Skip to content

Concurrency in columnlineage get #57

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/src/main/java/marquez/api/ColumnLineageResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ public Response getLineage(
return Response.status(400, "Node version cannot be specified when withDownstream is true")
.build();
}
return Response.ok(columnLineageService.lineage(nodeId, depth, withDownstream)).build();
return Response.ok(columnLineageService.directColumnLineage(nodeId, depth, withDownstream)).build();
}
}
36 changes: 27 additions & 9 deletions api/src/main/java/marquez/service/ColumnLineageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -49,23 +50,40 @@ public ColumnLineageService(ColumnLineageDao dao, DatasetFieldDao datasetFieldDa
this.datasetFieldDao = datasetFieldDao;
}

public Lineage lineage(NodeId nodeId, int depth, boolean withDownstream) {
public Lineage directColumnLineage(NodeId nodeId, int depth, boolean withDownstream) {
// 1. Get initial column nodes
ColumnNodes columnNodes = getColumnNodes(nodeId);
if (columnNodes.nodeIds.isEmpty()) {
throw new NodeIdNotFoundException("Could not find node");
}

// 2. Fetch upstream and downstream lineage separately (like LineageService.directLineage)
Set<ColumnLineageNodeData> upstreamLineage = fetchDirectColumnLineage(
new HashSet<>(columnNodes.nodeIds), depth, true, columnNodes.createdAtUntil);

Set<ColumnLineageNodeData> allLineageData = new HashSet<>(upstreamLineage);
// 2. Fetch upstream and downstream lineage with concurrency when both are needed
Set<ColumnLineageNodeData> allLineageData;

if (withDownstream) {
Set<ColumnLineageNodeData> downstreamLineage = fetchDirectColumnLineage(
new HashSet<>(columnNodes.nodeIds), depth, false, columnNodes.createdAtUntil);
allLineageData.addAll(downstreamLineage);
// Run upstream and downstream lineage fetching concurrently
CompletableFuture<Set<ColumnLineageNodeData>> upstreamFuture = CompletableFuture.supplyAsync(() ->
fetchDirectColumnLineage(new HashSet<>(columnNodes.nodeIds), depth, true, columnNodes.createdAtUntil));

CompletableFuture<Set<ColumnLineageNodeData>> downstreamFuture = CompletableFuture.supplyAsync(() ->
fetchDirectColumnLineage(new HashSet<>(columnNodes.nodeIds), depth, false, columnNodes.createdAtUntil));

try {
// Wait for both to complete and combine results
Set<ColumnLineageNodeData> upstreamLineage = upstreamFuture.get();
Set<ColumnLineageNodeData> downstreamLineage = downstreamFuture.get();

allLineageData = new HashSet<>(upstreamLineage);
allLineageData.addAll(downstreamLineage);
} catch (Exception e) {
log.error("Error during concurrent lineage fetching", e);
throw new RuntimeException("Failed to fetch lineage data concurrently", e);
}
} else {
// Only fetch upstream lineage
Set<ColumnLineageNodeData> upstreamLineage = fetchDirectColumnLineage(
new HashSet<>(columnNodes.nodeIds), depth, true, columnNodes.createdAtUntil);
allLineageData = new HashSet<>(upstreamLineage);
}

log.debug("Completed lineage traversal with {} total nodes", allLineageData.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ColumnLineageResourceTest {
ColumnLineageResourceTest.class.getResourceAsStream("/column_lineage/node.json"),
new TypeReference<>() {});
LINEAGE = new Lineage(ImmutableSortedSet.of(testNode));
when(lineageService.lineage(any(NodeId.class), eq(20), eq(false))).thenReturn(LINEAGE);
when(lineageService.directColumnLineage(any(NodeId.class), eq(20), eq(false))).thenReturn(LINEAGE);

ServiceFactory serviceFactory =
ApiTestUtils.mockServiceFactory(Map.of(ColumnLineageService.class, lineageService));
Expand Down
Loading
Loading