问题标签 [flink-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
453 浏览

apache-flink - Apache Flink 导入 scala api 流扩展

我正在尝试为 Apache Flink 导入 scala api 流扩展,如https://ci.apache.org/projects/flink/flink-docs-master/apis/scala_api_extensions.html中所述

但是,我的 ScalaIDE 抱怨以下消息:对象扩展不是包 org.apache.flink.streaming.api.scala 的成员

我正在使用 scala 2.11 和 Flink 1.0.1。

这是我的导入声明:import org.apache.flink.streaming.api.scala.extensions._

这是我的 pom.xml:

0 投票
2 回答
1201 浏览

apache-flink - Flink 自定义触发器给出意外的输出

我想创建一个Trigger第一次在 20 秒内触发,之后每五秒触发一次。我用过GlobalWindows和一个习惯Trigger

这是中的代码TradeTrigger

所以基本上,when flagis false,即第一次,Trigger应该在 20 秒内触发并将 设置flagtrue。从下一次开始,它应该每 5 秒触发一次。

我面临的问题是,每次Trigger触发时我只会在输出中收到一条消息。也就是说,我在 20 秒后收到一条消息,每五秒收到一条消息。我预计每次触发时输出中有 20 条消息。

如果我使用.timeWindow(Time.seconds(5))并创建一个 5 秒的时间窗口,则每 5 秒输出 20 条消息。请帮助我正确获取此代码。有什么我想念的吗?

0 投票
1 回答
344 浏览

apache-flink - 使用 Apache Flink,我如何按时加入两个流?

我有两个流。它们都是超过 1 小时窗口的聚合数据。我想压缩这些流,以便将同一时间跨度上的聚合元组在一起,如果现在存在这样的对应匹配,则可能具有空值。

我怎样才能做到这一点?

0 投票
1 回答
1462 浏览

apache-flink - 如何处理 Flink 中的应用程序错误

我目前想知道如何处理 Apache Flink 流应用程序中的应用程序错误。一般来说,我看到两种情况:

  1. 暂时性错误,您希望重播输入数据并且处理可能在第二次尝试时成功。一个示例是对暂时不可用的外部服务的依赖。
  2. 永久性错误,重复处理仍会失败;例如无效的输入数据。

对于第一种情况,常见的解决方案似乎只是抛出一些异常。或者有没有更好的方法,例如一种特殊类型的异常,用于更有效地处理,例如FailedException来自 Apache Storm Trident(请参阅Storm Trident 拓扑中的错误处理)。

对于永久性错误,我在网上找不到任何信息。例如,一个map()操作总是必须返回一些东西,因此不能像在 Trident 中那样默默地丢弃消息

有哪些可用的 API 或最佳实践?谢谢你的帮助。

0 投票
0 回答
278 浏览

apache-flink - flink流 - cep找不到以前的共享缓冲区条目与密钥

我尝试在 Flink 上运行 cep,并从本地路径获取测试数据,一开始,我将文件大小设置为 1G 左右,它运行良好。但是当我将文件大小设置为 10G 时,出现了下面的问题.

这是我的代码。谢谢帮助

0 投票
1 回答
1040 浏览

scala - 无法将()自定义函数应用于 Flink 上的 WindowedStream

我一直在尝试为 Window 的 apply() 方法编写自定义逻辑。基本上我想减少窗口中的所有元素,然后将时间戳附加到该值,所以我从 DataStream 创建了一个 WindowedStream,但是当我尝试为 apply() 定义函数时,它在编译时失败。

这是代码:

DataStream 的类型是 [Int, String, Int],键是 [Int, String]。没有 apply() 的代码运行和编译没有错误,但是当我输入:

当它失败并且无法编译时,给出错误:

0 投票
1 回答
2689 浏览

apache-flink - 对 Apache Flink 中的两个消息流使用相同的接收器

我们有两种消息传到 Flink

  1. 控制消息 -> 仅滚动文件
  2. 数据消息 -> 将使用 sink 存储在 S3

我们对两个消息都有单独的源流。我们将相同的接收器附加到两个流。我们要做的是广播控制消息,以便所有并行运行的接收器都应该接收它。

下面是相同的代码:

但我观察到的是,它正在创建 4 个接收器实例,并且控制消息仅广播到 2 个接收器(由控制消息流创建)。所以我的理解是两个流都应该通过相同的运算符链来执行我们不希望的操作,因为数据消息会有多个转换。我们已经编写了自己的接收器,如果它是控制消息,它将读取消息,然后它只会滚动文件。

示例代码:

输出:

我们可以看到 LASTNAME 值不正确,它被每条记录的 FIRSTNAME 值替换

0 投票
1 回答
857 浏览

flink-streaming - 无法解析符号 ValueState

我正在尝试使用 flink-streaming 状态后端,遵循本指南:https ://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html ,但我收到错误:无法解析符号“ValueState”

看了一会儿,我意识到ValueState不在我的依赖项中。相反,只有OperatorState在 org.apache.flink.api.common.state (flink-core) 中。

但是,如果我查看 Github,我会在该包中看到 ValueState:https ://github.com/apache/flink/tree/master/flink-core/src/main/java/org/apache/flink/api/common /状态

我猜我要么没有正确版本的 flink 以按照指南显示的方式使用 StateBackend,要么我有正确的版本,但 ValueState 已移至另一个 maven 依赖项。

下面是我的 pom.xml:

这是我的代码:

非常感谢您的帮助!

劳伦特。

0 投票
0 回答
920 浏览

apache-flink - Flink Streaming - 如何基于第一个(按事件时间戳)键控事件创建窗口

我正在使用 Flink 的 DataStream API 来处理带有翻滚窗口的键控流。

简化的问题如下:

输入事件对象示例:

  1. 我使用 assignTimestampsAndWatermarks 来使用事件时间戳
  2. 我使用 user_id 字段键入数据

.

现在我有了键控数据,我希望有 10 分钟的窗口,该窗口基于事件时间戳的最早事件,然后在该窗口上执行某种聚合。

如何创建具有静态大小(10 分钟)的窗口,该窗口将根据该窗口上该 user_id 的第一个开始,这意味着当窗口结束并且将再次看到 user_id 时,将创建一个新窗口。

我发现唯一能回答类似要求(但不完全是我需要的要求)是使用事件时间会话窗口(在写这个问题时不稳定,Flink 1.1 - https://ci.apache .org/projects/flink/flink-docs-master/apis/streaming/windows.html )

知道如何实现吗?

谢谢

0 投票
2 回答
1351 浏览

apache-flink - Apache Flink 联合运算符给出错误响应

我在两个DataStream通用记录类型上应用联合运算符。

输出:

如您所见,合并后 dataMessageGenericRecordStream 中的记录不正确。所有字段值都将被第一个字段值替换。