2

我希望我的代码能够读取每分钟生成的 json 文本文件(它是来自 Citibike 的车站馈送数据),并且我尝试使用 Spark Streaming。但我不断收到未知主机异常错误。

我的代码:

    String url = "http://citibikenyc.com/stations/json";

    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Streaming");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(60000));
    JavaDStream<String> lines = jssc.socketTextStream(url, 9999);
    lines.print();

    jssc.start();
    jssc.awaitTermination();

和错误:

14/11/22 15:32:54 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting        receiver with delay 2000ms: Error receiving data - java.net.UnknownHostException: http://citibikenyc.com/stations/json
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:579)
    at java.net.Socket.connect(Socket.java:528)
    at java.net.Socket.<init>(Socket.java:425)
    at java.net.Socket.<init>(Socket.java:208)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)
14/11/22 15:32:54 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0
4

1 回答 1

2

.socketTextStream用于完全不同的目的。Spark Streaming 没有任何接收器来定期获取 URL。

您将需要编写一个单独的程序来定期获取 URL 并将其提供给 Spark Streaming。你有很多选择:

  • 编写一个 shell 脚本定期将 URL 下载到一个目录,然后使用Apache Flume读取该目录中的文件并将它们发送到 Spark Streaming。有集成指南:Spark Streaming + Flume 集成指南
  • 编写您自己的 Spark Streaming 接收器。你可以从这里开始
  • 在您的 Spark 应用程序中,启动一个定期获取 URL 的线程并打开一个套接字以发送内容,然后连接到该套接字(例如.socketTextStream(127.0.0.1, 9999))。

有很多变体和一些更高级的解决方案,但我会说这些更方便。

于 2014-11-24T14:09:31.340 回答