Skip to content

Commit 316fcbd

Browse files
committed
Put AIP-52 setup/teardown tasks behind feature flag
We aren't going to land AIP-52 in time for 2.6, so put the authoring api behind a feature flag. I've chosen to put it in `airflow.settings` so users can set it in `airflow_local_settings`, or set it via env var.
1 parent f17eace commit 316fcbd

File tree

14 files changed

+108
-57
lines changed

14 files changed

+108
-57
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ env:
4646
USE_SUDO: "true"
4747
INCLUDE_SUCCESS_OUTPUTS: "true"
4848
AIRFLOW_ENABLE_AIP_44: "true"
49+
AIRFLOW_ENABLE_AIP_52: "true"
4950

5051
concurrency:
5152
group: ci-${{ github.event.pull_request.number || github.ref }}

airflow/decorators/setup_teardown.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@
2222
from airflow import AirflowException
2323
from airflow.decorators import python_task
2424
from airflow.decorators.task_group import _TaskGroupFactory
25+
from airflow.settings import _ENABLE_AIP_52
2526

2627

2728
def setup_task(func: Callable) -> Callable:
29+
if not _ENABLE_AIP_52:
30+
raise AirflowException("AIP-52 Setup tasks are disabled.")
31+
2832
# Using FunctionType here since _TaskDecorator is also a callable
2933
if isinstance(func, types.FunctionType):
3034
func = python_task(func)
@@ -35,6 +39,9 @@ def setup_task(func: Callable) -> Callable:
3539

3640

3741
def teardown_task(_func=None, *, on_failure_fail_dagrun: bool = False) -> Callable:
42+
if not _ENABLE_AIP_52:
43+
raise AirflowException("AIP-52 Teardown tasks are disabled.")
44+
3845
def teardown(func: Callable) -> Callable:
3946
# Using FunctionType here since _TaskDecorator is also a callable
4047
if isinstance(func, types.FunctionType):

airflow/example_dags/example_setup_teardown.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,25 @@
2222

2323
from airflow.models.dag import DAG
2424
from airflow.operators.bash import BashOperator
25+
from airflow.settings import _ENABLE_AIP_52
2526
from airflow.utils.task_group import TaskGroup
2627

27-
with DAG(
28-
dag_id="example_setup_teardown",
29-
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
30-
catchup=False,
31-
tags=["example"],
32-
) as dag:
33-
BashOperator.as_setup(task_id="root_setup", bash_command="echo 'Hello from root_setup'")
34-
normal = BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
35-
BashOperator.as_teardown(task_id="root_teardown", bash_command="echo 'Goodbye from root_teardown'")
28+
if _ENABLE_AIP_52:
29+
with DAG(
30+
dag_id="example_setup_teardown",
31+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
32+
catchup=False,
33+
tags=["example"],
34+
) as dag:
35+
BashOperator.as_setup(task_id="root_setup", bash_command="echo 'Hello from root_setup'")
36+
normal = BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
37+
BashOperator.as_teardown(task_id="root_teardown", bash_command="echo 'Goodbye from root_teardown'")
3638

37-
with TaskGroup("section_1") as section_1:
38-
BashOperator.as_setup(task_id="taskgroup_setup", bash_command="echo 'Hello from taskgroup_setup'")
39-
BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
40-
BashOperator.as_setup(
41-
task_id="taskgroup_teardown", bash_command="echo 'Hello from taskgroup_teardown'"
42-
)
39+
with TaskGroup("section_1") as section_1:
40+
BashOperator.as_setup(task_id="taskgroup_setup", bash_command="echo 'Hello from taskgroup_setup'")
41+
BashOperator(task_id="normal", bash_command="echo 'I am just a normal task'")
42+
BashOperator.as_setup(
43+
task_id="taskgroup_teardown", bash_command="echo 'Hello from taskgroup_teardown'"
44+
)
4345

44-
normal >> section_1
46+
normal >> section_1

airflow/example_dags/example_setup_teardown_taskflow.py

Lines changed: 38 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,49 +22,51 @@
2222

2323
from airflow.decorators import setup, task, task_group, teardown
2424
from airflow.models.dag import DAG
25+
from airflow.settings import _ENABLE_AIP_52
2526

26-
with DAG(
27-
dag_id="example_setup_teardown_taskflow",
28-
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
29-
catchup=False,
30-
tags=["example"],
31-
) as dag:
32-
# You can use the setup and teardown decorators to add setup and teardown tasks at the DAG level
33-
@setup
34-
@task
35-
def root_setup():
36-
print("Hello from root_setup")
37-
38-
@teardown
39-
@task
40-
def root_teardown():
41-
print("Goodbye from root_teardown")
42-
43-
@task
44-
def normal():
45-
print("I am just a normal task")
46-
47-
@task_group
48-
def section_1():
49-
# You can also have setup and teardown tasks at the task group level
27+
if _ENABLE_AIP_52:
28+
with DAG(
29+
dag_id="example_setup_teardown_taskflow",
30+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
31+
catchup=False,
32+
tags=["example"],
33+
) as dag:
34+
# You can use the setup and teardown decorators to add setup and teardown tasks at the DAG level
5035
@setup
5136
@task
52-
def my_setup():
53-
print("I set up")
37+
def root_setup():
38+
print("Hello from root_setup")
5439

5540
@teardown
5641
@task
57-
def my_teardown():
58-
print("I tear down")
42+
def root_teardown():
43+
print("Goodbye from root_teardown")
5944

6045
@task
61-
def hello():
62-
print("I say hello")
46+
def normal():
47+
print("I am just a normal task")
48+
49+
@task_group
50+
def section_1():
51+
# You can also have setup and teardown tasks at the task group level
52+
@setup
53+
@task
54+
def my_setup():
55+
print("I set up")
56+
57+
@teardown
58+
@task
59+
def my_teardown():
60+
print("I tear down")
61+
62+
@task
63+
def hello():
64+
print("I say hello")
6365

64-
my_setup()
65-
hello()
66-
my_teardown()
66+
my_setup()
67+
hello()
68+
my_teardown()
6769

68-
root_setup()
69-
normal() >> section_1()
70-
root_teardown()
70+
root_setup()
71+
normal() >> section_1()
72+
root_teardown()

airflow/models/baseoperator.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,12 +925,22 @@ def __init__(
925925

926926
@classmethod
927927
def as_setup(cls, *args, **kwargs):
928+
from airflow.settings import _ENABLE_AIP_52
929+
930+
if not _ENABLE_AIP_52:
931+
raise AirflowException("AIP-52 Setup tasks are disabled.")
932+
928933
op = cls(*args, **kwargs)
929934
op._is_setup = True
930935
return op
931936

932937
@classmethod
933938
def as_teardown(cls, *args, **kwargs):
939+
from airflow.settings import _ENABLE_AIP_52
940+
941+
if not _ENABLE_AIP_52:
942+
raise AirflowException("AIP-52 Teardown tasks are disabled.")
943+
934944
on_failure_fail_dagrun = kwargs.pop("on_failure_fail_dagrun", False)
935945
op = cls(*args, **kwargs)
936946
op._is_teardown = True

airflow/settings.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ def initialize():
610610
# AIP-52: setup/teardown (experimental)
611611
# This feature is not complete yet, so we disable it by default.
612612
_ENABLE_AIP_52 = os.environ.get("AIRFLOW_ENABLE_AIP_52", "false").lower() in {"true", "t", "yes", "y", "1"}
613+
613614
# AIP-44: internal_api (experimental)
614615
# This feature is not complete yet, so we disable it by default.
615-
_ENABLE_AIP_44 = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in ("true", "t", "yes", "y", "1")
616+
_ENABLE_AIP_44 = os.environ.get("AIRFLOW_ENABLE_AIP_44", "false").lower() in {"true", "t", "yes", "y", "1"}

tests/always/test_example_dags.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import pytest
2424

2525
from airflow.models import DagBag
26+
from airflow.settings import _ENABLE_AIP_52
2627
from airflow.utils import yaml
2728
from tests.test_utils.asserts import assert_queries_count
2829

@@ -59,8 +60,12 @@ def example_not_suspended_dags():
5960
for example_dir in example_dirs:
6061
candidates = glob(f"{AIRFLOW_SOURCES_ROOT.as_posix()}/{example_dir}", recursive=True)
6162
for candidate in candidates:
62-
if not any(candidate.startswith(s) for s in suspended_providers_folders):
63-
yield candidate
63+
if any(candidate.startswith(s) for s in suspended_providers_folders):
64+
continue
65+
# we will also suspend AIP-52 DAGs unless it is enabled
66+
if not _ENABLE_AIP_52 and "example_setup_teardown" in candidate:
67+
continue
68+
yield candidate
6469

6570

6671
def example_dags_except_db_exception():

tests/decorators/test_external_python.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import pytest
2929

3030
from airflow.decorators import setup, task, teardown
31+
from airflow.settings import _ENABLE_AIP_52
3132
from airflow.utils import timezone
3233

3334
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
@@ -126,6 +127,7 @@ def f(_):
126127

127128
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
128129

130+
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
129131
def test_marking_external_python_task_as_setup(self, dag_maker, venv_python):
130132
@setup
131133
@task.external_python(python=venv_python)
@@ -140,6 +142,7 @@ def f():
140142
assert setup_task._is_setup
141143
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
142144

145+
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
143146
def test_marking_external_python_task_as_teardown(self, dag_maker, venv_python):
144147
@teardown
145148
@task.external_python(python=venv_python)
@@ -154,6 +157,7 @@ def f():
154157
assert teardown_task._is_teardown
155158
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
156159

160+
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
157161
@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
158162
def test_marking_external_python_task_as_teardown_with_on_failure_fail(
159163
self, dag_maker, on_failure_fail_dagrun, venv_python

tests/decorators/test_python_virtualenv.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import pytest
2525

2626
from airflow.decorators import setup, task, teardown
27+
from airflow.settings import _ENABLE_AIP_52
2728
from airflow.utils import timezone
2829

2930
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
@@ -177,6 +178,7 @@ def f(_):
177178

178179
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
179180

181+
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
180182
def test_marking_virtualenv_python_task_as_setup(self, dag_maker):
181183
@setup
182184
@task.virtualenv
@@ -191,6 +193,7 @@ def f():
191193
assert setup_task._is_setup
192194
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
193195

196+
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
194197
def test_marking_virtualenv_python_task_as_teardown(self, dag_maker):
195198
@teardown
196199
@task.virtualenv
@@ -205,6 +208,7 @@ def f():
205208
assert teardown_task._is_teardown
206209
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
207210

211+
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
208212
@pytest.mark.parametrize("on_failure_fail_dagrun", [True, False])
209213
def test_marking_virtualenv_python_task_as_teardown_with_on_failure_fail(
210214
self, dag_maker, on_failure_fail_dagrun

tests/decorators/test_setup_teardown.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
from airflow import AirflowException
2323
from airflow.decorators import setup, task, task_group, teardown
2424
from airflow.operators.bash import BashOperator
25+
from airflow.settings import _ENABLE_AIP_52
2526

2627

28+
@pytest.mark.skipif(not _ENABLE_AIP_52, reason="AIP-52 is disabled")
2729
class TestSetupTearDownTask:
2830
def test_marking_functions_as_setup_task(self, dag_maker):
2931
@setup

0 commit comments

Comments
 (0)