我创建了一个将数据从 S3(csv 文件)复制到 Redshift 的 Glue 作业。它工作并填充所需的表。
但是,我需要在此过程中清除表,因为在该过程完成后我留下了重复的记录。
我正在寻找一种方法来将此清除添加到 Glue 过程中。任何意见,将不胜感激。
谢谢。
我创建了一个将数据从 S3(csv 文件)复制到 Redshift 的 Glue 作业。它工作并填充所需的表。
但是,我需要在此过程中清除表,因为在该过程完成后我留下了重复的记录。
我正在寻找一种方法来将此清除添加到 Glue 过程中。任何意见,将不胜感激。
谢谢。
@frobinrobin 提供的链接已过时,我多次尝试即使您提供错误的语法也会跳过 preactions 语句,并出现重复的行(插入操作已执行!)
尝试这个:
只需替换
glueContext.write_dynamic_frame.from_jdbc_conf()
上面链接中的语法即可glueContext.write_dynamic_frame_from_jdbc_conf()
!
至少这对我有帮助(AWS Glue 作业只需将数据插入 Redshift 而不执行截断表操作)
您可以更改 Glue 脚本以在插入之前执行“预操作”,如下所述:
https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
例如,对于主要基于默认值的脚本,我在最后一个 DataSink 之前插入了一个新的 DataSink(我用 {things} 替换了我的一些细节):
## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{DBTABLE}", "database": "{DBNAME}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift-data-live", connection_options = {"preactions":"truncate table {TABLENAME};","dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink5
## @inputs: [frame = datasink4]
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasink4, catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
您需要修改 Glue 提供的自动生成代码。使用 spark jdbc 连接连接到 redshift 并执行清除查询。
在 redshift VPC 中启动 Glue 容器;在粘合作业中指定连接,以获得对红移集群的访问权限。
希望这可以帮助。
您看过Glue 中的作业书签吗?这是保持高水位标记的功能,仅适用于 s3。我不是 100% 确定,但它可能需要分区。
您可以使用 spark/Pyspark databricks 库在表的截断表之后进行追加(这比覆盖性能更好):
preactions = "TRUNCATE table <schema.table>"
df.write\
.format("com.databricks.spark.redshift")\
.option("url", redshift_url)\
.option("dbtable", redshift_table)\
.option("user", user)\
.option("password", readshift_password)\
.option("aws_iam_role", redshift_copy_role)\
.option("tempdir", args["TempDir"])\
.option("preactions", preactions)\
.mode("append")\
.save()