AWS Glue ジョブにデータを書き込む前または後に、Amazon Redshift テーブルで SQL コマンドを実行する方法を教えてください。

所要時間3分
0

Amazon Redshift テーブルにデータを読み込む AWS Glue ジョブがあります。この AWS Glue ジョブが完了する前または後に、Amazon Redshift で SQL コマンドを実行したいです。

解決策

AWS Glue の DynamicFrameWriter クラス に次のパラメータを渡して認証を行います。

  • aws_iam_role: 別の AWS リソースのデータにアクセスすることを許可します。このパラメータを、Amazon Redshift クラスターにアタッチされている AWS Identity and Access Management (IAM) ロールの完全指定 ARN と併用します。たとえば、arn:aws:iam::123456789012:role/redshift_iam_role を使用します。詳しくは、「認証パラメータ」を参照してください。

AWS Glue の DynamicFrameWriter クラスに、次のパラメータを 1 つ以上渡します。

  • preactions: COPY コマンドの前に実行する SQL コマンドをセミコロンで区切ったリスト。コマンドが失敗すると、Amazon Redshift は例外をスローします。
    注: 前処理パラメータには、改行文字を含めることはできません。
  • postactions: COPY コマンドが成功した後に実行する SQL コマンドをセミコロンで区切ったリスト。コマンドが失敗すると、Amazon Redshift は例外をスローします。
    注: 後処理パラメータには改行文字を含めることはできません。
  • extracopyoptions: Amazon Redshift の COPY コマンドがデータをロードするときに追加する、追加オプションのリスト。たとえば、TRUNCATECOLUMNS や MAXERROR を使用できます。

シナリオ例

AWS Glue にレコードを挿入する前に Amazon Redshift テーブルを切り捨てる

preactions パラメータを使用します。

Python での例

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala での例

val options = JsonOptions(Map(   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

上記のコマンドでは、次の値を置き換えます。

  • test_red: 使用するカタログ接続。
  • schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル。
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス。

接続オプションで IAM ロールを使用する

認証情報は 1 時間後に期限切れになります。長時間の接続でのエラーを防ぐために、接続オプションでは IAM ロールを使用してください。

Python での例

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

Scala での例

val connectionOptions = JsonOptions(Map(      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

Amazon Redshift テーブルを AWS Glue にマージする (upsert)

データをステージングテーブルに読み込んだ後に、マージクエリを作成します。

注: マージクエリを機能させるには、Amazon Redshift データベースに target_table を配置済みである必要があります。

Python での例

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala での例

val options = JsonOptions(Map(   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

上記のコマンドでは、次の値を置き換えます。

  • schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル。
  • test_red: 使用するカタログ接続。
  • schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift のステージングテーブル。
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス。

有効でない行を無視する

extracopyoptions パラメータで、MAXERROR により高い値を指定します。

Python での例

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Scala での例

val options = JsonOptions(Map(   "dbtable" -> "schema.load_table",
   "database" -> "reddb",
   "extracopyoptions" -> "MAXERROR 2"
   ))

上記の例では、次の値を置き換えます。

  • schema.target_table: Amazon Redshift データベースのスキーマと Amazon Redshift テーブル。
  • schema.stage_table: Amazon Redshift データベースのスキーマと Amazon Redshift のステージングテーブル。
  • test: 使用するカタログ接続。
  • testalblog2: データの読み込み先となる Amazon Redshift テーブル。
  • reddb: Amazon Redshift データベース。
  • emp1: Amazon Redshift テーブル。データが testalblog2 にロードされた後に、このテーブルからはデータが削除されます。
  • s3://s3path: Amazon Redshift テーブルの一時ディレクトリのパス。

追加情報

AWS Glue 4.0 ETL ジョブを使用する場合は、Amazon Redshift Spark コネクタ (redshift-jdbc42-2.1.0.9) を使用します。このコネクタには次の特性があります。

  • IAM ベースの JDBC URL がサポートされています。
  • autopushdownautopushdown.s3_result_cacheunload_s3_format など、パフォーマンス向上に関するオプションが含まれています。
  • 一時フォルダのデータに使用できる SSE_KMS 暗号化オプションが含まれています。AWS Glue は、Amazon Redshift テーブルから読み込むときにこのデータを使用します。

関連情報

COPY

TRUNCATE

データロード操作

AWS公式
AWS公式更新しました 3ヶ月前
コメントはありません