I have an AWS Glue job that loads data into an Amazon Redshift table. I want to run SQL commands on Amazon Redshift before or after the AWS Glue job completes.
Resolution
Pass the following parameter in the AWS Glue DynamicFrameWriter class for authorization:
- aws_iam_role: Provides authorization to access data in another AWS resource. Use this parameter with the fully specified ARN of the AWS Identity and Access Management (IAM) role that's attached to the Amazon Redshift cluster. For example, use arn:aws:iam::123456789012:role/redshift_iam_role. For more information, see Authorization parameters.
Pass one or more of the following parameters in the AWS Glue DynamicFrameWriter class:
- preactions: A semicolon-delimited list of SQL commands that are run before the COPY command. If the commands fail, then Amazon Redshift throws an exception.
Note: The preaction parameter can't contain newline characters.
- postactions: A semicolon-delimited list of SQL commands that are run after a successful COPY command. If the commands fail, then Amazon Redshift throws an exception.
Note: The postaction parameter can't contain newline characters.
- extracopyoptions: A list of additional options to append to the Amazon Redshift COPY command when it loads data. For example, you might use TRUNCATECOLUMNS or MAXERROR.
Example scenarios
Truncate an Amazon Redshift table before inserting records in AWS Glue
Use the preactions parameter.
Python example:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala example:
val options = JsonOptions(Map( "dbtable" -> "schema.target_table",
"database" -> "redshiftdb",
"preactions" -> "truncate table schema.target_table;"
))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
In the preceding examples, replace the following values:
- test_red: The catalog connection to use.
- schema.target_table: The Amazon Redshift database's schema and the Amazon Redshift table.
- s3://s3path: The path of the Amazon Redshift table's temporary directory.
Use an IAM role in the connection options
Credentials expire after 1 hour. Use an IAM role in the connection options so that your long running connections don't fail.
Python example:
glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})
Scala example:
val connectionOptions = JsonOptions(Map( "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
"dbtable" -> "schema.table",
"user" -> "redshift_user",
"password" -> "redshift_password",
"tempdir" -> "s3://temp_bucket/temp",
"aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))
val dyf = glueContext.getSource("redshift", connectionOptions)
.getDynamicFrame()
Merge an Amazon Redshift table in AWS Glue (upsert)
After you load the data into a staging table, create a merge query.
Note: For your merge query to work, target_table must already exist in your Amazon Redshift database.
Python example:
post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala example:
val options = JsonOptions(Map( "dbtable" -> "schema.stage_table",
"database" -> "redshiftdb",
"preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
"postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
In the preceding examples, replace the following values:
- schema.target_table: The Amazon Redshift database's schema and the Amazon Redshift table.
- test_red: The catalog connection to use.
- schema.stage_table: The Amazon Redshift database's schema and the Amazon Redshift staging table.
- s3://s3path: The path of the Amazon Redshift table's temporary directory.
Ignore rows that aren't valid
Use the extracopyoptions parameter to specify a higher MAXERROR value.
Python example:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala example:
val options = JsonOptions(Map( "dbtable" -> "schema.load_table",
"database" -> "reddb",
"extracopyoptions" -> "MAXERROR 2"
))
In preceding examples, replace the following values:
- schema.target_table: The Amazon Redshift database's schema and the Amazon Redshift table.
- schema.stage_table: The Amazon Redshift database's schema and the Amazon Redshift staging table.
- test: The catalog connection to use.
- testalblog2: The Amazon Redshift table to load data into.
- reddb: The Amazon Redshift database.
- emp1: The Amazon Redshift table to delete the data from, after the data is loaded into testalblog2.
- s3://s3path: The path of the Amazon Redshift table's temporary directory.
Additional information
You can use the Amazon Redshift Spark connector (redshift-jdbc42-2.1.0.9) when you use AWS Glue 4.0 ETL jobs. This connector has the following properties:
- Supports IAM-based JDBC URLs.
- Includes performance improvement options like autopushdown, autopushdown.s3_result_cache, and unload_s3_format.
- Includes the SSE_KMS encryption option that can be used for data in the temporary folder. AWS Glue uses this data when it reads from Amazon Redshift tables.
Related information
COPY
TRUNCATE
Data load operations