Skip to content

Commit e785922

Browse files
committed
Rename JobRunner modules to *_job_runner and base_job* to job
The #30255 introduced "JobRunner" concept and decoupled the job logic from the ORM polymorphic *Job objects. The change was implemented in the way to minimise the review effort needed, so it avoided renaming the modules for the runners (from `_job` to `_job_runner`). Also BaseJob lost its "polymorphism" properties so the package, and class name can be renamed to simply job. This PR completes the JobRunner concept introduction by applying the renames. Closes: #30296
1 parent 6cd19b0 commit e785922

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+531
-560
lines changed

airflow/api_connexion/endpoints/health_endpoint.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
from airflow.api_connexion.schemas.health_schema import health_schema
2020
from airflow.api_connexion.types import APIResponse
21-
from airflow.jobs.scheduler_job import SchedulerJobRunner
22-
from airflow.jobs.triggerer_job import TriggererJobRunner
21+
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
22+
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
2323

2424
HEALTHY = "healthy"
2525
UNHEALTHY = "unhealthy"

airflow/api_connexion/schemas/job_schema.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
2121

22-
from airflow.jobs.base_job import BaseJob
22+
from airflow.jobs.job import Job
2323

2424

2525
class JobSchema(SQLAlchemySchema):
@@ -28,7 +28,7 @@ class JobSchema(SQLAlchemySchema):
2828
class Meta:
2929
"""Meta."""
3030

31-
model = BaseJob
31+
model = Job
3232

3333
id = auto_field(dump_only=True)
3434
dag_id = auto_field(dump_only=True)

airflow/cli/commands/dag_command.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from airflow.cli.simple_table import AirflowConsole
3636
from airflow.configuration import conf
3737
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
38-
from airflow.jobs.base_job import BaseJob
38+
from airflow.jobs.job import Job
3939
from airflow.models import DagBag, DagModel, DagRun, TaskInstance
4040
from airflow.models.dag import DAG
4141
from airflow.models.serialized_dag import SerializedDagModel
@@ -391,15 +391,13 @@ def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION)
391391

392392
if not dag:
393393
raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
394-
queries.append(BaseJob.dag_id == args.dag_id)
394+
queries.append(Job.dag_id == args.dag_id)
395395

396396
if args.state:
397-
queries.append(BaseJob.state == args.state)
397+
queries.append(Job.state == args.state)
398398

399399
fields = ["dag_id", "state", "job_type", "start_date", "end_date"]
400-
all_jobs = (
401-
session.query(BaseJob).filter(*queries).order_by(BaseJob.start_date.desc()).limit(args.limit).all()
402-
)
400+
all_jobs = session.query(Job).filter(*queries).order_by(Job.start_date.desc()).limit(args.limit).all()
403401
all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs]
404402

405403
AirflowConsole().print_as(

airflow/cli/commands/dag_processor_command.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626

2727
from airflow import settings
2828
from airflow.configuration import conf
29-
from airflow.jobs.base_job import BaseJob
29+
from airflow.jobs.job import Job
3030
from airflow.utils import cli as cli_utils
3131
from airflow.utils.cli import setup_locations, setup_logging
3232

3333
log = logging.getLogger(__name__)
3434

3535

36-
def _create_dag_processor_job(args: Any) -> BaseJob:
36+
def _create_dag_processor_job(args: Any) -> Job:
3737
"""Creates DagFileProcessorProcess instance."""
3838
from airflow.dag_processing.manager import DagFileProcessorManager
3939

@@ -47,7 +47,7 @@ def _create_dag_processor_job(args: Any) -> BaseJob:
4747
dag_ids=[],
4848
pickle_dags=args.do_pickle,
4949
)
50-
return BaseJob(
50+
return Job(
5151
job_runner=processor.job_runner,
5252
)
5353

airflow/cli/commands/jobs_command.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from sqlalchemy.orm import Session
2020

21-
from airflow.jobs.base_job import BaseJob
21+
from airflow.jobs.job import Job
2222
from airflow.utils.net import get_hostname
2323
from airflow.utils.session import NEW_SESSION, provide_session
2424
from airflow.utils.state import State
@@ -32,22 +32,17 @@ def check(args, session: Session = NEW_SESSION) -> None:
3232
if args.hostname and args.local:
3333
raise SystemExit("You can't use --hostname and --local at the same time")
3434

35-
query = (
36-
session.query(BaseJob)
37-
.filter(BaseJob.state == State.RUNNING)
38-
.order_by(BaseJob.latest_heartbeat.desc())
39-
)
35+
query = session.query(Job).filter(Job.state == State.RUNNING).order_by(Job.latest_heartbeat.desc())
4036
if args.job_type:
41-
query = query.filter(BaseJob.job_type == args.job_type)
37+
query = query.filter(Job.job_type == args.job_type)
4238
if args.hostname:
43-
query = query.filter(BaseJob.hostname == args.hostname)
39+
query = query.filter(Job.hostname == args.hostname)
4440
if args.local:
45-
query = query.filter(BaseJob.hostname == get_hostname())
41+
query = query.filter(Job.hostname == get_hostname())
4642
if args.limit > 0:
4743
query = query.limit(args.limit)
4844

49-
jobs: list[BaseJob] = query.all()
50-
alive_jobs = [job for job in jobs if job.is_alive()]
45+
alive_jobs: list[Job] = [job for job in query.all() if job.is_alive()]
5146

5247
count_alive_jobs = len(alive_jobs)
5348
if count_alive_jobs == 0:

airflow/cli/commands/scheduler_command.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@
2828
from airflow.api_internal.internal_api_call import InternalApiConfig
2929
from airflow.configuration import conf
3030
from airflow.executors.executor_loader import ExecutorLoader
31-
from airflow.jobs.base_job import BaseJob
32-
from airflow.jobs.scheduler_job import SchedulerJobRunner
31+
from airflow.jobs.job import Job
32+
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
3333
from airflow.utils import cli as cli_utils
3434
from airflow.utils.cli import process_subdir, setup_locations, setup_logging, sigint_handler, sigquit_handler
3535
from airflow.utils.scheduler_health import serve_health_check
3636

3737

38-
def _run_scheduler_job(job: BaseJob, *, skip_serve_logs: bool) -> None:
38+
def _run_scheduler_job(job: Job, *, skip_serve_logs: bool) -> None:
3939
InternalApiConfig.force_database_direct_access()
4040
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
4141
with _serve_logs(skip_serve_logs), _serve_health_check(enable_health_check):
@@ -47,7 +47,7 @@ def scheduler(args):
4747
"""Starts Airflow Scheduler."""
4848
print(settings.HEADER)
4949

50-
job = BaseJob(
50+
job = Job(
5151
job_runner=SchedulerJobRunner(
5252
subdir=process_subdir(args.subdir),
5353
num_runs=args.num_runs,

airflow/cli/commands/standalone_command.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
from airflow.configuration import AIRFLOW_HOME, conf, make_group_other_inaccessible
3131
from airflow.executors import executor_constants
3232
from airflow.executors.executor_loader import ExecutorLoader
33-
from airflow.jobs.base_job import most_recent_job
34-
from airflow.jobs.job_runner import BaseJobRunner
35-
from airflow.jobs.scheduler_job import SchedulerJobRunner
36-
from airflow.jobs.triggerer_job import TriggererJobRunner
33+
from airflow.jobs.base_job_runner import BaseJobRunner
34+
from airflow.jobs.job import most_recent_job
35+
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
36+
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
3737
from airflow.utils import db
3838

3939

airflow/cli/commands/task_command.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737
from airflow.configuration import conf
3838
from airflow.exceptions import AirflowException, DagRunNotFound, TaskInstanceNotFound
3939
from airflow.executors.executor_loader import ExecutorLoader
40-
from airflow.jobs.base_job import BaseJob
41-
from airflow.jobs.local_task_job import LocalTaskJobRunner
40+
from airflow.jobs.job import Job
41+
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
4242
from airflow.listeners.listener import get_listener_manager
4343
from airflow.models import DagPickle, TaskInstance
4444
from airflow.models.dag import DAG
@@ -260,7 +260,7 @@ def _run_task_by_local_task_job(args, ti: TaskInstance) -> TaskReturnCode | None
260260
pool=args.pool,
261261
external_executor_id=_extract_external_executor_id(args),
262262
)
263-
run_job = BaseJob(
263+
run_job = Job(
264264
job_runner=local_task_job_runner,
265265
dag_id=ti.dag_id,
266266
)

airflow/cli/commands/triggerer_command.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828

2929
from airflow import settings
3030
from airflow.configuration import conf
31-
from airflow.jobs.base_job import BaseJob
32-
from airflow.jobs.triggerer_job import TriggererJobRunner
31+
from airflow.jobs.job import Job
32+
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
3333
from airflow.utils import cli as cli_utils
3434
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler, sigquit_handler
3535
from airflow.utils.serve_logs import serve_logs
@@ -56,7 +56,7 @@ def triggerer(args):
5656
settings.MASK_SECRETS_IN_LOGS = True
5757
print(settings.HEADER)
5858
triggerer_job_runner = TriggererJobRunner(capacity=args.capacity)
59-
job = BaseJob(job_runner=triggerer_job_runner)
59+
job = Job(job_runner=triggerer_job_runner)
6060

6161
if args.daemon:
6262
pid, stdout, stderr, log_file = setup_locations(

airflow/dag_processing/manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
4747
from airflow.configuration import conf
4848
from airflow.dag_processing.processor import DagFileProcessorProcess
49-
from airflow.jobs.base_job import perform_heartbeat
49+
from airflow.jobs.job import perform_heartbeat
5050
from airflow.models import errors
5151
from airflow.models.dag import DagModel
5252
from airflow.models.dagwarning import DagWarning
@@ -68,7 +68,7 @@
6868
from airflow.utils.sqlalchemy import prohibit_commit, skip_locked, with_row_locks
6969

7070
if TYPE_CHECKING:
71-
from airflow.jobs.dag_processor_job import DagProcessorJobRunner
71+
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
7272

7373

7474
class DagParsingStat(NamedTuple):
@@ -385,7 +385,7 @@ def __init__(
385385
signal_conn: MultiprocessingConnection | None = None,
386386
async_mode: bool = True,
387387
):
388-
from airflow.jobs.dag_processor_job import DagProcessorJobRunner
388+
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
389389

390390
super().__init__()
391391
# known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly

0 commit comments

Comments
 (0)