@@ -1439,3 +1439,124 @@ mod tests {
14391439 . unwrap ( )
14401440 }
14411441}
1442+
1443+ #[ cfg( test) ]
1444+ mod test {
1445+ use arrow_schema:: { DataType , Field , Schema , SortOptions } ;
1446+ use datafusion_physical_expr:: expressions:: col;
1447+ use crate :: memory:: MemoryExec ;
1448+ use super :: * ;
1449+ use crate :: union:: UnionExec ;
1450+
1451+ /// Asserts that the plan is as expected
1452+ ///
1453+ /// `$EXPECTED_PLAN_LINES`: input plan
1454+ /// `$PLAN`: the plan to optimized
1455+ ///
1456+ macro_rules! assert_plan {
1457+ ( $EXPECTED_PLAN_LINES: expr, $PLAN: expr) => {
1458+ let physical_plan = $PLAN;
1459+ let formatted = crate :: displayable( & physical_plan) . indent( true ) . to_string( ) ;
1460+ let actual: Vec <& str > = formatted. trim( ) . lines( ) . collect( ) ;
1461+
1462+ let expected_plan_lines: Vec <& str > = $EXPECTED_PLAN_LINES
1463+ . iter( ) . map( |s| * s) . collect( ) ;
1464+
1465+ assert_eq!(
1466+ expected_plan_lines, actual,
1467+ "\n **Original Plan Mismatch\n \n expected:\n \n {expected_plan_lines:#?}\n actual:\n \n {actual:#?}\n \n "
1468+ ) ;
1469+ } ;
1470+ }
1471+
1472+ #[ tokio:: test]
1473+ async fn test_preserve_order ( ) -> Result < ( ) > {
1474+ let schema = test_schema ( ) ;
1475+ let sort_exprs = sort_exprs ( & schema) ;
1476+ let source1 = sorted_memory_exec ( & schema, sort_exprs. clone ( ) ) ;
1477+ let source2 = sorted_memory_exec ( & schema, sort_exprs) ;
1478+ // output has multiple partitions, and is sorted
1479+ let union = UnionExec :: new ( vec ! [ source1, source2] ) ;
1480+ let exec = RepartitionExec :: try_new ( Arc :: new ( union) , Partitioning :: RoundRobinBatch ( 10 ) )
1481+ . unwrap ( )
1482+ . with_preserve_order ( true ) ;
1483+
1484+ // Repartition should preserve order
1485+ let expected_plan = [
1486+ "SortPreservingRepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, sort_exprs=c0@0 ASC" ,
1487+ " UnionExec" ,
1488+ " MemoryExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC" ,
1489+ " MemoryExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC" ,
1490+ ] ;
1491+ assert_plan ! ( expected_plan, exec) ;
1492+ Ok ( ( ) )
1493+ }
1494+
1495+ #[ tokio:: test]
1496+ async fn test_preserve_order_one_partition ( ) -> Result < ( ) > {
1497+ let schema = test_schema ( ) ;
1498+ let sort_exprs = sort_exprs ( & schema) ;
1499+ let source = sorted_memory_exec ( & schema, sort_exprs) ;
1500+ // output is sorted, but has only a single partition, so no need to sort
1501+ let exec = RepartitionExec :: try_new ( source, Partitioning :: RoundRobinBatch ( 10 ) )
1502+ . unwrap ( )
1503+ . with_preserve_order ( true ) ;
1504+
1505+ // Repartition should not preserve order
1506+ let expected_plan = [
1507+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1" ,
1508+ " MemoryExec: partitions=1, partition_sizes=[0], output_ordering=c0@0 ASC" ,
1509+ ] ;
1510+ assert_plan ! ( expected_plan, exec) ;
1511+ Ok ( ( ) )
1512+ }
1513+
1514+ #[ tokio:: test]
1515+ async fn test_preserve_order_input_not_sorted ( ) -> Result < ( ) > {
1516+ let schema = test_schema ( ) ;
1517+ let source1 = memory_exec ( & schema) ;
1518+ let source2 = memory_exec ( & schema) ;
1519+ // output has multiple partitions, but is not sorted
1520+ let union = UnionExec :: new ( vec ! [ source1, source2] ) ;
1521+ let exec = RepartitionExec :: try_new ( Arc :: new ( union) , Partitioning :: RoundRobinBatch ( 10 ) )
1522+ . unwrap ( )
1523+ . with_preserve_order ( true ) ;
1524+
1525+ // Repartition should not preserve order, as there is no order to preserve
1526+ let expected_plan = [
1527+ "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2" ,
1528+ " UnionExec" ,
1529+ " MemoryExec: partitions=1, partition_sizes=[0]" ,
1530+ " MemoryExec: partitions=1, partition_sizes=[0]" ,
1531+ ] ;
1532+ assert_plan ! ( expected_plan, exec) ;
1533+ Ok ( ( ) )
1534+ }
1535+
1536+ fn test_schema ( ) -> Arc < Schema > {
1537+ Arc :: new ( Schema :: new ( vec ! [ Field :: new( "c0" , DataType :: UInt32 , false ) ] ) )
1538+ }
1539+
1540+ fn sort_exprs ( schema : & Schema ) -> Vec < PhysicalSortExpr > {
1541+ let options = SortOptions :: default ( ) ;
1542+ vec ! [ PhysicalSortExpr {
1543+ expr: col( "c0" , schema) . unwrap( ) ,
1544+ options,
1545+ } ]
1546+ }
1547+
1548+
1549+ fn memory_exec ( schema : & SchemaRef ) -> Arc < dyn ExecutionPlan > {
1550+ Arc :: new ( MemoryExec :: try_new ( & [ vec ! [ ] ] , schema. clone ( ) , None ) . unwrap ( ) )
1551+ }
1552+
1553+ fn sorted_memory_exec ( schema : & SchemaRef , sort_exprs : Vec < PhysicalSortExpr > ) -> Arc < dyn ExecutionPlan > {
1554+ Arc :: new (
1555+ MemoryExec :: try_new ( & [ vec ! [ ] ] , schema. clone ( ) , None ) . unwrap ( )
1556+ . with_sort_information ( vec ! [ sort_exprs] )
1557+ )
1558+ }
1559+
1560+
1561+
1562+ }
0 commit comments