问题标签 [spark-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
cassandra - kafka spark-streaming 数据没有被写入 cassandra。插入零行
从 spark 向 cassandra 写入数据时,没有写入数据。
闪回是:
我正在做一个 kafka-sparkStreaming-cassandra 集成。
我正在阅读 kafka 消息并试图将其放在 cassandra 表中CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT)
。
kafka 到 spark-streaming 运行很酷,但是 spark 到 cassandra,有一些问题......数据没有写入表。
我能够创建与 cassandra 的连接,但数据没有插入到 cassandra 表中。输出显示它已连接,下一秒将断开连接。
for 的字符串System.out.print()
都在输出中。
Cassandra shell 显示 0 行。
完整代码以及日志和依赖项如下:
日志是:
POM.xml 依赖项是:
代码有问题吗?还是cassandra配置?
apache-spark - 为什么运行简单的火花流这么难?spark 1.1.1 Maven 依赖
我正在使用火花流,当我使用 spark1.0.2 时它工作正常,现在我反复遇到一些问题
就像找不到类一样,我正在使用相同的 pom.xml 和更新版本的所有 spark 模块,我正在使用 spark-core、streaming、streaming with kafka modules..
它不断地抛出错误,因为没有公共配置、公共语言、日志记录
如何获得运行火花流的所有依赖项。有什么办法,或者我们只需要通过试错法找到?
我的 pom 依赖项
我在这里错过了什么吗?
apache-spark - Spark 自定义流式传输丢弃大部分数据
我正在按照Spark 客户接收器提供的 spark 站点中给出的使用客户接收器的 spark 流式传输示例进行操作。
但是,这项工作似乎丢弃了我的大部分数据。无论我流传输的数据量是多少,消费者都会成功接收到它。但是,当我对其进行任何地图/平面地图操作时,我只会看到 10 行数据。无论我传输多少数据,情况总是如此。
我已修改此程序以从ActiveMQ
队列中读取。如果我查看 ActiveMQ Web 界面,火花作业成功地使用了我生成的所有数据。但是,每批只处理 10 个数据。我尝试将批量大小更改为各种值,并在本地以及 6 节点 Spark 集群上进行了尝试——到处都是相同的结果。
这真的很令人沮丧,因为我不知道为什么要处理有限数量的数据。我在这里缺少什么吗?
这是我的火花程序。包括自定义接收器。此外,我并没有真正创建任何套接字连接。相反,我为测试目的对消息进行了硬编码。行为与为流创建套接字连接时相同。
apache-kafka - Spark Streaming 中的 ERROR actor.OneForOneStrategy 错误
我的 Spark Streaming Kafka 接收器程序中出现以下错误:
可能是什么原因?
apache-spark - 如何更新 RDD?
我们正在开发 Spark 框架,其中我们将历史数据移动到 RDD 集中。
基本上,RDD 是不可变的,我们在其上进行操作的只读数据集。基于此,我们将历史数据移动到 RDD 中,并在此类 RDD 上进行过滤/映射等计算。
现在有一个用例,其中 RDD 中的数据子集被更新,我们必须重新计算这些值。
HistoricalData 采用 RDD 的形式。我根据请求范围创建另一个 RDD,并将该 RDD 的引用保存在ScopeCollection中
到目前为止,我已经能够想到以下方法 -
方法1:广播变化:
- 对于每个更改请求,我的服务器都会获取特定于范围的 RDD 并生成一个作业
- 在作业中,在该 RDD 上应用映射阶段 -
2.a。对于 RDD 中的每个节点,在广播中查找并创建一个现在已更新的新值,从而创建一个新的 RDD
2.b。现在我在 step2.a 再次对这个新的 RDD 进行所有计算。像乘法,减少等
2.c。我将此 RDD 引用保存回我的ScopeCollection
方法2:为更新创建一个RDD
- 对于每个更改请求,我的服务器都会获取特定于范围的 RDD 并生成一个作业
- 在每个 RDD 上,与具有更改的新 RDD 进行联接
- 现在我在第 2 步再次对这个新的 RDD 进行所有计算,例如乘法、减法等
方法3:
我曾想过创建流式 RDD,在其中不断更新相同的 RDD 并进行重新计算。但据我了解,它可以从 Flume 或 Kafka 获取流。而在我的情况下,这些值是根据用户交互在应用程序本身中生成的。因此,在我的上下文中,我看不到任何流式 RDD 的集成点。
关于哪种方法更好或适合这种情况的任何其他方法的任何建议。
蒂亚!
scala - Spark Streaming groupByKey 和 updateStateByKey 实现
我正在尝试通过从 Kafka 读取的(假)apache Web 服务器日志运行有状态的 Spark Streaming 计算。目标是“会话化”类似于此博客文章的网络流量
唯一的区别是我想对 IP 命中的每个页面进行“会话化”,而不是整个会话。我能够在批处理模式下使用 Spark 从虚假网络流量文件中读取,但现在我想在流式上下文中进行。
从 Kafka 读取日志文件并解析K/V
成对(String, (String, Long, Long))
或
(IP, (requestPage, time, time))
.
然后我呼吁groupByKey()
这个K/V pair
。在批处理模式下,这将产生:
(String, CollectionBuffer((String, Long, Long), ...)
或者
(IP, CollectionBuffer((requestPage, time, time), ...)
在 StreamingContext 中,它产生一个:
(String, ArrayBuffer((String, Long, Long), ...)
像这样:
然而,随着下一个微批处理(DStream)的到来,该信息被丢弃。
最终,我想要的是ArrayBuffer
随着时间的推移,随着给定 IP 继续交互并对其数据运行一些计算以“会话化”页面时间,它会随着时间的推移而填满。
我相信实现这一点的运营商是“ updateStateByKey
。” 我在使用这个运算符时遇到了一些问题(我对 Spark 和 Scala 都是新手);
任何帮助表示赞赏。
迄今:
apache-spark - 通过 Java 中的 Spark 流向 Twitter 流添加地理位置过滤器
我想要仅与特定地理位置相关的推文。在谷歌搜索之后,我发现这可以通过向 TwitterUtils 和 TwitterInputDStream 类添加额外的方法/功能来实现。但我不能这样做,因为这些是最后的课程。
帮助我我们如何实现这一目标?
提前致谢。
amazon-ec2 - 如何从 EC2 上的 Spark Streaming 向 VisualVM 报告 JMX?
我一直在尝试获得 Spark Streaming 作业,在 EC2 实例上运行以使用 JMX 向 VisualVM 报告。
截至目前,我有以下配置文件:
火花/conf/metrics.properties:
我开始这样的火花流工作:(我后来添加的-D位希望远程访问ec2的jmx)
终端:
java - 在创建 FlumeDStream java.net.BindException 时出现 Yarn 错误上的 Spark 流:无法分配请求的地址
我正在尝试从基于水槽推送的方法创建火花流。我在我的 Yarn 集群上运行火花。在启动流时它无法绑定请求的地址。我正在使用 scala-shell 来执行程序,下面是我正在使用的代码
Flume Agent 无法写入此端口,因为此代码无法绑定 5858 端口。
水槽堆栈跟踪:
来自火花流的堆栈跟踪如下。
scala - 无法执行我的 SparkStreaming 程序
我编写了以下 Scala 代码,我的平台是 CentOS 6.5 上的 Cloudera CDH 5.2.1
教程.scala
我的 build.sbt 文件看起来像
我还创建了一个名为 projects/plugin.sbt 的文件,其中包含以下内容
和项目/build.scala
在此之后,我可以使用构建我的“超级”程序集
现在我使用运行我的代码
我得到错误