Skip to content

Commit 42cadbb

Browse files
Runless events - consume dataset event (#2641)
* Runless events - consume dataset event Signed-off-by: Pawel Leszczynski <[email protected]> * Runless events - add container to store daos Signed-off-by: Pawel Leszczynski <[email protected]> * Runless events - extract common methods Signed-off-by: Pawel Leszczynski <[email protected]> * Runless events - run upsert builder Signed-off-by: Pawel Leszczynski <[email protected]> * Runless events - review feedback Signed-off-by: Pawel Leszczynski <[email protected]> * fix daos container - speeds up tests twice Signed-off-by: Pawel Leszczynski <[email protected]> --------- Signed-off-by: Pawel Leszczynski <[email protected]> Co-authored-by: Willy Lulciuc <[email protected]>
1 parent 7c19162 commit 42cadbb

14 files changed

+646
-268
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
* Web: fix Unix epoch time display for null `endedAt` values [`#2647`](https://github.com/MarquezProject/marquez/pull/2647) [@merobi-hub](https://github.com/merobi-hub)
2727
*Fixes the issue of the GUI displaying Unix epoch time (midnight on January 1, 1970) in the case of running jobs/null `endedAt` values.*
2828

29+
### Added
30+
* API: support `DatasetEvent` [`#2641`](https://github.com/MarquezProject/marquez/pull/2641) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
31+
*Save into Marquez model datasets sent via `DatasetEvent` event type
32+
2933
## [0.41.0](https://github.com/MarquezProject/marquez/compare/0.40.0...0.41.0) - 2023-09-20
3034
### Added
3135
* API: add support for the following parameters in the `SearchDao` [`#2556`](https://github.com/MarquezProject/marquez/pull/2556) [@tati](https://github.com/tati) [@wslulciuc](https://github.com/wslulciuc)

api/src/main/java/marquez/api/OpenLineageResource.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import marquez.db.OpenLineageDao;
4040
import marquez.service.ServiceFactory;
4141
import marquez.service.models.BaseEvent;
42+
import marquez.service.models.DatasetEvent;
4243
import marquez.service.models.LineageEvent;
4344
import marquez.service.models.NodeId;
4445

@@ -67,15 +68,11 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
6768
if (event instanceof LineageEvent) {
6869
openLineageService
6970
.createAsync((LineageEvent) event)
70-
.whenComplete(
71-
(result, err) -> {
72-
if (err != null) {
73-
log.error("Unexpected error while processing request", err);
74-
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
75-
} else {
76-
asyncResponse.resume(Response.status(201).build());
77-
}
78-
});
71+
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
72+
} else if (event instanceof DatasetEvent) {
73+
openLineageService
74+
.createAsync((DatasetEvent) event)
75+
.whenComplete((result, err) -> onComplete(result, err, asyncResponse));
7976
} else {
8077
log.warn("Unsupported event type {}. Skipping without error", event.getClass().getName());
8178

@@ -84,6 +81,15 @@ public void create(@Valid @NotNull BaseEvent event, @Suspended final AsyncRespon
8481
}
8582
}
8683

84+
private void onComplete(Void result, Throwable err, AsyncResponse asyncResponse) {
85+
if (err != null) {
86+
log.error("Unexpected error while processing request", err);
87+
asyncResponse.resume(Response.status(determineStatusCode(err)).build());
88+
} else {
89+
asyncResponse.resume(Response.status(201).build());
90+
}
91+
}
92+
8793
private int determineStatusCode(Throwable e) {
8894
if (e instanceof CompletionException) {
8995
return determineStatusCode(e.getCause());

api/src/main/java/marquez/db/DatasetFacetsDao.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.Spliterators;
1313
import java.util.UUID;
1414
import java.util.stream.StreamSupport;
15+
import javax.annotation.Nullable;
1516
import lombok.NonNull;
1617
import marquez.common.Utils;
1718
import marquez.service.models.LineageEvent;
@@ -126,9 +127,9 @@ void insertDatasetFacet(
126127
default void insertDatasetFacetsFor(
127128
@NonNull UUID datasetUuid,
128129
@NonNull UUID datasetVersionUuid,
129-
@NonNull UUID runUuid,
130+
@Nullable UUID runUuid,
130131
@NonNull Instant lineageEventTime,
131-
@NonNull String lineageEventType,
132+
@Nullable String lineageEventType,
132133
@NonNull LineageEvent.DatasetFacets datasetFacets) {
133134
final Instant now = Instant.now();
134135

0 commit comments

Comments
 (0)