问题标签 [apache-spark-2.0]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
4 回答
12182 浏览

scala - 使用 Spark 2.0.2(结构化流)从 Kafka 读取 Avro 消息

我有一个 spark 2.0 应用程序,它使用 spark 流(使用 spark-streaming-kafka-0-10_2.11)从 kafka 读取消息。

结构化流看起来真的很酷,所以我想尝试迁移代码,但我不知道如何使用它。

在常规流中,我使用 kafkaUtils 来创建 Dstrean,在我传递的参数中,它是值反序列化器。

在结构化流中,文档说我应该使用 DataFrame 函数进行反序列化,但我无法准确理解这意味着什么。

我查看了诸如this example之类的示例,但是我在Kafka中的Avro对象非常复杂,不能像示例中的String那样简单地转换..

到目前为止,我尝试了这种代码(我在这里看到了另一个问题):

我得到“数据类型不匹配:无法将 BinaryType 转换为 StructType(StructField(....”

如何反序列化该值?

0 投票
1 回答
2156 浏览

java - 将 Json 的 Dataset 列解析为 Dataset

拥有Dataset<Row>单列 json 字符串:

json示例:

我怎样才能最有效地得到Dataset<Row>这样的:

我正在流中处理这些数据,我知道当我从文件中读取数据时,spark 可以自己执行此操作:

但现在我正在从 kafka 读取数据,它以另一种形式为我提供数据。我知道我可以使用像 Gson 这样的解析器,但我想让 spark 为我做这件事。

0 投票
6 回答
40983 浏览

scala - 程序执行期间 Apache-Spark 中的超时异常

我在 MAC 中运行 Bash 脚本。该脚本多次调用Scala语言编写的spark方法。我目前正在尝试使用 for 循环调用此 spark 方法 100,000 次。

运行少量迭代(大约 3000 次迭代)后,代码退出并出现以下异常。

有人可以帮忙吗,这个错误是因为大量调用spark方法引起的吗?

0 投票
0 回答
2320 浏览

python - PySpark 中缺少 lit() 函数?

也许我错了,但我认为pyspark.sql.functions.lit()PySpark 缺少该功能。

当我尝试导入它时首先注意到。我也查了源码,没找到。只是想先在这里检查一下,如果我是正确的,根本就存在一个错误。

最后,有谁知道我如何从字符串输入创建一个文字列,最好是使用 lambda 函数?

0 投票
1 回答
400 浏览

apache-spark - Spark 2.0 Standalone模式动态资源分配Worker启动错误

我在独立模式下运行 Spark 2.0,成功将其配置为在服务器上启动,并且还能够将 Ipython Kernel PySpark 配置为 Jupyter Notebook 的选项。一切正常,但我面临的问题是,对于我启动的每个 Notebook,我的所有 4 个工作人员都分配给该应用程序。因此,如果我的团队中的另一个人尝试使用 PySpark 内核启动另一个 Notebook,它根本无法工作,直到我停止第一个 notebook 并释放所有工作人员。

为了解决这个问题,我尝试按照Spark 2.0 Documentation中的说明进行操作。所以,在我的身上,$SPARK_HOME/conf/spark-defaults.conf我有以下几行:

另外,$SPARK_HOME/conf/spark-env.sh我有:

但是当我尝试使用 启动工作人员时$SPARK_HOME/sbin/start-slaves.sh,只有第一个工作人员成功启动。第一个工人的日志最终是这样的:

16/11/24 13:32:06 INFO Worker: 成功注册master spark://cerberus:7077

但是工人 2-4 的日志向我显示了这个错误:

信息 ExternalShuffleService:使用 useSasl = false 16/11/24 13:32:08 在端口 7337 上启动 shuffle 服务错误收件箱:忽略错误 java.net.BindException:地址已在使用中

似乎(对我来说)第一个工作人员在端口 7337 成功启动了 shuffle 服务,但是 2-4 工作人员“不知道”这一点并尝试在同一端口上启动另一个 shuffle 服务。

如果我首先启动 shuffle-service(使用$SPARK_HOME/sbin/start-shuffle-service.sh)然后尝试启动所有工作人员($SPARK_HOME/sbin/start-slaves.sh),那么所有工作人员(1-4)也会出现问题。

有什么办法可以解决这个问题吗?如果有一个 shuffle 服务正在运行并连接到它,而不是尝试创建一个新服务,那么能够验证所有工作人员吗?

0 投票
0 回答
1119 浏览

amazon-web-services - Spark EMR 集群在运行时正在删除执行程序,因为它们处于空闲状态

我有一个在独立模式下运行良好的 spark 应用程序,我现在正试图让相同的应用程序在 AWS EMR 集群上运行,但目前它失败了。

该消息是我以前从未见过的消息,暗示工人没有得到工作并且正在被关闭。

DAG 显示工人已初始化,然后是一个收集(一个相对较小的),然后在它们都失败后不久。启用了动态分配,因此有人认为驱动程序可能没有向它们发送任何任务,因此它们超时 - 为了证明这一理论,我在没有动态分配的情况下启动了另一个集群,并且发生了同样的事情。

主人设置为纱线。

非常感谢任何帮助,谢谢。

我的步骤很简单——spark-submit --deploy-mode client --master yarn --class Run app.jar

0 投票
0 回答
777 浏览

apache-spark - 如何使用pyspark从数据框中的单列创建两列

我有一个转换数据框,看起来像这样

现在我必须把它转换成一个由三列组成的数据框,看起来像这样

0 投票
1 回答
1530 浏览

apache-spark - 如何在 Apache Spark 上进行非随机数据集拆分?

我知道我可以使用 randomSplit 方法进行随机拆分:

我可以使用一些“nonRandomSplit 方法”将数据分成连续的部分吗?

Apache Spark 2.0.1。提前致谢。

UPD:数据顺序很重要,我将在“较小 ID”的数据上训练我的模型,并在“较大 ID”的数据上对其进行测试。所以我想把数据分成连续的部分而不用改组。

例如

我能想到的唯一解决方案是使用countlimit,但可能有更好的解决方案。

0 投票
1 回答
976 浏览

scala - Apache Spark 加入动态重新分区

我试图在两个表上做一个相当简单的连接,没什么复杂的。加载两个表,进行连接和更新列,但它不断抛出异常。

我注意到任务卡在最后一个分区上199/200并最终崩溃。我怀疑数据是倾斜的,导致所有数据都加载到最后一个分区199中。

SELECT COUNT(DISTINCT report_audit) FROM ReportDs = 1.5million.

尽管

SELECT COUNT(*) FROM ReportDs = 57million.

集群详情:CPU:40核,内存:160G。

这是我的示例代码:

我认为应该有一种优雅的方式来处理这种数据偏斜。

0 投票
1 回答
152 浏览

apache-spark - 在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)

在 Spark 2.0.x 上,我一直在使用JobProgressListener实现从我们的集群中实时检索 Job/Stage/Task 进度信息。我了解事件流程的工作原理,并成功接收工作更新。

我的问题是我们在同一个 Spark Context 上同时运行了几个不同的提交,并且似乎无法区分每个提交属于哪个 Job/Stage/Task。每个 Job/Stage/Task 都会收到一个唯一的 id,这很棒。但是,我正在寻找一种方法来提供将与收到的 JobProgressListener 事件对象一起返回的提交“id”或“name”。

我意识到可以在 Spark Context 上设置“作业组”,但是如果多个作业同时在同一个上下文上运行,它们就会变得混乱。

有没有一种方法可以潜入将与单个 SQLContext 的侦听器事件一起返回的自定义属性?这样做,我应该能够连接后续的 Stage 和 Task 事件并得到我需要的东西。

请注意:我没有为这些工作使用 spark-submit。它们是使用对 SparkSession/SQLContext 的 Java 引用执行的。

感谢您提供任何解决方案或想法。