问题标签 [spark-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1245 浏览

cassandra - kafka spark-streaming 数据没有被写入 cassandra。插入零行

从 spark 向 cassandra 写入数据时,没有写入数据。
闪回是:
我正在做一个 kafka-sparkStreaming-cassandra 集成。
我正在阅读 kafka 消息并试图将其放在 cassandra 表中CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT)
kafka 到 spark-streaming 运行很酷,但是 spark 到 cassandra,有一些问题......数据没有写入表。
我能够创建与 cassandra 的连接,但数据没有插入到 cassandra 表中。输出显示它已连接,下一秒将断开连接。
for 的字符串System.out.print()都在输出中。

Cassandra shell 显示 0 行。
完整代码以及日志和依赖项如下:

日志是:

POM.xml 依赖项是:

代码有问题吗?还是cassandra配置?

0 投票
2 回答
1436 浏览

apache-spark - 为什么运行简单的火花流这么难?spark 1.1.1 Maven 依赖

我正在使用火花流,当我使用 spark1.0.2 时它工作正常,现在我反复遇到一些问题

就像找不到类一样,我正在使用相同的 pom.xml 和更新版本的所有 spark 模块,我正在使用 spark-core、streaming、streaming with kafka modules..

它不断地抛出错误,因为没有公共配置、公共语言、日志记录

如何获得运行火花流的所有依赖项。有什么办法,或者我们只需要通过试错法找到?

我的 pom 依赖项

我在这里错过了什么吗?

0 投票
1 回答
1225 浏览

apache-spark - Spark 自定义流式传输丢弃大部分数据

我正在按照Spark 客户接收器提供的 spark 站点中给出的使用客户接收器的 spark 流式传输示例进行操作。

但是,这项工作似乎丢弃了我的大部分数据。无论我流传输的数据量是多少,消费者都会成功接收到它。但是,当我对其进行任何地图/平面地图操作时,我只会看到 10 行数据。无论我传输多少数据,情况总是如此。

我已修改此程序以从ActiveMQ队列中读取。如果我查看 ActiveMQ Web 界面,火花作业成功地使用了我生成的所有数据。但是,每批只处理 10 个数据。我尝试将批量大小更改为各种值,并在本地以及 6 节点 Spark 集群上进行了尝试——到处都是相同的结果。

这真的很令人沮丧,因为我不知道为什么要处理有限数量的数据。我在这里缺少什么吗?

这是我的火花程序。包括自定义接收器。此外,我并没有真正创建任何套接字连接。相反,我为测试目的对消息进行了硬编码。行为与为流创建套接字连接时相同。

0 投票
2 回答
648 浏览

apache-kafka - Spark Streaming 中的 ERROR actor.OneForOneStrategy 错误

我的 Spark Streaming Kafka 接收器程序中出现以下错误:

可能是什么原因?

0 投票
2 回答
5917 浏览

apache-spark - 如何更新 RDD?

我们正在开发 Spark 框架,其中我们将历史数据移动到 RDD 集中。

基本上,RDD 是不可变的,我们在其上进行操作的只读数据集。基于此,我们将历史数据移动到 RDD 中,并在此类 RDD 上进行过滤/映射等计算。

现在有一个用例,其中 RDD 中的数据子集被更新,我们必须重新计算这些值。

HistoricalData 采用 RDD 的形式。我根据请求范围创建另一个 RDD,并将该 RDD 的引用保存在ScopeCollection中

到目前为止,我已经能够想到以下方法 -

方法1:广播变化:

  1. 对于每个更改请求,我的服务器都会获取特定于范围的 RDD 并生成一个作业
  2. 在作业中,在该 RDD 上应用映射阶段 -

    2.a。对于 RDD 中的每个节点,在广播中查找并创建一个现在已更新的新值,从而创建一个新的 RDD
    2.b。现在我在 step2.a 再次对这个新的 RDD 进行所有计算。像乘法,减少等
    2.c。我将此 RDD 引用保存回我的ScopeCollection

方法2:为更新创建一个RDD

  1. 对于每个更改请求,我的服务器都会获取特定于范围的 RDD 并生成一个作业
  2. 在每个 RDD 上,与具有更改的新 RDD 进行联接
  3. 现在我在第 2 步再次对这个新的 RDD 进行所有计算,例如乘法、减法等

方法3:

我曾想过创建流式 RDD,在其中不断更新相同的 RDD 并进行重新计算。但据我了解,它可以从 Flume 或 Kafka 获取流。而在我的情况下,这些值是根据用户交互在应用程序本身中生成的。因此,在我的上下文中,我看不到任何流式 RDD 的集成点。

关于哪种方法更好或适合这种情况的任何其他方法的任何建议。

蒂亚!

0 投票
2 回答
3214 浏览

scala - Spark Streaming groupByKey 和 updateStateByKey 实现

我正在尝试通过从 Kafka 读取的(假)apache Web 服务器日志运行有状态的 Spark Streaming 计算。目标是“会话化”类似于此博客文章的网络流量

唯一的区别是我想对 IP 命中的每个页面进行“会话化”,而不是整个会话。我能够在批处理模式下使用 Spark 从虚假网络流量文件中读取,但现在我想在流式上下文中进行。

从 Kafka 读取日志文件并解析K/V成对(String, (String, Long, Long))

(IP, (requestPage, time, time)).

然后我呼吁groupByKey()这个K/V pair。在批处理模式下,这将产生:

(String, CollectionBuffer((String, Long, Long), ...)或者

(IP, CollectionBuffer((requestPage, time, time), ...)

在 StreamingContext 中,它产生一个:

(String, ArrayBuffer((String, Long, Long), ...)像这样:

然而,随着下一个微批处理(DStream)的到来,该信息被丢弃。

最终,我想要的是ArrayBuffer随着时间的推移,随着给定 IP 继续交互并对其数据运行一些计算以“会话化”页面时间,它会随着时间的推移而填满。

我相信实现这一点的运营商是“ updateStateByKey。” 我在使用这个运算符时遇到了一些问题(我对 Spark 和 Scala 都是新手);

任何帮助表示赞赏。

迄今:

0 投票
1 回答
712 浏览

apache-spark - 通过 Java 中的 Spark 流向 Twitter 流添加地理位置过滤器

我想要仅与特定地理位置相关的推文。在谷歌搜索之后,我发现这可以通过向 TwitterUtils 和 TwitterInputDStream 类添加额外的方法/功能来实现。但我不能这样做,因为这些是最后的课程。

帮助我我们如何实现这一目标?

提前致谢。

0 投票
2 回答
3353 浏览

amazon-ec2 - 如何从 EC2 上的 Spark Streaming 向 VisualVM 报告 JMX?

我一直在尝试获得 Spark Streaming 作业,在 EC2 实例上运行以使用 JMX 向 VisualVM 报告。

截至目前,我有以下配置文件:

火花/conf/metrics.properties:

我开始这样的火花流工作:(我后来添加的-D位希望远程访问ec2的jmx)

终端

0 投票
1 回答
1055 浏览

java - 在创建 FlumeDStream java.net.BindException 时出现 Yarn 错误上的 Spark 流:无法分配请求的地址

我正在尝试从基于水槽推送的方法创建火花流。我在我的 Yarn 集群上运行火花。在启动流时它无法绑定请求的地址。我正在使用 scala-shell 来执行程序,下面是我正在使用的代码

Flume Agent 无法写入此端口,因为此代码无法绑定 5858 端口。

水槽堆栈跟踪:


来自火花流的堆栈跟踪如下。

0 投票
2 回答
720 浏览

scala - 无法执行我的 SparkStreaming 程序

我编写了以下 Scala 代码,我的平台是 CentOS 6.5 上的 Cloudera CDH 5.2.1

教程.scala

我的 build.sbt 文件看起来像

我还创建了一个名为 projects/plugin.sbt 的文件,其中包含以下内容

和项目/build.scala

在此之后,我可以使用构建我的“超级”程序集

现在我使用运行我的代码

我得到错误