Skip to content

Commit 5d0556b

Browse files
pan3793dongjoon-hyun
authored andcommitted
[SPARK-52902][K8S] Support SPARK_VERSION placeholder in container image names
### What changes were proposed in this pull request? This PR allows users to use `{{SPARK_VERSION}}` in the following three configs ``` spark.kubernetes.container.image spark.kubernetes.driver.container.image spark.kubernetes.executor.container.image ``` ### Why are the changes needed? Simplify Spark on K8s configuration. As an administrator of the Spark platform in our corp, I need to refresh the `spark-defaults.conf` on each version upgrade, for example, from ``` spark.kubernetes.container.image=foo.com/spark:4.0.0-1-corp ``` to ``` spark.kubernetes.container.image=foo.com/spark:4.0.0-2-corp ``` If I miss touch the `spark-defauls.conf`, then it may cause Spark Client and Driver to use different jars in client mode, thus causing potential issues. After this patch, I can write it using a pattern like ``` spark.kubernetes.container.image=foo.com/spark:{{SPARK_VERSION}} ``` ### Does this PR introduce _any_ user-facing change? This is a new feature. ### How was this patch tested? UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51592 from pan3793/SPARK-52902. Authored-by: Cheng Pan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 27dcbcd commit 5d0556b

File tree

7 files changed

+46
-15
lines changed

7 files changed

+46
-15
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import org.apache.logging.log4j.core.config.LoggerConfig
6666
import org.eclipse.jetty.util.MultiException
6767
import org.slf4j.Logger
6868

69-
import org.apache.spark._
69+
import org.apache.spark.{SPARK_VERSION, _}
7070
import org.apache.spark.deploy.SparkHadoopUtil
7171
import org.apache.spark.internal.{Logging, MDC, MessageWithContext}
7272
import org.apache.spark.internal.LogKeys
@@ -2909,6 +2909,13 @@ private[spark] object Utils
29092909
opt.replace("{{APP_ID}}", appId)
29102910
}
29112911

2912+
/**
2913+
* Replaces all the {{SPARK_VERSION}} occurrences with the Spark version.
2914+
*/
2915+
def substituteSparkVersion(opt: String): String = {
2916+
opt.replace("{{SPARK_VERSION}}", SPARK_VERSION)
2917+
}
2918+
29122919
def createSecret(conf: SparkConf): String = {
29132920
val bits = conf.get(AUTH_SECRET_BIT_LENGTH)
29142921
val rnd = new SecureRandom()

docs/running-on-kubernetes.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -643,23 +643,26 @@ See the [configuration page](configuration.html) for information on Spark config
643643
Container image to use for the Spark application.
644644
This is usually of the form <code>example.com/repo/spark:v1.0.0</code>.
645645
This configuration is required and must be provided by the user, unless explicit
646-
images are provided for each different container type.
646+
images are provided for each different container type. Note that <code>{{SPARK_VERSION}}</code>
647+
is the built-in variable that will be substituted with current Spark's version.
647648
</td>
648649
<td>2.3.0</td>
649650
</tr>
650651
<tr>
651652
<td><code>spark.kubernetes.driver.container.image</code></td>
652653
<td><code>(value of spark.kubernetes.container.image)</code></td>
653654
<td>
654-
Custom container image to use for the driver.
655+
Custom container image to use for the driver. Note that <code>{{SPARK_VERSION}}</code>
656+
is the built-in variable that will be substituted with current Spark's version.
655657
</td>
656658
<td>2.3.0</td>
657659
</tr>
658660
<tr>
659661
<td><code>spark.kubernetes.executor.container.image</code></td>
660662
<td><code>(value of spark.kubernetes.container.image)</code></td>
661663
<td>
662-
Custom container image to use for executors.
664+
Custom container image to use for executors. Note that <code>{{SPARK_VERSION}}</code>
665+
is the built-in variable that will be substituted with current Spark's version.
663666
</td>
664667
<td>2.3.0</td>
665668
</tr>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,20 +131,23 @@ private[spark] object Config extends Logging {
131131
ConfigBuilder("spark.kubernetes.container.image")
132132
.doc("Container image to use for Spark containers. Individual container types " +
133133
"(e.g. driver or executor) can also be configured to use different images if desired, " +
134-
"by setting the container type-specific image name.")
134+
"by setting the container type-specific image name. Note that `{{SPARK_VERSION}}` is " +
135+
"the built-in variable that will be substituted with current Spark's version.")
135136
.version("2.3.0")
136137
.stringConf
137138
.createOptional
138139

139140
val DRIVER_CONTAINER_IMAGE =
140141
ConfigBuilder("spark.kubernetes.driver.container.image")
141-
.doc("Container image to use for the driver.")
142+
.doc("Container image to use for the driver. Note that `{{SPARK_VERSION}}` is " +
143+
"the built-in variable that will be substituted with current Spark's version.")
142144
.version("2.3.0")
143145
.fallbackConf(CONTAINER_IMAGE)
144146

145147
val EXECUTOR_CONTAINER_IMAGE =
146148
ConfigBuilder("spark.kubernetes.executor.container.image")
147-
.doc("Container image to use for the executors.")
149+
.doc("Container image to use for the executors. Note that `{{SPARK_VERSION}}` is " +
150+
"the built-in variable that will be substituted with current Spark's version.")
148151
.version("2.3.0")
149152
.fallbackConf(CONTAINER_IMAGE)
150153

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.util.{Locale, UUID}
2121
import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
2222
import org.apache.commons.lang3.StringUtils
2323

24-
import org.apache.spark.{SPARK_VERSION, SparkConf}
24+
import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException}
2525
import org.apache.spark.annotation.{DeveloperApi, Since, Unstable}
2626
import org.apache.spark.deploy.k8s.Config._
2727
import org.apache.spark.deploy.k8s.Constants._
@@ -46,6 +46,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {
4646
def volumes: Seq[KubernetesVolumeSpec]
4747
def schedulerName: Option[String]
4848
def appId: String
49+
def image: String
4950

5051
def appName: String = get("spark.app.name", "spark")
5152

@@ -173,6 +174,12 @@ class KubernetesDriverConf(
173174
override def schedulerName: Option[String] = {
174175
Option(get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).orNull))
175176
}
177+
178+
override def image: String = {
179+
get(DRIVER_CONTAINER_IMAGE).map(Utils.substituteSparkVersion).getOrElse {
180+
throw new SparkException("Must specify the driver container image")
181+
}
182+
}
176183
}
177184

178185
private[spark] class KubernetesExecutorConf(
@@ -237,6 +244,12 @@ private[spark] class KubernetesExecutorConf(
237244
Option(get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse(get(KUBERNETES_SCHEDULER_NAME).orNull))
238245
}
239246

247+
override def image: String = {
248+
get(EXECUTOR_CONTAINER_IMAGE).map(Utils.substituteSparkVersion).getOrElse {
249+
throw new SparkException("Must specify the executor container image")
250+
}
251+
}
252+
240253
private def checkExecutorEnvKey(key: String): Boolean = {
241254
// Pattern for matching an executorEnv key, which meets certain naming rules.
242255
val executorEnvRegex = "[-._a-zA-Z][-._a-zA-Z0-9]*".r

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import scala.jdk.CollectionConverters._
2121

2222
import io.fabric8.kubernetes.api.model._
2323

24-
import org.apache.spark.SparkException
2524
import org.apache.spark.deploy.k8s._
2625
import org.apache.spark.deploy.k8s.Config._
2726
import org.apache.spark.deploy.k8s.Constants._
@@ -37,9 +36,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
3736
.get(KUBERNETES_DRIVER_POD_NAME)
3837
.getOrElse(s"${conf.resourceNamePrefix}-driver")
3938

40-
private val driverContainerImage = conf
41-
.get(DRIVER_CONTAINER_IMAGE)
42-
.getOrElse(throw new SparkException("Must specify the driver container image"))
39+
private val driverContainerImage = conf.image
4340

4441
// CPU settings
4542
private val driverCpuCores = conf.get(DRIVER_CORES)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ private[spark] class BasicExecutorFeatureStep(
3939
extends KubernetesFeatureConfigStep with Logging {
4040

4141
// Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf
42-
private val executorContainerImage = kubernetesConf
43-
.get(EXECUTOR_CONTAINER_IMAGE)
44-
.getOrElse(throw new SparkException("Must specify the executor container image"))
42+
private val executorContainerImage = kubernetesConf.image
4543
private val blockManagerPort = kubernetesConf
4644
.sparkConf
4745
.getInt(BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,4 +276,14 @@ class KubernetesConfSuite extends SparkFunSuite {
276276
assert(KubernetesConf.getResourceNamePrefix(appName).matches("[a-z]([-a-z0-9]*[a-z0-9])?"))
277277
}
278278
}
279+
280+
test("SPARK-52902: K8s image configs support {{SPARK_VERSION}} placeholder") {
281+
val sparkConf = new SparkConf(false)
282+
sparkConf.set(CONTAINER_IMAGE, "apache/spark:{{SPARK_VERSION}}")
283+
sparkConf.set(EXECUTOR_CONTAINER_IMAGE, Some("foo.com/spark:{{SPARK_VERSION}}-corp"))
284+
val driverUnsetConf = KubernetesTestConf.createDriverConf(sparkConf)
285+
val execUnsetConf = KubernetesTestConf.createExecutorConf(sparkConf)
286+
assert(driverUnsetConf.image === s"apache/spark:$SPARK_VERSION")
287+
assert(execUnsetConf.image === s"foo.com/spark:$SPARK_VERSION-corp")
288+
}
279289
}

0 commit comments

Comments
 (0)