4

我创建了一个将数据从 S3(csv 文件)复制到 Redshift 的 Glue 作业。它工作并填充所需的表。

但是,我需要在此过程中清除表,因为在该过程完成后我留下了重复的记录。

我正在寻找一种方法来将此清除添加到 Glue 过程中。任何意见,将不胜感激。

谢谢。

4

5 回答 5

2

@frobinrobin 提供的链接已过时,我多次尝试即使您提供错误的语法也会跳过 preactions 语句,并出现重复的行(插入操作已执行!)

尝试这个:

只需替换 glueContext.write_dynamic_frame.from_jdbc_conf()上面链接中的语法即可glueContext.write_dynamic_frame_from_jdbc_conf()

至少这对我有帮助(AWS Glue 作业只需​​将数据插入 Redshift 而不执行截断表操作)

于 2020-12-29T01:44:31.483 回答
2

您可以更改 Glue 脚本以在插入之前执行“预操作”,如下所述:

https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

例如,对于主要基于默认值的脚本,我在最后一个 DataSink 之前插入了一个新的 DataSink(我用 {things} 替换了我的一些细节):

## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{DBTABLE}", "database": "{DBNAME}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift-data-live", connection_options = {"preactions":"truncate table {TABLENAME};","dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink5
## @inputs: [frame = datasink4]
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasink4, catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
于 2020-12-01T22:35:29.683 回答
0

您需要修改 Glue 提供的自动生成代码。使用 spark jdbc 连接连接到 redshift 并执行清除查询。

在 redshift VPC 中启动 Glue 容器;在粘合作业中指定连接,以获得对红移集群的访问权限。

希望这可以帮助。

于 2018-05-04T17:20:37.293 回答
0

您看过Glue 中的作业书签吗?这是保持高水位标记的功能,仅适用于 s3。我不是 100% 确定,但它可能需要分区。

于 2018-04-13T18:17:22.177 回答
-1

您可以使用 spark/Pyspark databricks 库在表的截断表之后进行追加(这比覆盖性能更好):

preactions = "TRUNCATE table <schema.table>" 
df.write\
  .format("com.databricks.spark.redshift")\
  .option("url", redshift_url)\
  .option("dbtable", redshift_table)\
  .option("user", user)\
  .option("password", readshift_password)\
  .option("aws_iam_role", redshift_copy_role)\
  .option("tempdir", args["TempDir"])\
  .option("preactions", preactions)\
  .mode("append")\
  .save()

您可以在此处查看 databricks 文档

于 2018-11-15T17:39:03.090 回答