0

在本地运行管道时出现以下异常。提交云执行时也不例外。

谢谢, 格纳迪

INFO: Executing pipeline using the DirectPipelineRunner.
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for GroupedValues [GroupedValues]
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:606)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:583)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:327)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
at app.Main.main(Main.java:124)

代码大纲基本上是这样的:

PCollection<KV<MyKey, Iterable<MyValue>>> groupedByMyKey = ...
PCollection<KV<MyKey, MyAggregated>> aggregated = groupedByMyKey.apply(
        Combine.<MyKey, MyValue, MyAggregated>groupedValues(new Aggregator()));

聚合器类扩展CombineFn<MyValue, List<MyValue>, MyAggregated>

4

2 回答 2

1

你能分享一个触发这个的代码片段吗?GroupedValues 是一种 PTransform,经常在各种组合变换中使用,因此它可能来自使用诸如 Min、Max 等之类的东西。

该错误意味着 DirectPipelineRunner 不知道如何评估 GroupedValues。但是,这是出乎意料的,因为它应该在执行之前扩展为 ParDo。

于 2014-12-29T16:46:30.093 回答
1

我找到了这种行为的原因

我正在使用命令行参数以远程模式(--runner=BlockingDataflowPipelineRunner)运行它,然后强制它在本地运行

PipelineRunner<?> runner = DirectPipelineRunner.fromOptions(options);
runner.run(p);

删除这些行并仅使用--runner=DirectPipelineRunner参数后,它按预期工作。

于 2014-12-30T09:56:13.837 回答