问题标签 [apache-flink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1121 浏览

scala - Apache Flink 中 Join 的输出

在 Apache Flink 中,如果我在一个主键上加入两个数据集,我会得到一个元组 2,其中包含每个数据集的相应数据集条目。

问题是,当将map()方法应用于输出的元组 2 数据集时,它看起来并不好,特别是如果两个数据集的条目都具有大量特征。

在两个输入数据集中使用元组可以得到如下代码:

我不介意使用 POJO 或案例类,但我不知道这会如何使它变得更好。

问题 1:有没有一种很好的方法来扁平化元组 2?例如使用另一个运算符。

问题2:如何处理同一个键上3个数据集的连接?它会使示例源更加混乱。

感谢您的帮助。

0 投票
1 回答
1298 浏览

scala - 如何在 Apache Flink 中对 GroupedDataSet 上的函数进行平面映射

我想将函数 viaflatMap应用于由DataSet.groupBy. 尝试调用flatMap我得到编译器错误:

我的代码:

事实上,在flink-scala 0.9-SNAPSHOT的文档中没有map列出或类似的。有没有类似的方法可以使用?如何在节点上单独实现每个组的所需分布式映射?

0 投票
1 回答
768 浏览

maven - Flink 错误 - org.apache.hadoop.ipc.RemoteException:服务器 IPC 版本 9 无法与客户端版本 4 通信

我正在尝试使用 HDFS 中的文件运行 flink 作业。我创建了一个数据集如下 -

我正在使用 flink 的最新版本 - 0.9.0-milestone-1-hadoop1(我也尝试过使用 0.9.0-milestone-1)

而我的 Hadoop 版本是 2.6.0

但是,当我尝试执行作业时出现以下异常。我搜索了类似的问题,它与客户端和hdfs之间的版本不兼容有关。

您能否让我知道我应该在我的 pom 中进行哪些更改,以便它指向正确的 Hadoop/HDFS 版本?或其他地方的变化?或者我需要降级hadoop安装?

0 投票
1 回答
427 浏览

python - Flink Python API 错误

我刚刚试用了 flink python api。当我尝试:

我得到:

该错误是由于以下路径处理引起的:

而不是使用:

可以使用以下命令创建映射文件:

这同样适用于 mmap 输入文件。那会是一个修复吗?

0 投票
1 回答
383 浏览

scala - ALS 的 OutOfBoundsException - Flink MLlib

我正在使用此处提供的 MovieLens 数据集为电影制作推荐系统:http: //grouplens.org/datasets/movielens/

为了计算这个推荐系统,我在 scala 中使用了 Flink 的 ML 库,特别是 ALS 算法 ( org.apache.flink.ml.recommendation.ALS)。

我首先将电影的评分映射到 a DataSet[(Int, Int, Double)],然后创建 atrainingSet和 a testSet(参见下面的代码)。

我的问题是当我将ALS.fit函数与整个数据集(所有评级)一起使用时没有错误,但如果我只删除一个评级,拟合函数不再起作用,我不明白为什么.

你有什么想法?:)

使用的代码:

Rating.scala

预处理.scala

处理.scala

“但如果我只删除一个评级”

错误 :

06/19/2015 15:00:24 CoGroup(在 org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570) 上的 CoGroup)(4/4)切换到 FAILED

java.lang.ArrayIndexOutOfBoundsException:5

在 org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)

在 org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)

在 org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)

...

0 投票
1 回答
42 浏览

apache-flink - #flink 不同算子传递消息使用MQ

我看到链接将使用kafka for MQ以便重播失败,并且在不同运营商中传输消息时是否使用内存MQ?

0 投票
1 回答
1097 浏览

apache-spark - Apache Spark 和 Apache Flink 中的“流”是什么意思?

Apache Spark Streaming Website 的时候,看到一句话:

Spark Streaming 使构建可扩展的容错流应用程序变得容易。

而在Apache Flink网站中,有一句话:

Apache Flink 是一个开源平台,用于可扩展的批处理和流数据处理。

streaming applicationbatch data processing,是什么意思stream data processing?你能举一些具体的例子吗?它们是为传感器数据设计的吗?

0 投票
1 回答
941 浏览

jdbc - org.apache.flink.api.java.io.jdbc.JDBCInputFormat 不在 FLINK JARS 内

  • 我在 eclipse-jee-kepler-SR2-win32-x86_64 中创建了一个新的 Java 项目。
  • 我已将 Jars 包含在 flink-0.8.1\lib 中。
  • 我已经创建了标准 WordCount 并且它可以工作。
  • 我已经修改了我的 WordCount 以从文本文件和 csv 文件中获取输入,并且它可以工作。
  • 所有的进口都完美无缺。
  • 然后我尝试了 import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
  • Eclipse没有找到它?

为什么 Eclipse 找不到导入?

因为 jar flink-java-0.8.1.jar里面没有 io/jdbc 目录。

我用flink-0.9.0-bin-hadoop27尝试了同样的事情,并且在 jar flink-dist-0.9.0.jar中没有org/apache/flink/api/java/io/jdbc目录。我解压缩 jar 并搜索字符串“jdbcinputformat”,结果为 0。我搜索了字符串“jdbc”,它只在org/apache/log4jorg/eclipse/jetty和其他不是org.apache.flink.api.java.io的地方提到

所以我的问题是:我在哪里可以找到类JDBCInputFormat

我可以做些什么来访问 Flink 中的 SqlServer2012(除了在 Flink 之外访问它,创建 csv 文件,然后在 Flink 中读取它们(这听起来很可怕,因为应该有一个特定的类))?

0 投票
1 回答
1028 浏览

apache-spark - Apache Spark Streaming:在内存中积累数据并在很久以后才输出

如果我理解这一点,火花流是用于通过一组转换管道您的 RDD 批次,并在转换后进行输出操作。这是针对每个批次执行的,因此输出操作也针对每个批次执行。但是由于每次输出都非常昂贵,我想处理批次并累积结果,并且仅在某些事件(例如在一定时间段之后)写出累积的结果并结束程序。

我知道我可以积累数据,例如,updateStateByKey但我不知道如何告诉 Spark 使用输出操作(例如saveAsTextFiles),直到很久以后,当某些条件到达时。

这可能吗?

这在flink中可能吗?

0 投票
1 回答
938 浏览

apache-flink - Flink CSV 文件阅读器无法将 LongType 转换为 PojoType

我试图在 Flink 中执行的部分代码:

我想pages用于其他目的,但是当我编译时,Flink 会向我抛出一条错误消息

线程“主”java.lang.ClassCastException 中的异常:
org.apache.flink.api.common.typeinfo.IntegerTypeInfo 无法转换为
org.apache.flink.api.java.typeutils.PojoTypeInfo

顺便说一句,我使用的是 Flink 的 0.9 快照版本。任何朝着正确方向的帮助都将受到高度赞赏。