MWAA Airflow and Redshift Connection Setup

0

Hey, I have been trying to set up a connection between Airflow and redshift using redshift_default connection but I have been unable to do so. I am attempting to use the S3 to Redshift Operator to push data present in an S3 bucket to a Redshift Table that I already tested manually uploading with the same data. Can anyone please tell me if there is anything wrong with my DAG or is able to guide me how to properly set up the redshift_default connection set up in the admin tab of airflow? I have many different things based on documentations. Currently, when i run my dag it immediately fails, and I am unable to see any task error logs either.

My dag is as below:

from datetime import datetime, timedelta from airflow import DAG from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.operators.dummy_operator import DummyOperator

REDSHIFT_CONNECTION_ID = 'redshift_default' S3_BUCKET = 'raw-appdata' REDSHIFT_SCHEMA = 'gold'

default_args = { 'owner': 'Airflow', 'start_date': datetime(2023, 12, 6), 'retries': 0, 'catchup' : False, 'retry_delay': timedelta(minutes=5), }

dag = DAG( 's3_to_redshift_test_dag2', default_args=default_args,
schedule_interval=None, # This DAG is manually triggered )

load_s3_to_redshift_task = S3ToRedshiftOperator( task_id='load_s3_to_redshift_task', s3_bucket=S3_BUCKET, s3_key= 'Test/Test_data.csv', schema=REDSHIFT_SCHEMA, table='test_data', copy_options=['CSV', 'IGNOREHEADER 1'], redshift_conn_id=REDSHIFT_CONNECTION_ID, dag=dag, )

end_task = DummyOperator(task_id='end_task', dag=dag)

load_s3_to_redshift_task >> end_task

1 個回答
0
已接受的答案

Hello,

From your above description, I understand that you are facing issue while trying to connect from your MWAA environment to Redshift cluster and you are using DAG to transfer the data from S3 to Redshift table. Very first, you can troubleshoot why logs are not visible because once logs are visible it would help to understand the issue. Please verify that:

  1. You have enabled task logs at the INFO level for your environment. For more information, see Viewing Airflow logs in Amazon CloudWatch.
    https://docs.aws.amazon.com/mwaa/latest/userguide/monitoring-airflow.html

  2. Verify that the environment execution role has the correct permission policies. https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-create-role.html

Also check if you are unable to view the logs of any of the others DAG or you are facing this issue with this DAG only. You can also refer the below AWS documentation to troubleshoot: https://docs.aws.amazon.com/mwaa/latest/userguide/t-cloudwatch-cloudtrail-logs.html#t-task-logs

I am sharing the below sample operator example for your reference:

transfer_s3_to_redshift = S3ToRedshiftOperator( task_id="transfer_s3_to_redshift", redshift_conn_id=conn_id_name, s3_bucket=bucket_name, s3_key=S3_KEY_2, schema="PUBLIC", table=REDSHIFT_TABLE, copy_options=["csv"], )

Additionally you can refer this third party documents

I hope you would find above information helpful.

profile pictureAWS
支援工程師
已回答 5 個月前

您尚未登入。 登入 去張貼答案。

一個好的回答可以清楚地回答問題並提供建設性的意見回饋,同時有助於提問者的專業成長。

回答問題指南