Skip to content

Commit 5e63ebb

Browse files
Flink fix terminal streaming events
Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent 78a191b commit 5e63ebb

File tree

5 files changed

+118
-2
lines changed

5 files changed

+118
-2
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22

33
## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.45.0...HEAD)
44

5+
### Fixed
6+
7+
* Streaming API: fix behaviour for `COMPLETE`/`FAIL` events within streaming jobs [`#2768`](https://github.com/MarquezProject/marquez/pull/2768) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
8+
*New `job_version` is not created for a streaming job terminal event with no dataset information and existing version is kept.*
9+
510
## [0.45.0](https://github.com/MarquezProject/marquez/compare/0.44.0...0.45.0) - 2024-03-07
611

712
### Added

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
374374
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
375375
insertInputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
376376
}
377-
} else {
377+
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
378378
// mark job_versions_io_mapping as obsolete
379379
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.INPUT);
380380
}
@@ -390,7 +390,7 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
390390
insertDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
391391
insertOutputDatasetFacets(daos, dataset, record, runUuid, event.getEventType(), now);
392392
}
393-
} else {
393+
} else if (!event.isTerminalEventForStreamingJobWithNoDatasets()) {
394394
// mark job_versions_io_mapping as obsolete
395395
daos.getJobVersionDao().markInputOrOutputDatasetAsPreviousFor(job.getUuid(), IoType.OUTPUT);
396396
}
@@ -791,6 +791,10 @@ default void updateMarquezOnStreamingJob(
791791
jobVersionDao.loadJobRowRunDetails(
792792
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());
793793

794+
if (event.isTerminalEventForStreamingJobWithNoDatasets()) {
795+
return;
796+
}
797+
794798
if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
795799
// need to insert new job version
796800
BagOfJobVersionInfo bagOfJobVersionInfo =

api/src/main/java/marquez/service/models/LineageEvent.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,20 @@ public class LineageEvent extends BaseEvent {
5252
@Valid @NotNull private String producer;
5353
@Valid private URI schemaURL;
5454

55+
@JsonIgnore
56+
public boolean isTerminalEvent() {
57+
return (eventType != null)
58+
&& (eventType.equalsIgnoreCase("COMPLETE") || eventType.equalsIgnoreCase("FAIL"));
59+
}
60+
61+
@JsonIgnore
62+
public boolean isTerminalEventForStreamingJobWithNoDatasets() {
63+
return isTerminalEvent()
64+
&& (job != null && job.isStreamingJob())
65+
&& (outputs == null || outputs.isEmpty())
66+
&& (inputs == null || inputs.isEmpty());
67+
}
68+
5569
@AllArgsConstructor
5670
@NoArgsConstructor
5771
@Setter

api/src/test/java/marquez/service/LineageServiceTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,41 @@ public void testGetLineageForRunningStreamingJob() {
555555
assertThat(lineageFromInput.getGraph()).isEqualTo(lineageFromOutput.getGraph());
556556
}
557557

558+
@Test
559+
public void testGetLineageForCompleteStreamingJob() {
560+
Dataset input = Dataset.builder().name("input-dataset").namespace(NAMESPACE).build();
561+
Dataset output = Dataset.builder().name("output-dataset").namespace(NAMESPACE).build();
562+
563+
LineageTestUtils.createLineageRow(
564+
openLineageDao,
565+
"streamingjob",
566+
"RUNNING",
567+
JobFacet.builder()
568+
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
569+
.build(),
570+
Arrays.asList(input),
571+
Arrays.asList(output));
572+
573+
LineageTestUtils.createLineageRow(
574+
openLineageDao,
575+
"streamingjob",
576+
"COMPLETE",
577+
JobFacet.builder()
578+
.jobType(JobTypeJobFacet.builder().processingType("STREAMING").build())
579+
.build(),
580+
Collections.emptyList(),
581+
Collections.emptyList());
582+
583+
Lineage lineage =
584+
lineageService.lineage(
585+
NodeId.of(
586+
new DatasetId(new NamespaceName(NAMESPACE), new DatasetName("output-dataset"))),
587+
5,
588+
true);
589+
590+
assertThat(lineage.getGraph()).hasSize(3); // 1 job + 2 datasets
591+
}
592+
558593
@Test
559594
public void testLineageForOrphanedDataset() {
560595
UpdateLineageRow writeJob =

api/src/test/java/marquez/service/models/LineageEventTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
package marquez.service.models;
77

88
import static org.assertj.core.api.Assertions.assertThat;
9+
import static org.mockito.Mockito.mock;
10+
import static org.mockito.Mockito.when;
911

1012
import com.fasterxml.jackson.databind.JsonNode;
1113
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -21,10 +23,12 @@
2123
import java.time.temporal.TemporalAccessor;
2224
import java.time.temporal.TemporalQueries;
2325
import java.util.Arrays;
26+
import java.util.Collections;
2427
import java.util.List;
2528
import marquez.common.Utils;
2629
import marquez.common.models.FlexibleDateTimeDeserializer;
2730
import marquez.service.models.LineageEvent.JobTypeJobFacet;
31+
import marquez.service.models.LineageEvent.LineageEventBuilder;
2832
import org.junit.Test;
2933
import org.junit.jupiter.params.ParameterizedTest;
3034
import org.junit.jupiter.params.provider.MethodSource;
@@ -109,4 +113,58 @@ public void testJobTypeJobFacetSerialization() throws IOException {
109113
assertThat(facet.getIntegration()).isEqualTo("FLINK");
110114
assertThat(facet.getProcessingType()).isEqualTo("STREAMING");
111115
}
116+
117+
@Test
118+
public void testIsTerminalEvent() {
119+
LineageEventBuilder builder = LineageEvent.builder();
120+
121+
assertThat(builder.eventType("compleTe").build().isTerminalEvent()).isTrue();
122+
assertThat(builder.eventType("Fail").build().isTerminalEvent()).isTrue();
123+
assertThat(builder.eventType("start").build().isTerminalEvent()).isFalse();
124+
}
125+
126+
@Test
127+
public void testSsTerminalEventForStreamingJobWithNoDatasets() {
128+
LineageEvent.Job streamingJob = mock(LineageEvent.Job.class);
129+
when(streamingJob.isStreamingJob()).thenReturn(true);
130+
LineageEventBuilder builder = LineageEvent.builder().job(streamingJob);
131+
132+
assertThat(builder.eventType("complete").build().isTerminalEventForStreamingJobWithNoDatasets())
133+
.isTrue();
134+
135+
assertThat(builder.eventType("start").build().isTerminalEventForStreamingJobWithNoDatasets())
136+
.isFalse();
137+
138+
assertThat(
139+
builder
140+
.eventType("complete")
141+
.inputs(Collections.emptyList())
142+
.build()
143+
.isTerminalEventForStreamingJobWithNoDatasets())
144+
.isTrue();
145+
146+
assertThat(
147+
builder
148+
.eventType("complete")
149+
.outputs(Collections.emptyList())
150+
.build()
151+
.isTerminalEventForStreamingJobWithNoDatasets())
152+
.isTrue();
153+
154+
assertThat(
155+
builder
156+
.eventType("complete")
157+
.outputs(Collections.singletonList(mock(LineageEvent.Dataset.class)))
158+
.build()
159+
.isTerminalEventForStreamingJobWithNoDatasets())
160+
.isFalse();
161+
162+
assertThat(
163+
builder
164+
.eventType("complete")
165+
.job(mock(LineageEvent.Job.class))
166+
.build()
167+
.isTerminalEventForStreamingJobWithNoDatasets())
168+
.isFalse();
169+
}
112170
}

0 commit comments

Comments
 (0)