How To Connect Airflow To Redshift Serverless

0

Hi, I'm trying to connect airflow to Redshift Serverless. I have RedshiftFullAccess, and I'm trying to copy files from S3 to Redshift using S3ToRedshiftOperator, I got Cluster not found error. I'm stuck on connecting airflow to Redshift Serverless. I've checked airflow docs, they're not clear to me. Would appreciate any resource or tip shared? code below

copy_to_redshift = S3ToRedshiftOperator(
    aws_conn_id = 'redshift_default',
    task_id='copy_to_redshift',
    schema='db_schema',
    table='table_name',
    s3_bucket=S3_BUCKET_NAME,
    s3_key='folder/output_folder',
    copy_options=["FORMAT AS PARQUET"],
    dag=dag,)

Connection created in Airflow UI

{Connection Id * : redshift_default, 
Connection Type *: Amazon Redshift , 
Host: WORKGROUP_NAME.ACCOUNT_ID.REGION.redshift-serverless.amazonaws.com,    
Database:  DbName, 
User: my-username, 
Password: , 
Port: 5439, 
Extra: { "iam": true, 
"is_serverless": true, 
"serverless_token_duration_seconds": 3600, 
"port": 5439, 
"region": "REGION", 
"database": "DBName", 
"profile": "default" }}

Error message

UserWarning: AWS Connection (conn_id='redshift_default', conn_type='redshift') expected connection type 'aws', got 'redshift'. This connection might not work correctly. Please use Amazon Web Services Connection type.  {connection_wrapper.py:378} INFO - AWS Connection (conn_id='redshift_default', conn_type='redshift') credentials retrieved from login and password.  {logging_mixin.py:154} WARNING - <string>:9 UserWarning: Found 'profile' without specifying 's3_config_file' in AWS Connection (conn_id='redshift_default', conn_type='redshift') extra. If required profile from AWS Shared Credentials please set 'profile_name' in AWS Connection (conn_id='redshift_default', conn_type='redshift') extra.
[2024-01-21, 12:32:34 UTC] {logging_mixin.py:154} WARNING - <string>:9 AirflowProviderDeprecationWarning: Host WORKGROUP_NAME.ACCOUNT_ID.REGION.redshift-serverless.amazonaws.com specified in the connection is not used. Please, set it on extra['endpoint_url'] instead
[2024-01-21, 12:32:35 UTC] {s3_to_redshift.py:192} INFO - Executing COPY command...
[2024-01-21, 12:32:35 UTC] {base.py:73} INFO - Using connection ID 'redshift_default' for task execution.
[2024-01-21, 12:32:35 UTC] {base_aws.py:581} WARNING - Unable to find AWS Connection ID 'aws_default', switching to empty.
[2024-01-21, 12:32:35 UTC] {base_aws.py:161} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name=None). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2024-01-21, 12:32:36 UTC] {credentials.py:1052} INFO - Found credentials from IAM Role: AttachedRole-role
[2024-01-21, 12:32:36 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/common/sql/hooks/sql.py", line 385, in run
    with closing(self.get_conn()) as conn:
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/redshift_sql.py", line 173, in get_conn
    conn_params = self._get_conn_params()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/redshift_sql.py", line 84, in _get_conn_params
    conn.login, conn.password, conn.port = self.get_iam_token(conn)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/amazon/aws/hooks/redshift_sql.py", line 115, in get_iam_token
    cluster_creds = redshift_client.get_cluster_credentials(
  File "/home/airflow/.local/lib/python3.8/site-packages/botocore/client.py", line 535, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/botocore/client.py", line 980, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.ClusterNotFoundFault: An error occurred (ClusterNotFound) when calling the GetClusterCredentials operation: Cluster WORKGROUP_NAME not found.
[2024-01-21, 12:32:36 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=redshift-pipe, task_id=copy_to_redshift, execution_date=20240121T123224, start_date=20240121T123234, end_date=20240121T123236
[2024-01-21, 12:32:36 UTC] {standard_task_runner.py:104} ERROR - Failed to execute job 95 for task copy_to_redshift (An error occurred (ClusterNotFound) when calling the GetClusterCredentials operation: Cluster WORKGROUP_NAME not found.; 12865)
[2024-01-21, 12:32:36 UTC] {local_task_job_runner.py:228} INFO - Task exited with return code 1
[2024-01-21, 12:32:36 UTC] {taskinstance.py:2778} INFO - 0 downstream tasks scheduled from follow-on schedule check

Found this solution and doc vague https://github.com/apache/airflow/issues/35805 https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/redshift.html .

2 Answers
0
Accepted Answer

Referencing the docs page https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/connections/redshift.html , I see it says there "If you want to use IAM with Amazon Redshift Serverless, you need to set is_serverless to true and provide serverless_work_group."

Have you setup serverless_work_group parameter ?

profile pictureAWS
answered 3 months ago
0

Thanks, the error was an airflow problem answered in awsdevelopers.slack.com, I added the serverless_work_group parameter. After upgrading to Airflow 2.8.1, had time out error, what worked attaching s3FullAccess policy to the IAM role in default-namespace security and encryption tab then switched to recommended Data API Operator used dataset from this blog .

with DAG(dag_id="redshift", start_date=datetime(2021, 1, 1), schedule_interval=None, tags=['example']) as dag:
    setup__task_create_table2 = RedshiftDataOperator(
        task_id='setup__create_table2', region='us-east-1', workgroup_name='default-workgroup',
        database='dev',
        sql="""
	CREATE TABLE public.yellow2_201601(vendorid bigint, tpep_pickup_datetime timestamp,
	tpep_dropoff_datetime timestamp, passenger_count double precision, trip_distance double precision, 
	ratecodeid double precision, store_and_fwd_flag varchar(12), Pulocationid bigint, dolocationid bigint, 
	payment_type bigint, fare_amount double precision, extra double precision, mta_tax double precision, 
	tip_amount double precision, tolls_amount double precision, improvement_surcharge double precision, 
	total_amount double precision, congestion_surcharge double precision, airport_fee double precision);
        """)

    task_get_all_table_data2 = RedshiftDataOperator(
        task_id='task_get_all_table_data2', database='dev', region='us-east-1', 
	workgroup_name='default-workgroup',  
        sql="COPY public.yellow2_201601 FROM 's3://yellow2-taxi/Parquet/' REGION 'us-east-1' IAM_ROLE 'arn:aws:iam::123456789012:role/service-role/AmazonRedshift-CommandsAccessRole-01234567T0123567' FORMAT AS PARQUET;"
        )

    setup__task_create_table2 >> task_get_all_table_data2
Awwal
answered 3 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions