2

考虑scala代码:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.{GlueArgParser, Job, JsonOptions}
import org.apache.spark.SparkContext

import scala.collection.JavaConverters.mapAsJavaMapConverter

object MyGlueJob {

  def main(sysArgs: Array[String]) {
    val spark: SparkContext = SparkContext.getOrCreate()
    val glueContext: GlueContext = new GlueContext(spark)

    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)

    val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table")
      .getDynamicFrame()

    val processed = input.applyMapping(
      Seq(
        ("id",                                        "string", "id", "string"),
        ("my_date",                                   "string", "my_date", "string")
      ))
    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = ""
    ).writeDynamicFrame(processed)
    Job.commit
  }
}

输入是按日期列分区的具有 gzip 压缩的分区 json 文件。一切正常- 数据以 json 格式读取并用 orc 编写。

但是当尝试使用相同的数据运行作业时,它会再次读取它并写入重复的数据。此作业中启用了书签。方法Job.initJob.commit被调用。怎么了?

更新

我在and中添加了一个transformationContext参数:getCatalogSourcegetSinkWithFormat

        val input = glueContext
      .getCatalogSource(database = "my_data_base", tableName = "my_json_gz_partition_table", transformationContext = "transformationContext1")
      .getDynamicFrame()

和:

    glueContext.getSinkWithFormat(
      connectionType = "s3",
      options = JsonOptions(Map("path" -> "s3://my_path", "partitionKeys" -> List("my_date"))),
      format = "orc", transformationContext = "transformationContext2"
    ).writeDynamicFrame(processed)

现在魔术以这种方式“起作用”:

  1. 第一次运行 - 好的
  2. 第二次运行(使用相同的数据或相同的数据和新的数据) - 它失败并出现错误(稍后)

在第二次(和后续)运行之后再次发生错误。该消息也Skipping Partition {"my_date": "2017-10-10"}出现在日志中。

ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType(); org.apache.spark.sql.AnalysisException: Partition column my_date not found in schema StructType();
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1$$anonfun$apply$11.apply(PartitioningUtils.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:438)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$partitionColumnsSchema$1.apply(PartitioningUtils.scala:437)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.partitionColumnsSchema(PartitioningUtils.scala:437)
at org.apache.spark.sql.execution.datasources.PartitioningUtils$.validatePartitionColumn(PartitioningUtils.scala:420)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:443)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:123)
at MobileArcToRaw$.main(script_2018-01-18-08-14-38.scala:99)

胶水书签到底是怎么回事???哦

4

3 回答 3

1

您是否尝试将transformationContext源和接收器的值设置为相同?它们当前在您上次更新中设置为不同的值。

transformationContext = "transformationContext1"

transformationContext = "transformationContext2"

我也一直在努力解决这个问题,使用胶水和书签。我正在尝试执行一项类似的任务,在其中读取按年、月和日分区的分区 JSON 文件,每天都有新文件到达。我的工作运行转换以提取数据子集,然后沉入 S3 上的分区 Parquet 文件中。

我正在使用 Python,所以我的 DynamicFrame 的初始实例如下所示:

dyf = glue_context.create_dynamic_frame.from_catalog(database="dev-db", table_name="raw", transformation_ctx="raw")

最后像这样接收到 S3:

glue_context.write_dynamic_frame.from_options( frame=select_out, connection_type='s3', connection_options={'path': output_dir, 'partitionKeys': ['year', 'month', 'day']}, format='parquet', transformation_ctx="dev-transactions" )

最初,我运行了这项工作,并在启用书签的情况下正确生成了 Parquet。然后我添加了新的一天数据,更新了输入表上的分区并重新运行。第二个作业将失败并出现如下错误:

pyspark.sql.utils.AnalysisException: u"cannot resolve 'year' given input columns: [];;\n'Project ['year, 'month, 'day, 'data']

将 更改为transformation_ctx相同(dev-transactions在我的情况下)使该过程仅在处理增量分区并为新分区生成 Parquet 时才能正常工作。

关于书签的一般文档以及如何使用转换上下文变量的文档非常稀少。

Python 文档只是说:(https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html):

transformation_ctx – 要使用的转换上下文(可选)。

Scala 文档说(https://docs.aws.amazon.com/glue/latest/dg/glue-etl-scala-apis-glue-gluecontext.html):

transformationContext — 与作业书签要使用的接收器关联的转换上下文。默认设置为空。

我能观察到的最好的事情是,由于文档解释得不好,转换上下文用于在已处理的源数据和接收器数据之间形成联系,并且定义了不同的上下文会阻止书签按预期工作。

于 2018-03-13T17:19:57.550 回答
0

JobBookmarks 使用转换上下文来关闭给定 ETL 操作(主要是源)的状态。目前将它们放在水槽中没有任何影响。

启用作业书签时作业失败的原因之一是因为它们只处理增量数据(新文件),如果没有新数据,脚本会像没有数据时一样运行,这可能是火花分析异常举个例子。

因此,您不应在不同的 ETL 运算符中使用相同的转换上下文。

对于第一次运行后的测试,请尝试将新数据复制到源位置并再次运行作业,只应处理新数据。

于 2018-08-10T22:34:08.600 回答
0

看起来该作业第二次运行时,没有为您的目录找到新数据

val input = glueContext.getCatalogSource(...)
input.count
# Returns 0, your dynamic frame has no Schema associated to it
# hence the `Partition column my_date not found in schema StructType()`

input.schema.containsField("my_field")我建议在尝试映射/写入之前检查 DynamicFrame 的大小,或者您的分区字段是否存在于 DynamicFrame 的架构中。那时,您可以提交或不提交工作。

此外,如果您确定新数据将进入新分区上的该目录,您可以考虑运行 Crawler 来选择这些新分区,或者如果您不期望任何架构更改,则可以通过 API 创建它们。

希望这可以帮助。

于 2018-01-18T14:54:21.683 回答