(1)有一个 BigQuery 源表,例如 ...
column_name | is_nullable | data_type
OrderId | YES | STRING
items | NO | ARRAY<STRUCT<articleId STRING, quantity FLOAT64INT64>>
从理性表的角度来看,“OrderId”应该是键。
(2)现在我想将 ARRAY/STRUCT 记录规范化为单独的表。为了实现这一点,我使用了变换“牧马人”。
注意:它是 Data Fusion 工作室传输部分的“牧马人”!当尝试通过汉堡菜单打开“牧马人”并选择 BQ 源表时,它告诉您:不支持 BigQuery 类型 STRUCT。
源表的输出链接到 Wrangler 的输入。
在牧马人我定义...
- 输入字段名称:*
- 前提条件:假
- 指令/配方:保持 combiOrderId,items,articleId,quantity
- Output Schema (Name | Type | Null): -- (根据源表,下面附上JSON)
combiOrderId | string | yes items | array | no record [ {articleId | string | yes}, {quantity | float | yes} ]
(3) BQ sink table 以 Wrangler Output 作为 Input Schema,我将最终 schema 定义为 (Name | Type | Null)
combiOrderId | string | yes
articleId | string | yes
quantity | float | yes
现在,在运行管道(预览模式)时,将记录以下错误消息:
转换为输出记录的问题。原因:无法解码数组“项目”
(下面的完整消息)
任何提示或替代解决方案都将受到欢迎:-)
谢谢你。
牧马人输出模式的 JSON:
[
{
"name": "etlSchemaBody",
"schema": {
"type": "record",
"name": "etlSchemaBody",
"fields": [
{
"name": "combiOrderId",
"type": [
"string",
"null"
]
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "a6adafef5943d4757b2fad43a10732952",
"fields": [
{
"name": "articleId",
"type": [
"string",
"null"
]
},
{
"name": "quantity",
"type": [
"float",
"null"
]
}
]
}
}
}
]
}
}
]
完整(第一个)错误日志:
java.lang.Exception: Stage:Normalize-items - Reached error threshold 1, terminating processing due to error : Problem converting into output record. Reason : Unable to decode array 'items'
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:412) ~[1576661389534-0/:na]
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:94) ~[1576661389534-0/:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.lambda$transform$5(WrappedTransform.java:90) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.common.plugin.Caller$1.call(Caller.java:30) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.common.plugin.WrappedTransform.transform(WrappedTransform.java:89) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.common.TrackedTransform.transform(TrackedTransform.java:74) ~[cdap-etl-core-6.1.0.jar:na]
at io.cdap.cdap.etl.spark.function.TransformFunction.call(TransformFunction.java:50) ~[hydrator-spark-core2_2.11-6.1.0.jar:na]
at io.cdap.cdap.etl.spark.Compat$FlatMapAdapter.call(Compat.java:126) ~[hydrator-spark-core2_2.11-6.1.0.jar:na]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:125) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.3.jar:2.3.3]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.3.jar:2.3.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]
Caused by: io.cdap.wrangler.api.RecipeException: Problem converting into output record. Reason : Unable to decode array 'items'
at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:102) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.Wrangler.transform(Wrangler.java:384) ~[1576661389534-0/:na]
... 25 common frames omitted
Caused by: io.cdap.wrangler.utils.RecordConvertorException: Unable to decode array 'items'
at io.cdap.wrangler.utils.RecordConvertor.decodeArray(RecordConvertor.java:382) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decode(RecordConvertor.java:142) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decodeUnion(RecordConvertor.java:368) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decode(RecordConvertor.java:152) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.decodeRecord(RecordConvertor.java:85) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.utils.RecordConvertor.toStructureRecord(RecordConvertor.java:56) ~[wrangler-core-4.1.3.jar:na]
at io.cdap.wrangler.executor.RecipePipelineExecutor.execute(RecipePipelineExecutor.java:99) ~[wrangler-core-4.1.3.jar:na]
... 26 common frames omitted