问题标签 [pyflink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
162 浏览

pyflink - 如何使用 table.where() 过滤 PyFlink 中的子字段?

我正在使用 pyflink 和 Flink 11.2,并且我已经像这样定义了我的表:

我想使用 table.where 过滤子字段:

到目前为止,我所做的一切都导致了错误,这就是其中之一:

它尝试e在 d 上执行函数,而不是检索子字段。

0 投票
1 回答
116 浏览

apache-flink - 运行 flink 1.12.0 wordcount 示例中的语法无效?

我正在研究来自https://github.com/uncleguanghui/pyflink_learn的第一个 flink wordcount 示例。

我的环境是flink 1.12.0和ubuntu,flink在后台运行。

wordcount 示例相当简单。

使用命令运行代码

但是出现了一个问题

错误指向这部分代码

任何人都可以找出解决方案是什么?

0 投票
2 回答
120 浏览

python - flink run -py 和 python run 的区别

我最近在学习pyflink,但我有点困惑。我们知道 pyflink table API 将流/批处理转换为 table 并对其进行一些工作,最后下沉到你想要的地方。但是,有几种方法可以创建表环境:

对于批处理示例:

但我发现其中一个仅适用于 flink run -py batch.py​​,其他一些样式可以作为 python batch.py​​ 运行以进行本地运行。

我的问题是 executionEnvironment 和 BatchTableEnvironment/StreamTableEnvironment 的区别是什么,什么时候可以使用 flink run -py 以及什么时候可以通过安装了 pyflink 的 python 解释器运行?

谢谢

0 投票
1 回答
195 浏览

pandas - 将 Flink 动态表转换为 Pandas 数据框

我正在使用 pyflink table api 从 Kafka 读取数据。现在我想将结果表转换为 Pandas 数据框。这是我的代码,

但在这里我既没有得到错误也没有得到结果。我正在使用 Flink 1.11.3。有没有办法将此动态表转换为静态表或使 table.to_pandas() 工作的东西?

0 投票
1 回答
148 浏览

apache-flink - 如何在 Flink Web UI 中上传 python 文件?

我只能看到上传 jar 文件的按钮。

我想知道如何在 Flink Web UI 中上传 Python 文件?

0 投票
2 回答
161 浏览

apache-flink - Pyflink:“JavaPackage”对象不可调用

当我使用以下代码在 Flink CLI 中运行 Python 文件时:

我得到这样的错误:

我改变了运行这个 Python 文件的方式:

我得到另一个错误:

0 投票
0 回答
650 浏览

apache-flink - Apache Flink 1.11 流式接收器到 S3

我正在使用 Flink FileSystem SQL 连接器从 Kafka 读取事件并写入 S3(使用 MinIo)。这是我的代码,

我正在使用 Flink 1.11.3 和 flink-s3-fs-hadoop 1.11.3。我已将 flink-s3-fs-hadoop-1.11.3.jar 复制到 plugins 文件夹中。

我还在 flink-conf.yaml 中添加了以下配置。

MinIo 运行正常,我在 MinIo 中创建了“测试桶”。当我运行这个作业时,作业提交不会发生,Flink Dashboard 进入某种等待状态。15-20 分钟后,我得到以下异常,

这里似乎有什么问题?

0 投票
0 回答
244 浏览

apache-flink - pyflink(flink) 1.12.0 通过 to_append_stream (java api is: toAppendStream) 将表转换为数据流时出现错误

非常感谢您的帮助!!!

代码:

错误信息:

环境:

  1. apache-flink 1.12.0(python flink)
  2. py4j 0.10.8.1(当pip3安装apache-flink时,py4j会自动安装依赖)
  3. python 3.7(蟒蛇)
  4. pycharm 2020.1.1 版本
  5. macOS 11.1

调试信息图 1:

调试信息图像 1

调试信息图 2:

调试信息图像 2

繁殖步骤:

  1. 相同环境,本地运行代码(本地模式)
  2. 代码行处的断点:“stream = env_stream.to_append_stream(table1, row_type_info)”
  3. debug运行,断点会触发两次,第一次找不到toAppendStream方法,第二次找到toAppendStream方法。但是第一次提出异常。
0 投票
1 回答
531 浏览

python - 使用本地系统中的 pyflink 以批处理模式读取 csv 文件

我试图在编写 pyflink 作业时读取已建立的 csv 文件。我正在使用文件系统连接器来获取数据,但是在 ddl 上执行 execute_sql() 并稍后对表进行查询后,我收到一个错误,说明它无法获取下一个结果。我无法解决此错误。我检查了 csv 文件,它完全正确并且可以使用 pandas,但在这里我不知道为什么它无法获取下一行。供参考,请找到随附的代码。

即将发生的错误:-

0 投票
1 回答
283 浏览

pyspark - 我可以将 PyFlink 与 PyTorch/Tensorflow/ScikitLearn/Xgboost/LightGBM 一起使用吗?

我正在探索 PyFlink,我想知道是否可以将 PyFlink 与 ML 工程师通常使用的所有这些 ML 库一起使用:PyTorch、Tensorflow、Scikit Learn、Xgboost、LightGBM 等。

根据这个 SO thread,PySpark 不能直接在 UDF 内部使用 Scikit Learn,因为 Scikit Learn 算法不是分布式实现的,而 Spark 是分布式运行的。

鉴于 PyFlink 类似于 PySpark,我猜答案可能是“否”。但我很想仔细检查,看看我需要做什么才能使 PyFlink 能够使用这些 ML 库定义 UDF。