问题标签 [spark-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1456 浏览

scala - 在scala中编写代码时如何在DStream上应用RDD函数

我正在尝试在 Scala 中编写一个简单的 Spark 代码。

在这里,我得到了一个 DStream。我成功地打印了这个 DStream。但是,当我尝试在此 DStream 上执行任何类型的“foreach”、“foreachRDD”或“transform”功能时,在我的程序执行期间,我的控制台就会冻结。在这里我没有收到任何错误,但控制台只是变得无响应,直到我手动终止 Eclipse 控制台操作。我在这里附上代码。请告诉我我做错了什么。

我的主要目标是在 DStream 上应用 RDD 操作,据我所知,我需要使用“foreach”、“foreachRDD”或“transform”函数将我的 DStream 转换为 RDD。

我已经通过使用 Java 实现了相同的目标。但是在scala中我遇到了这个问题。

还有其他人面临同样的问题吗?如果没有,请帮助我。谢谢

我在这里写一个示例代码

0 投票
2 回答
10078 浏览

json - 在火花流中解析 json

我对 spark 很陌生,我正在尝试从 kafka 主题接收结构为 json 的 DStream,我想解析每个 json 的内容。我收到的 json 是这样的:

我正在尝试仅提取 ident 字段,至少目前是这样,并且我正在使用 lift-json 库来解析数据。我的程序如下所示:

但它向我抛出了以下异常:

问题是,如果在不使用 spark(从文件中读取)的情况下运行相同的程序,它会完美运行。当我尝试将其放入火花程序时,问题就开始了。另外,如果我将解析器函数更改为如下所示:

它也有效。但是当我尝试提取实际的字符串时,我得到了同样的错误。

感谢您的帮助。我希望我已经解释得很好。

0 投票
2 回答
4187 浏览

scala - 使用过滤逻辑将 Spark 流式传输到 HBase

我一直在尝试了解 spark 流和 hbase 是如何连接的,但没有成功。我想要做的是给定一个火花流,处理该流并将结果存储在一个 hbase 表中。到目前为止,这就是我所拥有的:

我目前正在 spark-shell 中运行上述代码。我不确定我做错了什么。
我在 shell 中收到以下错误:

我还仔细检查了 hbase 表,以防万一,那里没有写任何新内容。

我在另一个终端上运行 nc -lk 9999 以将数据输入 spark-shell 进行测试。

0 投票
1 回答
705 浏览

apache-spark - 开发火花流应用程序

所以我要解决的问题如下:

  • 我需要一个以特定频率发出消息的数据源
  • 有 N 个神经网络需要单独处理每条消息
  • 汇总所有神经网络的输出,并且仅当收集了每个消息的所有 N 个输出时,才应将消息声明为已完全处理
  • 最后,我应该测量一条消息被完全处理所花费的时间(从它发出到收集该消息的所有 N 个神经网络输出之间的时间)

我很好奇如何使用火花流处理这样的任务。

我当前的实现使用了 3 种类型的组件:一个自定义接收器和两个实现 Function 的类,一个用于神经网络,一个用于最终聚合器。

概括地说,我的应用程序构建如下:

不过,我遇到的主要问题是它在本地模式下的运行速度比提交到 4 节点集群时要快。

我的实施一开始是错误的还是这里发生了其他事情?

这里还有一个完整的帖子http://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html详细介绍了这三个中的每一个的实现前面提到的组件。

0 投票
3 回答
1351 浏览

intellij-idea - 在 spark-1.0 中实现 kafka 消费者

我需要在 spark 1.0 的 spark 流中实现 kafka 消费者。我写了一个卡夫卡制作人。谁能帮我写一个火花接收器来从kafka中提取消息?另外,请问如何在 Intellij IDEA 中运行 kafka spark 流项目?

0 投票
1 回答
1393 浏览

scala - 自定义接收器在 Spark Streaming 中停止工作人员

我正在尝试使用自定义接收器编写 Spark 流应用程序。我应该通过提供具有预定义间隔的随机值来模拟实时输入数据。(简化的)接收器如下所示,相应的 Spark Streaming 应用程序代码如下:

运行此代码,我看到接收器正在工作(存储项目接收到单个日志条目)。但是,saveAsTextFiles永远不会输出值。

local[2]我可以通过将主服务器更改为使用两个线程(更具体地说,我需要至少比注册的自定义接收器数量多一个线程才能获得任何输出。

在我看来,工作线程似乎被接收者停止了。

谁能解释这种效果,以及如何修复我的代码?

0 投票
2 回答
285 浏览

scala - 为什么加入两个数据集并应用过滤器会导致“错误:构造函数无法实例化为预期类型”?

我正在加入两个数据集——第一个来自流,第二个来自 HDFS。

我在火花中使用斯卡拉。加入两个数据集后,我需要对加入的数据集应用过滤器,但在这里我遇到了问题。请协助解决。

我正在使用下面的代码,

使用过滤器时出现以下错误

0 投票
0 回答
634 浏览

apache-spark - 为什么在 DStreams 上运行 SparkSQL 会为 org.apache.spark.rdd.ShuffledRDDPartition 提供 ClassCastException?

在 DStream 中的每个 RDD 上运行 SparkSQL 时出现 ClassCastException。

0 投票
1 回答
736 浏览

amazon-ec2 - Spark Streaming 应用程序流式传输已流式传输的文件

我们在 YARN ec2 集群中部署了一个 Spark 流应用程序,该集群具有 1 个名称节点和 2 个数据节点。我们提交的应用程序有 11 个执行程序,每个执行程序有 1 个内核和 588 MB RAM。该应用程序从 S3 中不断写入的目录中流出;这是实现这一目标的代码行:

使用 fileStream 而不是 textFileStream 的目的是自定义 spark 在进程启动时处理现有文件的方式。我们只想处理进程启动后添加的新文件并忽略现有文件。我们配置了 10 秒的批处理持续时间。

当我们将少量文件添加到 s3 时,这个过程很顺利,比如 4 或 5 个。我们可以在流式 UI 中看到阶段是如何在执行器中成功执行的,每个处理的文件都有一个。但有时当我们尝试添加大量文件时,我们会遇到奇怪的行为;应用程序开始流式传输已经流式传输的文件。

例如,我在 s3 中添加了 20 个文件。文件分 3 批处理。第一批处理 7 个文件,第二批处理 8 个,第三批处理 5 个。此时不再向 S3 添加文件,但 spark 开始使用相同的文件无休止地重复这些阶段! 有什么想法会导致这种情况吗?

我已经为此问题发布了一张 Jira 票: https ://issues.apache.org/jira/browse/SPARK-3553

0 投票
1 回答
237 浏览

scala - Twitter Apache Spark 上的热门标签

我是apache spark的新手。我试图运行https://github.com/prabeesh/SparkTwitterAnalysis/tree/0.2.0示例,但控制台给了我以下错误:

我已经使用启动服务器nc -lk 9999,我已经通过 sbt/sbt 包编译了代码,
并使用sbt/执行了代码sbt 'run spark://localhost:9999 <keys as specifies> hashtag'

此错误的原因是什么以及如何解决它

提前致谢。