Skip to content

Commit d8547b3

Browse files
Fixing downstream column-lineage bug
1 parent 75f056a commit d8547b3

File tree

2 files changed

+219
-171
lines changed

2 files changed

+219
-171
lines changed

api/src/main/java/marquez/db/ColumnLineageDao.java

Lines changed: 91 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -253,14 +253,13 @@ Set<ColumnLineageNodeData> getLineageRowsForDatasets(
253253
List<Pair<String, String>> datasets);
254254

255255
/**
256-
* Fetch all of the column lineage nodes that are directly connected to the input dataset fields.
257-
* This returns a single layer of lineage using column lineage as edges. Fields that have
258-
* no input or output lineage will have no results.
256+
* Fetch upstream lineage nodes that directly produce data TO the input dataset fields.
257+
* This returns fields that are direct producers of the given fields.
258+
* Only follows INPUT edges to the given fields (direct upstream producers).
259259
*
260-
* @param datasetFieldUuids The UUIDs of the dataset fields to get lineage for
261-
* @param withDownstream Whether to include downstream lineage
260+
* @param datasetFieldUuids The UUIDs of the dataset fields to get upstream producers for
262261
* @param createdAtUntil The point in time to get lineage for
263-
* @return Set of ColumnLineageNodeData representing the direct lineage
262+
* @return Set of ColumnLineageNodeData representing the direct upstream producer fields
264263
*/
265264
@SqlQuery("""
266265
WITH dataset_fields_view AS (
@@ -274,15 +273,74 @@ WITH dataset_fields_view AS (
274273
FROM dataset_fields df
275274
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
276275
),
277-
-- For upstream lineage: find direct input fields
278-
upstream_input_fields AS (
279-
SELECT DISTINCT input_dataset_field_uuid
280-
FROM tmp_column_lineage_latest
281-
WHERE output_dataset_field_uuid IN (<datasetFieldUuids>)
282-
AND created_at <= :createdAtUntil
276+
-- Find upstream producer fields: fields that produce data TO our target fields
277+
upstream_producers AS (
278+
SELECT DISTINCT
279+
input_fields.namespace_name,
280+
input_fields.dataset_name,
281+
input_fields.field_name,
282+
input_fields.type,
283+
ARRAY[]::text[][] AS inputFields,
284+
ARRAY_AGG(DISTINCT ARRAY[
285+
output_fields.namespace_name,
286+
output_fields.dataset_name,
287+
CAST(cl.output_dataset_version_uuid AS VARCHAR),
288+
output_fields.field_name,
289+
cl.transformation_description,
290+
cl.transformation_type
291+
]) AS outputFields,
292+
cl.input_dataset_version_uuid as dataset_version_uuid
293+
FROM tmp_column_lineage_latest cl
294+
INNER JOIN dataset_fields_view input_fields
295+
ON cl.input_dataset_field_uuid = input_fields.uuid
296+
INNER JOIN dataset_symlinks ds_input
297+
ON ds_input.namespace_uuid = input_fields.namespace_uuid
298+
AND ds_input.name = input_fields.dataset_name
299+
INNER JOIN dataset_fields_view output_fields
300+
ON cl.output_dataset_field_uuid = output_fields.uuid
301+
INNER JOIN dataset_symlinks ds_output
302+
ON ds_output.namespace_uuid = output_fields.namespace_uuid
303+
AND ds_output.name = output_fields.dataset_name
304+
WHERE cl.output_dataset_field_uuid IN (<datasetFieldUuids>)
305+
AND cl.created_at <= :createdAtUntil
306+
AND ds_input.is_primary is true
307+
AND ds_output.is_primary is true
308+
GROUP BY
309+
input_fields.namespace_name,
310+
input_fields.dataset_name,
311+
input_fields.field_name,
312+
input_fields.type,
313+
cl.input_dataset_version_uuid
314+
)
315+
SELECT * FROM upstream_producers
316+
""")
317+
Set<ColumnLineageNodeData> getUpstreamColumnLineage(
318+
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
319+
Instant createdAtUntil);
320+
321+
/**
322+
* Fetch downstream lineage nodes that directly consume data FROM the input dataset fields.
323+
* This returns fields that are direct consumers of the given fields.
324+
* Only follows OUTPUT edges from the given fields (direct downstream consumers).
325+
*
326+
* @param datasetFieldUuids The UUIDs of the dataset fields to get downstream consumers for
327+
* @param createdAtUntil The point in time to get lineage for
328+
* @return Set of ColumnLineageNodeData representing the direct downstream consumer fields
329+
*/
330+
@SqlQuery("""
331+
WITH dataset_fields_view AS (
332+
SELECT
333+
d.namespace_name as namespace_name,
334+
d.name as dataset_name,
335+
df.name as field_name,
336+
df.type,
337+
df.uuid,
338+
d.namespace_uuid
339+
FROM dataset_fields df
340+
INNER JOIN datasets_view d ON d.uuid = df.dataset_uuid
283341
),
284-
-- For upstream lineage: get direct upstream nodes
285-
upstream_lineage AS (
342+
-- Find downstream consumer fields: fields that consume data FROM our target fields
343+
downstream_consumers AS (
286344
SELECT DISTINCT
287345
output_fields.namespace_name,
288346
output_fields.dataset_name,
@@ -304,76 +362,38 @@ upstream_lineage AS (
304362
INNER JOIN dataset_symlinks ds_output
305363
ON ds_output.namespace_uuid = output_fields.namespace_uuid
306364
AND ds_output.name = output_fields.dataset_name
307-
INNER JOIN upstream_input_fields uif
308-
ON cl.input_dataset_field_uuid = uif.input_dataset_field_uuid
309365
INNER JOIN dataset_fields_view input_fields
310366
ON cl.input_dataset_field_uuid = input_fields.uuid
311367
INNER JOIN dataset_symlinks ds_input
312368
ON ds_input.namespace_uuid = input_fields.namespace_uuid
313369
AND ds_input.name = input_fields.dataset_name
314-
WHERE output_fields.uuid IN (<datasetFieldUuids>)
315-
AND ds_output.is_primary is true
370+
WHERE cl.input_dataset_field_uuid IN (<datasetFieldUuids>)
371+
AND cl.created_at <= :createdAtUntil
372+
AND ds_output.is_primary is true
316373
AND ds_input.is_primary is true
317374
GROUP BY
318375
output_fields.namespace_name,
319376
output_fields.dataset_name,
320377
output_fields.field_name,
321378
output_fields.type,
322379
cl.output_dataset_version_uuid
323-
),
324-
-- For downstream lineage: find direct output fields
325-
downstream_output_fields AS (
326-
SELECT DISTINCT output_dataset_field_uuid
327-
FROM tmp_column_lineage_latest
328-
WHERE input_dataset_field_uuid IN (<datasetFieldUuids>)
329-
AND created_at <= :createdAtUntil
330-
),
331-
-- For downstream lineage: get direct downstream nodes
332-
downstream_lineage AS (
333-
SELECT DISTINCT
334-
input_fields.namespace_name,
335-
input_fields.dataset_name,
336-
input_fields.field_name,
337-
input_fields.type,
338-
ARRAY[]::text[][] AS inputFields,
339-
ARRAY_AGG(DISTINCT ARRAY[
340-
output_fields.namespace_name,
341-
output_fields.dataset_name,
342-
CAST(cl.output_dataset_version_uuid AS VARCHAR),
343-
output_fields.field_name,
344-
cl.transformation_description,
345-
cl.transformation_type
346-
]) AS outputFields,
347-
cl.output_dataset_version_uuid as dataset_version_uuid
348-
FROM tmp_column_lineage_latest cl
349-
INNER JOIN dataset_fields_view input_fields
350-
ON cl.input_dataset_field_uuid = input_fields.uuid
351-
INNER JOIN dataset_symlinks ds_input
352-
ON ds_input.namespace_uuid = input_fields.namespace_uuid
353-
AND ds_input.name = input_fields.dataset_name
354-
INNER JOIN downstream_output_fields dof
355-
ON cl.output_dataset_field_uuid = dof.output_dataset_field_uuid
356-
INNER JOIN dataset_fields_view output_fields
357-
ON cl.output_dataset_field_uuid = output_fields.uuid
358-
INNER JOIN dataset_symlinks ds_output
359-
ON ds_output.namespace_uuid = output_fields.namespace_uuid
360-
AND ds_output.name = output_fields.dataset_name
361-
WHERE input_fields.uuid IN (<datasetFieldUuids>)
362-
AND :withDownstream
363-
AND ds_input.is_primary is true
364-
AND ds_output.is_primary is true
365-
GROUP BY
366-
input_fields.namespace_name,
367-
input_fields.dataset_name,
368-
input_fields.field_name,
369-
input_fields.type,
370-
cl.output_dataset_version_uuid
371380
)
372-
-- Combine results
373-
SELECT * FROM upstream_lineage
374-
UNION ALL
375-
SELECT * FROM downstream_lineage
376-
""")
381+
SELECT * FROM downstream_consumers
382+
""")
383+
Set<ColumnLineageNodeData> getDownstreamColumnLineage(
384+
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
385+
Instant createdAtUntil);
386+
387+
/**
388+
* Fetch all of the column lineage nodes that are directly connected to the input dataset fields.
389+
* This returns a single layer of lineage using column lineage as edges. Fields that have
390+
* no input or output lineage will have no results.
391+
*
392+
* @param datasetFieldUuids The UUIDs of the dataset fields to get lineage for
393+
* @param withDownstream Whether to include downstream lineage
394+
* @param createdAtUntil The point in time to get lineage for
395+
* @return Set of ColumnLineageNodeData representing the direct lineage
396+
*/
377397
Set<ColumnLineageNodeData> getDirectColumnLineage(
378398
@BindList(onEmpty = NULL_STRING) List<UUID> datasetFieldUuids,
379399
boolean withDownstream,

0 commit comments

Comments
 (0)