Skip to content

Commit 0c492e1

Browse files
committed
Rework with_preserve_order usage
1 parent ee55ec3 commit 0c492e1

File tree

6 files changed

+81
-69
lines changed

6 files changed

+81
-69
lines changed

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -929,14 +929,12 @@ fn add_roundrobin_on_top(
929929
// - Preserving ordering is not helpful in terms of satisfying ordering requirements
930930
// - Usage of order preserving variants is not desirable
931931
// (determined by flag `config.optimizer.bounded_order_preserving_variants`)
932-
let should_preserve_ordering = input.output_ordering().is_some();
933-
934932
let partitioning = Partitioning::RoundRobinBatch(n_target);
935-
let repartition = RepartitionExec::try_new(input, partitioning)?;
936-
let new_plan = Arc::new(repartition.with_preserve_order(should_preserve_ordering))
937-
as Arc<dyn ExecutionPlan>;
933+
let repartition =
934+
RepartitionExec::try_new(input, partitioning)?.with_preserve_order();
938935

939936
// update distribution onward with new operator
937+
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
940938
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
941939
Ok(new_plan)
942940
} else {
@@ -999,7 +997,6 @@ fn add_hash_on_top(
999997
// requirements.
1000998
// - Usage of order preserving variants is not desirable (per the flag
1001999
// `config.optimizer.bounded_order_preserving_variants`).
1002-
let should_preserve_ordering = input.output_ordering().is_some();
10031000
let mut new_plan = if repartition_beneficial_stats {
10041001
// Since hashing benefits from partitioning, add a round-robin repartition
10051002
// before it:
@@ -1008,9 +1005,10 @@ fn add_hash_on_top(
10081005
input
10091006
};
10101007
let partitioning = Partitioning::Hash(hash_exprs, n_target);
1011-
let repartition = RepartitionExec::try_new(new_plan, partitioning)?;
1012-
new_plan =
1013-
Arc::new(repartition.with_preserve_order(should_preserve_ordering)) as _;
1008+
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
1009+
// preserve any ordering if possible
1010+
.with_preserve_order();
1011+
new_plan = Arc::new(repartition) as _;
10141012

10151013
// update distribution onward with new operator
10161014
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
@@ -1159,11 +1157,11 @@ fn replace_order_preserving_variants_helper(
11591157
if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::<RepartitionExec>() {
11601158
if repartition.preserve_order() {
11611159
return Ok(Arc::new(
1160+
// new RepartitionExec don't preserve order
11621161
RepartitionExec::try_new(
11631162
updated_children.swap_remove(0),
11641163
repartition.partitioning().clone(),
1165-
)?
1166-
.with_preserve_order(false),
1164+
)?,
11671165
));
11681166
}
11691167
}

datafusion/core/src/physical_optimizer/enforce_sorting.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -703,11 +703,11 @@ fn remove_corresponding_sort_from_sub_plan(
703703
} else if let Some(repartition) = plan.as_any().downcast_ref::<RepartitionExec>()
704704
{
705705
Arc::new(
706+
// By default, RepartitionExec does not preserve order
706707
RepartitionExec::try_new(
707708
children.swap_remove(0),
708709
repartition.partitioning().clone(),
709-
)?
710-
.with_preserve_order(false),
710+
)?,
711711
)
712712
} else {
713713
plan.clone().with_new_children(children)?
@@ -844,7 +844,6 @@ mod tests {
844844
};
845845
}
846846

847-
848847
#[tokio::test]
849848
async fn test_remove_unnecessary_sort() -> Result<()> {
850849
let schema = create_test_schema()?;

datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,9 @@ fn get_updated_plan(
176176
// a `SortPreservingRepartitionExec` if appropriate:
177177
if is_repartition(&plan) && !plan.maintains_input_order()[0] && is_spr_better {
178178
let child = plan.children().swap_remove(0);
179-
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?;
180-
plan = Arc::new(repartition.with_preserve_order(true)) as _
179+
let repartition = RepartitionExec::try_new(child, plan.output_partitioning())?
180+
.with_preserve_order();
181+
plan = Arc::new(repartition) as _
181182
}
182183
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
183184
// with a `SortPreservingMergeExec` if appropriate:

datafusion/core/src/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ pub fn spr_repartition_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionP
328328
Arc::new(
329329
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(10))
330330
.unwrap()
331-
.with_preserve_order(true),
331+
.with_preserve_order(),
332332
)
333333
}
334334

datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ mod sp_repartition_fuzz_tests {
140140
Arc::new(
141141
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
142142
.unwrap()
143-
.with_preserve_order(true),
143+
.with_preserve_order(),
144144
)
145145
}
146146

@@ -159,7 +159,7 @@ mod sp_repartition_fuzz_tests {
159159
Arc::new(
160160
RepartitionExec::try_new(input, Partitioning::Hash(hash_expr, 2))
161161
.unwrap()
162-
.with_preserve_order(true),
162+
.with_preserve_order(),
163163
)
164164
}
165165

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 64 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,34 +24,34 @@ use std::sync::Arc;
2424
use std::task::{Context, Poll};
2525
use std::{any::Any, vec};
2626

27+
use arrow::array::{ArrayRef, UInt64Builder};
28+
use arrow::datatypes::SchemaRef;
29+
use arrow::record_batch::RecordBatch;
30+
use futures::stream::Stream;
31+
use futures::{FutureExt, StreamExt};
32+
use hashbrown::HashMap;
33+
use log::trace;
34+
use parking_lot::Mutex;
35+
use tokio::task::JoinHandle;
36+
37+
use datafusion_common::{not_impl_err, DataFusionError, Result};
38+
use datafusion_execution::memory_pool::MemoryConsumer;
39+
use datafusion_execution::TaskContext;
40+
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
41+
2742
use crate::common::transpose;
2843
use crate::hash_utils::create_hashes;
2944
use crate::metrics::BaselineMetrics;
3045
use crate::repartition::distributor_channels::{channels, partition_aware_channels};
3146
use crate::sorts::streaming_merge;
3247
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics};
3348

34-
use self::distributor_channels::{DistributionReceiver, DistributionSender};
35-
3649
use super::common::{AbortOnDropMany, AbortOnDropSingle, SharedMemoryReservation};
3750
use super::expressions::PhysicalSortExpr;
3851
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
3952
use super::{DisplayAs, RecordBatchStream, SendableRecordBatchStream};
4053

41-
use arrow::array::{ArrayRef, UInt64Builder};
42-
use arrow::datatypes::SchemaRef;
43-
use arrow::record_batch::RecordBatch;
44-
use datafusion_common::{not_impl_err, DataFusionError, Result};
45-
use datafusion_execution::memory_pool::MemoryConsumer;
46-
use datafusion_execution::TaskContext;
47-
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
48-
49-
use futures::stream::Stream;
50-
use futures::{FutureExt, StreamExt};
51-
use hashbrown::HashMap;
52-
use log::trace;
53-
use parking_lot::Mutex;
54-
use tokio::task::JoinHandle;
54+
use self::distributor_channels::{DistributionReceiver, DistributionSender};
5555

5656
mod distributor_channels;
5757

@@ -428,9 +428,12 @@ impl ExecutionPlan for RepartitionExec {
428428
self: Arc<Self>,
429429
mut children: Vec<Arc<dyn ExecutionPlan>>,
430430
) -> Result<Arc<dyn ExecutionPlan>> {
431-
let repartition =
432-
RepartitionExec::try_new(children.swap_remove(0), self.partitioning.clone());
433-
repartition.map(|r| Arc::new(r.with_preserve_order(self.preserve_order)) as _)
431+
let mut repartition =
432+
RepartitionExec::try_new(children.swap_remove(0), self.partitioning.clone())?;
433+
if self.preserve_order {
434+
repartition = repartition.with_preserve_order();
435+
}
436+
Ok(Arc::new(repartition))
434437
}
435438

436439
/// Specifies whether this plan generates an infinite stream of records.
@@ -628,7 +631,9 @@ impl ExecutionPlan for RepartitionExec {
628631
}
629632

630633
impl RepartitionExec {
631-
/// Create a new RepartitionExec
634+
/// Create a new RepartitionExec, that produces output `partitioning`, and
635+
/// does not preserve the order of the input (see [`Self::with_preserve_order`]
636+
/// for more details)
632637
pub fn try_new(
633638
input: Arc<dyn ExecutionPlan>,
634639
partitioning: Partitioning,
@@ -652,8 +657,8 @@ impl RepartitionExec {
652657
///
653658
/// If the input is not ordered, or has only one partition, this is a no op,
654659
/// and the node remains a `RepartitionExec`.
655-
pub fn with_preserve_order(mut self, preserve_order: bool) -> Self {
656-
self.preserve_order = preserve_order &&
660+
pub fn with_preserve_order(mut self) -> Self {
661+
self.preserve_order =
657662
// If the input isn't ordered, there is no ordering to preserve
658663
self.input.output_ordering().is_some() &&
659664
// if there is only one input partition, merging is not required
@@ -918,7 +923,19 @@ impl RecordBatchStream for PerPartitionStream {
918923

919924
#[cfg(test)]
920925
mod tests {
921-
use super::*;
926+
use std::collections::HashSet;
927+
928+
use arrow::array::{ArrayRef, StringArray};
929+
use arrow::datatypes::{DataType, Field, Schema};
930+
use arrow::record_batch::RecordBatch;
931+
use arrow_array::UInt32Array;
932+
use futures::FutureExt;
933+
use tokio::task::JoinHandle;
934+
935+
use datafusion_common::cast::as_string_array;
936+
use datafusion_common::{assert_batches_sorted_eq, exec_err};
937+
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
938+
922939
use crate::{
923940
test::{
924941
assert_is_pending,
@@ -929,16 +946,8 @@ mod tests {
929946
},
930947
{collect, expressions::col, memory::MemoryExec},
931948
};
932-
use arrow::array::{ArrayRef, StringArray};
933-
use arrow::datatypes::{DataType, Field, Schema};
934-
use arrow::record_batch::RecordBatch;
935-
use arrow_array::UInt32Array;
936-
use datafusion_common::cast::as_string_array;
937-
use datafusion_common::{assert_batches_sorted_eq, exec_err};
938-
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
939-
use futures::FutureExt;
940-
use std::collections::HashSet;
941-
use tokio::task::JoinHandle;
949+
950+
use super::*;
942951

943952
#[tokio::test]
944953
async fn one_to_many_round_robin() -> Result<()> {
@@ -1443,11 +1452,14 @@ mod tests {
14431452
#[cfg(test)]
14441453
mod test {
14451454
use arrow_schema::{DataType, Field, Schema, SortOptions};
1455+
14461456
use datafusion_physical_expr::expressions::col;
1457+
14471458
use crate::memory::MemoryExec;
1448-
use super::*;
14491459
use crate::union::UnionExec;
14501460

1461+
use super::*;
1462+
14511463
/// Asserts that the plan is as expected
14521464
///
14531465
/// `$EXPECTED_PLAN_LINES`: input plan
@@ -1477,9 +1489,10 @@ mod test {
14771489
let source2 = sorted_memory_exec(&schema, sort_exprs);
14781490
// output has multiple partitions, and is sorted
14791491
let union = UnionExec::new(vec![source1, source2]);
1480-
let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
1481-
.unwrap()
1482-
.with_preserve_order(true);
1492+
let exec =
1493+
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
1494+
.unwrap()
1495+
.with_preserve_order();
14831496

14841497
// Repartition should preserve order
14851498
let expected_plan = [
@@ -1498,9 +1511,9 @@ mod test {
14981511
let sort_exprs = sort_exprs(&schema);
14991512
let source = sorted_memory_exec(&schema, sort_exprs);
15001513
// output is sorted, but has only a single partition, so no need to sort
1501-
let exec = RepartitionExec::try_new(source, Partitioning::RoundRobinBatch(10))
1514+
let exec = RepartitionExec::try_new(source, Partitioning::RoundRobinBatch(10))
15021515
.unwrap()
1503-
.with_preserve_order(true);
1516+
.with_preserve_order();
15041517

15051518
// Repartition should not preserve order
15061519
let expected_plan = [
@@ -1518,9 +1531,10 @@ mod test {
15181531
let source2 = memory_exec(&schema);
15191532
// output has multiple partitions, but is not sorted
15201533
let union = UnionExec::new(vec![source1, source2]);
1521-
let exec = RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
1522-
.unwrap()
1523-
.with_preserve_order(true);
1534+
let exec =
1535+
RepartitionExec::try_new(Arc::new(union), Partitioning::RoundRobinBatch(10))
1536+
.unwrap()
1537+
.with_preserve_order();
15241538

15251539
// Repartition should not preserve order, as there is no order to preserve
15261540
let expected_plan = [
@@ -1545,18 +1559,18 @@ mod test {
15451559
}]
15461560
}
15471561

1548-
15491562
fn memory_exec(schema: &SchemaRef) -> Arc<dyn ExecutionPlan> {
15501563
Arc::new(MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap())
15511564
}
15521565

1553-
fn sorted_memory_exec(schema: &SchemaRef, sort_exprs: Vec<PhysicalSortExpr>) -> Arc<dyn ExecutionPlan> {
1566+
fn sorted_memory_exec(
1567+
schema: &SchemaRef,
1568+
sort_exprs: Vec<PhysicalSortExpr>,
1569+
) -> Arc<dyn ExecutionPlan> {
15541570
Arc::new(
1555-
MemoryExec::try_new(&[vec![]], schema.clone(), None).unwrap()
1556-
.with_sort_information(vec![sort_exprs])
1571+
MemoryExec::try_new(&[vec![]], schema.clone(), None)
1572+
.unwrap()
1573+
.with_sort_information(vec![sort_exprs]),
15571574
)
15581575
}
1559-
1560-
1561-
1562-
}
1576+
}

0 commit comments

Comments
 (0)