Skip to content

Commit 2afc493

Browse files
Fix task completion time to use airflow task end time instead of dag end time (#869)
Signed-off-by: henneberger <[email protected]> Co-authored-by: Willy Lulciuc <[email protected]>
1 parent 3c6cde6 commit 2afc493

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

integrations/airflow/marquez_airflow/dag.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,13 @@ def _report_task_instance(self, ti, dagrun, run_args, session):
169169
self.log.debug(f'Setting task state: {ti.state}'
170170
f' for {ti.task_id}')
171171
if ti.state in {State.SUCCESS, State.SKIPPED}:
172-
self._marquez.complete_run(run_id)
172+
self._marquez.complete_run(
173+
run_id,
174+
DagUtils.to_iso_8601(ti.end_date))
173175
else:
174-
self._marquez.fail_run(run_id)
176+
self._marquez.fail_run(
177+
run_id,
178+
DagUtils.to_iso_8601(ti.end_date))
175179

176180
def _extract_metadata(self, dagrun, task, ti=None):
177181
extractor = self._get_extractor(task)

integrations/airflow/marquez_airflow/marquez.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,13 +98,13 @@ def _register_dataset(self, step_dataset, run_id=None):
9898
inputs = self._to_dataset_ids(step_dataset)
9999
return inputs
100100

101-
def complete_run(self, run_id):
101+
def complete_run(self, run_id, at):
102102
self.get_or_create_marquez_client(). \
103-
mark_job_run_as_completed(run_id=run_id)
103+
mark_job_run_as_completed(run_id=run_id, at=at)
104104

105-
def fail_run(self, run_id):
105+
def fail_run(self, run_id, at):
106106
self.get_or_create_marquez_client().mark_job_run_as_failed(
107-
run_id=run_id)
107+
run_id=run_id, at=at)
108108

109109
def start_run(self, run_id, start):
110110
self.get_or_create_marquez_client() \

integrations/airflow/tests/test_marquez_dag.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ def test_marquez_dag(mock_get_or_create_marquez_client, mock_uuid,
204204
)
205205

206206
mock_marquez_client.mark_job_run_as_completed.assert_called_once_with(
207-
run_id=run_id_completed
207+
run_id=run_id_completed,
208+
at=mock.ANY
208209
)
209210

210211
# When a task run completes, the task outputs are also updated in order
@@ -215,7 +216,8 @@ def test_marquez_dag(mock_get_or_create_marquez_client, mock_uuid,
215216

216217
dag.handle_callback(dagrun, success=False, session=session)
217218
mock_marquez_client.mark_job_run_as_failed.assert_called_once_with(
218-
run_id=run_id_failed
219+
run_id=run_id_failed,
220+
at=mock.ANY
219221
)
220222

221223
# Assert an attempt to version the outputs of a task is not made when
@@ -455,7 +457,8 @@ def test_marquez_dag_with_extractor(mock_get_or_create_marquez_client,
455457
kwargs['context'].get('extract') == 'extract'
456458

457459
mock_marquez_client.mark_job_run_as_completed.assert_called_once_with(
458-
run_id=run_id
460+
run_id=run_id,
461+
at=mock.ANY
459462
)
460463

461464
# When a task run completes, the task outputs are also updated in order
@@ -676,7 +679,8 @@ def test_marquez_dag_with_extract_on_complete(
676679
kwargs['context'].get('extract_on_complete') == 'extract_on_complete'
677680

678681
mock_marquez_client.mark_job_run_as_completed.assert_called_once_with(
679-
run_id=run_id
682+
run_id=run_id,
683+
at=mock.ANY
680684
)
681685

682686
# When a task run completes, the task outputs are also updated in order

0 commit comments

Comments
 (0)