Skip to content

Commit 2c8613a

Browse files
Support streaming jobs in Marquez (#2682)
* Support streaming jobs in Marquez Signed-off-by: Pawel Leszczynski <[email protected]> * write job versions on running Signed-off-by: Pawel Leszczynski <[email protected]> * refactor isStreamingJob method Signed-off-by: Pawel Leszczynski <[email protected]> --------- Signed-off-by: Pawel Leszczynski <[email protected]>
1 parent b73fb15 commit 2c8613a

27 files changed

+634
-171
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*Save into Marquez model datasets sent via `DatasetEvent` event type
88
* API: support `JobEvent` [`#2661`](https://github.com/MarquezProject/marquez/pull/2661) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
99
*Save into Marquez model jobs and datasets sent via `JobEvent` event type.
10+
* API: support streaming jobs [`#2682`](https://github.com/MarquezProject/marquez/pull/2682) [@pawel-big-lebowski]( https://github.com/pawel-big-lebowski)
11+
*Creates job version and reference rows at the beginning of the job instead of on complete. Updates job version within the run if anything changes.
1012

1113
## [0.42.0](https://github.com/MarquezProject/marquez/compare/0.41.0...0.42.0) - 2023-10-17
1214
### Added

api/src/main/java/marquez/db/JobVersionDao.java

Lines changed: 77 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,14 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
280280
return findInputOrOutputDatasetsFor(jobVersionUuid, IoType.OUTPUT);
281281
}
282282

283+
/**
284+
* Verifies if a job with a specified job version is present in table.
285+
*
286+
* @param version Version identifier
287+
*/
288+
@SqlQuery("SELECT EXISTS (SELECT 1 FROM job_versions WHERE version = :version)")
289+
boolean versionExists(UUID version);
290+
283291
/**
284292
* Returns the input or output datasets for a given job version.
285293
*
@@ -447,98 +455,73 @@ private static ExtendedDatasetVersionRow toExtendedDatasetVersionRow(DatasetReco
447455
* and context. A version for a given job is created <i>only</i> when a {@link Run} transitions
448456
* into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state.
449457
*
450-
* @param jobRow The job.
451-
* @param runUuid The unique ID of the run associated with the job version.
458+
* @param jobRowRunDetails The job row with run details.
452459
* @param runState The current run state.
453460
* @param transitionedAt The timestamp of the run state transition.
454461
* @return A {@link BagOfJobVersionInfo} object.
455462
*/
456463
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
457-
@NonNull JobRow jobRow,
458-
@NonNull UUID runUuid,
464+
@NonNull JobRowRunDetails jobRowRunDetails,
459465
@NonNull RunState runState,
460-
@NonNull Instant transitionedAt) {
466+
@NonNull Instant transitionedAt,
467+
boolean linkJobToJobVersion) {
461468
// Get the job.
462469
final JobDao jobDao = createJobDao();
463470

464-
// Get the inputs and outputs dataset versions for the run associated with the job version.
465-
final DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
466-
final List<ExtendedDatasetVersionRow> jobVersionInputs =
467-
datasetVersionDao.findInputDatasetVersionsFor(runUuid);
468-
final List<ExtendedDatasetVersionRow> jobVersionOutputs =
469-
datasetVersionDao.findOutputDatasetVersionsFor(runUuid);
470-
471-
// Get the namespace for the job.
472-
final NamespaceRow namespaceRow =
473-
createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get();
474-
475-
// Generate the version for the job; the version may already exist.
476-
final Version jobVersion =
477-
Utils.newJobVersionFor(
478-
NamespaceName.of(jobRow.getNamespaceName()),
479-
JobName.of(
480-
Optional.ofNullable(jobRow.getParentJobName())
481-
.map(pn -> pn + "." + jobRow.getSimpleName())
482-
.orElse(jobRow.getName())),
483-
toDatasetIds(
484-
jobVersionInputs.stream()
485-
.map(i -> (DatasetVersionRow) i)
486-
.collect(Collectors.toList())),
487-
toDatasetIds(
488-
jobVersionOutputs.stream()
489-
.map(o -> (DatasetVersionRow) o)
490-
.collect(Collectors.toList())),
491-
jobRow.getLocation());
492-
493471
// Add the job version.
494472
final JobVersionDao jobVersionDao = createJobVersionDao();
473+
495474
final JobVersionRow jobVersionRow =
496475
jobVersionDao.upsertJobVersion(
497476
UUID.randomUUID(),
498477
transitionedAt, // Use the timestamp of when the run state transitioned.
499-
jobRow.getUuid(),
500-
jobRow.getLocation(),
501-
jobVersion.getValue(),
502-
jobRow.getName(),
503-
namespaceRow.getUuid(),
504-
jobRow.getNamespaceName());
478+
jobRowRunDetails.jobRow.getUuid(),
479+
jobRowRunDetails.jobRow.getLocation(),
480+
jobRowRunDetails.jobVersion.getValue(),
481+
jobRowRunDetails.jobRow.getName(),
482+
jobRowRunDetails.namespaceRow.getUuid(),
483+
jobRowRunDetails.jobRow.getNamespaceName());
505484

506485
// Link the input datasets to the job version.
507-
jobVersionInputs.forEach(
486+
jobRowRunDetails.jobVersionInputs.forEach(
508487
jobVersionInput -> {
509488
jobVersionDao.upsertInputDatasetFor(
510489
jobVersionRow.getUuid(),
511490
jobVersionInput.getDatasetUuid(),
512491
jobVersionRow.getJobUuid(),
513-
jobRow.getSymlinkTargetId());
492+
jobRowRunDetails.jobRow.getSymlinkTargetId());
514493
});
515494

516495
// Link the output datasets to the job version.
517-
jobVersionOutputs.forEach(
496+
jobRowRunDetails.jobVersionOutputs.forEach(
518497
jobVersionOutput -> {
519498
jobVersionDao.upsertOutputDatasetFor(
520499
jobVersionRow.getUuid(),
521500
jobVersionOutput.getDatasetUuid(),
522501
jobVersionRow.getJobUuid(),
523-
jobRow.getSymlinkTargetId());
502+
jobRowRunDetails.jobRow.getSymlinkTargetId());
524503
});
525504

526505
// Link the job version to the run.
527-
createRunDao().updateJobVersion(runUuid, jobVersionRow.getUuid());
506+
createRunDao().updateJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid());
528507

529508
// Link the run to the job version; multiple run instances may be linked to a job version.
530-
jobVersionDao.updateLatestRunFor(jobVersionRow.getUuid(), transitionedAt, runUuid);
509+
jobVersionDao.updateLatestRunFor(
510+
jobVersionRow.getUuid(), transitionedAt, jobRowRunDetails.runUuid);
531511

532512
// Link the job facets to this job version
533-
jobVersionDao.linkJobFacetsToJobVersion(runUuid, jobVersionRow.getUuid());
513+
jobVersionDao.linkJobFacetsToJobVersion(jobRowRunDetails.runUuid, jobVersionRow.getUuid());
534514

535-
// Link the job version to the job only if the run is marked done and has transitioned into one
536-
// of the following states: COMPLETED, ABORTED, or FAILED.
537-
if (runState.isDone()) {
538-
jobDao.updateVersionFor(jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid());
515+
if (linkJobToJobVersion) {
516+
jobDao.updateVersionFor(
517+
jobRowRunDetails.jobRow.getUuid(), transitionedAt, jobVersionRow.getUuid());
539518
}
540519

541-
return new BagOfJobVersionInfo(jobRow, jobVersionRow, jobVersionInputs, jobVersionOutputs);
520+
return new BagOfJobVersionInfo(
521+
jobRowRunDetails.jobRow,
522+
jobVersionRow,
523+
jobRowRunDetails.jobVersionInputs,
524+
jobRowRunDetails.jobVersionOutputs);
542525
}
543526

544527
/** Returns the specified {@link ExtendedDatasetVersionRow}s as {@link DatasetId}s. */
@@ -556,6 +539,40 @@ private DatasetId toDatasetId(DatasetVersionRow dataset) {
556539
NamespaceName.of(dataset.getNamespaceName()), DatasetName.of(dataset.getDatasetName()));
557540
}
558541

542+
default JobRowRunDetails loadJobRowRunDetails(JobRow jobRow, UUID runUuid) {
543+
// Get the inputs and outputs dataset versions for the run associated with the job version.
544+
final DatasetVersionDao datasetVersionDao = createDatasetVersionDao();
545+
final List<ExtendedDatasetVersionRow> jobVersionInputs =
546+
datasetVersionDao.findInputDatasetVersionsFor(runUuid);
547+
final List<ExtendedDatasetVersionRow> jobVersionOutputs =
548+
datasetVersionDao.findOutputDatasetVersionsFor(runUuid);
549+
550+
// Get the namespace for the job.
551+
final NamespaceRow namespaceRow =
552+
createNamespaceDao().findNamespaceByName(jobRow.getNamespaceName()).get();
553+
554+
// Generate the version for the job; the version may already exist.
555+
final Version jobVersion =
556+
Utils.newJobVersionFor(
557+
NamespaceName.of(jobRow.getNamespaceName()),
558+
JobName.of(
559+
Optional.ofNullable(jobRow.getParentJobName())
560+
.map(pn -> pn + "." + jobRow.getSimpleName())
561+
.orElse(jobRow.getName())),
562+
toDatasetIds(
563+
jobVersionInputs.stream()
564+
.map(i -> (DatasetVersionRow) i)
565+
.collect(Collectors.toList())),
566+
toDatasetIds(
567+
jobVersionOutputs.stream()
568+
.map(o -> (DatasetVersionRow) o)
569+
.collect(Collectors.toList())),
570+
jobRow.getLocation());
571+
572+
return new JobRowRunDetails(
573+
jobRow, runUuid, namespaceRow, jobVersionInputs, jobVersionOutputs, jobVersion);
574+
}
575+
559576
/** A container class for job version info. */
560577
@Value
561578
class BagOfJobVersionInfo {
@@ -567,6 +584,14 @@ class BagOfJobVersionInfo {
567584

568585
record JobDataset(String namespace, String name, IoType ioType) {}
569586

587+
record JobRowRunDetails(
588+
JobRow jobRow,
589+
UUID runUuid,
590+
NamespaceRow namespaceRow,
591+
List<ExtendedDatasetVersionRow> jobVersionInputs,
592+
List<ExtendedDatasetVersionRow> jobVersionOutputs,
593+
Version jobVersion) {}
594+
570595
class JobDatasetMapper implements RowMapper<JobDataset> {
571596
@Override
572597
public JobDataset map(ResultSet rs, StatementContext ctx) throws SQLException {

api/src/main/java/marquez/db/OpenLineageDao.java

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
import marquez.common.models.DatasetId;
2828
import marquez.common.models.DatasetName;
2929
import marquez.common.models.DatasetType;
30-
import marquez.common.models.JobType;
3130
import marquez.common.models.NamespaceName;
3231
import marquez.common.models.RunState;
3332
import marquez.common.models.SourceType;
3433
import marquez.db.DatasetFieldDao.DatasetFieldMapping;
3534
import marquez.db.JobVersionDao.BagOfJobVersionInfo;
3635
import marquez.db.JobVersionDao.IoType;
36+
import marquez.db.JobVersionDao.JobRowRunDetails;
3737
import marquez.db.RunDao.RunUpsert;
3838
import marquez.db.RunDao.RunUpsert.RunUpsertBuilder;
3939
import marquez.db.mappers.LineageEventMapper;
@@ -167,9 +167,13 @@ SELECT count(*)
167167
default UpdateLineageRow updateMarquezModel(LineageEvent event, ObjectMapper mapper) {
168168
UpdateLineageRow updateLineageRow = updateBaseMarquezModel(event, mapper);
169169
RunState runState = getRunState(event.getEventType());
170-
if (event.getEventType() != null && runState.isDone()) {
170+
171+
if (event.getJob() != null && event.getJob().isStreamingJob()) {
172+
updateMarquezOnStreamingJob(event, updateLineageRow, runState);
173+
} else if (event.getEventType() != null && runState.isDone()) {
171174
updateMarquezOnComplete(event, updateLineageRow, runState);
172175
}
176+
173177
return updateLineageRow;
174178
}
175179

@@ -559,7 +563,7 @@ private JobRow buildJobFromEvent(
559563
jobDao.upsertJob(
560564
UUID.randomUUID(),
561565
parent.getUuid(),
562-
getJobType(job),
566+
job.type(),
563567
now,
564568
namespace.getUuid(),
565569
namespace.getName(),
@@ -572,7 +576,7 @@ private JobRow buildJobFromEvent(
572576
() ->
573577
jobDao.upsertJob(
574578
UUID.randomUUID(),
575-
getJobType(job),
579+
job.type(),
576580
now,
577581
namespace.getUuid(),
578582
namespace.getName(),
@@ -680,7 +684,7 @@ private JobRow createParentJobRunRecord(
680684
createJobDao()
681685
.upsertJob(
682686
UUID.randomUUID(),
683-
getJobType(job),
687+
job.type(),
684688
now,
685689
namespace.getUuid(),
686690
namespace.getName(),
@@ -745,16 +749,58 @@ default Set<DatasetId> toDatasetId(List<Dataset> datasets) {
745749

746750
default void updateMarquezOnComplete(
747751
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
752+
final JobVersionDao jobVersionDao = createJobVersionDao();
753+
// Link the job version to the job only if the run is marked done and has transitioned into one
754+
// of the following states: COMPLETED, ABORTED, or FAILED.
755+
final boolean linkJobToJobVersion = runState.isDone();
756+
748757
BagOfJobVersionInfo bagOfJobVersionInfo =
749-
createJobVersionDao()
750-
.upsertJobVersionOnRunTransition(
751-
updateLineageRow.getJob(),
752-
updateLineageRow.getRun().getUuid(),
753-
runState,
754-
event.getEventTime().toInstant());
758+
jobVersionDao.upsertJobVersionOnRunTransition(
759+
jobVersionDao.loadJobRowRunDetails(
760+
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid()),
761+
runState,
762+
event.getEventTime().toInstant(),
763+
linkJobToJobVersion);
755764
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
756765
}
757766

767+
/**
768+
* A separate method is used as the logic to update Marquez model differs for streaming and batch.
769+
* The assumption for batch is that the job version is created when task is done and cumulative
770+
* list of input and output datasets from all the events is used to compute the job version UUID.
771+
* However, this wouldn't make sense for streaming jobs, which are mostly long living and produce
772+
* output before completing.
773+
*
774+
* <p>In this case, a job version is created based on the list of input and output datasets
775+
* referenced by this job. If a job starts with inputs:{A,B} and outputs:{C}, new job version is
776+
* created immediately at job start. If a following event produces inputs:{A}, outputs:{C}, then
777+
* the union of all datasets registered within this job does not change, and thus job version does
778+
* not get modified. In case of receiving another event with no inputs nor outputs, job version
779+
* still will not get modified as its hash is evaluated based on the datasets attached to the run.
780+
*
781+
* <p>However, in case of event with inputs:{A,B,D} and outputs:{C}, new hash gets computed and
782+
* new job version row is inserted into the table.
783+
*
784+
* @param event
785+
* @param updateLineageRow
786+
* @param runState
787+
*/
788+
default void updateMarquezOnStreamingJob(
789+
LineageEvent event, UpdateLineageRow updateLineageRow, RunState runState) {
790+
final JobVersionDao jobVersionDao = createJobVersionDao();
791+
JobRowRunDetails jobRowRunDetails =
792+
jobVersionDao.loadJobRowRunDetails(
793+
updateLineageRow.getJob(), updateLineageRow.getRun().getUuid());
794+
795+
if (!jobVersionDao.versionExists(jobRowRunDetails.jobVersion().getValue())) {
796+
// need to insert new job version
797+
BagOfJobVersionInfo bagOfJobVersionInfo =
798+
jobVersionDao.upsertJobVersionOnRunTransition(
799+
jobRowRunDetails, runState, event.getEventTime().toInstant(), true);
800+
updateLineageRow.setJobVersionBag(bagOfJobVersionInfo);
801+
}
802+
}
803+
758804
default String getUrlOrNull(String uri) {
759805
try {
760806
return new URI(uri).toASCIIString();
@@ -772,10 +818,6 @@ default String formatNamespaceName(String namespace) {
772818
return namespace.replaceAll("[^a-z:/A-Z0-9\\-_.@+]", "_");
773819
}
774820

775-
default JobType getJobType(Job job) {
776-
return JobType.BATCH;
777-
}
778-
779821
default DatasetRecord upsertLineageDataset(
780822
ModelDaos daos, Dataset ds, Instant now, UUID runUuid, boolean isInput) {
781823
daos.initBaseDao(this);

0 commit comments

Comments
 (0)