我正在尝试执行下面的代码,其中我使用命名元组进行 PCollection 和 SQL 转换来进行简单的选择。
根据视频链接 (4:06):https ://www.youtube.com/watch?v=zx4p-UNSmrA 。
除了在 SQLTransform 查询中使用 PCOLLECTION 之外,还可以提供命名的 PCollections,如下所示。
代码块
class EmployeeType(typing.NamedTuple):
name:str
age:int
beam.coders.registry.register_coder(EmployeeType, beam.coders.RowCoder)
pcol = p | "Create" >> beam.Create([EmployeeType(name="ABC", age=10)]).with_output_types(EmployeeType)
(
{'a':pcol} | SqlTransform(
""" SELECT age FROM a """)
| "Map" >> beam.Map(lambda row: row.age)
| "Print" >> beam.Map(print)
)
p.run()
但是,下面的代码块出错并出现错误
Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.validate.SqlValidatorException: Object 'a' not found
使用的 Apache Beam SDK 是 2.35.0,使用命名 PCollection 是否有任何已知限制