Skip to content

Commit 0408c2b

Browse files
authored
Rename bounded_order_preserving_variants config to prefer_exising_sort and update docs (#7723)
* Improve documentation for bounded_order_preserving_variants config * update docs * fmt * update config * fix typo :facepalm * prettier * Reword for clarity
1 parent f16bc8b commit 0408c2b

File tree

6 files changed

+28
-23
lines changed

6 files changed

+28
-23
lines changed

datafusion/common/src/config.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -453,11 +453,13 @@ config_namespace! {
453453
/// ```
454454
pub repartition_sorts: bool, default = true
455455

456-
/// When true, DataFusion will opportunistically remove sorts by replacing
457-
/// `RepartitionExec` with `SortPreservingRepartitionExec`, and
458-
/// `CoalescePartitionsExec` with `SortPreservingMergeExec`,
459-
/// even when the query is bounded.
460-
pub bounded_order_preserving_variants: bool, default = false
456+
/// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
457+
/// (i.e. setting `preserve_order` to true on `RepartitionExec` and
458+
/// using `SortPreservingMergeExec`)
459+
///
460+
/// When false, DataFusion will maximize plan parallelism using
461+
/// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
462+
pub prefer_existing_sort: bool, default = false
461463

462464
/// When set to true, the logical plan optimizer will produce warning
463465
/// messages if any optimization rules produce errors and then proceed to the next

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,7 +1228,7 @@ fn ensure_distribution(
12281228
// - it is desired according to config
12291229
// - when plan is unbounded
12301230
let order_preserving_variants_desirable =
1231-
is_unbounded || config.optimizer.bounded_order_preserving_variants;
1231+
is_unbounded || config.optimizer.prefer_existing_sort;
12321232

12331233
if dist_context.plan.children().is_empty() {
12341234
return Ok(Transformed::No(dist_context));
@@ -2085,8 +2085,7 @@ mod tests {
20852085
config.optimizer.enable_round_robin_repartition = false;
20862086
config.optimizer.repartition_file_scans = false;
20872087
config.optimizer.repartition_file_min_size = 1024;
2088-
config.optimizer.bounded_order_preserving_variants =
2089-
bounded_order_preserving_variants;
2088+
config.optimizer.prefer_existing_sort = bounded_order_preserving_variants;
20902089
ensure_distribution(distribution_context, &config).map(|item| item.into().plan)
20912090
}
20922091

@@ -2124,7 +2123,7 @@ mod tests {
21242123
config.execution.target_partitions = $TARGET_PARTITIONS;
21252124
config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS;
21262125
config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE;
2127-
config.optimizer.bounded_order_preserving_variants = $BOUNDED_ORDER_PRESERVING_VARIANTS;
2126+
config.optimizer.prefer_existing_sort = $BOUNDED_ORDER_PRESERVING_VARIANTS;
21282127

21292128
// NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade
21302129
// because they were written prior to the separation of `BasicEnforcement` into
@@ -4516,7 +4515,7 @@ mod tests {
45164515
let mut config = ConfigOptions::new();
45174516
config.execution.target_partitions = 10;
45184517
config.optimizer.enable_round_robin_repartition = true;
4519-
config.optimizer.bounded_order_preserving_variants = false;
4518+
config.optimizer.prefer_existing_sort = false;
45204519
let distribution_plan =
45214520
EnforceDistribution::new().optimize(physical_plan, &config)?;
45224521
assert_plan_txt!(expected, distribution_plan);
@@ -4558,7 +4557,7 @@ mod tests {
45584557
let mut config = ConfigOptions::new();
45594558
config.execution.target_partitions = 10;
45604559
config.optimizer.enable_round_robin_repartition = true;
4561-
config.optimizer.bounded_order_preserving_variants = false;
4560+
config.optimizer.prefer_existing_sort = false;
45624561
let distribution_plan =
45634562
EnforceDistribution::new().optimize(physical_plan, &config)?;
45644563
assert_plan_txt!(expected, distribution_plan);

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ pub(crate) fn replace_with_order_preserving_variants(
251251
// any case, as doing so helps fix the pipeline.
252252
// Also do the replacement if opted-in via config options.
253253
let use_order_preserving_variant =
254-
config.optimizer.bounded_order_preserving_variants || unbounded_output(plan);
254+
config.optimizer.prefer_existing_sort || unbounded_output(plan);
255255
let updated_sort_input = get_updated_plan(
256256
exec_tree,
257257
is_spr_better || use_order_preserving_variant,
@@ -336,7 +336,7 @@ mod tests {
336336

337337
// Run the rule top-down
338338
// let optimized_physical_plan = physical_plan.transform_down(&replace_repartition_execs)?;
339-
let config = SessionConfig::new().with_bounded_order_preserving_variants($ALLOW_BOUNDED);
339+
let config = SessionConfig::new().with_prefer_existing_sort($ALLOW_BOUNDED);
340340
let plan_with_pipeline_fixer = OrderPreservationContext::new(physical_plan);
341341
let parallel = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| replace_with_order_preserving_variants(plan_with_pipeline_fixer, false, false, config.options()))?;
342342
let optimized_physical_plan = parallel.plan;

datafusion/execution/src/config.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,12 @@ impl SessionConfig {
145145
self.options.optimizer.repartition_sorts
146146
}
147147

148-
/// Remove sorts by replacing with order-preserving variants of operators,
149-
/// even when query is bounded?
150-
pub fn bounded_order_preserving_variants(&self) -> bool {
151-
self.options.optimizer.bounded_order_preserving_variants
148+
/// Prefer existing sort (true) or maximize parallelism (false). See
149+
/// [prefer_existing_sort] for more details
150+
///
151+
/// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
152+
pub fn prefer_existing_sort(&self) -> bool {
153+
self.options.optimizer.prefer_existing_sort
152154
}
153155

154156
/// Are statistics collected during execution?
@@ -221,10 +223,12 @@ impl SessionConfig {
221223
self
222224
}
223225

224-
/// Enables or disables the use of order-preserving variants of `CoalescePartitions`
225-
/// and `RepartitionExec` operators, even when the query is bounded
226-
pub fn with_bounded_order_preserving_variants(mut self, enabled: bool) -> Self {
227-
self.options.optimizer.bounded_order_preserving_variants = enabled;
226+
/// Prefer existing sort (true) or maximize parallelism (false). See
227+
/// [prefer_existing_sort] for more details
228+
///
229+
/// [prefer_existing_sort]: datafusion_common::config::OptimizerOptions::prefer_existing_sort
230+
pub fn with_prefer_existing_sort(mut self, enabled: bool) -> Self {
231+
self.options.optimizer.prefer_existing_sort = enabled;
228232
self
229233
}
230234

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,12 @@ datafusion.explain.logical_plan_only false
183183
datafusion.explain.physical_plan_only false
184184
datafusion.explain.show_statistics false
185185
datafusion.optimizer.allow_symmetric_joins_without_pruning true
186-
datafusion.optimizer.bounded_order_preserving_variants false
187186
datafusion.optimizer.enable_round_robin_repartition true
188187
datafusion.optimizer.enable_topk_aggregation true
189188
datafusion.optimizer.filter_null_join_keys false
190189
datafusion.optimizer.hash_join_single_partition_threshold 1048576
191190
datafusion.optimizer.max_passes 3
191+
datafusion.optimizer.prefer_existing_sort false
192192
datafusion.optimizer.prefer_hash_join true
193193
datafusion.optimizer.repartition_aggregations true
194194
datafusion.optimizer.repartition_file_min_size 10485760

docs/source/user-guide/configs.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
8787
| datafusion.optimizer.repartition_file_scans | true | When set to `true`, file groups will be repartitioned to achieve maximum parallelism. Currently Parquet and CSV formats are supported. If set to `true`, all files will be repartitioned evenly (i.e., a single large file might be partitioned into smaller chunks) for parallel scanning. If set to `false`, different files will be read in parallel, but repartitioning won't happen within a single file. |
8888
| datafusion.optimizer.repartition_windows | true | Should DataFusion repartition data using the partitions keys to execute window functions in parallel using the provided `target_partitions` level |
8989
| datafusion.optimizer.repartition_sorts | true | Should DataFusion execute sorts in a per-partition fashion and merge afterwards instead of coalescing first and sorting globally. With this flag is enabled, plans in the form below `text "SortExec: [a@0 ASC]", " CoalescePartitionsExec", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` would turn into the plan below which performs better in multithreaded environments `text "SortPreservingMergeExec: [a@0 ASC]", " SortExec: [a@0 ASC]", " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1", ` |
90-
| datafusion.optimizer.bounded_order_preserving_variants | false | When true, DataFusion will opportunistically remove sorts by replacing `RepartitionExec` with `SortPreservingRepartitionExec`, and `CoalescePartitionsExec` with `SortPreservingMergeExec`, even when the query is bounded. |
90+
| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting `preserve_order` to true on `RepartitionExec` and using `SortPreservingMergeExec`) When false, DataFusion will maximize plan parallelism using `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`. |
9191
| datafusion.optimizer.skip_failed_rules | false | When set to true, the logical plan optimizer will produce warning messages if any optimization rules produce errors and then proceed to the next rule. When set to false, any rules that produce errors will cause the query to fail |
9292
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
9393
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |

0 commit comments

Comments
 (0)