问题标签 [spark-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - 为什么 Spark Streaming 需要一定数量的 CPU 内核才能正常运行?
重要的是要记住 Spark Streaming 应用程序需要分配足够的内核来处理接收到的数据,以及运行接收器
接着:
如果分配给应用程序的核数小于或等于输入 DStreams/接收器的数量,则系统将接收数据,但无法处理它们
这似乎令人惊讶,因为操作系统会调度 CPU 以使应用程序继续运行,无论那里有多少 CPU 内核,除非它以某种方式阻止这样做。我的问题是:
- Spark 会做一些特殊的事情来阻止正常的 CPU 调度吗?
- 如果是这样,其背后的理性是什么?
apache-spark - 集成 SQL 和 Spark Streaming 时出现不可序列化异常
除了集成 Spark SQL 和 Spark Streaming 时出现的 Not Serializable 异常
我的源代码
JavaSQLContext 也在 ForeachRDD 循环之外声明,但我仍然收到 NonSerializableException
23 年 14 月 12 日 23:49:38 错误 JobScheduler:运行作业流作业时出错 1419378578000 ms.1 org.apache.spark.SparkException:在 org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala :166) org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) org.apache.spark.SparkContext.clean(SparkContext.scala:1435) org.apache.spark.rdd.RDD .map(RDD.scala:271) 在 org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) 在 org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD .scala:42) 在 com.basic.spark.NumberCount$2.call(NumberCount.java:79) 在 com.basic.spark.NumberCount$2.call(NumberCount.java:67) 在 org.apache.spark.streaming。 api.java.JavaDStreamLike$$anonfun$foreachRDD$1。在 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274) 在 org.apache.spark.streaming.dstream.DStream$ 应用(JavaDStreamLike.scala:274) $anonfun$foreachRDD$1.apply(DStream.scala:529) 在 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:529) 在 org.apache.spark.streaming。 dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) 在 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) 在 org.apache。 spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run (Job.scala:32) 在 org.apache.spark.streaming.scheduler。JobScheduler$JobHandler.run(JobScheduler.scala:171) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java .lang.Thread.run(Thread.java:724) 引起:java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java.io。 java.io.ObjectOutputStream 中的 ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175)。defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java :1175) 在 java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541) 在 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506) 在 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429) 在 java .io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 上的 apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) ... 还有 20 个
如果您有任何建议,我将不胜感激。
apache-spark - 块推送线程中的错误:Kafka Spark 流式传输
运行 kafka consumer 时出现以下错误:
build.sbt 文件:
错误的原因可能是什么?
scala - 如何在 Spark Streaming 中使用无限 Scala 流作为源?
假设我本质上想要Stream.from(0)
as InputDStream
。我该怎么办?我能看到的唯一方法是使用StreamingContext#queueStream
,但我必须将来自另一个线程或子类的元素排入Queue
队列以创建一个行为类似于无限流的队列,这两者都感觉像黑客。
这样做的正确方法是什么?
authentication - Spark Streaming 使用 MQTTutils 通过身份验证从 activemq 订阅主题
好像 MQTTUtils 只提供了三个方法, def createStream(jssc: JavaStreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel): JavaDStream[String]
创建一个接收 MQTT 发布者推送的消息的输入流。def createStream(jssc:JavaStreamingContext,brokerUrl:字符串,主题:字符串):JavaDStream [字符串]
创建一个接收 MQTT 发布者推送的消息的输入流。def createStream(ssc: StreamingContext, brokerUrl: String, topic: String, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): DStream[String]
创建一个接收 MQTT 发布者推送的消息的输入流。
但是,如果代理启用了身份验证,我如何提供用户名和密码?
hdfs - 将部分 spark DStream 窗口保存到 HDFS
我正在计算每个窗口中的值并找到最高值,并且只想将每个窗口的前 10 个频繁值保存到 hdfs 而不是所有值。
我可以打印上面的前 10 名(评论)。
我也试过
但我收到以下错误。我的数组不可序列化
15/01/05 17:12:23 错误 actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
apache-spark - Spark Streaming 中三叉戟的 persistentAggregate 等价物是什么?
假设我有这份三叉戟工作:
我如何在 Spark Streaming 中实现同样的目标?我看了看,updateStateByKey
但这似乎将状态保持在内部(而不是将其保持在像 Memcached 这样的外部状态)并且无限期地保持。当我保存它时,它看起来也试图转储每个批次上的所有内容,例如saveAsTextFile
,而不是仅发出在该批次上更新的键值。
我知道我可以简单地与外部状态交互foreachRDD
,但在这种情况下,我如何确保我只处理一次记录?
apache-kafka - 如何将日志文件从多个 Windows 服务器传输到中央服务器?
我有一个连续生成日志的 Tableau 服务器(Windows 平台)。我想将它流式传输到 apache spark 进行一些实时分析。我查看了以下解决方案,但似乎没有一个满足要求。
1) 使用 nxlog 代理。这是不可扩展的,因为将来日志可能来自多个 tableau 服务器。
2) fluented , flume 与 windows 不兼容。
3) Kafka 是毫无疑问的,因为它不跟踪日志文件。
什么是此类问题的可扩展解决方案?主要限制是 Tableau Server 在 Windows 上运行,
scala - Spark Streaming StreamingContext 依赖问题
我正在尝试将 Spark Streaming 与 Scala 一起使用,但我遇到了错误,我不知道为什么。
StreamingContext 是给出错误的行:
这是2个错误:
错误的符号引用。StreamingContext.class 中的签名是指包 org.apache.hadoop 中不可用的术语 conf。当前类路径中可能完全缺少它,或者类路径上的版本可能与编译 StreamingContext.class 时使用的版本不兼容。
和:
加载类文件“StreamingContext.class”时检测到缺少或无效的依赖项。无法访问包 org.apache.hadoop 中的术语 conf,因为它(或其依赖项)丢失。检查您的构建定义是否存在缺失或冲突的依赖项。(使用 -Ylog-classpath 重新运行以查看有问题的类路径。)如果“StreamingContext.class”是针对不兼容的 org.apache.hadoop 版本编译的,则完全重建可能会有所帮助。
之前有人问过这个问题: Spark Streaming StreamingContext 错误,这些错误似乎来自依赖问题,但据我所知,我的依赖关系都是正常的。
scala - Spark 流 StreamingContext.start() - 启动接收器时出错 0
我有一个使用火花流的项目,我正在使用“火花提交”运行它,但我遇到了这个错误:
这是错误来自的代码,一切都运行良好,直到 ssc.start()
我已经使用“spark-submit”运行了 SparkPi 示例,它运行良好,所以我似乎无法弄清楚是什么导致了我的应用程序出现问题,任何帮助将不胜感激。