diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index a107db7f5c288..876f3ba457a81 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -749,18 +749,27 @@ impl DefaultPhysicalPlanner { Arc::clone(&physical_input_schema), )?); - let can_repartition = !groups.is_empty() - && session_state.config().target_partitions() > 1 - && session_state.config().repartition_aggregations(); - // Some aggregators may be modified during initialization for // optimization purposes. For example, a FIRST_VALUE may turn // into a LAST_VALUE with the reverse ordering requirement. // To reflect such changes to subsequent stages, use the updated // `AggregateFunctionExpr`/`PhysicalSortExpr` objects. let updated_aggregates = initial_aggr.aggr_expr().to_vec(); + let final_grouping_set = initial_aggr.group_expr().as_final(); + + // For aggregations with grouping sets, + // the group by expressions differ between the partial and final stages, + // requiring a shuffle. + let has_grouping_id = final_grouping_set + .expr() + .iter() + .any(|(_, name)| name == Aggregate::INTERNAL_GROUPING_ID); - let next_partition_mode = if can_repartition { + let can_repartition = !groups.is_empty() + && session_state.config().target_partitions() > 1 + && session_state.config().repartition_aggregations(); + + let next_partition_mode = if can_repartition || has_grouping_id { // construct a second aggregation with 'AggregateMode::FinalPartitioned' AggregateMode::FinalPartitioned } else { @@ -769,8 +778,6 @@ impl DefaultPhysicalPlanner { AggregateMode::Final }; - let final_grouping_set = initial_aggr.group_expr().as_final(); - Arc::new(AggregateExec::try_new( next_partition_mode, final_grouping_set, diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 35b2a6c03b399..400dd7ae4b701 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5693,28 +5693,30 @@ SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c3, c2 order by c3, c2 l -90 4 query TT -EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3; +EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) ORDER BY c3 NULLS FIRST, c2 NULLS FIRST limit 3; ---- logical_plan -01)Projection: aggregate_test_100.c2, aggregate_test_100.c3 -02)--Limit: skip=0, fetch=3 +01)Sort: aggregate_test_100.c3 ASC NULLS FIRST, aggregate_test_100.c2 ASC NULLS FIRST, fetch=3 +02)--Projection: aggregate_test_100.c2, aggregate_test_100.c3 03)----Aggregate: groupBy=[[ROLLUP (aggregate_test_100.c2, aggregate_test_100.c3)]], aggr=[[]] 04)------TableScan: aggregate_test_100 projection=[c2, c3] physical_plan -01)ProjectionExec: expr=[c2@0 as c2, c3@1 as c3] -02)--GlobalLimitExec: skip=0, fetch=3 -03)----AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3, __grouping_id@2 as __grouping_id], aggr=[], lim=[3] -04)------CoalescePartitionsExec -05)--------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[] -06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true +01)SortPreservingMergeExec: [c3@1 ASC, c2@0 ASC], fetch=3 +02)--SortExec: TopK(fetch=3), expr=[c3@1 ASC, c2@0 ASC], preserve_partitioning=[true] +03)----ProjectionExec: expr=[c2@0 as c2, c3@1 as c3] +04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2, c3@1 as c3, __grouping_id@2 as __grouping_id], aggr=[] +05)--------CoalesceBatchesExec: target_batch_size=8192 +06)----------RepartitionExec: partitioning=Hash([c2@0, c3@1, __grouping_id@2], 4), input_partitions=4 +07)------------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[] +08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true query II -SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3; +SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) ORDER BY c3 NULLS FIRST, c2 NULLS FIRST limit 3; ---- NULL NULL +1 NULL 2 NULL -5 NULL statement ok @@ -7390,6 +7392,41 @@ query error Error during planning: ORDER BY and WITHIN GROUP clauses cannot be u SELECT array_agg(a_varchar order by a_varchar) WITHIN GROUP (ORDER BY a_varchar) FROM (VALUES ('a'), ('d'), ('c'), ('a')) t(a_varchar); +statement ok +SET datafusion.execution.target_partitions = 1; + +query TT +EXPLAIN select * from (select 'id' as id union all select 'id' as id order by id) group by grouping sets ((id), ()); +---- +logical_plan +01)Projection: id +02)--Aggregate: groupBy=[[GROUPING SETS ((id), ())]], aggr=[[]] +03)----Union +04)------Projection: Utf8("id") AS id +05)--------EmptyRelation: rows=1 +06)------Projection: Utf8("id") AS id +07)--------EmptyRelation: rows=1 +physical_plan +01)ProjectionExec: expr=[id@0 as id] +02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[], ordering_mode=PartiallySorted([0]) +03)----CoalesceBatchesExec: target_batch_size=8192 +04)------RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 1), input_partitions=2 +05)--------AggregateExec: mode=Partial, gby=[(id@0 as id), (NULL as id)], aggr=[] +06)----------UnionExec +07)------------ProjectionExec: expr=[id] +08)--------------PlaceholderRowExec +09)------------ProjectionExec: expr=[id] +10)--------------PlaceholderRowExec + +query T rowsort +select * from (select 'id' as id union all select 'id' as id order by id) group by grouping sets ((id), ()); +---- +NULL +id + +statement ok +set datafusion.execution.target_partitions = 2; + # distinct average statement ok create table distinct_avg (a int, b double) as values