在 AWS Glue 作业中写入数据之前或之后如何在 Amazon Redshift 表上运行 SQL 命令?

3 分钟阅读
0

我有一个 AWS Glue 作业,用于将数据加载到 Amazon Redshift 表中。我想在 AWS Glue 作业完成之前或之后在 Amazon Redshift 上运行 SQL 命令。

解决方法

在 AWS Glue DynamicFrameWriter 类中传递以下参数以进行授权:

  • **aws_iam_role:**提供访问另一个 AWS 资源中的数据的授权。将此参数与附加到 Amazon Redshift 集群的 AWS Identity and Access Management (IAM) 角色的完全指定 ARN 一起使用。例如,使用 arn:aws:iam::123456789012:role/redshift_iam_role。有关详细信息,请参阅授权参数

在 AWS Glue DynamicFrameWriter 类中传递以下一个或多个参数:

  • **preactions:**在 COPY 命令之前运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 将抛出异常。
    **注意:**preaction 参数不能包含换行符。
  • **postactions:**在成功运行 COPY 命令后运行的 SQL 命令的分号分隔列表。如果命令失败,Amazon Redshift 将抛出异常。
    **注意:**postaction 参数不能包含换行符。
  • **extracopyoptions:**在 Amazon Redshift COPY 命令加载数据时附加到该命令的其他选项列表。例如,您可以使用 TRUNCATECOLUMNS 或 MAXERROR。

示例场景

截断一个 Amazon Redshift 表后再将记录插入到 AWS Glue 中

使用 preactions 参数。

Python 示例:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala 示例:

val options = JsonOptions(Map(   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

在以上示例中,请替换以下值:

  • **test_red:**要使用的目录连接。
  • **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
  • **s3://s3path:**Amazon Redshift 表的临时目录路径。

在连接选项中使用 IAM 角色

凭证将在 1 小时后过期。请在连接选项中使用 IAM 角色,以便长时间运行的连接不会失败。

Python 示例:

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

Scala 示例:

val connectionOptions = JsonOptions(Map(      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

将 Amazon Redshift 表合并到 AWS Glue 中 (upsert)

将数据加载到暂存表后,创建合并查询

**注意:**要使合并查询生效,target_table 必须已经存在于您的 Amazon Redshift 数据库中。

Python 示例:

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala 示例:

val options = JsonOptions(Map(   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

在以上示例中,请替换以下值:

  • **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
  • **test_red:**要使用的目录连接。
  • **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表。
  • **s3://s3path:**Amazon Redshift 表的临时目录路径。

忽略无效的行

使用 extracopyoptions 参数指定更高的 MAXERROR 值。

Python 示例:

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala 示例:

val options = JsonOptions(Map(   "dbtable" -> "schema.load_table",
   "database" -> "reddb",
   "extracopyoptions" -> "MAXERROR 2"
   ))

在以上示例中,请替换以下值:

  • **schema.target_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 表。
  • **schema.stage_table:**Amazon Redshift 数据库的架构和 Amazon Redshift 暂存表。
  • **test:**要使用的目录连接。
  • **testalblog2:**要将数据加载到的 Amazon Redshift 表。
  • **reddb:**Amazon Redshift 数据库。
  • **emp1:**当数据加载到 testalblog2 后,要从中删除数据的 Amazon Redshift 表。
  • **s3://s3path:**Amazon Redshift 表的临时目录路径。

其他信息

在使用 AWS Glue 4.0 ETL 作业时,您可以使用 Amazon Redshift Spark 连接器 (redshift-jdbc42-2.1.0.9)。此连接器具有以下属性:

  • 支持基于 IAM 的 JDBC URL。
  • 包括性能改进选项,例如 autopushdownautopushdown.s3_result_cacheunload_s3_format
  • 包括可用于临时文件夹中的数据的 SSE_KMS 加密选项。AWS Glue 在读取 Amazon Redshift 表时会使用此数据。

相关信息

COPY

TRUNCATE

数据加载操作

AWS 官方
AWS 官方已更新 5 个月前