Airflow EMR Hook failing while requesting to add a step

0

Hope everyone is doing well!

Here's the context of the issue I'm facing, I'm working on a company that is supporting a really old airflow version, here are the details of the version and some components.

airflow=1.10.1=py36_0 python=3.6.2=0 botocore=1.12.226=py_0 awscli=1.16.236=py36_0 boto3=1.9.199=py_0 boto=2.49.0=py36_0 Since a couple of days ago, we have been facing an issue on a DAG that is supposed to have part of the code to add a task to an EMR cluster and we are facing the following issue:

Traceback (most recent call last):
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/src/src/dags/data_conversion/operators/emr.py", line 69, in execute
    response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/botocore/client.py", line 661, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the AddJobFlowSteps operation: A job flow that is shutting down, terminated, or finished may not be modified.

And here is the part of the code affected:

        job_flow_id = context['task_instance'].xcom_pull(task_ids=self.cluster_creator_operator_name)[0]

        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()

        logging.info('Adding steps to %s', job_flow_id)
        response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)

        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
            raise AirflowException('Adding steps failed: %s' % response)
        else:
            logging.info('Steps %s added to JobFlow', response['StepIds'])
            return response['StepIds']

Based on my research, I've found a stackoverflow post (https://stackoverflow.com/questions/64634755/mrjob-im-having-a-client-error-while-using-emr) where it is mentioned that botocore package is deprecated as well as in the github it is mentioned that is no longer support the version of python we are using.

Would this be the correct analysis for the issue? I've also found this link (https://stackoverflow.com/questions/65595398/mrjob-in-emr-is-running-only-1-mrstep-out-of-3-mrsteps-and-cluster-is-shutting-d) where it is suggested to persist an EMR cluster but not sure if will be useful because I suspect the Airflow classes are invoking botocore package and I would be unable to override that.

Thanks a lot.

asked a year ago61 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions