Skip to content

Commit 9066492

Browse files
authored
upstream run level lineage implementation (MarquezProject#2658)
* upstream run level lineage implementation Signed-off-by: Julien Le Dem <[email protected]> --------- Signed-off-by: Julien Le Dem <[email protected]>
1 parent 31b6ec5 commit 9066492

File tree

9 files changed

+388
-27
lines changed

9 files changed

+388
-27
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
*Improves experience with large graphs by adding a new tab to move between graph elements without looking at the graph itself.*
1212
* Web: add hover-over Tag tooltip to datasets [`#2630`](https://github.com/MarquezProject/marquez/pull/2630) [@davidsharp7](https://github.com/davidsharp7)
1313
*For parity with columns in the GUI, this adds a Tag tooltip to datasets.*
14+
* API: upstream run-level lineage API [`#2658`](https://github.com/MarquezProject/marquez/pull/2658) [@julienledem]( https://github.com/julienledem)
15+
*When trouble shooting an issue and doing root cause analysis, it is usefull to get the upstream run-level lineage to know exactly what version of each job and dataset a run is depending on.*
1416

1517
### Changed
1618
* Docker: upgrade to Docker Compose V2 [`#2644`](https://github.com/MarquezProject/marquez/pull/2644) [@merobi-hub](https://github.com/merobi-hub)

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import lombok.Value;
3737
import lombok.extern.slf4j.Slf4j;
3838
import marquez.api.models.SortDirection;
39+
import marquez.common.models.RunId;
3940
import marquez.db.OpenLineageDao;
4041
import marquez.service.ServiceFactory;
4142
import marquez.service.models.BaseEvent;
@@ -136,6 +137,28 @@ public Response getLineageEvents(
136137
return Response.ok(new Events(events, totalCount)).build();
137138
}
138139

140+
/**
141+
* Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from
142+
* -> the run that produced it
143+
*
144+
* @param runId the run to get upstream lineage from
145+
* @param depth the maximum depth of the upstream lineage
146+
* @return the upstream lineage for that run up to `detph` levels
147+
*/
148+
@Timed
149+
@ResponseMetered
150+
@ExceptionMetered
151+
@GET
152+
@Consumes(APPLICATION_JSON)
153+
@Produces(APPLICATION_JSON)
154+
@Path("/runlineage/upstream")
155+
public Response getRunLineageUpstream(
156+
@QueryParam("runId") @NotNull RunId runId,
157+
@QueryParam("depth") @DefaultValue(DEFAULT_DEPTH) int depth) {
158+
throwIfNotExists(runId);
159+
return Response.ok(lineageService.upstream(runId, depth)).build();
160+
}
161+
139162
@Value
140163
static class Events {
141164
@NonNull

api/src/main/java/marquez/db/Columns.java

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,7 @@ public static UUID uuidOrNull(final ResultSet results, final String column) thro
150150
}
151151

152152
public static UUID uuidOrThrow(final ResultSet results, final String column) throws SQLException {
153-
if (results.getObject(column) == null) {
154-
throw new IllegalArgumentException();
155-
}
153+
checkNotNull(results, column);
156154
return results.getObject(column, UUID.class);
157155
}
158156

@@ -166,9 +164,7 @@ public static Instant timestampOrNull(final ResultSet results, final String colu
166164

167165
public static Instant timestampOrThrow(final ResultSet results, final String column)
168166
throws SQLException {
169-
if (results.getObject(column) == null) {
170-
throw new IllegalArgumentException();
171-
}
167+
checkNotNull(results, column);
172168
return results.getTimestamp(column).toInstant();
173169
}
174170

@@ -182,9 +178,7 @@ public static String stringOrNull(final ResultSet results, final String column)
182178

183179
public static String stringOrThrow(final ResultSet results, final String column)
184180
throws SQLException {
185-
if (results.getObject(column) == null) {
186-
throw new IllegalArgumentException();
187-
}
181+
checkNotNull(results, column);
188182
return results.getString(column);
189183
}
190184

@@ -199,40 +193,30 @@ public static boolean booleanOrDefault(
199193

200194
public static boolean booleanOrThrow(final ResultSet results, final String column)
201195
throws SQLException {
202-
if (results.getObject(column) == null) {
203-
throw new IllegalArgumentException();
204-
}
196+
checkNotNull(results, column);
205197
return results.getBoolean(column);
206198
}
207199

208200
public static int intOrThrow(final ResultSet results, final String column) throws SQLException {
209-
if (results.getObject(column) == null) {
210-
throw new IllegalArgumentException();
211-
}
201+
checkNotNull(results, column);
212202
return results.getInt(column);
213203
}
214204

215205
public static PGInterval pgIntervalOrThrow(final ResultSet results, final String column)
216206
throws SQLException {
217-
if (results.getObject(column) == null) {
218-
throw new IllegalArgumentException();
219-
}
207+
checkNotNull(results, column);
220208
return new PGInterval(results.getString(column));
221209
}
222210

223211
public static BigDecimal bigDecimalOrThrow(final ResultSet results, final String column)
224212
throws SQLException {
225-
if (results.getObject(column) == null) {
226-
throw new IllegalArgumentException();
227-
}
213+
checkNotNull(results, column);
228214
return results.getBigDecimal(column);
229215
}
230216

231217
public static List<UUID> uuidArrayOrThrow(final ResultSet results, final String column)
232218
throws SQLException {
233-
if (results.getObject(column) == null) {
234-
throw new IllegalArgumentException();
235-
}
219+
checkNotNull(results, column);
236220
return Arrays.asList((UUID[]) results.getArray(column).getArray());
237221
}
238222

@@ -246,9 +230,7 @@ public static List<UUID> uuidArrayOrEmpty(final ResultSet results, final String
246230

247231
public static List<String> stringArrayOrThrow(final ResultSet results, final String column)
248232
throws SQLException {
249-
if (results.getObject(column) == null) {
250-
throw new IllegalArgumentException();
251-
}
233+
checkNotNull(results, column);
252234
return Arrays.asList((String[]) results.getArray(column).getArray());
253235
}
254236

@@ -311,4 +293,11 @@ public static PGobject toPgObject(@NonNull final Object object) {
311293
}
312294
return jsonObject;
313295
}
296+
297+
private static void checkNotNull(final ResultSet results, final String column)
298+
throws SQLException {
299+
if (results.getObject(column) == null) {
300+
throw new IllegalArgumentException(column + " not found in result");
301+
}
302+
}
314303
}

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,22 @@
55

66
package marquez.db;
77

8+
import java.time.Instant;
89
import java.util.Collection;
910
import java.util.List;
1011
import java.util.Optional;
1112
import java.util.Set;
1213
import java.util.UUID;
14+
import javax.validation.constraints.NotNull;
15+
import marquez.common.models.DatasetName;
16+
import marquez.common.models.JobName;
17+
import marquez.common.models.NamespaceName;
18+
import marquez.common.models.RunId;
1319
import marquez.db.mappers.DatasetDataMapper;
1420
import marquez.db.mappers.JobDataMapper;
1521
import marquez.db.mappers.JobRowMapper;
1622
import marquez.db.mappers.RunMapper;
23+
import marquez.db.mappers.UpstreamRunRowMapper;
1724
import marquez.service.models.DatasetData;
1825
import marquez.service.models.JobData;
1926
import marquez.service.models.Run;
@@ -25,8 +32,18 @@
2532
@RegisterRowMapper(JobDataMapper.class)
2633
@RegisterRowMapper(RunMapper.class)
2734
@RegisterRowMapper(JobRowMapper.class)
35+
@RegisterRowMapper(UpstreamRunRowMapper.class)
2836
public interface LineageDao {
2937

38+
public record JobSummary(NamespaceName namespace, JobName name, UUID version) {}
39+
40+
public record RunSummary(RunId id, Instant start, Instant end, String status) {}
41+
42+
public record DatasetSummary(
43+
NamespaceName namespace, DatasetName name, UUID version, RunId producedByRunId) {}
44+
45+
public record UpstreamRunRow(JobSummary job, RunSummary run, DatasetSummary input) {}
46+
3047
/**
3148
* Fetch all of the jobs that consume or produce the datasets that are consumed or produced by the
3249
* input jobIds. This returns a single layer from the BFS using datasets as edges. Jobs that have
@@ -154,4 +171,51 @@ SELECT DISTINCT on(r.job_name, r.namespace_name) r.*, jv.version as job_version
154171
WHERE j.uuid in (<jobUuid>) OR j.symlink_target_uuid IN (<jobUuid>)
155172
ORDER BY r.job_name, r.namespace_name, created_at DESC""")
156173
List<Run> getCurrentRuns(@BindList Collection<UUID> jobUuid);
174+
175+
@SqlQuery(
176+
"""
177+
WITH RECURSIVE
178+
upstream_runs(
179+
r_uuid, -- run uuid
180+
dataset_uuid, dataset_version_uuid, dataset_namespace, dataset_name, -- input dataset version to the run
181+
u_r_uuid, -- upstream run that produced that dataset version
182+
depth -- current depth of traversal
183+
) AS (
184+
185+
-- initial case: find the inputs of the initial runs
186+
select r.uuid,
187+
dv.dataset_uuid, dv."version", dv.namespace_name, dv.dataset_name,
188+
dv.run_uuid,
189+
0 AS depth -- starts at 0
190+
FROM (SELECT :runId::uuid AS uuid) r -- initial run
191+
LEFT JOIN runs_input_mapping rim ON rim.run_uuid = r.uuid
192+
LEFT JOIN dataset_versions dv ON dv.uuid = rim.dataset_version_uuid
193+
194+
UNION
195+
196+
-- recursion: find the inputs of the inputs found on the previous iteration and increase depth to know when to stop
197+
SELECT
198+
ur.u_r_uuid,
199+
dv2.dataset_uuid, dv2."version", dv2.namespace_name, dv2.dataset_name,
200+
dv2.run_uuid,
201+
ur.depth + 1 AS depth -- increase depth to check end condition
202+
FROM upstream_runs ur
203+
LEFT JOIN runs_input_mapping rim2 ON rim2.run_uuid = ur.u_r_uuid
204+
LEFT JOIN dataset_versions dv2 ON dv2.uuid = rim2.dataset_version_uuid
205+
-- end condition of the recursion: no input or depth is over the maximum set
206+
-- also avoid following cycles (ex: merge statement)
207+
WHERE ur.u_r_uuid IS NOT NULL AND ur.u_r_uuid <> ur.r_uuid AND depth < :depth
208+
)
209+
210+
-- present the result: use Distinct as we may have traversed the same edge multiple times if there are diamonds in the graph.
211+
SELECT * FROM ( -- we need the extra statement to sort after the DISTINCT
212+
SELECT DISTINCT ON (upstream_runs.r_uuid, upstream_runs.dataset_version_uuid, upstream_runs.u_r_uuid)
213+
upstream_runs.*,
214+
r.started_at, r.ended_at, r.current_run_state as state,
215+
r.job_uuid, r.job_version_uuid, r.namespace_name as job_namespace, r.job_name
216+
FROM upstream_runs, runs r WHERE upstream_runs.r_uuid = r.uuid
217+
) sub
218+
ORDER BY depth ASC, job_name ASC;
219+
""")
220+
List<UpstreamRunRow> getUpstreamRuns(@NotNull UUID runId, int depth);
157221
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2018-2023 contributors to the Marquez project
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package marquez.db.mappers;
7+
8+
import static marquez.db.Columns.stringOrNull;
9+
import static marquez.db.Columns.stringOrThrow;
10+
import static marquez.db.Columns.timestampOrNull;
11+
import static marquez.db.Columns.uuidOrThrow;
12+
13+
import java.sql.ResultSet;
14+
import java.sql.SQLException;
15+
import java.util.Optional;
16+
import java.util.UUID;
17+
import lombok.NonNull;
18+
import marquez.common.models.DatasetName;
19+
import marquez.common.models.JobName;
20+
import marquez.common.models.NamespaceName;
21+
import marquez.common.models.RunId;
22+
import marquez.db.Columns;
23+
import marquez.db.LineageDao.DatasetSummary;
24+
import marquez.db.LineageDao.JobSummary;
25+
import marquez.db.LineageDao.RunSummary;
26+
import marquez.db.LineageDao.UpstreamRunRow;
27+
import org.jdbi.v3.core.mapper.RowMapper;
28+
import org.jdbi.v3.core.statement.StatementContext;
29+
30+
/** Maps the upstream query result set to a UpstreamRunRow */
31+
public final class UpstreamRunRowMapper implements RowMapper<UpstreamRunRow> {
32+
@Override
33+
public UpstreamRunRow map(@NonNull ResultSet results, @NonNull StatementContext context)
34+
throws SQLException {
35+
return new UpstreamRunRow(
36+
new JobSummary(
37+
new NamespaceName(stringOrThrow(results, "job_namespace")),
38+
new JobName(stringOrThrow(results, "job_name")),
39+
Optional.ofNullable(stringOrNull(results, "job_version_uuid"))
40+
.map(UUID::fromString)
41+
.orElse(null)),
42+
new RunSummary(
43+
new RunId(uuidOrThrow(results, "r_uuid")),
44+
timestampOrNull(results, Columns.STARTED_AT),
45+
timestampOrNull(results, Columns.ENDED_AT),
46+
stringOrThrow(results, Columns.STATE)),
47+
results.getObject("dataset_name") == null
48+
? null
49+
: new DatasetSummary(
50+
new NamespaceName(stringOrThrow(results, "dataset_namespace")),
51+
new DatasetName(stringOrThrow(results, "dataset_name")),
52+
UUID.fromString(stringOrThrow(results, "dataset_version_uuid")),
53+
new RunId(UUID.fromString(stringOrThrow(results, "u_r_uuid")))));
54+
}
55+
}

api/src/main/java/marquez/service/LineageService.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55

66
package marquez.service;
77

8+
import static java.util.stream.Collectors.groupingBy;
9+
import static java.util.stream.Collectors.toList;
10+
811
import com.google.common.base.Functions;
912
import com.google.common.collect.ImmutableSet;
1013
import com.google.common.collect.ImmutableSortedSet;
1114
import com.google.common.collect.Maps;
1215
import java.util.Collections;
1316
import java.util.HashMap;
1417
import java.util.HashSet;
18+
import java.util.LinkedHashMap;
1519
import java.util.LinkedHashSet;
1620
import java.util.List;
1721
import java.util.Map;
@@ -21,14 +25,20 @@
2125
import java.util.UUID;
2226
import java.util.stream.Collectors;
2327
import java.util.stream.Stream;
28+
import javax.validation.constraints.NotNull;
2429
import lombok.NonNull;
2530
import lombok.extern.slf4j.Slf4j;
2631
import marquez.common.models.DatasetId;
2732
import marquez.common.models.JobId;
33+
import marquez.common.models.RunId;
2834
import marquez.db.JobDao;
2935
import marquez.db.LineageDao;
36+
import marquez.db.LineageDao.DatasetSummary;
37+
import marquez.db.LineageDao.JobSummary;
38+
import marquez.db.LineageDao.RunSummary;
3039
import marquez.db.models.JobRow;
3140
import marquez.service.DelegatingDaos.DelegatingLineageDao;
41+
import marquez.service.LineageService.UpstreamRunLineage;
3242
import marquez.service.models.DatasetData;
3343
import marquez.service.models.Edge;
3444
import marquez.service.models.Graph;
@@ -41,6 +51,11 @@
4151

4252
@Slf4j
4353
public class LineageService extends DelegatingLineageDao {
54+
55+
public record UpstreamRunLineage(List<UpstreamRun> runs) {}
56+
57+
public record UpstreamRun(JobSummary job, RunSummary run, List<DatasetSummary> inputs) {}
58+
4459
private final JobDao jobDao;
4560

4661
public LineageService(LineageDao delegate, JobDao jobDao) {
@@ -252,4 +267,32 @@ public Optional<UUID> getJobUuid(NodeId nodeId) {
252267
String.format("Node '%s' must be of type dataset or job!", nodeId.getValue()));
253268
}
254269
}
270+
271+
/**
272+
* Returns the upstream lineage for a given run. Recursively: run -> dataset version it read from
273+
* -> the run that produced it
274+
*
275+
* @param runId the run to get upstream lineage from
276+
* @param depth the maximum depth of the upstream lineage
277+
* @return the upstream lineage for that run up to `detph` levels
278+
*/
279+
public UpstreamRunLineage upstream(@NotNull RunId runId, int depth) {
280+
List<UpstreamRunRow> upstreamRuns = getUpstreamRuns(runId.getValue(), depth);
281+
Map<RunId, List<UpstreamRunRow>> collect =
282+
upstreamRuns.stream().collect(groupingBy(r -> r.run().id(), LinkedHashMap::new, toList()));
283+
List<UpstreamRun> runs =
284+
collect.entrySet().stream()
285+
.map(
286+
row -> {
287+
UpstreamRunRow upstreamRunRow = row.getValue().get(0);
288+
List<DatasetSummary> inputs =
289+
row.getValue().stream()
290+
.map(UpstreamRunRow::input)
291+
.filter(i -> i != null)
292+
.collect(toList());
293+
return new UpstreamRun(upstreamRunRow.job(), upstreamRunRow.run(), inputs);
294+
})
295+
.collect(toList());
296+
return new UpstreamRunLineage(runs);
297+
}
255298
}

0 commit comments

Comments
 (0)