1

(1)有一个 BigQuery 源表,例如 ...

column_name | is_nullable | data_type

OrderId     | YES         | STRING
items       | NO          | ARRAY<STRUCT<articleId STRING, quantity FLOAT64INT64>>

从理性表的角度来看,“OrderId”应该是键。

(2)现在我想将 ARRAY/STRUCT 记录规范化为单独的表。为了实现这一点,我使用了变换“牧马人”。

注意:它是 Data Fusion 工作室传输部分的“牧马人”!当尝试通过汉堡菜单打开“牧马人”并选择 BQ 源表时,它告诉您:不支持 BigQuery 类型 STRUCT。

源表的输出链接到 Wrangler 的输入。

在牧马人我定义...

  • 输入字段名称:*
  • 前提条件:假
  • 指令/配方:保持 combiOrderId,items,articleId,quantity
  • Output Schema (Name | Type | Null): -- (根据源表,下面附上JSON) combiOrderId | string | yes items | array | no record [ {articleId | string | yes}, {quantity | float | yes} ]

牧马人参数屏幕

(3) BQ sink table 以 Wrangler Output 作为 Input Schema,我将最终 schema 定义为 (Name | Type | Null) combiOrderId | string | yes articleId | string | yes quantity | float | yes

现在,在运行管道(预览模式)时,将记录以下错误消息:

转换为输出记录的问题。原因:无法解码数组“项目”

(下面的完整消息)

任何提示或替代解决方案都将受到欢迎:-)

谢谢你。

牧马人输出模式的 JSON:

[
    {
        "name": "etlSchemaBody",
        "schema": {
            "type": "record",
            "name": "etlSchemaBody",
            "fields": [
                {
                    "name": "combiOrderId",
                    "type": [
                        "string",
                        "null"
                    ]
                },
                {
                    "name": "items",
                    "type": {
                        "type": "array",
                        "items": {
                            "type": "record",
                            "name": "a6adafef5943d4757b2fad43a10732952",
                            "fields": [
                                {
                                    "name": "articleId",
                                    "type": [
                                        "string",
                                        "null"
                                    ]
                                },
                                {
                                    "name": "quantity",
                                    "type": [
                                        "float",
                                        "null"
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
]

完整(第一个)错误日志:

java.lang.Exception: Stage:Normalize-items - Reached error threshold 1, terminating processing due to error : Problem converting into output record. Reason : Unable to decode array 'items'
	at io.cdap.wrangler.Wrangler.transform(Wrangler.java:412) ~[1576661389534-0/:na]
	at io.cdap.wrangler.Wrangler.transform(Wrangler.java:94) ~[1576661389534-0/:na]
	at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.1.0.jar:na]
	at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.1.0.jar:na]
	at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.1.0.jar:na]
	at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.1.0.jar:na]
	at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:50) ~[hydrator-spark-core2_2.11-6.1.0.jar:na]
	at io.cdap.cdap.etl.spark.Compat$FlatMapAdapter.call(Compat.java:126) ~[hydrator-spark-core2_2.11-6.1.0.jar:na]
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.3.jar:2.3.3]
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) ~[scala-library-2.11.8.jar:na]
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) ~[scala-library-2.11.8.jar:na]
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127) ~[spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139) [spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.3.jar:2.3.3]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.3.jar:2.3.3]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
Caused by: io.cdap.wrangler.api.RecipeException: Problem converting into output record. Reason : Unable to decode array 'items'
	at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:102) ~[wrangler-core-4.1.3.jar:na]
	at io.cdap.wrangler.Wrangler.transform(Wrangler.java:384) ~[1576661389534-0/:na]
	... 25 common frames omitted
Caused by: io.cdap.wrangler.utils.RecordConvertorException: Unable to decode array 'items'
	at io.cdap.wrangler.utils.RecordConvertor.decodeArray(RecordConvertor.java:382) ~[wrangler-core-4.1.3.jar:na]
	at io.cdap.wrangler.utils.RecordConvertor.decode(RecordConvertor.java:142) ~[wrangler-core-4.1.3.jar:na]
	at io.cdap.wrangler.utils.RecordConvertor.decodeUnion(RecordConvertor.java:368) ~[wrangler-core-4.1.3.jar:na]
	at io.cdap.wrangler.utils.RecordConvertor.decode(RecordConvertor.java:152) ~[wrangler-core-4.1.3.jar:na]
	at io.cdap.wrangler.utils.RecordConvertor.decodeRecord(RecordConvertor.java:85) ~[wrangler-core-4.1.3.jar:na]
	at io.cdap.wrangler.utils.RecordConvertor.toStructureRecord(RecordConvertor.java:56) ~[wrangler-core-4.1.3.jar:na]
	at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:99) ~[wrangler-core-4.1.3.jar:na]
	... 26 common frames omitted

4

1 回答 1

0

添加评论作为答案。

关于调试错误: 检查错误可能是什么的最简单方法是从 Wrangler 导航。您可以按照以下步骤执行此操作,

  1. 转到牧马人连接:/cdap/ns/default/connections
  2. 点击 BQ 源(或创建 BigQuery 连接)
  3. 导航到 BQ 表并单击它。
  4. 这将带您进入 wrangler 工作区(选项卡式视图)
  5. 从那里您可以应用所有转换并单击“创建管道”

在此之后,您应该看到您的源和牧马人转换已经配置。然后,您可以添加一个接收器并运行预览以测试是否有问题

要解决您的另一点:牧马人仅支持 BQ 源中的数组类型。它不支持从 BigQuery 读取 STRUCT 类型。我的猜测就是这就是你看到这个问题的原因。issues.cask.co/browse/CDAP-15665

于 2019-12-23T18:28:16.160 回答