By using AWS re:Post, you agree to the Terms of Use
/Amazon Managed Workflows for Apache Airflow (MWAA)/

Questions tagged with Amazon Managed Workflows for Apache Airflow (MWAA)

Sort by most recent
  • 1
  • 90 / page

Browse through the questions and answers listed below or filter and sort to narrow down your results.

0
answers
0
votes
1
views
redec
asked 2 days ago

Airflow 2.0 database initialisation error

Hello, we were testing the new Airflow 2.0 implementation but the new v2.0.2 version fails during the database initialisation (webserver logs see below). Do we need to update some other settings than the Airflow version when upgrading from Airflow v1 to v2? We already tried destroying and re-applying. On our local docker-compose setup (official v2 docker-compose), everything works. Thank you for your support! Our configuration: resource "aws_mwaa_environment" "airflow" { execution_role_arn = aws_iam_role.airflow.arn name = "<fancyname>" airflow_version = "2.0.2" source_bucket_arn = aws_s3_bucket.airflow.arn dag_s3_path = aws_s3_bucket_object.dags.key requirements_s3_path = "requirements.txt" plugins_s3_path = "plugins.zip" network_configuration { security_group_ids = \[aws_security_group.airflow.id] subnet_ids = var.private_subnets } airflow_configuration_options = { "secrets.backend" = "airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend" } webserver_access_mode = "PUBLIC_ONLY" } And the logs: 2021-05-28T22:43:41.894+02:00 WARNING:root:Failed to log action with (psycopg2.errors.UndefinedTable) relation "log" does not exist 2021-05-28T22:43:42.104+02:00 LINE 1: INSERT INTO log (dttm, dag_id, task_id, event, execution_dat... 2021-05-28T22:43:42.206+02:00 ^ 2021-05-28T22:43:42.340+02:00 2021-05-28T22:43:42.409+02:00 \[SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (%(dttm)s, %(dag_id)s, %(task_id)s, %(event)s, %(execution_date)s, %(owner)s, %(extra)s) RETURNING log.id] 2021-05-28T22:43:42.595+02:00 \[parameters: {'dttm': datetime.datetime(2021, 5, 28, 20, 43, 41, 872768, tzinfo=Timezone('UTC')), 'dag_id': None, 'task_id': None, 'event': 'cli_webserver', 'execution_date': None, 'owner': 'airflow', 'extra': '{"host_name": "ip-10-0-28-2.eu-central-1.compute.internal", "full_command": "\[\'/usr/local/bin/airflow\', \'webserver\']"}'}] 2021-05-28T22:43:42.739+02:00 (Background on this error at: http://sqlalche.me/e/13/f405) 2021-05-28T22:43:42.835+02:00 /usr/local/lib/python3.7/site-packages/airflow/settings.py:385 DeprecationWarning: `session_lifetime_days` option from `\[webserver]` section has been renamed to `session_lifetime_minutes`. The new option allows to configure session lifetime in minutes. The `force_log_out_after` option has been removed from `\[webserver]` section. Please update your configuration. 2021-05-28T22:43:42.902+02:00 ____________ _____________ 2021-05-28T22:43:43.029+02:00 ____ |__( )_________ __/__ /________ __ 2021-05-28T22:43:43.110+02:00 ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / 2021-05-28T22:43:43.234+02:00 ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / 2021-05-28T22:43:43.371+02:00 _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ 2021-05-28T22:43:43.770+02:00 \[\[34m2021-05-28 20:43:42,064\[0m] {{\[34mdagbag.py:\[0m451}} INFO\[0m - Filling up the DagBag from /dev/null\[0m 2021-05-28T22:43:43.885+02:00 \[\[34m2021-05-28 20:43:42,067\[0m] {{\[34mlogging_config.py:\[0m46}} INFO\[0m - Successfully imported user-defined logging config from log_config.LOGGING_CONFIG\[0m 2021-05-28T22:43:43.992+02:00 WARNING:flask_appbuilder.security.manager:No user yet created, use flask fab command to do it. 2021-05-28T22:43:51.448+02:00 Traceback (most recent call last): 2021-05-28T22:43:51.935+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context 2021-05-28T22:43:52.235+02:00 cursor, statement, parameters, context 2021-05-28T22:43:52.535+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute 2021-05-28T22:43:52.734+02:00 cursor.execute(statement, parameters) 2021-05-28T22:43:52.798+02:00 psycopg2.errors.UndefinedTable: relation "dag" does not exist 2021-05-28T22:43:52.907+02:00 LINE 2: FROM dag 2021-05-28T22:43:53.269+02:00 ^ 2021-05-28T22:43:53.328+02:00 2021-05-28T22:43:53.381+02:00 2021-05-28T22:43:53.445+02:00 The above exception was the direct cause of the following exception: 2021-05-28T22:43:53.501+02:00 2021-05-28T22:43:53.563+02:00 Traceback (most recent call last): 2021-05-28T22:43:53.636+02:00 File "/usr/local/bin/airflow", line 8, in <module> 2021-05-28T22:43:53.716+02:00 sys.exit(main()) 2021-05-28T22:43:53.779+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/__main__.py", line 40, in main 2021-05-28T22:43:53.843+02:00 args.func(args) 2021-05-28T22:43:54.124+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 48, in command 2021-05-28T22:43:54.182+02:00 return func(*args, **kwargs) 2021-05-28T22:43:54.229+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 89, in wrapper 2021-05-28T22:43:54.301+02:00 return f(*args, **kwargs) 2021-05-28T22:43:54.357+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/cli/commands/webserver_command.py", line 360, in webserver 2021-05-28T22:43:54.423+02:00 app = cached_app(None) 2021-05-28T22:43:54.468+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/www/app.py", line 145, in cached_app 2021-05-28T22:43:54.530+02:00 app = create_app(config=config, testing=testing) 2021-05-28T22:43:54.590+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/www/app.py", line 133, in create_app 2021-05-28T22:43:54.645+02:00 sync_appbuilder_roles(flask_app) 2021-05-28T22:43:54.699+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/www/app.py", line 65, in sync_appbuilder_roles 2021-05-28T22:43:54.765+02:00 security_manager.sync_roles() 2021-05-28T22:43:54.812+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/www/security.py", line 574, in sync_roles 2021-05-28T22:43:54.879+02:00 self.create_dag_specific_permissions() 2021-05-28T22:43:54.966+02:00 File "/usr/local/lib/python3.7/site-packages/airflow/www/security.py", line 526, in create_dag_specific_permissions 2021-05-28T22:43:55.023+02:00 .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) 2021-05-28T22:43:55.135+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/query.py", line 3373, in all 2021-05-28T22:43:55.404+02:00 return list(self) 2021-05-28T22:43:55.460+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/query.py", line 3535, in __iter__ 2021-05-28T22:43:55.541+02:00 return self._execute_and_instances(context) 2021-05-28T22:43:55.606+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/query.py", line 3560, in _execute_and_instances 2021-05-28T22:43:55.663+02:00 result = conn.execute(querycontext.statement, self._params) 2021-05-28T22:43:56.560+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1011, in execute 2021-05-28T22:43:59.666+02:00 return meth(self, multiparams, params) 2021-05-28T22:43:59.729+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection 2021-05-28T22:44:00.801+02:00 return connection._execute_clauseelement(self, multiparams, params) 2021-05-28T22:44:02.608+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1130, in _execute_clauseelement 2021-05-28T22:44:02.663+02:00 distilled_params, 2021-05-28T22:44:02.806+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1317, in _execute_context 2021-05-28T22:44:02.862+02:00 e, statement, parameters, cursor, context 2021-05-28T22:44:05.052+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1511, in _handle_dbapi_exception 2021-05-28T22:44:05.176+02:00 sqlalchemy_exception, with_traceback=exc_info\[2], from_=e 2021-05-28T22:44:05.241+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/compat.py", line 182, in raise_ 2021-05-28T22:44:05.289+02:00 raise exception 2021-05-28T22:44:05.351+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context 2021-05-28T22:44:05.409+02:00 cursor, statement, parameters, context 2021-05-28T22:44:06.122+02:00 File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 608, in do_execute 2021-05-28T22:44:06.170+02:00 cursor.execute(statement, parameters) 2021-05-28T22:44:06.274+02:00 sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "dag" does not exist 2021-05-28T22:44:06.831+02:00 LINE 2: FROM dag 2021-05-28T22:44:06.884+02:00 ^ 2021-05-28T22:44:06.940+02:00 2021-05-28T22:44:07.210+02:00 \[SQL: SELECT dag.dag_id AS dag_dag_id 2021-05-28T22:44:07.272+02:00 FROM dag 2021-05-28T22:44:07.817+02:00 WHERE dag.is_active OR dag.is_paused] 2021-05-28T22:44:07.872+02:00 (Background on this error at: http://sqlalche.me/e/13/f405)
2
answers
0
votes
3
views
capca5
asked 8 months ago

Can't run airflow backfill via cli

I am trying to run the equivalent of backfill -t TASKNAME DAGNAME I copied the script from <https://docs.aws.amazon.com/mwaa/latest/userguide/access-airflow-ui.html#call-mwaa-apis-cli> , but it looks to me as if the environment running the cli does not have the same packages loaded as per my requirements.txt (Airflow can parse the DAGs) Result is this ``` Traceback (most recent call last): File "/usr/local/bin/airflow", line 37, in <module> args.func(args) File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 76, in wrapper return f(*args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 187, in backfill dag = dag or get_dag(args) File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 164, in get_dag 'parse.'.format(args.dag_id)) airflow.exceptions.AirflowException: dag_id could not be found: sentinel-hourly. Either the dag did not exist or it failed to parse. \[2021-04-01 21:40:27,020] {{__init__.py:50}} INFO - Using executor CeleryExecutor \[2021-04-01 21:40:27,020] {{dagbag.py:417}} INFO - Filling up the DagBag from /usr/local/airflow/dags \[2021-04-01 21:40:27,023] {{dagbag.py:259}} ERROR - Failed to import: /usr/local/airflow/dags/test.py Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 256, in process_file m = imp.load_source(mod_name, filepath) File "/usr/lib64/python3.7/imp.py", line 171, in load_source module = _load(spec) File "<frozen importlib._bootstrap>", line 696, in _load File "<frozen importlib._bootstrap>", line 677, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 728, in exec_module File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/usr/local/airflow/dags/test.py", line 3, in <module> from airflow.providers.amazon.aws.operators.ecs import ECSOperator ModuleNotFoundError: No module named 'airflow.providers' \[2021-04-01 21:40:27,026] {{dagbag.py:259}} ERROR - Failed to import: /usr/local/airflow/dags/sentinel-hourly.py Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 256, in process_file m = imp.load_source(mod_name, filepath) File "/usr/lib64/python3.7/imp.py", line 171, in load_source module = _load(spec) File "<frozen importlib._bootstrap>", line 696, in _load File "<frozen importlib._bootstrap>", line 677, in _load_unlocked File "<frozen importlib._bootstrap_external>", line 728, in exec_module File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed File "/usr/local/airflow/dags/sentinel-hourly.py", line 3, in <module> from airflow.providers.amazon.aws.operators.ecs import ECSOperator ModuleNotFoundError: No module named 'airflow.providers' ```
2
answers
0
votes
0
views
tth-pl
asked 9 months ago

Fetching logs in Airflow UI results in a timeout

Recently, we can not see the logs in the Airflow UI anymore because the corresponding requests ends in a 504 (Gateway Timeout): https://<uuid>.c3.eu-central-1.airflow.amazonaws.com/get_logs_with_metadata?dag_id=s3_example_dag&task_id=write-s3-task&execution_date=2021-02-24T08%3A58%3A58.485414%2B00%3A00&try_number=1&metadata=null Is there a permission that is being required by this? My user that logs into the UI has Admin permissions, so this should not be the issue. We use the following Airflow configuration: resource "aws_security_group" "airflow" { vpc_id = var.vpc_id egress { from_port = 0 to_port = 0 protocol = "-1" cidr_blocks = \["0.0.0.0/0"] } ingress { from_port = 0 to_port = 0 protocol = "-1" cidr_blocks = \["0.0.0.0/0"] } } resource "aws_mwaa_environment" "airflow" { execution_role_arn = aws_iam_role.airflow.arn name = "<name>" source_bucket_arn = aws_s3_bucket.airflow.arn dag_s3_path = aws_s3_bucket_object.dags.key requirements_s3_path = "requirements.txt" network_configuration { security_group_ids = \[ aws_security_group.airflow.id ] subnet_ids = var.private_subnets } airflow_configuration_options = { "secrets.backend" = "airflow.contrib.secrets.aws_systems_manager.SystemsManagerParameterStoreBackend" } webserver_access_mode = "PUBLIC_ONLY" } The logs are correctly written in CloudWatch and in the UI the tasks are also being executed. Only when we want to display the logs in the Airflow UI, it ends in a timeout. Is there any configuration that we are missing? Thanks! Edited by: capca5 on Feb 24, 2021 4:41 AM Edited by: capca5 on Feb 24, 2021 4:42 AM
2
answers
0
votes
2
views
capca5
asked a year ago

Deploy DAG zip package

Hi all! There seems to be a problem with DAG zip package deployment in MWAA (see https://airflow.apache.org/docs/apache-airflow/stable/concepts.html?highlight=zip#packaged-dags). I created a simple "hello world" DAG, placed the Python file in dags, MWAA picked it up no problem, all good. Zipped the same Python file in a zip and uploaded it to dags, getting the following error in the DAGProcessing log: \[2021-01-28 21:12:44,726] {{logging_mixin.py:112}} WARNING - /usr/local/lib64/python3.7/site-packages/sqlalchemy/sql/sqltypes.py:270: SAWarning: Unicode type received non-unicode bind param value "b'# -**- coding: utf-8 -**-n#'...". (this warning may be suppressed after 10 occurrences) (util.ellipses_string(value),), \[2021-01-28 21:12:44,759] {{scheduler_job.py:167}} ERROR - Got an exception! Propagating...Traceback (most recent call last): File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context cursor, statement, parameters, context File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1084, in execute return self.executemany(operation, \[parameters]) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1109, in executemany rows = self._src.execute(sql)pg.ProgrammingError: ERROR: syntax error at or near ":"LINE 1: ...:12:44.721535_00:00'::timestamptz, 'b'# -**- coding: utf-8 -**... ^The above exception was the direct cause of the following exception:Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 159, in _run_file_processor pickle_dags) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(**args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file dag.sync_to_db() File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(**args, **kwargs) File "/usr/local/lib/python3.7/site-packages/airflow/models/dag.py", line 1538, in sync_to_db DagCode.bulk_sync_to_db(\[orm_dag.fileloc]) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper return func(**args, **kwargs) File "/usr/lib64/python3.7/contextlib.py", line 119, in __exit__ next(self.gen) File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 45, in create_session session.commit() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 1042, in commit self.transaction.commit() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 504, in commit self._prepare_impl() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 483, in _prepare_impl self.session.flush() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 2523, in flush self._flush(objects) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 2664, in _flush transaction.rollback(_capture_exception=True) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 69, in __exit__ exc_value, with_traceback=exc_tb, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_ raise exception File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/session.py", line 2624, in _flush flush_context.execute() File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute rec.execute(self) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute uow, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj insert, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1083, in _emit_insert_statements c = cached_connections\[connection].execute(statement, multiparams) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1014, in execute return meth(self, multiparams, params) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection return connection._execute_clauseelement(self, multiparams, params) File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1133, in _execute_clauseelement distilled_params, File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1318, in _execute_context e, statement, parameters, cursor, context File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1512, in _handle_dbapi_exception sqlalchemy_exception, with_traceback=exc_info\[2], from_=e File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_ raise exception File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context cursor, statement, parameters, context File "/usr/local/lib64/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute cursor.execute(statement, parameters) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1084, in execute return self.executemany(operation, \[parameters]) File "/usr/local/lib64/python3.7/site-packages/pgdb.py", line 1109, in executemany rows = self._src.execute(sql)sqlalchemy.exc.ProgrammingError: (pg.ProgrammingError) ERROR: syntax error at or near ":"LINE 1: ...:12:44.721535_00:00'::timestamptz, 'b'# -**- coding: utf-8 -**... ^\[SQL: INSERT INTO dag_code (fileloc_hash, fileloc, last_updated, source_code) VALUES (%(fileloc_hash)s, %(fileloc)s, %(last_updated)s, %(source_code)s)]\[parameters: {'fileloc_hash': 57638855142936464, 'fileloc': '/usr/local/airflow/dags/test-dag.zip/test-dag.py', 'last_updated': datetime.datetime(2021, 1, 28, 21, 12, 44, 721535, tzinfo=<Timezone \[UTC]>), 'source_code': b'# -**- coding: utf-8 -*-\n#\n# Licensed to the Apache Software Foundation (ASF) under one\n# or more contributor license agreements. See the NOTICE ... (3380 characters truncated) ... plated_command,\n params={\'my_param\': \'Parameter I passed in\'},\n dag=dag,\n)\n# \[END jinja_template]\n\nt1 >> \[t2, t3]\n# \[END tutorial]\n'}](Background on this error at: http://sqlalche.me/e/13/f405) Apparently, if comes from a zip, MWAA tries to put the DAG source code into the metadata database as binary and Postgres doesn't like it. Has anyone tried deploying zip packages and resolved this problem? Thanks! Lev.
2
answers
0
votes
0
views
levahim
asked a year ago
  • 1
  • 90 / page