Knowledge Center Monthly Newsletter - March 2025
Stay up to date with the latest from the Knowledge Center. See all new and updated Knowledge Center articles published in the last month and re:Post’s top contributors.
AWS Glue ジョブにデータを書き込む前または後に、Amazon Redshift テーブルで SQL コマンドを実行する方法を教えてください。
Amazon Redshift テーブルにデータを読み込む AWS Glue ジョブがあります。この AWS Glue ジョブが完了する前または後に、Amazon Redshift で SQL コマンドを実行したいです。
解決策
AWS Glue の DynamicFrameWriter クラス に次のパラメータを渡して認証を行います。
- aws_iam_role: 別の AWS リソースのデータにアクセスすることを許可します。このパラメータを、Amazon Redshift クラスターにアタッチされている AWS Identity and Access Management (IAM) ロールの完全指定 ARN と併用します。たとえば、arn:aws:iam::123456789012:role/redshift_iam_role を使用します。詳しくは、「認証パラメータ」を参照してください。
AWS Glue の DynamicFrameWriter クラスに、次のパラメータを 1 つ以上渡します。
- preactions: COPY コマンドの前に実行する SQL コマンドをセミコロンで区切ったリスト。コマンドが失敗すると、Amazon Redshift は例外をスローします。
注: 前処理パラメータには、改行文字を含めることはできません。 - postactions: COPY コマンドが成功した後に実行する SQL コマンドをセミコロンで区切ったリスト。コマンドが失敗すると、Amazon Redshift は例外をスローします。
注: 後処理パラメータには改行文字を含めることはできません。 - extracopyoptions: Amazon Redshift の COPY コマンドがデータをロードするときに追加する、追加オプションのリスト。たとえば、TRUNCATECOLUMNS や MAXERROR を使用できます。
シナリオ例
AWS Glue にレコードを挿入する前に Amazon Redshift テーブルを切り捨てる
preactions パラメータを使用します。
Python での例
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala での例
val options = JsonOptions(Map( "dbtable" -> "schema.target_table", "database" -> "redshiftdb", "preactions" -> "truncate table schema.target_table;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
上記のコマンドでは、次の値を置き換えます。
- test_red: 使用するカタログ接続。
- schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル。
- s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス。
接続オプションで IAM ロールを使用する
認証情報は 1 時間後に期限切れになります。長時間の接続でのエラーを防ぐために、接続オプションでは IAM ロールを使用してください。
Python での例
glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})
Scala での例
val connectionOptions = JsonOptions(Map( "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database", "dbtable" -> "schema.table", "user" -> "redshift_user", "password" -> "redshift_password", "tempdir" -> "s3://temp_bucket/temp", "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" )) val dyf = glueContext.getSource("redshift", connectionOptions) .getDynamicFrame()
Amazon Redshift テーブルを AWS Glue にマージする (upsert)
データをステージングテーブルに読み込んだ後に、マージクエリを作成します。
注: マージクエリを機能させるには、Amazon Redshift データベースに target_table を配置済みである必要があります。
Python での例
post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala での例
val options = JsonOptions(Map( "dbtable" -> "schema.stage_table", "database" -> "redshiftdb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
上記のコマンドでは、次の値を置き換えます。
- schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル。
- test_red: 使用するカタログ接続。
- schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift のステージングテーブル。
- s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス。
有効でない行を無視する
extracopyoptions パラメータで、MAXERROR により高い値を指定します。
Python での例
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala での例
val options = JsonOptions(Map( "dbtable" -> "schema.load_table", "database" -> "reddb", "extracopyoptions" -> "MAXERROR 2" ))
上記の例では、次の値を置き換えます。
- schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル。
- schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift のステージングテーブル。
- test: 使用するカタログ接続。
- testalblog2: データの読み込み先となる Amazon Redshift テーブル。
- reddb: Amazon Redshift データベース。
- emp1: Amazon Redshift テーブル。データが testalblog2 にロードされた後に、このテーブルからはデータが削除されます。
- s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス。
追加情報
AWS Glue 4.0 ETL ジョブを使用する場合は、Amazon Redshift Spark コネクタ (redshift-jdbc42-2.1.0.9) を使用します。このコネクタには次の特性があります。
- IAM ベースの JDBC URL がサポートされています。
- autopushdown、autopushdown.s3_result_cache、unload_s3_format など、パフォーマンス向上に関するオプションが含まれています。
- 一時フォルダのデータに使用できる SSE_KMS 暗号化オプションが含まれています。AWS Glue は、Amazon Redshift テーブルから読み込むときにこのデータを使用します。
関連情報
関連するコンテンツ
- 質問済み 4ヶ月前lg...
- 質問済み 2年前lg...
- 質問済み 1年前lg...
- 質問済み 2ヶ月前lg...
- AWS公式更新しました 3ヶ月前
- AWS公式更新しました 3年前