0

我正在尝试执行下面的代码,其中我使用命名元组进行 PCollection 和 SQL 转换来进行简单的选择。

根据视频链接 (4:06):https ://www.youtube.com/watch?v=zx4p-UNSmrA 。

除了在 SQLTransform 查询中使用 PCOLLECTION 之外,还可以提供命名的 PCollections,如下所示。

代码块

class EmployeeType(typing.NamedTuple):
    name:str
    age:int

beam.coders.registry.register_coder(EmployeeType, beam.coders.RowCoder)

pcol = p | "Create" >> beam.Create([EmployeeType(name="ABC", age=10)]).with_output_types(EmployeeType)

(
{'a':pcol} | SqlTransform(
    """ SELECT age FROM a """) 
    | "Map" >> beam.Map(lambda row: row.age)
    | "Print" >> beam.Map(print) 
)

p.run()

但是,下面的代码块出错并出现错误

Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'a' not found

使用的 Apache Beam SDK 是 2.35.0,使用命名 PCollection 是否有任何已知限制

4

0 回答 0