11
11
import java .util .stream .Collectors ;
12
12
import lombok .NonNull ;
13
13
import lombok .extern .slf4j .Slf4j ;
14
- import marquez .service .models .LineageKindsModels .*;
15
14
import marquez .service .models .Lineage ;
15
+ import marquez .service .models .LineageKindsModels .CentralNodeInfo ;
16
+ import marquez .service .models .LineageKindsModels .DataGovernance ;
17
+ import marquez .service .models .LineageKindsModels .DataObjectNodeSpec ;
18
+ import marquez .service .models .LineageKindsModels .KindMetadata ;
19
+ import marquez .service .models .LineageKindsModels .LineageGraphKind ;
20
+ import marquez .service .models .LineageKindsModels .LineageGraphKindList ;
21
+ import marquez .service .models .LineageKindsModels .LineageGraphSpec ;
22
+ import marquez .service .models .LineageKindsModels .ListMetadata ;
16
23
import marquez .service .models .Node ;
17
24
import marquez .service .models .NodeId ;
18
25
@@ -25,112 +32,98 @@ public LineageKindsService(@NonNull LineageService lineageService) {
25
32
this .lineageService = lineageService ;
26
33
}
27
34
28
- /**
29
- * Convert traditional Marquez lineage to LineageGraph kind format
30
- */
35
+ /** Convert traditional Marquez lineage to LineageGraph kind format */
31
36
public LineageGraphKind convertToLineageGraphKind (
32
- @ NonNull String nodeIdValue ,
33
- int depth ,
34
- boolean includeMetadata ) throws NodeIdNotFoundException {
35
-
37
+ @ NonNull String nodeIdValue , int depth , boolean includeMetadata )
38
+ throws NodeIdNotFoundException {
39
+
36
40
log .debug ("Converting lineage to kinds format for nodeId: {}, depth: {}" , nodeIdValue , depth );
37
-
41
+
38
42
// 1. Get traditional lineage using existing directLineage service
39
43
NodeId nodeId = NodeId .of (nodeIdValue );
40
44
Lineage traditionalLineage = lineageService .directLineage (nodeId , depth );
41
-
45
+
42
46
// 2. Convert to kinds format
43
47
return convertToLineageGraphKind (traditionalLineage , nodeId , depth , includeMetadata );
44
48
}
45
49
46
- /**
47
- * Get a LineageGraph kind by user-friendly name
48
- */
49
- public LineageGraphKind getLineageGraphByName (
50
- @ NonNull String name ,
51
- int depth ) throws NodeIdNotFoundException {
52
-
50
+ /** Get a LineageGraph kind by user-friendly name */
51
+ public LineageGraphKind getLineageGraphByName (@ NonNull String name , int depth )
52
+ throws NodeIdNotFoundException {
53
+
53
54
log .debug ("Getting lineage graph by name: {}, depth: {}" , name , depth );
54
-
55
+
55
56
NodeId nodeId = deriveNodeIdFromName (name );
56
57
Lineage traditionalLineage = lineageService .directLineage (nodeId , depth );
57
-
58
+
58
59
return convertToLineageGraphKind (traditionalLineage , nodeId , depth , true );
59
60
}
60
61
61
- /**
62
- * List LineageGraph kinds with optional filtering
63
- */
62
+ /** List LineageGraph kinds with optional filtering */
64
63
public LineageGraphKindList listLineageGraphKinds (String labelSelector , int limit ) {
65
64
log .debug ("Listing lineage graphs with labelSelector: {}, limit: {}" , labelSelector , limit );
66
-
65
+
67
66
// This is a simplified implementation
68
67
// In practice, you'd query your metadata store based on labels
69
68
List <LineageGraphKind > items = findLineageGraphsByLabels (labelSelector , limit );
70
-
69
+
71
70
return LineageGraphKindList .builder ()
72
71
.apiVersion ("graphs/v1alpha1" )
73
72
.kind ("LineageGraphList" )
74
- .metadata (ListMetadata .builder ()
75
- .totalCount (items .size ())
76
- .build ())
73
+ .metadata (ListMetadata .builder ().totalCount (items .size ()).build ())
77
74
.items (items )
78
75
.build ();
79
76
}
80
77
81
- /**
82
- * Convert traditional Marquez lineage to LineageGraph kind
83
- */
78
+ /** Convert traditional Marquez lineage to LineageGraph kind */
84
79
private LineageGraphKind convertToLineageGraphKind (
85
- Lineage traditionalLineage ,
86
- NodeId centralNodeId ,
87
- int depth ,
88
- boolean includeMetadata ) {
89
-
80
+ Lineage traditionalLineage , NodeId centralNodeId , int depth , boolean includeMetadata ) {
81
+
90
82
// Extract central node information
91
83
CentralNodeInfo centralNode = extractCentralNodeInfo (traditionalLineage , centralNodeId );
92
-
84
+
93
85
// Build metadata
94
- KindMetadata .KindMetadataBuilder metadataBuilder = KindMetadata .builder ()
95
- .name (generateLineageGraphName (centralNodeId ))
96
- .graphDepth (depth )
97
- .centralNode (centralNode )
98
- .createdAt (Instant .now ());
99
-
86
+ KindMetadata .KindMetadataBuilder metadataBuilder =
87
+ KindMetadata .builder ()
88
+ .name (generateLineageGraphName (centralNodeId ))
89
+ .graphDepth (depth )
90
+ .centralNode (centralNode )
91
+ .createdAt (Instant .now ());
92
+
100
93
if (includeMetadata ) {
101
94
// Add governance-related labels and annotations
102
95
metadataBuilder
103
- .labels (Map .of (
104
- "data-domain" , centralNode .getDataGovernance ().getDataDomain (),
105
- "data-subdomain" , centralNode .getDataGovernance ().getDataSubdomain (),
106
- "geo" , centralNode .getDataGovernance ().getGeo (),
107
- "source-system" , centralNode .getSourceSystem () != null ? centralNode .getSourceSystem () : "unknown"
108
- ))
109
- .annotations (Map .of (
110
- "marquez.source-endpoint" , "/api/v1/lineage/direct" ,
111
- "conversion.timestamp" , Instant .now ().toString (),
112
- "conversion.depth" , String .valueOf (depth )
113
- ));
96
+ .labels (
97
+ Map .of (
98
+ "data-domain" , centralNode .getDataGovernance ().getDataDomain (),
99
+ "data-subdomain" , centralNode .getDataGovernance ().getDataSubdomain (),
100
+ "geo" , centralNode .getDataGovernance ().getGeo (),
101
+ "source-system" ,
102
+ centralNode .getSourceSystem () != null
103
+ ? centralNode .getSourceSystem ()
104
+ : "unknown" ))
105
+ .annotations (
106
+ Map .of (
107
+ "marquez.source-endpoint" , "/api/v1/lineage/direct" ,
108
+ "conversion.timestamp" , Instant .now ().toString (),
109
+ "conversion.depth" , String .valueOf (depth )));
114
110
}
115
-
111
+
116
112
// Convert nodes
117
- List <DataObjectNodeSpec > nodes = traditionalLineage .getGraph ().stream ()
118
- .map (this ::convertToDataObjectNodeSpec )
119
- .collect (Collectors .toList ());
120
-
113
+ List <DataObjectNodeSpec > nodes =
114
+ traditionalLineage .getGraph ().stream ()
115
+ .map (this ::convertToDataObjectNodeSpec )
116
+ .collect (Collectors .toList ());
117
+
121
118
return LineageGraphKind .builder ()
122
119
.apiVersion ("graphs/v1alpha1" )
123
120
.kind ("LineageGraph" )
124
121
.metadata (metadataBuilder .build ())
125
- .spec (LineageGraphSpec .builder ()
126
- .nodes (nodes )
127
- .build ())
122
+ .spec (LineageGraphSpec .builder ().nodes (nodes ).build ())
128
123
.build ();
129
124
}
130
125
131
- /**
132
- * Convert a traditional graph node to DataObjectNodeSpec
133
- */
126
+ /** Convert a traditional graph node to DataObjectNodeSpec */
134
127
private DataObjectNodeSpec convertToDataObjectNodeSpec (Node traditionalNode ) {
135
128
return DataObjectNodeSpec .builder ()
136
129
.nurn (generateNuRN (traditionalNode ))
@@ -198,18 +191,19 @@ private String extractVersion(Node node) {
198
191
199
192
private CentralNodeInfo extractCentralNodeInfo (Lineage lineage , NodeId centralNodeId ) {
200
193
String centralIdValue = centralNodeId .getValue ();
201
-
194
+
202
195
// Extract namespace and name from nodeId (format: "dataset:namespace:name")
203
196
String [] parts = centralIdValue .split (":" , 3 );
204
197
String namespace = parts .length > 1 ? parts [1 ] : "default" ;
205
198
String name = parts .length > 2 ? parts [2 ] : centralIdValue ;
206
-
199
+
207
200
return CentralNodeInfo .builder ()
208
- .dataGovernance (DataGovernance .builder ()
209
- .geo ("DATA" )
210
- .dataDomain (namespace )
211
- .dataSubdomain ("datasets" )
212
- .build ())
201
+ .dataGovernance (
202
+ DataGovernance .builder ()
203
+ .geo ("DATA" )
204
+ .dataDomain (namespace )
205
+ .dataSubdomain ("datasets" )
206
+ .build ())
213
207
.nurn ("nurn:nu:data:metapod:" + centralIdValue .replace (":" , "/" ))
214
208
.name (namespace + "/" + name )
215
209
.type ("dataset" )
@@ -232,4 +226,4 @@ private List<LineageGraphKind> findLineageGraphsByLabels(String labelSelector, i
232
226
// For now, return empty list
233
227
return List .of ();
234
228
}
235
- }
229
+ }
0 commit comments