Skip to content

Commit bd00b61

Browse files
Added a job's run time and attempt as environment variables exposed to the job
1 parent b9ddbaa commit bd00b61

File tree

2 files changed

+131
-27
lines changed

2 files changed

+131
-27
lines changed

src/main/scala/org/apache/mesos/chronos/scheduler/mesos/MesosTaskBuilder.scala

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import java.util.logging.Logger
44
import javax.inject.Inject
55

66
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
7-
import org.apache.mesos.chronos.scheduler.jobs.{Fetch, BaseJob}
7+
import org.apache.mesos.chronos.scheduler.jobs.{TaskUtils, EnvironmentVariable, Fetch, BaseJob}
8+
89
import com.google.common.base.Charsets
910
import com.google.protobuf.ByteString
1011
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
@@ -50,38 +51,42 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
5051
builder.build()
5152
}
5253

54+
def envs(taskIdStr: String, job: BaseJob, offer: Offer): Environment.Builder = {
55+
val (_, start, attempt, _) = TaskUtils.parseTaskId(taskIdStr)
56+
val baseEnv = Map(
57+
"mesos_task_id" -> taskIdStr,
58+
"CHRONOS_JOB_OWNER" -> job.owner,
59+
"CHRONOS_JOB_NAME" -> job.name,
60+
"HOST" -> offer.getHostname,
61+
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
62+
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
63+
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
64+
"CHRONOS_JOB_RUN_TIME" -> start.toString,
65+
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
66+
)
67+
68+
// If the job defines custom environment variables, add them to the builder
69+
// Don't add them if they already exist to prevent overwriting the defaults
70+
val finalEnv =
71+
if (job.environmentVariables != null && job.environmentVariables.nonEmpty) {
72+
job.environmentVariables.foldLeft(baseEnv)((envs, env) =>
73+
if (envs.contains(env.name)) envs else envs + (env.name -> env.value)
74+
)
75+
} else {
76+
baseEnv
77+
}
78+
79+
finalEnv.foldLeft(Environment.newBuilder())((builder, env) =>
80+
builder.addVariables(Variable.newBuilder().setName(env._1).setValue(env._2)))
81+
}
82+
5383
def getMesosTaskInfoBuilder(taskIdStr: String, job: BaseJob, offer: Offer): TaskInfo.Builder = {
5484
//TODO(FL): Allow adding more fine grained resource controls.
5585
val taskId = TaskID.newBuilder().setValue(taskIdStr).build()
5686
val taskInfo = TaskInfo.newBuilder()
5787
.setName(taskNameTemplate.format(job.name))
5888
.setTaskId(taskId)
59-
val environment = Environment.newBuilder()
60-
.addVariables(Variable.newBuilder()
61-
.setName("mesos_task_id").setValue(taskIdStr))
62-
.addVariables(Variable.newBuilder()
63-
.setName("CHRONOS_JOB_OWNER").setValue(job.owner))
64-
.addVariables(Variable.newBuilder()
65-
.setName("CHRONOS_JOB_NAME").setValue(job.name))
66-
.addVariables(Variable.newBuilder()
67-
.setName("HOST").setValue(offer.getHostname))
68-
.addVariables(Variable.newBuilder()
69-
.setName("CHRONOS_RESOURCE_MEM").setValue(job.mem.toString))
70-
.addVariables(Variable.newBuilder()
71-
.setName("CHRONOS_RESOURCE_CPU").setValue(job.cpus.toString))
72-
.addVariables(Variable.newBuilder()
73-
.setName("CHRONOS_RESOURCE_DISK").setValue(job.disk.toString))
74-
75-
// If the job defines custom environment variables, add them to the builder
76-
// Don't add them if they already exist to prevent overwriting the defaults
77-
val builtinEnvNames = environment.getVariablesList.asScala.map(_.getName).toSet
78-
if (job.environmentVariables != null && job.environmentVariables.nonEmpty) {
79-
job.environmentVariables.foreach(env =>
80-
if (!builtinEnvNames.contains(env.name)) {
81-
environment.addVariables(Variable.newBuilder().setName(env.name).setValue(env.value))
82-
}
83-
)
84-
}
89+
val environment = envs(taskIdStr, job, offer)
8590

8691
val fetch = job.fetch ++ job.uris.map { Fetch(_) }
8792
val uriCommand = fetch.map { f =>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package org.apache.mesos.chronos.scheduler.mesos
2+
3+
import scala.collection.JavaConversions._
4+
import org.apache.mesos.Protos._
5+
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
6+
import org.apache.mesos.chronos.scheduler.jobs.Parameter
7+
import org.apache.mesos.chronos.scheduler.jobs.Volume
8+
import org.apache.mesos.chronos.scheduler.jobs._
9+
import org.apache.mesos.chronos.scheduler.jobs.constraints.{LikeConstraint, EqualsConstraint}
10+
import org.joda.time.Minutes
11+
import org.specs2.mock.Mockito
12+
import org.specs2.mutable.SpecificationWithJUnit
13+
14+
15+
class MesosTaskBuilderSpec extends SpecificationWithJUnit with Mockito {
16+
17+
val taskId = "ct:1454467003926:0:test2Execution:run"
18+
19+
val (_, start, attempt, _) = TaskUtils.parseTaskId(taskId)
20+
21+
val offer = Offer.newBuilder().mergeFrom(Offer.getDefaultInstance)
22+
.setHostname("localport")
23+
.setId(OfferID.newBuilder().setValue("123").build())
24+
.setFrameworkId(FrameworkID.newBuilder().setValue("123").build())
25+
.setSlaveId(SlaveID.newBuilder().setValue("123").build())
26+
.build()
27+
28+
val job = {
29+
val volumes = Seq(
30+
Volume(Option("/host/dir"), "container/dir", Option(VolumeMode.RW)),
31+
Volume(None, "container/dir", None)
32+
)
33+
34+
var parameters = scala.collection.mutable.ListBuffer[Parameter]()
35+
36+
val container = DockerContainer("dockerImage", volumes, parameters, NetworkMode.HOST, true)
37+
38+
val constraints = Seq(
39+
EqualsConstraint("rack", "rack-1"),
40+
LikeConstraint("rack", "rack-[1-3]")
41+
)
42+
43+
new ScheduleBasedJob("FOO/BAR/BAM", "AJob", "noop", Minutes.minutes(5).toPeriod, 10L, 20L,
44+
"fooexec", "fooflags", "none", 7, "[email protected]", "Foo", "Test schedule-based job", "TODAY",
45+
"YESTERDAY", true, cpus = 2, disk = 3, mem = 5, container = container, environmentVariables = Seq(),
46+
shell = true, arguments = Seq(), softError = true, constraints = constraints)
47+
}
48+
49+
val defaultEnv = Map(
50+
"mesos_task_id" -> taskId,
51+
"CHRONOS_JOB_OWNER" -> job.owner,
52+
"CHRONOS_JOB_NAME" -> job.name,
53+
"HOST" -> offer.getHostname,
54+
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
55+
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
56+
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
57+
"CHRONOS_JOB_RUN_TIME" -> start.toString,
58+
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
59+
)
60+
61+
def toMap(envs: Environment): Map[String, String] =
62+
envs.getVariablesList.foldLeft(Map[String, String]())((m, v) => m + (v.getName -> v.getValue))
63+
64+
"MesosTaskBuilder" should {
65+
"Setup all the default environment variables" in {
66+
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])
67+
68+
defaultEnv must_== toMap(target.envs(taskId, job, offer).build())
69+
}
70+
}
71+
72+
"MesosTaskBuilder" should {
73+
"Setup all the default environment variables and job environment variables" in {
74+
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])
75+
76+
val testJob = job.copy(environmentVariables = Seq(
77+
EnvironmentVariable("FOO", "BAR"),
78+
EnvironmentVariable("TOM", "JERRY")
79+
))
80+
81+
val finalEnv = defaultEnv ++ Map("FOO" -> "BAR", "TOM" -> "JERRY")
82+
83+
finalEnv must_== toMap(target.envs(taskId, testJob, offer).build())
84+
}
85+
}
86+
87+
"MesosTaskBuilder" should {
88+
"Should not allow job environment variables to overwrite any default environment variables" in {
89+
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])
90+
91+
val testJob = job.copy(environmentVariables = Seq(
92+
EnvironmentVariable("CHRONOS_RESOURCE_MEM", "10000"),
93+
EnvironmentVariable("CHRONOS_RESOURCE_DISK", "40000")
94+
))
95+
96+
defaultEnv must_== toMap(target.envs(taskId, testJob, offer).build())
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)