Airflow EMR Hook failing while requesting to add a step

0

Hope everyone is doing well!

Here's the context of the issue I'm facing, I'm working on a company that is supporting a really old airflow version, here are the details of the version and some components.

airflow=1.10.1=py36_0 python=3.6.2=0 botocore=1.12.226=py_0 awscli=1.16.236=py36_0 boto3=1.9.199=py_0 boto=2.49.0=py36_0 Since a couple of days ago, we have been facing an issue on a DAG that is supposed to have part of the code to add a task to an EMR cluster and we are facing the following issue:

Traceback (most recent call last):
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/src/src/dags/data_conversion/operators/emr.py", line 69, in execute
    response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/conda/.conda/envs/airflow36/lib/python3.6/site-packages/botocore/client.py", line 661, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ValidationException) when calling the AddJobFlowSteps operation: A job flow that is shutting down, terminated, or finished may not be modified.

And here is the part of the code affected:

        job_flow_id = context['task_instance'].xcom_pull(task_ids=self.cluster_creator_operator_name)[0]

        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()

        logging.info('Adding steps to %s', job_flow_id)
        response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=steps)

        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
            raise AirflowException('Adding steps failed: %s' % response)
        else:
            logging.info('Steps %s added to JobFlow', response['StepIds'])
            return response['StepIds']

Based on my research, I've found a stackoverflow post (https://stackoverflow.com/questions/64634755/mrjob-im-having-a-client-error-while-using-emr) where it is mentioned that botocore package is deprecated as well as in the github it is mentioned that is no longer support the version of python we are using.

Would this be the correct analysis for the issue? I've also found this link (https://stackoverflow.com/questions/65595398/mrjob-in-emr-is-running-only-1-mrstep-out-of-3-mrsteps-and-cluster-is-shutting-d) where it is suggested to persist an EMR cluster but not sure if will be useful because I suspect the Airflow classes are invoking botocore package and I would be unable to override that.

Thanks a lot.

질문됨 일 년 전61회 조회
답변 없음

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠