5

非常感谢您的帮助!!!

代码:

from pyflink.common.typeinfo import RowTypeInfo, Types, BasicTypeInfo, TupleTypeInfo
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# stream 模式的env创建
env_settings_stream = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
env_stream = StreamTableEnvironment.create(environment_settings=env_settings_stream)

table1 = env_stream.from_elements([(1, 23.4, 'lili'), (2, 33.4, 'er'), (3, 45.6, 'yu')], ['id', 'order_amt', 'name'])
table2 = env_stream.from_elements([(1, 43.4, 'xixi'), (2, 53.4, 'rr'), (3, 65.6, 'ww')], ['id2', 'order_amt2', 'name'])

# types: List[TypeInformation], field_names: List[str]
# row_type_info = RowTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()], ['id', 'order_amt', 'name'])
row_type_info = TupleTypeInfo([BasicTypeInfo.STRING_TYPE_INFO(), BasicTypeInfo.FLOAT_TYPE_INFO(), BasicTypeInfo.STRING_TYPE_INFO()])


stream = env_stream.to_append_stream(table1, row_type_info)

错误信息:

Traceback (most recent call last):
  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/Users/hulc/anaconda3/envs/myenv_3_6/lib/python3.6/site-packages/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o4.toAppendStream. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method toAppendStream([class org.apache.flink.table.api.internal.TableImpl, class org.apache.flink.api.java.typeutils.TupleTypeInfo]) does not exist
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:274)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

环境:

  1. apache-flink 1.12.0(python flink)
  2. py4j 0.10.8.1(当pip3安装apache-flink时,py4j会自动安装依赖)
  3. python 3.7(蟒蛇)
  4. pycharm 2020.1.1 版本
  5. macOS 11.1

调试信息图 1:

调试信息图像 1

调试信息图 2:

调试信息图像 2

繁殖步骤:

  1. 相同环境,本地运行代码(本地模式)
  2. 代码行处的断点:“stream = env_stream.to_append_stream(table1, row_type_info)”
  3. debug运行,断点会触发两次,第一次找不到toAppendStream方法,第二次找到toAppendStream方法。但是第一次提出异常。
4

0 回答 0