Global outage event
If you’re experiencing issues with your AWS services, then please refer to the AWS Health Dashboard. You can find the overall status of ongoing outages, the health of AWS services, and the latest updates from AWS engineers.
在 AWS Glue 作业中写入数据之前或之后如何在 Amazon Redshift 表上运行 SQL 命令?
我有一个 AWS Glue 作业,用于将数据加载到 Amazon Redshift 表中。我想在 AWS Glue 作业完成之前或之后在 Amazon Redshift 上运行 SQL 命令。
解决方法
在 AWS Glue DynamicFrameWriter 类中传递以下参数以进行授权:
- **aws_iam_role:**提供访问另一个 AWS 资源中的数据的授权。将此参数与附加到 Amazon Redshift 集群的 AWS Identity and Access Management (IAM) 角色的完全指定 ARN 一起使用。例如,使用 arn:aws:iam::123456789012:role/redshift_iam_role。有关详细信息,请参阅授权参数。
在 AWS Glue DynamicFrameWriter 类中传递以下一个或多个参数:
- **preactions:**在 COPY 命令之前运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 将抛出异常。
**注意:**preaction 参数不能包含换行符。 - **postactions:**在成功运行 COPY 命令后运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 将抛出异常。
**注意:**postaction 参数不能包含换行符。 - **extracopyoptions:**在 Amazon Redshift COPY 命令加载数据时附加到该命令的其他选项列表。例如,您可以使用 TRUNCATECOLUMNS 或 MAXERROR。
示例场景
截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中
使用 preactions 参数。
Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.target_table", "database" -> "redshiftdb", "preactions" -> "truncate table schema.target_table;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
在以上示例中,请替换以下值:
- **test_red:**要使用的目录连接。
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
- **s3://s3path:**Amazon Redshift 表的临时目录路径。
在连接选项中使用 IAM 角色
凭证将在 1 小时后过期。请在连接选项中使用 IAM 角色,以便长时间运行的连接不会失败。
Python 示例:
glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})
Scala 示例:
val connectionOptions = JsonOptions(Map( "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database", "dbtable" -> "schema.table", "user" -> "redshift_user", "password" -> "redshift_password", "tempdir" -> "s3://temp_bucket/temp", "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" )) val dyf = glueContext.getSource("redshift", connectionOptions) .getDynamicFrame()
将 Amazon Redshift 表合并到 AWS Glue 中 (upsert)
将数据加载到暂存表后,创建合并查询。
**注意:**要使合并查询生效,target_table 必须已经存在于您的 Amazon Redshift 数据库中。
Python 示例:
post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.stage_table", "database" -> "redshiftdb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
在以上示例中,请替换以下值:
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
- **test_red:**要使用的目录连接。
- **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表。
- **s3://s3path:**Amazon Redshift 表的临时目录路径。
忽略无效的行
使用 extracopyoptions 参数指定更高的 MAXERROR 值。
Python 示例:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 示例:
val options = JsonOptions(Map( "dbtable" -> "schema.load_table", "database" -> "reddb", "extracopyoptions" -> "MAXERROR 2" ))
在以上示例中,请替换以下值:
- **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
- **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表。
- **test:**要使用的目录连接。
- **testalblog2:**要将数据加载到的 Amazon Redshift 表。
- **reddb:**Amazon Redshift 数据库。
- **emp1:**当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表。
- **s3://s3path:**Amazon Redshift 表的临时目录路径。
其他信息
在使用 AWS Glue 4.0 ETL 作业时,您可以使用 Amazon Redshift Spark 连接器 (redshift-jdbc42-2.1.0.9)。此连接器具有以下属性:
- 支持基于 IAM 的 JDBC URL。
- 包括性能改进选项,例如 autopushdown、autopushdown.s3_result_cache 和 unload_s3_format。
- 包括可用于临时文件夹中的数据的 SSE_KMS 加密选项。AWS Glue 在读取 Amazon Redshift 表时会使用此数据。
