From 5d8dca6167f3aef9d76fe1bb22aa7fac4f6a1e3b Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 31 Jul 2025 00:23:57 +0800 Subject: [PATCH 1/5] fix: repartition for grouping set if target_partitions = 1 --- datafusion/core/src/physical_planner.rs | 24 +++++++++---- .../sqllogictest/test_files/aggregate.slt | 35 +++++++++++++++++++ 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 5a0ee327cb6aa..11a5630908afa 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -749,16 +749,30 @@ impl DefaultPhysicalPlanner { Arc::clone(&physical_input_schema), )?); - let can_repartition = !groups.is_empty() - && session_state.config().target_partitions() > 1 - && session_state.config().repartition_aggregations(); - // Some aggregators may be modified during initialization for // optimization purposes. For example, a FIRST_VALUE may turn // into a LAST_VALUE with the reverse ordering requirement. // To reflect such changes to subsequent stages, use the updated // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); + let final_grouping_set = initial_aggr.group_expr().as_final(); + + // For aggregations with grouping sets, + // the group by expressions differ between the partial and final stages, + // requiring a shuffle. + let has_grouping_id = final_grouping_set + .expr() + .iter() + .any(|(_, name)| name == Aggregate::INTERNAL_GROUPING_ID); + + let partition_num = (Arc::clone(&initial_aggr) as Arc) + .output_partitioning() + .partition_count(); + + let can_repartition = !groups.is_empty() + && (session_state.config().target_partitions() > 1 + || (has_grouping_id && partition_num > 1)) + && session_state.config().repartition_aggregations(); let next_partition_mode = if can_repartition { // construct a second aggregation with 'AggregateMode::FinalPartitioned' @@ -769,8 +783,6 @@ impl DefaultPhysicalPlanner { AggregateMode::Final }; - let final_grouping_set = initial_aggr.group_expr().as_final(); - Arc::new(AggregateExec::try_new( next_partition_mode, final_grouping_set, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index bdf327c98248a..371352d9650cf 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7210,3 +7210,38 @@ FROM (VALUES ('a'), ('d'), ('c'), ('a')) t(a_varchar); query error Error during planning: ORDER BY and WITHIN GROUP clauses cannot be used together in the same aggregate function SELECT array_agg(a_varchar order by a_varchar) WITHIN GROUP (ORDER BY a_varchar) FROM (VALUES ('a'), ('d'), ('c'), ('a')) t(a_varchar); + +statement ok +SET datafusion.execution.target_partitions = 1; + +query TT +EXPLAIN select * from (select 'id' as id union all select 'id' as id order by id) group by grouping sets ((id), ()); +---- +logical_plan +01)Projection: id +02)--Aggregate: groupBy=[[GROUPING SETS ((id), ())]], aggr=[[]] +03)----Union +04)------Projection: Utf8("id") AS id +05)--------EmptyRelation +06)------Projection: Utf8("id") AS id +07)--------EmptyRelation +physical_plan +01)ProjectionExec: expr=[id@0 as id] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[], ordering_mode=PartiallySorted([0]) +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 1), input_partitions=2 +05)--------AggregateExec: mode=Partial, gby=[(id@0 as id), (NULL as id)], aggr=[] +06)----------UnionExec +07)------------ProjectionExec: expr=[id] +08)--------------PlaceholderRowExec +09)------------ProjectionExec: expr=[id] +10)--------------PlaceholderRowExec + +query T rowsort +select * from (select 'id' as id union all select 'id' as id order by id) group by grouping sets ((id), ()); +---- +NULL +id + +statement ok +set datafusion.execution.target_partitions = 2; From 75bf986ad6f4b711710fb09514ee58068013d1c7 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 31 Jul 2025 08:35:10 +0800 Subject: [PATCH 2/5] update --- datafusion/core/src/physical_planner.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 11a5630908afa..28e292abcabb7 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -765,13 +765,9 @@ impl DefaultPhysicalPlanner { .iter() .any(|(_, name)| name == Aggregate::INTERNAL_GROUPING_ID); - let partition_num = (Arc::clone(&initial_aggr) as Arc) - .output_partitioning() - .partition_count(); - let can_repartition = !groups.is_empty() && (session_state.config().target_partitions() > 1 - || (has_grouping_id && partition_num > 1)) + || has_grouping_id) && session_state.config().repartition_aggregations(); let next_partition_mode = if can_repartition { From 2486334f20183c6453a2833e3e1779bfe1fffb38 Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Mon, 11 Aug 2025 09:02:35 +0800 Subject: [PATCH 3/5] update --- datafusion/core/src/physical_planner.rs | 7 +++-- .../sqllogictest/test_files/aggregate.slt | 26 ++++++++++--------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 28e292abcabb7..75ecee000ac9e 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -765,12 +765,11 @@ impl DefaultPhysicalPlanner { .iter() .any(|(_, name)| name == Aggregate::INTERNAL_GROUPING_ID); - let can_repartition = !groups.is_empty() - && (session_state.config().target_partitions() > 1 - || has_grouping_id) + let can_repartition = !groups.is_empty() + && session_state.config().target_partitions() > 1 && session_state.config().repartition_aggregations(); - let next_partition_mode = if can_repartition { + let next_partition_mode = if can_repartition || has_grouping_id { // construct a second aggregation with 'AggregateMode::FinalPartitioned' AggregateMode::FinalPartitioned } else { diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 371352d9650cf..1e34ec3466384 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5592,28 +5592,30 @@ SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c3, c2 order by c3, c2 l -90 4 query TT -EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3; +EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) ORDER BY c3 NULLS FIRST, c2 NULLS FIRST limit 3; ---- logical_plan -01)Projection: aggregate_test_100.c2, aggregate_test_100.c3 -02)--Limit: skip=0, fetch=3 +01)Sort: aggregate_test_100.c3 ASC NULLS FIRST, aggregate_test_100.c2 ASC NULLS FIRST, fetch=3 +02)--Projection: aggregate_test_100.c2, aggregate_test_100.c3 03)----Aggregate: groupBy=[[ROLLUP (aggregate_test_100.c2, aggregate_test_100.c3)]], aggr=[[]] 04)------TableScan: aggregate_test_100 projection=[c2, c3] physical_plan -01)ProjectionExec: expr=[c2@0 as c2, c3@1 as c3] -02)--GlobalLimitExec: skip=0, fetch=3 -03)----AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3, __grouping_id@2 as __grouping_id], aggr=[], lim=[3] -04)------CoalescePartitionsExec -05)--------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[] -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true +01)SortPreservingMergeExec: [c3@1 ASC, c2@0 ASC], fetch=3 +02)--SortExec: TopK(fetch=3), expr=[c3@1 ASC, c2@0 ASC], preserve_partitioning=[true] +03)----ProjectionExec: expr=[c2@0 as c2, c3@1 as c3] +04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2, c3@1 as c3, __grouping_id@2 as __grouping_id], aggr=[] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([c2@0, c3@1, __grouping_id@2], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true query II -SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3; +SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) ORDER BY c3 NULLS FIRST, c2 NULLS FIRST limit 3; ---- NULL NULL +1 NULL 2 NULL -5 NULL statement ok From 7930b35ecedf9bd9fb233014da034d9852a2960a Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Thu, 21 Aug 2025 13:19:41 +0800 Subject: [PATCH 4/5] fmt --- datafusion/core/src/physical_planner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 75ecee000ac9e..68baddaf56fa8 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -765,7 +765,7 @@ impl DefaultPhysicalPlanner { .iter() .any(|(_, name)| name == Aggregate::INTERNAL_GROUPING_ID); - let can_repartition = !groups.is_empty() + let can_repartition = !groups.is_empty() && session_state.config().target_partitions() > 1 && session_state.config().repartition_aggregations(); From ce7cfe77e77bf36e86eaa8854bb8a704a2d563cf Mon Sep 17 00:00:00 2001 From: Chongchen Chen Date: Mon, 1 Sep 2025 22:21:49 +0800 Subject: [PATCH 5/5] update --- datafusion/sqllogictest/test_files/aggregate.slt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index c680439c80796..400dd7ae4b701 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -7403,9 +7403,9 @@ logical_plan 02)--Aggregate: groupBy=[[GROUPING SETS ((id), ())]], aggr=[[]] 03)----Union 04)------Projection: Utf8("id") AS id -05)--------EmptyRelation +05)--------EmptyRelation: rows=1 06)------Projection: Utf8("id") AS id -07)--------EmptyRelation +07)--------EmptyRelation: rows=1 physical_plan 01)ProjectionExec: expr=[id@0 as id] 02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[], ordering_mode=PartiallySorted([0]) @@ -7426,6 +7426,7 @@ id statement ok set datafusion.execution.target_partitions = 2; + # distinct average statement ok create table distinct_avg (a int, b double) as values