Skip to content

Commit 332a041

Browse files
committed
Merge pull request #595 from Califax/add-mark-job-successful-api-call
add markJobSuccessful api call
2 parents fc2567d + ab5172c commit 332a041

File tree

5 files changed

+131
-12
lines changed

5 files changed

+131
-12
lines changed

docs/docs/api.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,19 @@ You can manually start a job by issuing an HTTP request.
9696
* Example: `curl -L -X PUT chronos-node:8080/scheduler/job/job_name?arguments=-debug`
9797
* Response: HTTP 204
9898

99+
## Marking a job as successful
100+
101+
You can manually mark a job as successful by issuing an HTTP request. If a job is marked successful, the success count
102+
of the job is incremented, the latest successful run time is updated, and all downstream dependencies are handled as if
103+
the job had completed executing the code in a standard run.
104+
the job normally runs.
105+
106+
* Endpoint: ___/scheduler/job/success/<jobname>
107+
* Method: __PUT__
108+
* Query string parameters: `arguments` - jobname to be marked success
109+
* Example: `curl -L -X PUT chronos-node:8080/scheduler/job/success/request_event_counter_hourly`
110+
* Response: boolean (true or false depending on success of request)
111+
99112
## Adding a Scheduled Job
100113

101114
The heart of job scheduling is a JSON POST request.

src/main/scala/org/apache/mesos/chronos/scheduler/api/JobManagementResource.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,28 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
164164
}
165165
}
166166

167+
/**
168+
* Mark Job successful
169+
*/
170+
@Path(PathConstants.jobSuccessPath)
171+
@PUT
172+
@Timed
173+
def markJobSuccessful(@PathParam("jobName") jobName: String): Response = {
174+
try {
175+
val success = jobScheduler.markJobSuccessAndFireOffDependencies(jobName)
176+
Response.ok("marked job %s as successful: %b".format(jobName, success)).build()
177+
} catch {
178+
case ex: IllegalArgumentException =>
179+
log.log(Level.INFO, "Bad Request", ex)
180+
Response.status(Response.Status.BAD_REQUEST).entity(ex.getMessage)
181+
.build()
182+
case ex: Exception =>
183+
log.log(Level.WARNING, "Exception while serving request", ex)
184+
Response.serverError().build
185+
}
186+
}
187+
188+
167189
/**
168190
* Allows an user to update the elements processed count for a job that
169191
* supports data tracking. The processed count has to be non-negative.

src/main/scala/org/apache/mesos/chronos/scheduler/api/PathConstants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ object PathConstants {
1515
final val allStatsPath = "stats/{percentile}"
1616
final val jobStatsPatternPath = "job/stat/{jobName}"
1717
final val jobTaskProgressPath = "job/{jobName}/task/{taskId}/progress"
18+
final val jobSuccessPath = "job/success/{jobName}"
1819
final val graphBasePath = "/graph"
1920
final val jobGraphDotPath = "dot"
2021
final val jobGraphCsvPath = "csv"

src/main/scala/org/apache/mesos/chronos/scheduler/jobs/JobScheduler.scala

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -267,18 +267,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
267267
val job = jobOption.get
268268
jobsObserver.apply(JobFinished(job, taskStatus, attempt))
269269

270-
val newJob = job match {
271-
case job: ScheduleBasedJob =>
272-
job.copy(successCount = job.successCount + 1,
273-
errorsSinceLastSuccess = 0,
274-
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
275-
case job: DependencyBasedJob =>
276-
job.copy(successCount = job.successCount + 1,
277-
errorsSinceLastSuccess = 0,
278-
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
279-
case _ =>
280-
throw new IllegalArgumentException("Cannot handle unknown task type")
281-
}
270+
val newJob = getNewSuccessfulJob(job)
282271
replaceJob(job, newJob)
283272
processDependencies(jobName, taskDate)
284273

@@ -310,6 +299,43 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
310299
}
311300
}
312301

302+
/**
303+
* Mark job by job name as successful. Trigger any dependent children jobs that should be run as a result
304+
*/
305+
def markJobSuccessAndFireOffDependencies(jobName : String): Boolean = {
306+
val optionalJob = jobGraph.getJobForName(jobName)
307+
if (optionalJob.isEmpty) {
308+
log.warning("%s not found in job graph, not marking success".format(jobName))
309+
return false
310+
} else {
311+
val job = optionalJob.get
312+
jobMetrics.updateJobStatus(jobName, success = true)
313+
val newJob = getNewSuccessfulJob(job)
314+
replaceJob(job, newJob)
315+
log.info("Resetting dependency invocations for %s".format(newJob))
316+
jobGraph.resetDependencyInvocations(jobName)
317+
log.info("Processing dependencies for %s".format(jobName))
318+
processDependencies(jobName, Option(DateTime.parse(newJob.lastSuccess)))
319+
}
320+
true
321+
}
322+
323+
private def getNewSuccessfulJob(job: BaseJob): BaseJob = {
324+
val newJob = job match {
325+
case job: ScheduleBasedJob =>
326+
job.copy(successCount = job.successCount + 1,
327+
errorsSinceLastSuccess = 0,
328+
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
329+
case job: DependencyBasedJob =>
330+
job.copy(successCount = job.successCount + 1,
331+
errorsSinceLastSuccess = 0,
332+
lastSuccess = DateTime.now(DateTimeZone.UTC).toString)
333+
case _ =>
334+
throw new scala.IllegalArgumentException("Cannot handle unknown task type")
335+
}
336+
newJob
337+
}
338+
313339
def replaceJob(oldJob: BaseJob, newJob: BaseJob) {
314340
lock.synchronized {
315341
jobGraph.replaceVertex(oldJob, newJob)

src/test/scala/org/apache/mesos/chronos/scheduler/jobs/JobSchedulerIntegrationTest.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,63 @@ class JobSchedulerIntegrationTest extends SpecificationWithJUnit with Mockito {
7070
there was one(mockJobsObserver).apply(JobFailed(Right(job3), TaskUtils.getTaskStatus(job1, DateTime.parse("2012-01-03T00:00:01.000Z"), 0), 0))
7171
}
7272

73+
"Marking a job successful updates the success and error counts and triggers children" in {
74+
val epsilon = Minutes.minutes(20).toPeriod
75+
val job1 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1D",
76+
name = "job1", command = "fooo", epsilon = epsilon, retries = 0)
77+
val dependentJob = new DependencyBasedJob(Set("job1", "job3"), name = "dependentJob", command = "CMD", disabled = false)
78+
val job3 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1D",
79+
name = "job3", command = "fooo", epsilon = epsilon, retries = 0)
80+
81+
val horizon = Minutes.minutes(5).toPeriod
82+
val mockTaskManager = mock[TaskManager]
83+
val jobGraph = new JobGraph()
84+
val mockPersistenceStore = mock[PersistenceStore]
85+
val mockJobsObserver = mockFullObserver
86+
87+
val scheduler = mockScheduler(horizon, mockTaskManager, jobGraph, mockPersistenceStore, mockJobsObserver)
88+
val date = DateTime.parse("2011-01-01T00:05:01.000Z")
89+
90+
val edgeInvocationCount = jobGraph.edgeInvocationCount
91+
92+
scheduler.leader.set(true)
93+
scheduler.registerJob(job1, persist = true, date)
94+
scheduler.registerJob(job3, persist = true, date)
95+
scheduler.registerJob(dependentJob, persist = true, date)
96+
97+
scheduler.run(() => {
98+
date
99+
})
100+
101+
val failedDate = date.plusMinutes(1)
102+
val passingDate = date.plusMinutes(1)
103+
104+
scheduler.handleFailedTask(TaskUtils.getTaskStatus(job1, failedDate, 0))
105+
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(job3, passingDate, 0))
106+
val failedJob = jobGraph.lookupVertex("job1").get
107+
failedJob.errorCount must_== 1
108+
failedJob.successCount must_== 0
109+
failedJob.errorsSinceLastSuccess must_== 1
110+
111+
scheduler.markJobSuccessAndFireOffDependencies("job1")
112+
val jobMarkedSuccess = jobGraph.lookupVertex("job1").get
113+
jobMarkedSuccess.errorCount must_== 1
114+
jobMarkedSuccess.successCount must_== 1
115+
jobMarkedSuccess.errorsSinceLastSuccess must_== 0
116+
val lastSuccess = DateTime.parse(jobMarkedSuccess.lastSuccess)
117+
there was one(mockTaskManager).enqueue(TaskUtils.getTaskId(dependentJob, lastSuccess, 0),
118+
highPriority = false)
119+
scheduler.handleStartedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
120+
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
121+
edgeInvocationCount.get(jobGraph.dag.getEdge("job1", "dependentJob")) must_== Some(0L)
122+
jobGraph.lookupVertex("dependentJob").get.successCount must_== 1
123+
124+
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(job1, passingDate, 0))
125+
scheduler.markJobSuccessAndFireOffDependencies("dependentJob")
126+
jobGraph.lookupVertex("dependentJob").get.successCount must_== 2
127+
edgeInvocationCount.get(jobGraph.dag.getEdge("job1", "dependentJob")) must_== Some(0L)
128+
}
129+
73130
"Tests that a disabled job does not run and does not execute dependant children." in {
74131
val epsilon = Minutes.minutes(20).toPeriod
75132
val job1 = new ScheduleBasedJob(schedule = "R/2012-01-01T00:00:00.000Z/PT1M",

0 commit comments

Comments
 (0)