Skip to content

Commit 6bfdd68

Browse files
committed
resolve comments
Signed-off-by: sophiely <[email protected]>
1 parent e0a0a5f commit 6bfdd68

File tree

4 files changed

+96
-63
lines changed

4 files changed

+96
-63
lines changed

api/src/main/java/marquez/db/LineageDao.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -114,15 +114,9 @@ AND ds.uuid IN (<dsUuids>)""")
114114
FROM datasets_view ds
115115
LEFT JOIN dataset_versions dv on dv.uuid = ds.current_version_uuid
116116
LEFT JOIN dataset_symlinks dsym ON dsym.namespace_uuid = ds.namespace_uuid and dsym.name = ds.name
117-
INNER JOIN (
118-
SELECT uuid
119-
FROM datasets_view as u
120-
WHERE
121-
u.name = :datasetName
122-
AND u.namespace_name = :namespaceName
123-
) as u
124-
on u.uuid = ds.uuid
125-
WHERE dsym.is_primary is true""")
117+
INNER JOIN datasets_view AS d ON d.uuid = ds.uuid
118+
WHERE dsym.is_primary is true
119+
AND CAST((:namespaceName, :datasetName) AS DATASET_NAME) = ANY(d.dataset_symlinks)""")
126120
DatasetData getDatasetData(String namespaceName, String datasetName);
127121

128122
@SqlQuery(
Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,33 @@
11
DROP VIEW IF EXISTS datasets_view;
2-
CREATE VIEW datasets_view
3-
AS
2+
CREATE VIEW datasets_view AS
43
SELECT d.uuid,
5-
d.type,
6-
d.created_at,
7-
d.updated_at,
8-
d.namespace_uuid,
9-
d.source_uuid,
10-
d.name,
11-
array_agg(CAST((namespaces.name, symlinks.name) AS DATASET_NAME)) AS dataset_symlinks,
12-
d.physical_name,
13-
d.description,
14-
d.current_version_uuid,
15-
d.last_modified_at,
16-
d.namespace_name,
17-
d.source_name,
18-
d.is_deleted
19-
FROM datasets d
20-
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
21-
INNER JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
22-
WHERE d.is_hidden IS FALSE
23-
GROUP BY d.uuid;
4+
d.type,
5+
d.created_at,
6+
d.updated_at,
7+
CASE
8+
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_uuid
9+
ELSE namespaces.uuid
10+
END
11+
AS namespace_uuid ,
12+
d.source_uuid,
13+
CASE
14+
WHEN (d.namespace_name = namespaces.name and d.name = symlinks.name) THEN d.name
15+
ELSE symlinks.name
16+
END
17+
AS name,
18+
array(SELECT ROW(namespaces.name::character varying(255), symlinks.name::character varying(255))::dataset_name) AS dataset_symlinks,
19+
d.physical_name,
20+
d.description,
21+
d.current_version_uuid,
22+
d.last_modified_at,
23+
CASE
24+
WHEN (d.namespace_name = namespaces.name AND d.name = symlinks.name) THEN d.namespace_name
25+
ELSE namespaces.name
26+
END
27+
AS namespace_name,
28+
d.source_name,
29+
d.is_deleted
30+
FROM datasets d
31+
JOIN dataset_symlinks symlinks ON d.uuid = symlinks.dataset_uuid
32+
JOIN namespaces ON symlinks.namespace_uuid = namespaces.uuid
33+
WHERE d.is_hidden is false;

api/src/main/resources/marquez/db/migration/V68__alter_datasets_view_to_keep_symlinks.sql

Lines changed: 0 additions & 33 deletions
This file was deleted.

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313

1414
import java.util.Arrays;
1515
import java.util.Collections;
16+
import java.util.HashMap;
1617
import java.util.LinkedList;
1718
import java.util.List;
19+
import java.util.Map;
1820
import java.util.Optional;
1921
import java.util.UUID;
2022
import java.util.stream.Collectors;
@@ -588,4 +590,64 @@ public void testLineageForOrphanedDataset() {
588590
private boolean jobNameEquals(Node node, String writeJob) {
589591
return node.getId().asJobId().getName().getValue().equals(writeJob);
590592
}
593+
594+
@Test
595+
public void testSymlinkDatasetLineage() {
596+
// (1) Create symlink facet for our main dataset
597+
Map<String, Object> symlink = new HashMap<>();
598+
Map<String, Object> symlinkInfo = new HashMap<>();
599+
Map<String, Object> symlinkIdentifiers = new HashMap<>();
600+
symlinkIdentifiers.put("name", "symlinkDataset");
601+
symlinkIdentifiers.put("namespace", NAMESPACE);
602+
symlinkIdentifiers.put("type", "DB_TABLE");
603+
symlinkInfo.put("producer", "https://github.com/OpenLineage/producer/");
604+
symlinkInfo.put("schemaURL", "https://openlineage.io/schema/url/");
605+
symlinkInfo.put("identifiers", symlinkIdentifiers);
606+
symlink.put("symlinks", symlinkInfo);
607+
608+
// (2) Create main dataset with a symlink
609+
Dataset mainDataset =
610+
new Dataset(
611+
NAMESPACE,
612+
"mainDataset",
613+
newDatasetFacet(symlink, new SchemaField("firstname", "string", "the first name")));
614+
615+
// (3) Create the symlink dataset
616+
Dataset symlinkDataset =
617+
new Dataset(
618+
NAMESPACE,
619+
"symlinkDataset",
620+
newDatasetFacet(new SchemaField("firstname", "string", "the first name")));
621+
622+
// (3) Create a job with the main dataset
623+
UpdateLineageRow firstJob =
624+
LineageTestUtils.createLineageRow(
625+
openLineageDao,
626+
"firstJob",
627+
"COMPLETE",
628+
jobFacet,
629+
Arrays.asList(mainDataset),
630+
Arrays.asList());
631+
632+
// (4) Create a job with the symlink dataset
633+
UpdateLineageRow secondJob =
634+
LineageTestUtils.createLineageRow(
635+
openLineageDao,
636+
"secondJob",
637+
"COMPLETE",
638+
jobFacet,
639+
Arrays.asList(symlinkDataset),
640+
Arrays.asList());
641+
642+
// (5) We expect the first and second job linked together because the main
643+
// and symlink dataset are in fact the same dataset
644+
Lineage lineage =
645+
lineageService.lineage(
646+
NodeId.of(
647+
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("symlinkDataset"))),
648+
5,
649+
true);
650+
651+
assertThat(lineage.getGraph()).hasSize(3);
652+
}
591653
}

0 commit comments

Comments
 (0)