问题标签 [flink-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
564 浏览

apache-flink - 如何同时使用键值状态和检查点?

据我了解,键值状态接口允许在我的流程失败后恢复状态,但在重新部署流程后无法恢复状态。

如果我想在重新部署后恢复状态,我应该实现Checkpointed接口并从/到键值状态使用Checkpointed接口的方法保存/恢复状态(+不要忘记在部署之前创建保存点并使用创建的保存点运行新流程)。对?

0 投票
1 回答
362 浏览

scala - 尝试在 PatternStream 上选择时出现“输入不匹配:预期元组类型”

我在测试新的 Flink 1.0.0 功能时遇到了一些麻烦。我一直在修补 CEP,但还没有设法运行一个简单的演示代码:

代码编译良好,并且 maven 没有显示警告。TrafficEvent 是一个包含几个简单字段的类,stream 是该类的 Scala DataStream。当代码在 Flink 上运行时会出现错误。它运行一秒钟,然后代码退出并显示以下错误消息:

该程序完成,但出现以下异常:

我试图通过构建这样的静态类将功能移至 Java(也许从 Scala 调用 API 存在一些奇怪的问题):

但是结果是完全一样的,它在 PatternStream.select 行中抛出了同样的错误。关于我可以尝试什么或我做错了什么的任何提示?如您所见,该模式非常愚蠢,仅用于测试目的。它只接受所有事件,并返回该事件作为响应。Flink 是 1.0.0,使用 Scala 2.10 版本。

谢谢

0 投票
1 回答
135 浏览

apache-flink - 在 Scala Shell 中实例化 StreamExecutionEnvironment

我启动了 scala shell。它会自动从 org.apache.flink.api.scala.ExecutionEnvironment 类型创建一个 ExecutionEnvironment

但是,我想使用流媒体环境。

导致异常,因为 ExecutionEnvironement 只能实例化一次。

0 投票
1 回答
1396 浏览

apache-flink - Flink 中基于流数据的全局聚合

我目前正在使用 Flink 1.0 编写一个聚合用例,作为用例的一部分,我需要获取最近 10 分钟登录的 api 计数。

我可以使用 keyBy("api") 轻松做到这一点,然后应用 10 分钟的窗口并执行 sum(count) 操作。

但问题是我的数据可能出现乱序,所以我需要一些方法来获取 10 分钟窗口内的 api 计数..

例如:如果相同的 api 日志出现在 2 个不同的窗口中,我应该得到一个全局计数,即 2,而不是两个单独的记录显示每个窗口的计数为 1。

我也不想要增量计数,即具有相同键的每条记录显示多次,计数等于增量值..

我希望记录以全局计数显示一次,例如 Spark 中的 updateStateByKey() 。

我们可以这样做吗?

0 投票
1 回答
1280 浏览

apache-flink - Flink:数据集和数据流 API 在一个程序中。可能吗?

我想首先使用数据集 API 操作静态数据,然后使用 DataStream API 运行流式作业。如果我在 IDE 上编写代码,它可以完美运行。但是当我尝试在本地 flink jobmanager 上运行(所有并行度为 1)时,流代码永远不会执行!

例如,以下代码不起作用:

我应该怎么做才能让这件事发挥作用?

日志:上述程序的执行日志

执行计划:plan 似乎是非循环的。

0 投票
0 回答
113 浏览

apache-flink - flink 中的流式实现

我的任务是从本地驱动器读取 csv 文件。它包含定期更新的股票数据。接下来,对读取的数据执行一些操作,并定期写回一个新的 csv 文件。

我的代码改编自WikipediaAnalysis代码,问题是执行只发生一次。在初始运行后,程序结束并且不会保持在运行状态。它不像 WikipediaAnalysis 那样以 5 秒(我已设置)的定期间隔运行。下面是我的代码:

你能建议吗!

0 投票
1 回答
1040 浏览

apache-flink - 如何在 Flink 中对 String DataStream 执行 timeWindow()?

我想在 Apache Flink 中创建一个流数据的时间窗口。我的数据看起来有点像这样:

每 20 秒,我想要所有行的标记总和(最后一列,例如 Mokshda 的标记是 84.85)。timeWindow() 函数在 KeyedStream 上运行,因此我必须 keyBy() 这个 DataStream。我可以通过卷号输入它(第一列,例如 Mokshda 的 52)。

但显然,Flink 并没有将我的数据作为列表读取。它将它作为字符串读取,因此,我得到以下异常:

如何对字符串数据执行 timeWindow,或者如何将此数据转换为元组?

0 投票
1 回答
349 浏览

apache-flink - 我们如何在 Flink 中对 WindowedStream 进行自定义操作?

我想在 Flink 中的 WindowedStream 上执行一些操作,比如说平均。但是预定义的操作非常有限,例如 sum、min、max 等。

假设我想找到平均值,我该怎么做?

0 投票
1 回答
796 浏览

apache-flink - Flink 中的自定义 Windows 充电

我正在使用 Flink 的TimeWindow功能来执行一些计算。我正在创建一个 5 分钟Window。但是我只想第一次创建一个小时Window。我需要的下一个窗口是 5 分钟。

这样在第一个小时内,收集数据并对其执行我的操作。完成此操作后,每五分钟执行一次相同的操作。

我发现这可以用 a 来实现,trigger但我不确定trigger应该使用哪个以及如何使用。

更新:我认为甚至triggers没有帮助,据我所知,他们只是定义触发的时间/计数window,而不是触发第一个window的时间。

0 投票
0 回答
1368 浏览

apache-flink - 如何在 Apache Flink 上编写“自定义源”

我想写一个 DataSource,它是来自 Tarantool-java https://github.com/tarantool/tarantool-java的 DataStream 。

谁能给我一个关于如何通过用户定义编写数据源的指南。

这是我的代码:

}