Skip to content

Commit 17810f2

Browse files
job-mapping rename job_versions_io_mapping to job_io_mapping
Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent 60d7d90 commit 17810f2

13 files changed

+636
-90
lines changed

api/src/main/java/marquez/db/DbRetention.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ CREATE OR REPLACE FUNCTION delete_datasets_older_than_x_days()
338338
BEGIN
339339
CREATE TEMPORARY TABLE used_datasets_as_io_in_x_days AS (
340340
SELECT dataset_uuid
341-
FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv
341+
FROM job_io_mapping AS jvio INNER JOIN job_versions AS jv
342342
ON jvio.job_version_uuid = jv.uuid
343343
WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days'
344344
);
@@ -621,7 +621,7 @@ CREATE OR REPLACE FUNCTION estimate_number_of_rows_older_than_x_days(retention_q
621621
"""
622622
CREATE TEMPORARY TABLE used_datasets_as_input_in_x_days AS (
623623
SELECT dataset_uuid
624-
FROM job_versions_io_mapping AS jvio INNER JOIN job_versions AS jv
624+
FROM job_io_mapping AS jvio INNER JOIN job_versions AS jv
625625
ON jvio.job_version_uuid = jv.uuid
626626
WHERE jv.created_at >= CURRENT_TIMESTAMP - INTERVAL '${retentionDays} days'
627627
AND jvio.io_type = 'INPUT'

api/src/main/java/marquez/db/JobVersionDao.java

Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ WITH job_version_io AS (
6969
JSON_AGG(json_build_object('namespace', ds.namespace_name,
7070
'name', ds.name))
7171
FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets
72-
FROM job_versions_io_mapping io
72+
FROM job_io_mapping io
7373
INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid
7474
INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid
7575
INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
@@ -192,40 +192,73 @@ ExtendedJobVersionRow upsertJobVersion(
192192
String namespaceName);
193193

194194
/**
195-
* Used to link an input dataset to a given job version.
195+
* Used to upsert an input or output dataset to a given job version.
196196
*
197197
* @param jobVersionUuid The unique ID of the job version.
198-
* @param inputDatasetUuid The unique ID of the input dataset.
198+
* @param datasetUuid The unique ID of the output dataset
199+
* @param ioType The {@link IoType} of the dataset.
200+
* @param jobUuid The unique ID of the job.
199201
*/
200-
default void upsertInputDatasetFor(UUID jobVersionUuid, UUID inputDatasetUuid) {
201-
upsertInputOrOutputDatasetFor(jobVersionUuid, inputDatasetUuid, IoType.INPUT);
202-
}
202+
@SqlUpdate(
203+
"""
204+
INSERT INTO job_io_mapping (
205+
job_version_uuid, dataset_uuid, io_type, job_uuid, symlink_target_job_uuid, is_job_version_current)
206+
VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE)
207+
ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE
208+
""")
209+
void upsertCurrentInputOrOutputDatasetFor(
210+
UUID jobVersionUuid,
211+
UUID datasetUuid,
212+
UUID jobUuid,
213+
UUID symlinkTargetJobUuid,
214+
IoType ioType);
215+
216+
@SqlUpdate(
217+
"""
218+
UPDATE job_io_mapping
219+
SET is_job_version_current = FALSE
220+
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
221+
AND job_version_uuid != :jobVersionUuid
222+
AND io_type = :ioType
223+
AND is_job_version_current = TRUE;
224+
""")
225+
void markVersionIOMappingObsolete(UUID jobVersionUuid, UUID jobUuid, IoType ioType);
226+
227+
@SqlUpdate(
228+
"""
229+
UPDATE job_io_mapping
230+
SET is_job_version_current = FALSE
231+
WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
232+
AND io_type = :ioType
233+
AND is_job_version_current = TRUE;
234+
""")
235+
void markVersionIOMappingObsolete(UUID jobUuid, IoType ioType);
203236

204237
/**
205-
* Used to link an output dataset to a given job version.
238+
* Used to link an input dataset to a given job version.
206239
*
207-
* @param jobVersionUuid The unique ID of the job version.
208-
* @param outputDatasetUuid The unique ID of the output dataset.
240+
* @param inputDatasetUuid The unique ID of the input dataset.
241+
* @param jobUuid The unique ID of the job.
209242
*/
210-
default void upsertOutputDatasetFor(UUID jobVersionUuid, UUID outputDatasetUuid) {
211-
upsertInputOrOutputDatasetFor(jobVersionUuid, outputDatasetUuid, IoType.OUTPUT);
243+
default void upsertInputDatasetFor(
244+
UUID jobVersionUuid, UUID inputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
245+
markVersionIOMappingObsolete(jobVersionUuid, jobUuid, IoType.INPUT);
246+
upsertCurrentInputOrOutputDatasetFor(
247+
jobVersionUuid, inputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.INPUT);
212248
}
213249

214250
/**
215-
* Used to upsert an input or output dataset to a given job version.
251+
* Used to link an output dataset to a given job version.
216252
*
217-
* @param jobVersionUuid The unique ID of the job version.
218-
* @param datasetUuid The unique ID of the output dataset
219-
* @param ioType The {@link IoType} of the dataset.
253+
* @param outputDatasetUuid The unique ID of the output dataset.
254+
* @param jobUuid The unique ID of the job.
220255
*/
221-
@SqlUpdate(
222-
"""
223-
INSERT INTO job_versions_io_mapping (
224-
job_version_uuid, dataset_uuid, io_type)
225-
VALUES (:jobVersionUuid, :datasetUuid, :ioType)
226-
ON CONFLICT DO NOTHING
227-
""")
228-
void upsertInputOrOutputDatasetFor(UUID jobVersionUuid, UUID datasetUuid, IoType ioType);
256+
default void upsertOutputDatasetFor(
257+
UUID jobVersionUuid, UUID outputDatasetUuid, UUID jobUuid, UUID symlinkTargetJobUuid) {
258+
markVersionIOMappingObsolete(jobVersionUuid, jobUuid, IoType.OUTPUT);
259+
upsertCurrentInputOrOutputDatasetFor(
260+
jobVersionUuid, outputDatasetUuid, jobUuid, symlinkTargetJobUuid, IoType.OUTPUT);
261+
}
229262

230263
/**
231264
* Returns the input datasets to a given job version.
@@ -256,7 +289,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
256289
@SqlQuery(
257290
"""
258291
SELECT dataset_uuid
259-
FROM job_versions_io_mapping
292+
FROM job_io_mapping
260293
WHERE job_version_uuid = :jobVersionUuid
261294
AND io_type = :ioType
262295
""")
@@ -265,7 +298,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
265298
@SqlQuery(
266299
"""
267300
SELECT d.namespace_name, d.name, io.io_type
268-
FROM job_versions_io_mapping io
301+
FROM job_io_mapping io
269302
INNER JOIN jobs_view j ON j.current_version_uuid = io.job_version_uuid
270303
INNER JOIN datasets_view d on d.uuid = io.dataset_uuid
271304
WHERE j.name = :jobName AND j.namespace_name=:jobNamespace
@@ -366,14 +399,20 @@ default BagOfJobVersionInfo upsertRunlessJobVersion(
366399
inputs.forEach(
367400
i -> {
368401
jobVersionDao.upsertInputDatasetFor(
369-
jobVersionRow.getUuid(), i.getDatasetVersionRow().getDatasetUuid());
402+
jobVersionRow.getUuid(),
403+
i.getDatasetVersionRow().getDatasetUuid(),
404+
jobVersionRow.getJobUuid(),
405+
jobRow.getSymlinkTargetId());
370406
});
371407

372408
// Link the output datasets to the job version.
373409
outputs.forEach(
374410
o -> {
375411
jobVersionDao.upsertOutputDatasetFor(
376-
jobVersionRow.getUuid(), o.getDatasetVersionRow().getDatasetUuid());
412+
jobVersionRow.getUuid(),
413+
o.getDatasetVersionRow().getDatasetUuid(),
414+
jobVersionRow.getJobUuid(),
415+
jobRow.getSymlinkTargetId());
377416
});
378417

379418
jobDao.updateVersionFor(jobRow.getUuid(), jobRow.getCreatedAt(), jobVersionRow.getUuid());
@@ -468,14 +507,20 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
468507
jobVersionInputs.forEach(
469508
jobVersionInput -> {
470509
jobVersionDao.upsertInputDatasetFor(
471-
jobVersionRow.getUuid(), jobVersionInput.getDatasetUuid());
510+
jobVersionRow.getUuid(),
511+
jobVersionInput.getDatasetUuid(),
512+
jobVersionRow.getJobUuid(),
513+
jobRow.getSymlinkTargetId());
472514
});
473515

474516
// Link the output datasets to the job version.
475517
jobVersionOutputs.forEach(
476518
jobVersionOutput -> {
477519
jobVersionDao.upsertOutputDatasetFor(
478-
jobVersionRow.getUuid(), jobVersionOutput.getDatasetUuid());
520+
jobVersionRow.getUuid(),
521+
jobVersionOutput.getDatasetUuid(),
522+
jobVersionRow.getJobUuid(),
523+
jobRow.getSymlinkTargetId());
479524
});
480525

481526
// Link the job version to the run.

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -56,43 +56,45 @@ public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary inpu
5656
@SqlQuery(
5757
"""
5858
WITH RECURSIVE
59-
-- Find the current version of a job or its symlink target if the target has no
60-
-- current_version_uuid. This ensures that we don't lose lineage for a job after it is
61-
-- symlinked to another job but before that target job has run successfully.
62-
job_current_version AS (
63-
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
64-
COALESCE(s.current_version_uuid, j.current_version_uuid) AS job_version_uuid
65-
FROM jobs j
66-
LEFT JOIN jobs s ON s.uuid=j.symlink_target_uuid
67-
WHERE s.current_version_uuid IS NULL
68-
),
69-
job_io AS (
70-
SELECT j.job_uuid,
71-
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
72-
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
73-
FROM job_versions_io_mapping io
74-
INNER JOIN job_current_version j ON io.job_version_uuid=j.job_version_uuid
75-
GROUP BY j.job_uuid
76-
),
77-
lineage(job_uuid, inputs, outputs) AS (
78-
SELECT v.job_uuid AS job_uuid,
79-
COALESCE(inputs, Array[]::uuid[]) AS inputs,
80-
COALESCE(outputs, Array[]::uuid[]) AS outputs,
81-
0 AS depth
82-
FROM jobs j
83-
INNER JOIN job_current_version v ON (j.symlink_target_uuid IS NULL AND j.uuid=v.job_uuid) OR v.job_uuid=j.symlink_target_uuid
84-
LEFT JOIN job_io io ON io.job_uuid=v.job_uuid
85-
WHERE j.uuid IN (<jobIds>) OR j.symlink_target_uuid IN (<jobIds>)
86-
UNION
87-
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
88-
FROM job_io io,
89-
lineage l
90-
WHERE io.job_uuid != l.job_uuid AND
91-
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
92-
AND depth < :depth)
93-
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
94-
FROM lineage l2
95-
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid;
59+
job_io AS (
60+
SELECT
61+
io.job_uuid AS job_uuid,
62+
io.symlink_target_job_uuid AS symlink_target_job_uuid,
63+
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs,
64+
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs
65+
FROM job_io_mapping io
66+
WHERE io.is_job_version_current = TRUE
67+
GROUP BY io.symlink_target_job_uuid, io.job_uuid
68+
),
69+
lineage(job_uuid, symlink_target_job_uuid, inputs, outputs) AS (
70+
SELECT job_uuid,
71+
symlink_target_job_uuid,
72+
COALESCE(inputs, Array[]::uuid[]) AS inputs,
73+
COALESCE(outputs, Array[]::uuid[]) AS outputs,
74+
0 AS depth
75+
FROM job_io
76+
WHERE job_uuid IN (<jobIds>) OR symlink_target_job_uuid IN (<jobIds>)
77+
UNION
78+
SELECT io.job_uuid, io.symlink_target_job_uuid, io.inputs, io.outputs, l.depth + 1
79+
FROM job_io io, lineage l
80+
WHERE (io.job_uuid != l.job_uuid) AND
81+
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
82+
AND depth < :depth),
83+
lineage_outside_job_io(job_uuid) AS (
84+
SELECT
85+
param_jobs.param_job_uuid as job_uuid,
86+
j.symlink_target_uuid,
87+
Array[]::uuid[] AS inputs,
88+
Array[]::uuid[] AS outputs,
89+
0 AS depth
90+
FROM (SELECT unnest(ARRAY[<jobIds>]::UUID[]) AS param_job_uuid) param_jobs
91+
LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid
92+
INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid
93+
WHERE l.job_uuid IS NULL
94+
)
95+
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
96+
FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2
97+
INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.symlink_target_job_uuid)
9698
""")
9799
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);
98100

@@ -116,7 +118,7 @@ WHERE ds.uuid IN (<dsUuids>)""")
116118
"""
117119
SELECT j.uuid FROM jobs j
118120
INNER JOIN job_versions jv ON jv.job_uuid = j.uuid
119-
INNER JOIN job_versions_io_mapping io ON io.job_version_uuid = jv.uuid
121+
INNER JOIN job_io_mapping io ON io.job_version_uuid = jv.uuid
120122
INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid
121123
WHERE ds.name = :datasetName AND ds.namespace_name = :namespaceName
122124
ORDER BY io_type DESC, jv.created_at DESC

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import marquez.common.models.SourceType;
3434
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
3535
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
36+
import marquez.db.JobVersionDao.IoType;
3637
import marquez.db.RunDao.RunUpsert;
3738
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
3839
import marquez.db.mappers.LineageEventMapper;
@@ -362,27 +363,33 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
362363

363364
// RunInput list uses null as a sentinel value
364365
List<DatasetRecord> datasetInputs = null;
365-
if (event.getInputs() != null) {
366+
if (event.getInputs() != null && !event.getInputs().isEmpty()) {
366367
datasetInputs = new ArrayList<>();
367368
for (Dataset dataset : event.getInputs()) {
368369
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, true);
369370
datasetInputs.add(record);
370371
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
371372
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
372373
}
374+
} else {
375+
// mark job_io_mapping as obsolete
376+
daos.getJobVersionDao().markVersionIOMappingObsolete(job.getUuid(), IoType.INPUT);
373377
}
374378
bag.setInputs(Optional.ofNullable(datasetInputs));
375379

376380
// RunInput list uses null as a sentinel value
377381
List<DatasetRecord> datasetOutputs = null;
378-
if (event.getOutputs() != null) {
382+
if (event.getOutputs() != null && !event.getOutputs().isEmpty()) {
379383
datasetOutputs = new ArrayList<>();
380384
for (Dataset dataset : event.getOutputs()) {
381385
DatasetRecord record = upsertLineageDataset(daos, dataset, now, runUuid, false);
382386
datasetOutputs.add(record);
383387
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
384388
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
385389
}
390+
} else {
391+
// mark job_io_mapping as obsolete
392+
daos.getJobVersionDao().markVersionIOMappingObsolete(job.getUuid(), IoType.OUTPUT);
386393
}
387394

388395
bag.setOutputs(Optional.ofNullable(datasetOutputs));
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db.migrations;
7+
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.flywaydb.core.api.MigrationVersion;
10+
import org.flywaydb.core.api.migration.Context;
11+
import org.flywaydb.core.api.migration.JavaMigration;
12+
import org.jdbi.v3.core.Jdbi;
13+
14+
@Slf4j
15+
public class V67_2_JobVersionsIOMappingBackfillJob implements JavaMigration {
16+
17+
public static final String UPDATE_QUERY =
18+
"""
19+
UPDATE job_io_mapping
20+
SET
21+
job_uuid = j.uuid,
22+
symlink_target_job_uuid = j.symlink_target_uuid,
23+
is_job_version_current = (jv.uuid = j.current_version_uuid)::BOOLEAN
24+
FROM job_versions jv
25+
INNER JOIN jobs_view j ON j.uuid = jv.job_uuid
26+
WHERE jv.uuid = job_io_mapping.job_version_uuid
27+
""";
28+
29+
@Override
30+
public MigrationVersion getVersion() {
31+
return MigrationVersion.fromVersion("67.2");
32+
}
33+
34+
@Override
35+
public void migrate(Context context) throws Exception {
36+
Jdbi jdbi = Jdbi.create(context.getConnection());
37+
jdbi.withHandle(h -> h.createUpdate(UPDATE_QUERY).execute());
38+
}
39+
40+
@Override
41+
public String getDescription() {
42+
return "Back fill job_uuid and is_job_version_current in job_io_mapping table";
43+
}
44+
45+
@Override
46+
public Integer getChecksum() {
47+
return null;
48+
}
49+
50+
@Override
51+
public boolean isUndo() {
52+
return false;
53+
}
54+
55+
@Override
56+
public boolean canExecuteInTransaction() {
57+
return false;
58+
}
59+
60+
@Override
61+
public boolean isBaselineMigration() {
62+
return false;
63+
}
64+
}

0 commit comments

Comments
 (0)