AWS Glue ジョブでデータを書き込む前後に、SQL コマンドを Amazon Redshift テーブルで実行するにはどうすればよいですか?
Amazon Redshift テーブルにデータをロードする AWS Glue ジョブがあります。AWS Glue ジョブが完了する前か後に Amazon Redshift で SQL コマンドを実行したいと考えています。
解決方法
AWS Glue の DynamicFrameWriter クラスで、以下に示すパラメータのいずれかを渡します。
- aws_iam_role: 別の AWS リソースにあるデータへのアクセスを認可します。Amazon Redshift クラスターにアタッチされている AWS Identity and Access Management (IAM) ロールの、完全に指定された ARN と共にこのパラメータを使用します。例えば、arn:aws:iam::123456789012:role/redshift_iam_role を使用します。詳細については、「認証パラメータ」を参照してください。
- preactions: COPY コマンドに先立って実行される SQL コマンドのセミコロン区切りリスト。コマンドが失敗した場合、Amazon Redshift は例外をスローします。
注: preaction パラメータに改行文字が含まれていないことを確認してください。 - postactions: COPY コマンドが完了した後に実行される SQL コマンドのセミコロン区切りリスト。コマンドが失敗した場合、Amazon Redshift は例外をスローします。
注: postaction パラメータに改行文字が含まれていないことを確認してください。 - extracopyoptions: データをロードするときに Amazon Redshift の COPY コマンドに追加する追加オプションのリストです。例えば、TRUNCATECOLUMNS や MAXERROR を使用できます。
サンプルシナリオ
AWS Glue でレコードを挿入する前に Amazon Redshift テーブルを切り詰める
preactions パラメータを使用します。
次の Python の例を参照してください。
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
次の Scala の例を参照してください。
val options = JsonOptions(Map( "dbtable" -> "schema.target_table", "database" -> "redshiftdb", "preactions" -> "truncate table schema.target_table;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
これらの例では、必ず次の値を置き換えてください。
- test_red: 使用するカタログ接続
- schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル
- s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス
接続オプションで IAM ロールを使用する
認証情報は 1 時間後に期限切れになるため、接続オプションで IAM ロールを使用して、長時間実行されている接続が失敗しないようにしてください。
次の Python の例を参照してください。
glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})
次の Scala の例を参照してください。
val connectionOptions = JsonOptions(Map( "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database", "dbtable" -> "schema.table", "user" -> "redshift_user", "password" -> "redshift_password", "tempdir" -> "s3://temp_bucket/temp", "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" )) val dyf = glueContext.getSource("redshift", connectionOptions) .getDynamicFrame()
AWS Glue で Amazon Redshift テーブルをマージする (アップサート) データをステージングテーブルにロードした後、マージクエリを作成します。
**注:**マージクエリが機能するためには、 target_table が既に Amazon Redshift データベースに存在している必要があります。
次の Python の例を参照してください。
post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
次の Scala の例を参照してください。
val options = JsonOptions(Map( "dbtable" -> "schema.stage_table", "database" -> "redshiftdb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
これらの例では、必ず次の値を置き換えてください。
- schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル
- test_red: 使用するカタログ接続
- schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift ステージングテーブル
- s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス
詳細については、「ステージングテーブルを使用したマージ (アップサート) の実行」を参照してください。
有効でない行を無視する
extracopyoptions パラメータを使用して、より大きい MAXERROR 値を指定します。
次の Python の例を参照してください。
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "testalblog2", "database": "reddb","postactions":"delete from emp1;","extracopyoptions":"MAXERROR 2"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
次の Scala の例を参照してください。
val options = JsonOptions(Map( "dbtable" -> "testalblog2", "database" -> "reddb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "delete from emp1;", "extracopyoptions" -> "MAXERROR 2" )) glueContext.getJDBCSink(catalogConnection = "test", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(persons_DyF)
これらの例では、必ず次の値を置き換えてください。
- schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル
- schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift ステージングテーブル
- test: 使用するカタログ接続
- testalblog2: データを読み込む Amazon Redshift テーブル
- reddb: 当該の Amazon Redshift データベース
- emp1: testalblog2 にデータをロードした後、データを削除する Amazon Redshift テーブル
- s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス
追加情報
AWS Glue 4.0 ETL ジョブを使用する場合は、Amazon Redshift Spark コネクタ (redshift-jdbc42-2.1.0.9) を使用できます。このコネクタには次のプロパティがあります。
- IAM ベースの JDBC URL をサポートします。
- autopushdown、autopushdown.s3_result_cache や unload_s3_format などのパフォーマンス改善オプションが含まれます。
- 一時フォルダのデータに使用できる SSE_KMS 暗号化オプションが含まれます。AWS Glue は Amazon Redshift テーブルから読み取るときにこのデータを使用します。
関連情報
「COPY」
「TRUNCATE」
関連するコンテンツ
- 質問済み 4ヶ月前lg...
- 質問済み 1ヶ月前lg...
- 質問済み 6年前lg...
- 質問済み 5年前lg...
- AWS公式更新しました 2年前
- AWS公式更新しました 2年前
- AWS公式更新しました 1年前