Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 9 additions & 50 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,41 +268,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead")
val COMET_ONHEAP_MEMORY_OVERHEAD: ConfigEntry[Long] = conf("spark.comet.memoryOverhead")
.category(CATEGORY_TESTING)
.doc(
"The amount of additional memory to be allocated per executor process for Comet, in MiB, " +
"when running Spark in on-heap mode. " +
"This config is optional. If this is not specified, it will be set to " +
s"`spark.comet.memory.overhead.factor` * `spark.executor.memory`. $TUNING_GUIDE.")
.internal()
"when running Spark in on-heap mode.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add default value in the doc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated docs show the default value:

def

.bytesConf(ByteUnit.MiB)
.createOptional

val COMET_MEMORY_OVERHEAD_FACTOR: ConfigEntry[Double] =
conf("spark.comet.memory.overhead.factor")
.category(CATEGORY_TESTING)
.doc(
"Fraction of executor memory to be allocated as additional memory for Comet " +
"when running Spark in on-heap mode. " +
s"$TUNING_GUIDE.")
.internal()
.doubleConf
.checkValue(
factor => factor > 0,
"Ensure that Comet memory overhead factor is a double greater than 0")
.createWithDefault(0.2)

val COMET_MEMORY_OVERHEAD_MIN_MIB: ConfigEntry[Long] = conf("spark.comet.memory.overhead.min")
.category(CATEGORY_TESTING)
.doc("Minimum amount of additional memory to be allocated per executor process for Comet, " +
s"in MiB, when running Spark in on-heap mode. $TUNING_GUIDE.")
.internal()
.bytesConf(ByteUnit.MiB)
.checkValue(
_ >= 0,
"Ensure that Comet memory overhead min is a long greater than or equal to 0")
.createWithDefault(384)
.createWithDefault(1024)

val COMET_EXEC_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.enabled")
Expand Down Expand Up @@ -425,18 +397,8 @@ object CometConf extends ShimCometConf {
.intConf
.createWithDefault(Int.MaxValue)

val COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE: OptionalConfigEntry[Long] =
conf("spark.comet.columnar.shuffle.memorySize")
.internal()
.category(CATEGORY_TESTING)
.doc("Amount of memory to reserve for columnar shuffle when running in on-heap mode. " +
s"$TUNING_GUIDE.")
.bytesConf(ByteUnit.MiB)
.createOptional

val COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
val COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR: ConfigEntry[Double] =
conf("spark.comet.columnar.shuffle.memory.factor")
.internal()
.category(CATEGORY_TESTING)
.doc("Fraction of Comet memory to be allocated per executor process for columnar shuffle " +
s"when running in on-heap mode. $TUNING_GUIDE.")
Expand Down Expand Up @@ -523,7 +485,6 @@ object CometConf extends ShimCometConf {
.category(CATEGORY_EXEC_EXPLAIN)
.doc("When this setting is enabled, Comet will log all plan transformations performed " +
"in physical optimizer rules. Default: false")
.internal()
.booleanConf
.createWithDefault(false)

Expand Down Expand Up @@ -558,15 +519,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_ENABLE_ONHEAP_MODE: ConfigEntry[Boolean] =
val COMET_ONHEAP_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.onHeap.enabled")
.category(CATEGORY_TESTING)
.doc("Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests.")
.internal()
.booleanConf
.createWithDefault(sys.env.getOrElse("ENABLE_COMET_ONHEAP", "false").toBoolean)

val COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
val COMET_OFFHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] =
conf("spark.comet.exec.memoryPool")
.category(CATEGORY_TUNING)
.doc(
Expand All @@ -576,19 +536,18 @@ object CometConf extends ShimCometConf {
.stringConf
.createWithDefault("fair_unified")

val COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
val COMET_ONHEAP_MEMORY_POOL_TYPE: ConfigEntry[String] = conf(
"spark.comet.exec.onHeap.memoryPool")
.category(CATEGORY_TUNING)
.category(CATEGORY_TESTING)
.doc(
"The type of memory pool to be used for Comet native execution " +
"when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, " +
"`greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, " +
"and `unbounded`.")
.internal()
.stringConf
.createWithDefault("greedy_task_shared")

val COMET_EXEC_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
val COMET_OFFHEAP_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
conf("spark.comet.exec.memoryPool.fraction")
.category(CATEGORY_TUNING)
.doc(
Expand Down
13 changes: 13 additions & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ These settings can be used to determine which parts of the plan are accelerated
| Config | Description | Default Value |
|--------|-------------|---------------|
| `spark.comet.explain.native.enabled` | When this setting is enabled, Comet will provide a tree representation of the native query plan before execution and again after execution, with metrics. | false |
| `spark.comet.explain.rules` | When this setting is enabled, Comet will log all plan transformations performed in physical optimizer rules. Default: false | false |
| `spark.comet.explain.verbose.enabled` | When this setting is enabled, Comet's extended explain output will provide the full query plan annotated with fallback reasons as well as a summary of how much of the plan was accelerated by Comet. When this setting is disabled, a list of fallback reasons will be provided instead. | false |
| `spark.comet.explainFallback.enabled` | When this setting is enabled, Comet will provide logging explaining the reason(s) why a query stage cannot be executed natively. Set this to false to reduce the amount of logging. | false |
| `spark.comet.logFallbackReasons.enabled` | When this setting is enabled, Comet will log warnings for all fallback reasons. | false |
Expand Down Expand Up @@ -119,6 +120,18 @@ These settings can be used to determine which parts of the plan are accelerated
| `spark.comet.tracing.enabled` | Enable fine-grained tracing of events and memory usage. For more information, refer to the [Comet Tracing Guide](https://datafusion.apache.org/comet/user-guide/tracing.html). | false |
<!--END:CONFIG_TABLE-->

## Development & Testing Settings

<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
<!--BEGIN:CONFIG_TABLE[testing]-->
| Config | Description | Default Value |
|--------|-------------|---------------|
| `spark.comet.columnar.shuffle.memory.factor` | Fraction of Comet memory to be allocated per executor process for columnar shuffle when running in on-heap mode. For more information, refer to the [Comet Tuning Guide](https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
| `spark.comet.exec.onHeap.enabled` | Whether to allow Comet to run in on-heap mode. Required for running Spark SQL tests. | false |
| `spark.comet.exec.onHeap.memoryPool` | The type of memory pool to be used for Comet native execution when running Spark in on-heap mode. Available pool types are `greedy`, `fair_spill`, `greedy_task_shared`, `fair_spill_task_shared`, `greedy_global`, `fair_spill_global`, and `unbounded`. | greedy_task_shared |
| `spark.comet.memoryOverhead` | The amount of additional memory to be allocated per executor process for Comet, in MiB, when running Spark in on-heap mode. | 1024 MiB |
<!--END:CONFIG_TABLE-->

## Enabling or Disabling Individual Operators

<!-- WARNING! DO NOT MANUALLY MODIFY CONTENT BETWEEN THE BEGIN AND END TAGS -->
Expand Down
6 changes: 3 additions & 3 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,10 @@ object CometExecIterator extends Logging {
if (offHeapMode) {
// in off-heap mode, Comet uses unified memory management to share off-heap memory with Spark
val offHeapSize = ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
val memoryFraction = CometConf.COMET_EXEC_MEMORY_POOL_FRACTION.get()
val memoryFraction = CometConf.COMET_OFFHEAP_MEMORY_POOL_FRACTION.get()
val memoryLimit = (offHeapSize * memoryFraction).toLong
val memoryLimitPerTask = (memoryLimit.toDouble * coresPerTask / numCores).toLong
val memoryPoolType = COMET_EXEC_OFFHEAP_MEMORY_POOL_TYPE.get()
val memoryPoolType = COMET_OFFHEAP_MEMORY_POOL_TYPE.get()
logInfo(
s"memoryPoolType=$memoryPoolType, " +
s"offHeapSize=${toMB(offHeapSize)}, " +
Expand All @@ -291,7 +291,7 @@ object CometExecIterator extends Logging {
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
val memoryLimitPerTask = (memoryLimit.toDouble * coresPerTask / numCores).toLong
val memoryPoolType = COMET_EXEC_ONHEAP_MEMORY_POOL_TYPE.get()
val memoryPoolType = COMET_ONHEAP_MEMORY_POOL_TYPE.get()
logInfo(
s"memoryPoolType=$memoryPoolType, " +
s"memoryLimit=${toMB(memoryLimit)}, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,46 +243,25 @@ object CometSparkSessionExtensions extends Logging {
}

/**
* Calculates required memory overhead in MB per executor process for Comet when running in
* Determines required memory overhead in MB per executor process for Comet when running in
* on-heap mode.
*
* If `COMET_MEMORY_OVERHEAD` is defined then that value will be used, otherwise the overhead
* will be calculated by multiplying executor memory (`spark.executor.memory`) by
* `COMET_MEMORY_OVERHEAD_FACTOR`.
*
* In either case, a minimum value of `COMET_MEMORY_OVERHEAD_MIN_MIB` will be returned.
*/
def getCometMemoryOverheadInMiB(sparkConf: SparkConf): Long = {
if (isOffHeapEnabled(sparkConf)) {
// when running in off-heap mode we use unified memory management to share
// off-heap memory with Spark so do not add overhead
return 0
}

// `spark.executor.memory` default value is 1g
val baseMemoryMiB = ConfigHelpers
.byteFromString(sparkConf.get("spark.executor.memory", "1024MB"), ByteUnit.MiB)

val cometMemoryOverheadMinAsString = sparkConf.get(
COMET_MEMORY_OVERHEAD_MIN_MIB.key,
COMET_MEMORY_OVERHEAD_MIN_MIB.defaultValueString)

val minimum = ConfigHelpers.byteFromString(cometMemoryOverheadMinAsString, ByteUnit.MiB)
val overheadFactor = getDoubleConf(sparkConf, COMET_MEMORY_OVERHEAD_FACTOR)

val overHeadMemFromConf = sparkConf
.getOption(COMET_MEMORY_OVERHEAD.key)
.map(ConfigHelpers.byteFromString(_, ByteUnit.MiB))

overHeadMemFromConf.getOrElse(math.max((overheadFactor * baseMemoryMiB).toLong, minimum))
ConfigHelpers.byteFromString(
sparkConf.get(
COMET_ONHEAP_MEMORY_OVERHEAD.key,
COMET_ONHEAP_MEMORY_OVERHEAD.defaultValueString),
ByteUnit.MiB)
}

private def getBooleanConf(conf: SparkConf, entry: ConfigEntry[Boolean]) =
conf.getBoolean(entry.key, entry.defaultValue.get)

private def getDoubleConf(conf: SparkConf, entry: ConfigEntry[Double]) =
conf.getDouble(entry.key, entry.defaultValue.get)

/**
* Calculates required memory overhead in bytes per executor process for Comet when running in
* on-heap mode.
Expand All @@ -300,11 +279,9 @@ object CometSparkSessionExtensions extends Logging {

val cometMemoryOverhead = getCometMemoryOverheadInMiB(sparkConf)

val overheadFactor = COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.get(conf)
val cometShuffleMemoryFromConf = COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.get(conf)
val overheadFactor = COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR.get(conf)

val shuffleMemorySize =
cometShuffleMemoryFromConf.getOrElse((overheadFactor * cometMemoryOverhead).toLong)
val shuffleMemorySize = (overheadFactor * cometMemoryOverhead).toLong
if (shuffleMemorySize > cometMemoryOverhead) {
logWarning(
s"Configured shuffle memory size $shuffleMemorySize is larger than Comet memory overhead " +
Expand Down
9 changes: 8 additions & 1 deletion spark/src/main/scala/org/apache/comet/GenerateDocs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.catalyst.expressions.Cast

import org.apache.comet.CometConf.COMET_ONHEAP_MEMORY_OVERHEAD
import org.apache.comet.expressions.{CometCast, CometEvalMode}
import org.apache.comet.serde.{Compatible, Incompatible, QueryPlanSerde}

Expand Down Expand Up @@ -78,7 +79,13 @@ object GenerateDocs {
if (conf.defaultValue.isEmpty) {
w.write(s"| `${conf.key}` | $doc | |\n".getBytes)
} else {
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
val isBytesConf = conf.key == COMET_ONHEAP_MEMORY_OVERHEAD.key
if (isBytesConf) {
val bytes = conf.defaultValue.get.asInstanceOf[Long]
w.write(s"| `${conf.key}` | $doc | $bytes MiB |\n".getBytes)
} else {
w.write(s"| `${conf.key}` | $doc | ${conf.defaultValueString} |\n".getBytes)
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
import org.apache.spark.sql.internal.StaticSQLConf

import org.apache.comet.CometConf.COMET_ENABLE_ONHEAP_MODE
import org.apache.comet.CometConf.COMET_ONHEAP_ENABLED
import org.apache.comet.CometSparkSessionExtensions

/**
Expand All @@ -49,7 +49,7 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
logInfo("CometDriverPlugin init")

if (!CometSparkSessionExtensions.isOffHeapEnabled(sc.getConf) &&
!sc.getConf.getBoolean(COMET_ENABLE_ONHEAP_MODE.key, false)) {
!sc.getConf.getBoolean(COMET_ONHEAP_ENABLED.key, false)) {
logWarning("Comet plugin is disabled because Spark is not running in off-heap mode.")
return Collections.emptyMap[String, String]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,47 +76,21 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {

def getBytesFromMib(mib: Long): Long = mib * 1024 * 1024

test("Minimum Comet memory overhead") {
test("Default Comet memory overhead") {
val conf = new SparkConf()
assert(getCometMemoryOverhead(conf) == getBytesFromMib(384))
}

test("Comet memory overhead factor with executor memory") {
val sparkConf = new SparkConf()
sparkConf.set("spark.executor.memory", "16g")
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")

assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(8 * 1024))
}

test("Comet memory overhead factor with default executor memory") {
val sparkConf = new SparkConf()
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(512))
assert(getCometMemoryOverhead(conf) == getBytesFromMib(1024))
}

test("Comet memory overhead") {
val sparkConf = new SparkConf()
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "10g")
sparkConf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "10g")
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(1024 * 10))
assert(shouldOverrideMemoryConf(sparkConf))
}

test("Comet memory overhead (min)") {
val sparkConf = new SparkConf()
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_MIN_MIB.key, "2g")
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(1024 * 2))
}

test("Comet memory overhead (factor)") {
val sparkConf = new SparkConf()
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")
assert(getCometMemoryOverhead(sparkConf) == getBytesFromMib(512))
}

test("Comet memory overhead (off heap)") {
val sparkConf = new SparkConf()
sparkConf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "64g")
sparkConf.set(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key, "64g")
sparkConf.set("spark.memory.offHeap.enabled", "true")
sparkConf.set("spark.memory.offHeap.size", "10g")
assert(getCometMemoryOverhead(sparkConf) == 0)
Expand All @@ -127,46 +101,11 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {
val conf = new SparkConf()

val sqlConf = new SQLConf
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_FACTOR.key, "0.2")
sqlConf.setConfString(CometConf.COMET_ONHEAP_SHUFFLE_MEMORY_FACTOR.key, "0.2")

// Minimum Comet memory overhead is 384MB
assert(
getCometShuffleMemorySize(conf, sqlConf) ==
getBytesFromMib((384 * 0.2).toLong))

conf.set(CometConf.COMET_MEMORY_OVERHEAD_FACTOR.key, "0.5")
assert(
getCometShuffleMemorySize(conf, sqlConf) ==
getBytesFromMib((1024 * 0.5 * 0.2).toLong))
}

test("Comet shuffle memory") {
val conf = new SparkConf()
val sqlConf = new SQLConf
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "512m")

assert(getCometShuffleMemorySize(conf, sqlConf) == getBytesFromMib(512))
}

test("Comet shuffle memory (off-heap)") {
val conf = new SparkConf()
val sqlConf = new SQLConf
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
conf.set("spark.memory.offHeap.enabled", "true")
conf.set("spark.memory.offHeap.size", "10g")
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "512m")

assertThrows[AssertionError] {
getCometShuffleMemorySize(conf, sqlConf)
}
}

test("Comet shuffle memory cannot be larger than Comet memory overhead") {
val conf = new SparkConf()
val sqlConf = new SQLConf
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "1g")
sqlConf.setConfString(CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key, "10g")
assert(getCometShuffleMemorySize(conf, sqlConf) == getBytesFromMib(1024))
getBytesFromMib((1024 * 0.2).toLong))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ abstract class CometColumnarShuffleSuite extends CometTestBase with AdaptiveSpar
CometConf.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD.key -> numElementsForceSpillThreshold.toString,
CometConf.COMET_EXEC_ENABLED.key -> "false",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_COLUMNAR_SHUFFLE_MEMORY_SIZE.key -> "1536m") {
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
testFun
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ class CometExecSuite extends CometTestBase {
}

test("spill sort with (multiple) dictionaries") {
withSQLConf(CometConf.COMET_MEMORY_OVERHEAD.key -> "15MB") {
withSQLConf(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "15MB") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawTimeParquetFileColumns(path, dictionaryEnabled = true, n = 1000, rowGroupSize = 10)
Expand All @@ -1270,7 +1270,7 @@ class CometExecSuite extends CometTestBase {
}

test("spill sort with (multiple) dictionaries on mixed columns") {
withSQLConf(CometConf.COMET_MEMORY_OVERHEAD.key -> "15MB") {
withSQLConf(CometConf.COMET_ONHEAP_MEMORY_OVERHEAD.key -> "15MB") {
withTempDir { dir =>
val path = new Path(dir.toURI.toString, "part-r-0.parquet")
makeRawTimeParquetFile(path, dictionaryEnabled = true, n = 1000, rowGroupSize = 10)
Expand Down
Loading
Loading