Skip to content

Commit f366d95

Browse files
authored
Fix calculation of health check threshold for SchedulerJob (#31277)
The change ##30302 split Job from JobRunner, but it missed the fact that SchedulerJob had a special case of checking the threshold - instead of using the standard grace multiplier, it used whatever has been defined in the `scheduler_helth_check_threshold`. The `is_alive` method in SchedulerJobRunner has remained unused, and default 2.1 grace multiplier has been used for both /health endpoint and `airflow jobs check`. This PR brings the exception for SchedulerJob back and clarifies that the same treshold is also used for airflow jobs check in the documentation. Fixes: #31200
1 parent 3193857 commit f366d95

File tree

6 files changed

+40
-25
lines changed

6 files changed

+40
-25
lines changed

airflow/config_templates/config.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2294,7 +2294,8 @@ scheduler:
22942294
description: |
22952295
If the last scheduler heartbeat happened more than scheduler_health_check_threshold
22962296
ago (in seconds), scheduler is considered unhealthy.
2297-
This is used by the health check in the "/health" endpoint
2297+
This is used by the health check in the "/health" endpoint and in `airflow jobs check` CLI
2298+
for SchedulerJob.
22982299
version_added: 1.10.2
22992300
type: string
23002301
example: ~

airflow/config_templates/default_airflow.cfg

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1183,7 +1183,8 @@ pool_metrics_interval = 5.0
11831183

11841184
# If the last scheduler heartbeat happened more than scheduler_health_check_threshold
11851185
# ago (in seconds), scheduler is considered unhealthy.
1186-
# This is used by the health check in the "/health" endpoint
1186+
# This is used by the health check in the "/health" endpoint and in `airflow jobs check` CLI
1187+
# for SchedulerJob.
11871188
scheduler_health_check_threshold = 30
11881189

11891190
# When you start a scheduler, airflow starts a tiny web server

airflow/jobs/job.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,13 @@ def is_alive(self, grace_multiplier=2.1):
130130
:param grace_multiplier: multiplier of heartrate to require heart beat
131131
within
132132
"""
133+
if self.job_type == "SchedulerJob":
134+
health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
135+
else:
136+
health_check_threshold: int = self.heartrate * grace_multiplier
133137
return (
134138
self.state == State.RUNNING
135-
and (timezone.utcnow() - self.latest_heartbeat).total_seconds()
136-
< self.heartrate * grace_multiplier
139+
and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold
137140
)
138141

139142
@provide_session

airflow/jobs/scheduler_job_runner.py

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -264,27 +264,6 @@ def _debug_dump(self, signum: int, frame: FrameType | None) -> None:
264264
self.job.executor.debug_dump()
265265
self.log.info("-" * 80)
266266

267-
def is_alive(self, grace_multiplier: float | None = None) -> bool:
268-
"""
269-
Whether the SchedulerJob is alive.
270-
271-
We define alive as in a state of running and a heartbeat within the
272-
threshold defined in the ``scheduler_health_check_threshold`` config
273-
setting.
274-
275-
``grace_multiplier`` is accepted for compatibility with the parent class.
276-
277-
"""
278-
if grace_multiplier is not None:
279-
# Accept the same behaviour as superclass
280-
return self.job.is_alive(grace_multiplier=grace_multiplier)
281-
scheduler_health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
282-
return (
283-
self.job.state == State.RUNNING
284-
and (timezone.utcnow() - self.job.latest_heartbeat).total_seconds()
285-
< scheduler_health_check_threshold
286-
)
287-
288267
def __get_concurrency_maps(self, states: Iterable[TaskInstanceState], session: Session) -> ConcurrencyMap:
289268
"""
290269
Get the concurrency maps.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
Clarifications of the external Health Check mechanism and using ``Job`` classes.
2+
3+
In the past SchedulerJob and other ``*Job`` classes are known to have been used to perform
4+
external health checks for Airflow components. Those are, however, Airflow DB ORM related classes.
5+
The DB models and database structure of Airflow are considered as internal implementation detail, following
6+
`public interface <https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html>`_).
7+
Therefore, they should not be used for external health checks. Instead, you should use the
8+
``airflow jobs check`` CLI command (introduced in Airflow 2.1) for that purpose.

tests/jobs/test_base_job.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,29 @@ def test_is_alive(self):
148148
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
149149
assert job.is_alive() is False, "Completed jobs even with recent heartbeat should not be alive"
150150

151+
def test_is_alive_scheduler(self):
152+
job = Job(heartrate=10, state=State.RUNNING, job_type="SchedulerJob")
153+
assert job.is_alive() is True
154+
155+
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
156+
assert job.is_alive() is True
157+
158+
# default health-check grace period for scheduler job is not heartrate*2.1 but 30 seconds
159+
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=21)
160+
assert job.is_alive() is True
161+
162+
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
163+
assert job.is_alive() is False
164+
165+
# test because .seconds was used before instead of total_seconds
166+
# internal repr of datetime is (days, seconds)
167+
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
168+
assert job.is_alive() is False
169+
170+
job.state = State.SUCCESS
171+
job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
172+
assert job.is_alive() is False, "Completed jobs even with recent heartbeat should not be alive"
173+
151174
@patch("airflow.jobs.job.create_session")
152175
def test_heartbeat_failed(self, mock_create_session):
153176
when = timezone.utcnow() - datetime.timedelta(seconds=60)

0 commit comments

Comments
 (0)