0

我正在研究来自https://github.com/uncleguanghui/pyflink_learn的第一个 flink wordcount 示例。

我的环境是flink 1.12.0和ubuntu,flink在后台运行。

wordcount 示例相当简单。

import os
import shutil
from pyflink.table import BatchTableEnvironment, EnvironmentSettings
from pyflink.table import DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = BatchTableEnvironment.create(environment_settings=env_settings)

dir_word = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'word.csv')

t_env.execute_sql(f"""
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'file://{dir_word}',
        'format' = 'csv'
    )
""")

dir_result = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'result.csv')

if os.path.exists(dir_result):
    if os.path.isfile(dir_result):
        os.remove(dir_result)
    else:
        shutil.rmtree(dir_result, True)

t_env.execute_sql(f"""
    CREATE TABLE sink (
        word STRING,   -- word
        cnt BIGINT     -- cnt
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'file://{dir_result}',
        'format' = 'csv'
    )
""")

t_env.execute_sql("""
    INSERT INTO sink
    SELECT word
           , count(1) AS cnt
    FROM source
    GROUP BY word
""")

使用命令运行代码

flink run -m localhost:8081 -py batch.py

但是出现了一个问题

# flink run -m localhost:8081 -py batch.py
  File "batch1.py", line 24
    """)
      ^
SyntaxError: invalid syntax
org.apache.flink.client.program.ProgramAbortException
    at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:113)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)

错误指向这部分代码

t_env.execute_sql(f"""
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'file://{dir_word}',
        'format' = 'csv'
    )
""") // issue to happen

任何人都可以找出解决方案是什么?

4

1 回答 1

0

尝试使用他引用的表格 API 版本。如果有效,则意味着您的 python 版本较低。还要确保使用 pip install apache-flink 安装 pyflink。最后你可以直接问他有问题,他回答非常快。我也在研究他的例子并学习 pyflink 的东西,api 经常更新,确保也使用相同的版本 :)

于 2020-12-18T02:10:20.987 回答