在 AWS Glue 作业中写入数据之前或之后如何在 Amazon Redshift 表上运行 SQL 命令?
3 分钟阅读
0
我有一个 AWS Glue 作业,用于将数据加载到 Amazon Redshift 表中。我想在 AWS Glue 作业完成之前或之后在 Amazon Redshift 上运行 SQL 命令。
解决方法
在 AWS Glue DynamicFrameWriter 类中传递以下参数以进行授权:
- **aws_iam_role:**提供访问另一个 AWS 资源中的数据的授权。将此参数与附加到 Amazon Redshift 集群的 AWS Identity and Access Management (IAM) 角色的完全指定 ARN 一起使用。例如,使用 arn:aws:iam::123456789012:role/redshift_iam_role。有关详细信息,请参阅授权参数。
在 AWS Glue DynamicFrameWriter 类中传递以下一个或多个参数:
- **preactions:**在 COPY 命令之前运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 将抛出异常。
**注意:**preaction 参数不能包含换行符。 - **postactions:**在成功运行 COPY 命令后运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 将抛出异常。
**注意:**postaction 参数不能包含换行符。 - **extracopyoptions:**在 Amazon Redshift COPY 命令加载数据时附加到该命令的其他选项列表。例如,您可以使用 TRUNCATECOLUMNS 或 MAXERROR。
示例场景
截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中
使用 preactions 参数。
Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.target_table", "database" -> "redshiftdb", "preactions" -> "truncate table schema.target_table;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
在以上示例中,请替换以下值:
- **test_red:**要使用的目录连接。
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
- **s3://s3path:**Amazon Redshift 表的临时目录路径。
在连接选项中使用 IAM 角色
凭证将在 1 小时后过期。请在连接选项中使用 IAM 角色,以便长时间运行的连接不会失败。
Python 示例:
glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})
Scala 示例:
val connectionOptions = JsonOptions(Map( "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database", "dbtable" -> "schema.table", "user" -> "redshift_user", "password" -> "redshift_password", "tempdir" -> "s3://temp_bucket/temp", "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" )) val dyf = glueContext.getSource("redshift", connectionOptions) .getDynamicFrame()
将 Amazon Redshift 表合并到 AWS Glue 中 (upsert)
将数据加载到暂存表后,创建合并查询。
**注意:**要使合并查询生效,target_table 必须已经存在于您的 Amazon Redshift 数据库中。
Python 示例:
post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.stage_table", "database" -> "redshiftdb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
在以上示例中,请替换以下值:
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
- **test_red:**要使用的目录连接。
- **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表。
- **s3://s3path:**Amazon Redshift 表的临时目录路径。
忽略无效的行
使用 extracopyoptions 参数指定更高的 MAXERROR 值。
Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.load_table", "database" -> "reddb", "extracopyoptions" -> "MAXERROR 2" ))
在以上示例中,请替换以下值:
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
- **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表。
- **test:**要使用的目录连接。
- **testalblog2:**要将数据加载到的 Amazon Redshift 表。
- **reddb:**Amazon Redshift 数据库。
- **emp1:**当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表。
- **s3://s3path:**Amazon Redshift 表的临时目录路径。
其他信息
在使用 AWS Glue 4.0 ETL 作业时,您可以使用 Amazon Redshift Spark 连接器 (redshift-jdbc42-2.1.0.9)。此连接器具有以下属性:
- 支持基于 IAM 的 JDBC URL。
- 包括性能改进选项,例如 autopushdown、autopushdown.s3_result_cache 和 unload_s3_format。
- 包括可用于临时文件夹中的数据的 SSE_KMS 加密选项。AWS Glue 在读取 Amazon Redshift 表时会使用此数据。
相关信息
没有评论
相关内容
- AWS 官方已更新 4 个月前
- AWS 官方已更新 3 个月前