MWAA Airflow and Redshift Connection Setup


Hey, I have been trying to set up a connection between Airflow and redshift using redshift_default connection but I have been unable to do so. I am attempting to use the S3 to Redshift Operator to push data present in an S3 bucket to a Redshift Table that I already tested manually uploading with the same data. Can anyone please tell me if there is anything wrong with my DAG or is able to guide me how to properly set up the redshift_default connection set up in the admin tab of airflow? I have many different things based on documentations. Currently, when i run my dag it immediately fails, and I am unable to see any task error logs either.

My dag is as below:

from datetime import datetime, timedelta from airflow import DAG from import S3ToRedshiftOperator from airflow.operators.dummy_operator import DummyOperator

REDSHIFT_CONNECTION_ID = 'redshift_default' S3_BUCKET = 'raw-appdata' REDSHIFT_SCHEMA = 'gold'

default_args = { 'owner': 'Airflow', 'start_date': datetime(2023, 12, 6), 'retries': 0, 'catchup' : False, 'retry_delay': timedelta(minutes=5), }

dag = DAG( 's3_to_redshift_test_dag2', default_args=default_args,
schedule_interval=None, # This DAG is manually triggered )

load_s3_to_redshift_task = S3ToRedshiftOperator( task_id='load_s3_to_redshift_task', s3_bucket=S3_BUCKET, s3_key= 'Test/Test_data.csv', schema=REDSHIFT_SCHEMA, table='test_data', copy_options=['CSV', 'IGNOREHEADER 1'], redshift_conn_id=REDSHIFT_CONNECTION_ID, dag=dag, )

end_task = DummyOperator(task_id='end_task', dag=dag)

load_s3_to_redshift_task >> end_task

1 Answer
Accepted Answer


From your above description, I understand that you are facing issue while trying to connect from your MWAA environment to Redshift cluster and you are using DAG to transfer the data from S3 to Redshift table. Very first, you can troubleshoot why logs are not visible because once logs are visible it would help to understand the issue. Please verify that:

  1. You have enabled task logs at the INFO level for your environment. For more information, see Viewing Airflow logs in Amazon CloudWatch.

  2. Verify that the environment execution role has the correct permission policies.

Also check if you are unable to view the logs of any of the others DAG or you are facing this issue with this DAG only. You can also refer the below AWS documentation to troubleshoot:

I am sharing the below sample operator example for your reference:

transfer_s3_to_redshift = S3ToRedshiftOperator( task_id="transfer_s3_to_redshift", redshift_conn_id=conn_id_name, s3_bucket=bucket_name, s3_key=S3_KEY_2, schema="PUBLIC", table=REDSHIFT_TABLE, copy_options=["csv"], )

Additionally you can refer this third party documents

I hope you would find above information helpful.

profile pictureAWS
answered 2 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions