我正在尝试实现从 Redis Stream 读取的 Apache Spark Streaming 作业。我收到以下错误:
java.io.NotSerializableException: java.util.ArrayList$Itr
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
Java 代码如下:
cfg=new RedisStreamConfig(ctx.getParams());
SparkConf conf =ctx.getSession().sparkContext().getConf();
conf
.set("redis.host",cfg.host)
.set("redis.port",cfg.port);
this.jssc = new JavaStreamingContext(new JavaSparkContext(ctx.getSession().sparkContext()),
Durations.milliseconds(cfg.window));
RedisConfig redisConfig = new RedisConfig(new RedisEndpoint(conf));
RedisStreamingContext redisStreamingContext = new RedisStreamingContext(jssc.ssc());
this.redisxstream = redisStreamingContext.createRedisXStream(this.cfg.config, StorageLevel.MEMORY_ONLY(), redisConfig);
this.redisxstream.print();
}
DStream<String> dstream= redisxstream.map(new StreamItemConverter(),ClassTag$.MODULE$.apply(String.class));
return JavaDStream.fromDStream(dstream,ClassTag$.MODULE$.apply(String.class))
.map(rec->{
JSONObject obj= (JSONObject)new JSONParser().parse(rec);
return new AquaRecord((String)obj.get(KeyConverter.KEY),(String)obj.get(KeyConverter.RECORD));
});
转换器代码如下:
public class StreamItemConverter extends AbstractFunction1 implements Serializable {
public static final String KEY="k";
public static final String RECORD="v";
@Override
public String apply(Object v1) {
StreamItem rec= (StreamItem) v1;
JSONObject obj = new JSONObject();
obj.put(KEY, rec.streamKey());
obj.put(RECORD, rec.id().toString());
System.out.println("REDIS MESSAGE"+ obj.toString());
return obj.toString();
}
}
我尝试寻找 Apache Spark Streaming 的示例,但没有成功,它是一个与 Kafka 一起工作的现有应用程序。
任何输入都受到高度赞赏。
POM.xml
<dependency>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis_2.12</artifactId>
<version>3.0.0</version>
</dependency>