Skip to content

Commit 0074433

Browse files
Flink fix terminal streaming events
Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent 78a191b commit 0074433

File tree

4 files changed

+60
-2
lines changed

4 files changed

+60
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.45.0...HEAD)
44

5+
### Fixed
6+
7+
* Streaming API: fix behaviour for `COMPLETE`/`FAIL` events within streaming jobs [`#2768`](https://github.com/MarquezProject/marquez/pull/2768) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
8+
*New `job_version` is not created for a streaming job terminal event with no dataset information and existing version is kept.*
9+
510
## [0.45.0](https://github.com/MarquezProject/marquez/compare/0.44.0...0.45.0) - 2024-03-07
611

712
### Added

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
374374
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
375375
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
376376
}
377-
} else {
377+
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
378378
// mark job_versions_io_mapping as obsolete
379379
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT);
380380
}
@@ -390,7 +390,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
390390
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
391391
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
392392
}
393-
} else {
393+
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
394394
// mark job_versions_io_mapping as obsolete
395395
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT);
396396
}
@@ -791,6 +791,10 @@ default void updateMarquezOnStreamingJob(
791791
jobVersionDao.loadJobRowRunDetails(
792792
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());
793793

794+
if (event.isTerminalEventForStreamingJobWithNoDatasets()) {
795+
return;
796+
}
797+
794798
if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
795799
// need to insert new job version
796800
BagOfJobVersionInfo bagOfJobVersionInfo =

api/src/main/java/marquez/service/models/LineageEvent.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,20 @@ public class LineageEvent extends BaseEvent {
5252
@Valid @NotNull private String producer;
5353
@Valid private URI schemaURL;
5454

55+
@JsonIgnore
56+
public boolean isTerminalEvent() {
57+
return (eventType != null)
58+
&& (eventType.equalsIgnoreCase("COMPLETE") || eventType.equalsIgnoreCase("FAIL"));
59+
}
60+
61+
@JsonIgnore
62+
public boolean isTerminalEventForStreamingJobWithNoDatasets() {
63+
return isTerminalEvent()
64+
&& (job != null && job.isStreamingJob())
65+
&& (outputs == null || outputs.isEmpty())
66+
&& (inputs == null || inputs.isEmpty());
67+
}
68+
5569
@AllArgsConstructor
5670
@NoArgsConstructor
5771
@Setter

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,41 @@ public void testGetLineageForRunningStreamingJob() {
555555
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
556556
}
557557

558+
@Test
559+
public void testGetLineageForCompleteStreamingJob() {
560+
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
561+
Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build();
562+
563+
LineageTestUtils.createLineageRow(
564+
openLineageDao,
565+
"streamingjob",
566+
"RUNNING",
567+
JobFacet.builder()
568+
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
569+
.build(),
570+
Arrays.asList(input),
571+
Arrays.asList(output));
572+
573+
LineageTestUtils.createLineageRow(
574+
openLineageDao,
575+
"streamingjob",
576+
"COMPLETE",
577+
JobFacet.builder()
578+
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
579+
.build(),
580+
Collections.emptyList(),
581+
Collections.emptyList());
582+
583+
Lineage lineage =
584+
lineageService.lineage(
585+
NodeId.of(
586+
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
587+
5,
588+
true);
589+
590+
assertThat(lineage.getGraph()).hasSize(3); // 1 job + 2 datasets
591+
}
592+
558593
@Test
559594
public void testLineageForOrphanedDataset() {
560595
UpdateLineageRow writeJob =

0 commit comments

Comments
 (0)