Skip to content

Commit ab5172c

Browse files
committed
update successful call of job to reset dependencies as a job that started normally would
1 parent ae24179 commit ab5172c

File tree

2 files changed

+28
-11
lines changed

2 files changed

+28
-11
lines changed

src/main/scala/org/apache/mesos/chronos/scheduler/jobs/JobScheduler.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,8 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
312312
jobMetrics.updateJobStatus(jobName, success = true)
313313
val newJob = getNewSuccessfulJob(job)
314314
replaceJob(job, newJob)
315+
log.info("Resetting dependency invocations for %s".format(newJob))
316+
jobGraph.resetDependencyInvocations(jobName)
315317
log.info("Processing dependencies for %s".format(jobName))
316318
processDependencies(jobName, Option(DateTime.parse(newJob.lastSuccess)))
317319
}

src/test/scala/org/apache/mesos/chronos/scheduler/jobs/JobSchedulerIntegrationTest.scala

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -72,44 +72,59 @@ class JobSchedulerIntegrationTest extends SpecificationWithJUnit with Mockito {
7272

7373
"Marking a job successful updates the success and error counts and triggers children" in {
7474
val epsilon = Minutes.minutes(20).toPeriod
75-
val job1 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1M",
75+
val job1 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1D",
7676
name = "job1", command = "fooo", epsilon = epsilon, retries = 0)
77-
val job2 = new DependencyBasedJob(Set("job1"), name = "job2", command = "CMD", disabled = false)
77+
val dependentJob = new DependencyBasedJob(Set("job1", "job3"), name = "dependentJob", command = "CMD", disabled = false)
78+
val job3 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1D",
79+
name = "job3", command = "fooo", epsilon = epsilon, retries = 0)
7880

7981
val horizon = Minutes.minutes(5).toPeriod
8082
val mockTaskManager = mock[TaskManager]
81-
val graph = new JobGraph()
83+
val jobGraph = new JobGraph()
8284
val mockPersistenceStore = mock[PersistenceStore]
8385
val mockJobsObserver = mockFullObserver
8486

85-
val scheduler = mockScheduler(horizon, mockTaskManager, graph, mockPersistenceStore, mockJobsObserver)
87+
val scheduler = mockScheduler(horizon, mockTaskManager, jobGraph, mockPersistenceStore, mockJobsObserver)
8688
val date = DateTime.parse("2011-01-01T00:05:01.000Z")
89+
90+
val edgeInvocationCount = jobGraph.edgeInvocationCount
91+
8792
scheduler.leader.set(true)
8893
scheduler.registerJob(job1, persist = true, date)
89-
scheduler.registerJob(job2, persist = true, date)
94+
scheduler.registerJob(job3, persist = true, date)
95+
scheduler.registerJob(dependentJob, persist = true, date)
96+
9097
scheduler.run(() => {
9198
date
9299
})
93100

94101
val failedDate = date.plusMinutes(1)
102+
val passingDate = date.plusMinutes(1)
95103

96104
scheduler.handleFailedTask(TaskUtils.getTaskStatus(job1, failedDate, 0))
97-
val failedJob = graph.lookupVertex("job1").get
105+
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(job3, passingDate, 0))
106+
val failedJob = jobGraph.lookupVertex("job1").get
98107
failedJob.errorCount must_== 1
99108
failedJob.successCount must_== 0
100109
failedJob.errorsSinceLastSuccess must_== 1
101110

102111
scheduler.markJobSuccessAndFireOffDependencies("job1")
103-
val jobMarkedSuccess = graph.lookupVertex("job1").get
112+
val jobMarkedSuccess = jobGraph.lookupVertex("job1").get
104113
jobMarkedSuccess.errorCount must_== 1
105114
jobMarkedSuccess.successCount must_== 1
106115
jobMarkedSuccess.errorsSinceLastSuccess must_== 0
107116
val lastSuccess = DateTime.parse(jobMarkedSuccess.lastSuccess)
108-
there was one(mockTaskManager).enqueue(TaskUtils.getTaskId(job2, lastSuccess, 0),
117+
there was one(mockTaskManager).enqueue(TaskUtils.getTaskId(dependentJob, lastSuccess, 0),
109118
highPriority = false)
110-
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(job2, lastSuccess, 0))
111-
val dependentJob = graph.lookupVertex("job2").get
112-
dependentJob.successCount must_== 1
119+
scheduler.handleStartedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
120+
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
121+
edgeInvocationCount.get(jobGraph.dag.getEdge("job1", "dependentJob")) must_== Some(0L)
122+
jobGraph.lookupVertex("dependentJob").get.successCount must_== 1
123+
124+
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(job1, passingDate, 0))
125+
scheduler.markJobSuccessAndFireOffDependencies("dependentJob")
126+
jobGraph.lookupVertex("dependentJob").get.successCount must_== 2
127+
edgeInvocationCount.get(jobGraph.dag.getEdge("job1", "dependentJob")) must_== Some(0L)
113128
}
114129

115130
"Tests that a disabled job does not run and does not execute dependant children." in {

0 commit comments

Comments
 (0)