0

我正在尝试实现从 Redis Stream 读取的 Apache Spark Streaming 作业。我收到以下错误:

java.io.NotSerializableException: java.util.ArrayList$Itr
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
        at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
        at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
        at java.base/java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1379)
        at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)
        at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)

Java 代码如下:

cfg=new RedisStreamConfig(ctx.getParams());
            SparkConf conf =ctx.getSession().sparkContext().getConf();
            conf
                    .set("redis.host",cfg.host)
                    .set("redis.port",cfg.port);
           
            this.jssc = new JavaStreamingContext(new JavaSparkContext(ctx.getSession().sparkContext()),
                                                Durations.milliseconds(cfg.window));
            RedisConfig redisConfig = new RedisConfig(new RedisEndpoint(conf));
            RedisStreamingContext redisStreamingContext = new RedisStreamingContext(jssc.ssc());

            this.redisxstream = redisStreamingContext.createRedisXStream(this.cfg.config, StorageLevel.MEMORY_ONLY(), redisConfig);
            this.redisxstream.print();
        }

        DStream<String> dstream= redisxstream.map(new StreamItemConverter(),ClassTag$.MODULE$.apply(String.class));

        return JavaDStream.fromDStream(dstream,ClassTag$.MODULE$.apply(String.class))
                .map(rec->{
                    JSONObject obj= (JSONObject)new JSONParser().parse(rec);
                    return new AquaRecord((String)obj.get(KeyConverter.KEY),(String)obj.get(KeyConverter.RECORD));
        });

转换器代码如下:

public class StreamItemConverter extends AbstractFunction1 implements Serializable {
    public static final String KEY="k";
    public static final String RECORD="v";


    @Override
    public String apply(Object v1) {
        StreamItem rec= (StreamItem) v1;
        JSONObject obj = new JSONObject();
        obj.put(KEY, rec.streamKey());
        obj.put(RECORD, rec.id().toString());
        System.out.println("REDIS MESSAGE"+ obj.toString());
        return obj.toString();

    }


}

我尝试寻找 Apache Spark Streaming 的示例,但没有成功,它是一个与 Kafka 一起工作的现有应用程序。

任何输入都受到高度赞赏。

POM.xml

<dependency>
   <groupId>com.redislabs</groupId>
   <artifactId>spark-redis_2.12</artifactId>
   <version>3.0.0</version>
</dependency>
4

0 回答 0