-
Couldn't load subscription status.
- Fork 15.9k
Open
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.8.1
What happened?
We are using deferred operators to execute jobs in databricks. These jobs utlize a common database so we use task pools to limit the concurrency to 1 task. This pool includes deferred operators. In some cases we see task timeouts, even though the deferred task successfully finished. You can see 1.5h passing between trigger event and scheduling:
[2024-12-06, 14:01:10 CET] {{taskinstance.py:2344}} INFO - Pausing task as DEFERRED. dag_id=my-dag, task_id=my-task, execution_date=20241205T130000, start_date=20241206T130108
[2024-12-06, 14:01:10 CET] {{local_task_job_runner.py:231}} INFO - Task exited with return code 100 (task deferral)
[2024-12-06, 14:01:11 CET] {{base.py:83}} INFO - Using connection ID 'databricks' for task execution.
[2024-12-06, 14:01:11 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'PENDING', 'result_state': '', 'state_message': 'Waiting for cluster'}. sleeping for 30 seconds
...
[2024-12-06, 14:09:42 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': 'In run'}. sleeping for 30 seconds
[2024-12-06, 14:10:12 CET] {{databricks.py:94}} INFO - run-id 847717920033451 in run state {'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': 'In run'}. sleeping for 30 seconds
[2024-12-06, 14:10:42 CET] {{triggerer_job_runner.py:602}} INFO - Trigger my-dag/scheduled__2024-12-05T13:00:00+00:00/my-task/-1/1 (ID 10030) fired: TriggerEvent<{'run_id': 847717920033451, 'run_page_url': '...', 'run_state': '{"life_cycle_state": "TERMINATED", "result_state": "SUCCESS", "state_message": ""}'}>
[2024-12-06, 15:38:27 CET] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: my-dag.my-task scheduled__2024-12-05T13:00:00+00:00 [queued]>
...
[2024-12-06, 15:38:27 CET] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 425, in _execute_task
raise AirflowTaskTimeout()
airflow.exceptions.AirflowTaskTimeout
Our assumption of what happens in the following:
- Many tasks are waiting to be executed but are limited by the pool
- Task starts running and is deferred (pool slot is consumed)
- Deferred task is running in the triggerer (pool slot is consumed)
- Deferred task emits trigger event and stops (pool slot is released)
- As the pool slot is released, another task starts running (pool slot is consumed again)
- The post-deferral task for our previous task is scheduled, but cannot run due to unavailable pool slots.
- After the task that got scheduled in between finishes and the pool is released, the post-deferral task runs and times out immediately.
What you think should happen instead?
I see multiple things that could improve this behavior:
- Tasks waking up after deferral do not consume slots within task pools.
- Tasks waking up have priority over other tasks when making scheduling decisions.
- Tasks waking up have their own timeout for the post-deferral trigger.
How to reproduce
- Create a DAG with many deferrable tasks sharing a single task pool.
- Reduce pool capacity to 1 and enable
Include Deferred. - Observe that sometimes a new task is scheduled before the post-deferral task is being scheduled.
Operating System
Amazon Linux 2
Versions of Apache Airflow Providers
No response
Deployment
Amazon (AWS) MWAA
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
raphaelauv