Deploy DAG zip package

0

Hi all!

There seems to be a problem with DAG zip package deployment in MWAA (see https://airflow.apache.org/docs/apache-airflow/stable/concepts.html?highlight=zip#packaged-dags). I created a simple "hello world" DAG, placed the Python file in dags, MWAA picked it up no problem, all good. Zipped the same Python file in a zip and uploaded it to dags, getting the following error in the DAGProcessing log:

[2021-01-28 21:12:44,726] {{logging_mixin.py:112}} WARNING - /usr/local/lib64/python3.7/site-packages/sqlalchemy/sql/sqltypes.py:270: SAWarning: Unicode type received non-unicode bind param value "b'# -- coding: utf-8 --n#'...". (this warning may be suppressed after 10 occurrences) (util.ellipses_string(value),),
[2021-01-28 21:12:44,759] {{scheduler_job.py:167}} ERROR - Got an exception! Propagating...Traceback (most recent call last): File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context cursor, statement, parameters, context File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1084, in execute return self.executemany(operation, [parameters]) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1109, in executemany rows = self._src.execute(sql)pg.ProgrammingError: ERROR: syntax error at or near ":"LINE 1: ...:12:44.721535_00:00'::timestamptz, 'b'# -- coding: utf-8 -... ^The above exception was the direct cause of the following exception:Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor pickle_dags) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(**args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file dag.sync_to_db() File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(**args, kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1538, in sync_to_db DagCode.bulk_sync_to_db([orm_dag.fileloc]) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(args, kwargs) File "/usr/lib64/python3.7/contextlib.py", line 119, in exit next(self.gen) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 45, in create_session session.commit() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 1042, in commit self.transaction.commit() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit self._prepare_impl() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl self.session.flush() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 2523, in flush self._flush(objects) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 2664, in _flush transaction.rollback(capture_exception=True) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 69, in exit exc_value, with_traceback=exc_tb, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise raise exception File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 2624, in _flush flush_context.execute() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute rec.execute(self) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute uow, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj insert, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements c = cached_connections[connection].execute(statement, multiparams) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1014, in execute return meth(self, multiparams, params) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1133, in _execute_clauseelement distilled_params, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1318, in execute_context e, statement, parameters, cursor, context File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1512, in handle_dbapi_exception sqlalchemy_exception, with_traceback=exc_info[2], from=e File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise raise exception File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context cursor, statement, parameters, context File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1084, in execute return self.executemany(operation, [parameters]) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1109, in executemany rows = self._src.execute(sql)sqlalchemy.exc.ProgrammingError: (pg.ProgrammingError) ERROR: syntax error at or near ":"LINE 1: ...:12:44.721535_00:00'::timestamptz, 'b'# -- coding: utf-8 -... ^[SQL: INSERT INTO dag_code (fileloc_hash, fileloc, last_updated, source_code) VALUES (%(fileloc_hash)s, %(fileloc)s, %(last_updated)s, %(source_code)s)][parameters: {'fileloc_hash': 57638855142936464, 'fileloc': '/usr/local/airflow/dags/test-dag.zip/test-dag.py', 'last_updated': datetime.datetime(2021, 1, 28, 21, 12, 44, 721535, tzinfo=<Timezone [UTC]>), 'source_code': b'# -- coding: utf-8 -*-\n#\n# Licensed to the Apache Software Foundation (ASF) under one\n# or more contributor license agreements. See the NOTICE ... (3380 characters truncated) ... plated_command,\n params={'my_param': 'Parameter I passed in'},\n dag=dag,\n)\n# [END jinja_template]\n\nt1 >> [t2, t3]\n# [END tutorial]\n'}](Background on this error at: http://sqlalche.me/e/13/f405)

Apparently, if comes from a zip, MWAA tries to put the DAG source code into the metadata database as binary and Postgres doesn't like it.

Has anyone tried deploying zip packages and resolved this problem?

Thanks!
Lev.

levahim
asked 3 years ago748 views
2 Answers
0

OK, so, the problem is that there is a bug in Airflow: when the DagCode model saves the DAG's source code in the metadata database, if the DAG comes from a zip package Airflow reads the content as binary (it reads it as text when it's not a zip archive). I've submitted a PR to Airflow to fix the problem (https://github.com/apache/airflow/pull/13962). Until then, and until MWAA starts using a newer version of Airflow that has this problem fixed (or patches the 1.10.12 version that they currently use), the workaround is to disable saving the DAG source code. To do that, add core.store_dag_code=false configuration option to your MWAA environment.

levahim
answered 3 years ago
0

Hi!

Note that requirements and plugins are not copied to the web server component in MWAA and as such store_dag_code is set to True by default. Most likely disabling serialized DAGs will cause other issues.

https://airflow.apache.org/docs/apache-airflow/1.10.12/dag-serialization.html

MWAA will add supported for packaged DAGs in an update to be released soon.

Thanks!

AWS
John_J
answered 3 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions