Knowledge Center Monthly Newsletter - June 2025
Stay up to date with the latest from the Knowledge Center. See all new Knowledge Center articles published in the last month, and re:Post's top contributors.
AWS Glue 작업에서 데이터를 쓰기 전이나 후에 Amazon Redshift 테이블에서 SQL 명령을 실행하려면 어떻게 해야 합니까?
Amazon Redshift 테이블에 데이터를 로드하는 AWS Glue 작업이 있습니다. AWS Glue 작업이 완료되기 전이나 후에 Amazon Redshift에서 SQL 명령을 실행하려고 합니다.
해결 방법
권한 부여를 위해 AWS Glue DynamicFrameWriter 클래스에 다음 파라미터를 전달하십시오.
- aws_iam_role: 다른 AWS 리소스의 데이터에 액세스할 수 있는 권한을 부여합니다. Amazon Redshift 클러스터에 연결된 AWS Identity and Access Management(IAM) 역할의 완전히 지정된 ARN과 함께 이 파라미터를 사용하십시오. 예를 들어 arn:aws:iam::123456789012:role/redshift_iam_role과 같이 사용합니다. 자세한 내용은 권한 부여 파라미터를 참조하십시오.
AWS Glue DynamicFrameWriter 클래스에 다음 파라미터 중 하나 이상을 전달하십시오.
- preactions: COPY 명령 전에 실행되는 SQL 명령 목록으로, 세미콜론으로 구분됩니다. 명령이 실패하면 Amazon Redshift에서 예외가 발생합니다.
참고: preaction 파라미터에는 개행 문자를 포함할 수 없습니다. - postactions: 성공적인 COPY 명령 후에 실행되는 SQL 명령 목록으로, 세미콜론으로 구분됩니다. 명령이 실패하면 Amazon Redshift에서 예외가 발생합니다.
참고: postaction 파라미터에는 개행 문자를 포함할 수 없습니다. - extracopyoptions: 데이터를 로드할 때 Amazon Redshift COPY 명령에 추가할 추가적인 옵션 목록입니다. 예를 들어 TRUNCATECOLUMNS 또는 MAXERROR를 사용할 수 있습니다.
예제 시나리오
AWS Glue에서 레코드를 삽입하기 전에 Amazon Redshift 테이블 잘라내기
preactions 파라미터를 사용하십시오.
Python 예:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 예:
val options = JsonOptions(Map( "dbtable" -> "schema.target_table", "database" -> "redshiftdb", "preactions" -> "truncate table schema.target_table;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)
위의 예에서 다음 값을 바꾸십시오.
- test_red: 사용할 카탈로그 연결입니다.
- schema.target_table: Amazon Redshift 데이터베이스의 스키마와 Amazon Redshift 테이블입니다.
- s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로입니다.
연결 옵션에서 IAM 역할 사용
자격 증명은 1시간 후에 만료됩니다. 장기 실행 연결이 실패하지 않도록 연결 옵션에서 IAM 역할을 사용하십시오.
Python 예:
glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})
Scala 예:
val connectionOptions = JsonOptions(Map( "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database", "dbtable" -> "schema.table", "user" -> "redshift_user", "password" -> "redshift_password", "tempdir" -> "s3://temp_bucket/temp", "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" )) val dyf = glueContext.getSource("redshift", connectionOptions) .getDynamicFrame()
Amazon Redshift 테이블을 AWS Glue에 병합(upsert)
스테이징 테이블에 데이터를 로드한 후 병합 쿼리를 생성합니다.
참고: 병합 쿼리가 작동하려면 target_table이 Amazon Redshift 데이터베이스에 이미 있어야 합니다.
Python 예:
post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 예:
val options = JsonOptions(Map( "dbtable" -> "schema.stage_table", "database" -> "redshiftdb", "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;", "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;" )) glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)
위의 예에서 다음 값을 바꾸십시오.
- schema.target_table: Amazon Redshift 데이터베이스의 스키마와 Amazon Redshift 테이블입니다.
- test_red: 사용할 카탈로그 연결입니다.
- schema.stage_table: Amazon Redshift 데이터베이스의 스키마와 Amazon Redshift 스테이징 테이블입니다.
- s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로입니다.
유효하지 않은 행 무시
더 높은 MAXERROR 값을 지정하려면 extracopyoptions 파라미터를 사용하십시오.
Python 예:
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
Scala 예:
val options = JsonOptions(Map( "dbtable" -> "schema.load_table", "database" -> "reddb", "extracopyoptions" -> "MAXERROR 2" ))
위의 예에서 다음 값을 바꾸십시오.
- schema.target_table: Amazon Redshift 데이터베이스의 스키마와 Amazon Redshift 테이블입니다.
- schema.stage_table: Amazon Redshift 데이터베이스의 스키마와 Amazon Redshift 스테이징 테이블입니다.
- test: 사용할 카탈로그 연결입니다.
- testalblog2: 데이터를 로드할 Amazon Redshift 테이블입니다.
- reddb: Amazon Redshift 데이터베이스입니다.
- emp1: 데이터가 testalblog2에 로드된 후 데이터를 삭제할 Amazon Redshift 테이블입니다.
- s3://s3path: Amazon Redshift 테이블의 임시 디렉터리 경로입니다.
추가 정보
AWS Glue 4.0 ETL 작업을 사용할 경우 Amazon Redshift Spark 커넥터(redshift-jdbc42-2.1.0.9)를 사용할 수 있습니다. 이 커넥터에는 다음과 같은 특성이 있습니다.
- IAM 기반 JDBC URL을 지원합니다.
- autopushdown, autopushdown.s3_result_cache 및 unload_s3_format과 같은 성능 개선 옵션이 포함되어 있습니다.
- 임시 폴더의 데이터에 사용할 수 있는 SSE_KMS 암호화 옵션이 포함되어 있습니다. AWS Glue는 Amazon Redshift 테이블에서 읽을 때 이 데이터를 사용합니다.