Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,18 +749,27 @@ impl DefaultPhysicalPlanner {
Arc::clone(&physical_input_schema),
)?);

let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();

// Some aggregators may be modified during initialization for
// optimization purposes. For example, a FIRST_VALUE may turn
// into a LAST_VALUE with the reverse ordering requirement.
// To reflect such changes to subsequent stages, use the updated
// `AggregateFunctionExpr`/`PhysicalSortExpr` objects.
let updated_aggregates = initial_aggr.aggr_expr().to_vec();
let final_grouping_set = initial_aggr.group_expr().as_final();

// For aggregations with grouping sets,
// the group by expressions differ between the partial and final stages,
// requiring a shuffle.
let has_grouping_id = final_grouping_set
.expr()
.iter()
.any(|(_, name)| name == Aggregate::INTERNAL_GROUPING_ID);

let next_partition_mode = if can_repartition {
let can_repartition = !groups.is_empty()
&& session_state.config().target_partitions() > 1
&& session_state.config().repartition_aggregations();

let next_partition_mode = if can_repartition || has_grouping_id {
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
AggregateMode::FinalPartitioned
} else {
Expand All @@ -769,8 +778,6 @@ impl DefaultPhysicalPlanner {
AggregateMode::Final
};

let final_grouping_set = initial_aggr.group_expr().as_final();

Arc::new(AggregateExec::try_new(
next_partition_mode,
final_grouping_set,
Expand Down
61 changes: 49 additions & 12 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5693,28 +5693,30 @@ SELECT DISTINCT c3, c2 FROM aggregate_test_100 group by c3, c2 order by c3, c2 l
-90 4

query TT
EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3;
EXPLAIN SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) ORDER BY c3 NULLS FIRST, c2 NULLS FIRST limit 3;
----
logical_plan
01)Projection: aggregate_test_100.c2, aggregate_test_100.c3
02)--Limit: skip=0, fetch=3
01)Sort: aggregate_test_100.c3 ASC NULLS FIRST, aggregate_test_100.c2 ASC NULLS FIRST, fetch=3
02)--Projection: aggregate_test_100.c2, aggregate_test_100.c3
03)----Aggregate: groupBy=[[ROLLUP (aggregate_test_100.c2, aggregate_test_100.c3)]], aggr=[[]]
04)------TableScan: aggregate_test_100 projection=[c2, c3]
physical_plan
01)ProjectionExec: expr=[c2@0 as c2, c3@1 as c3]
02)--GlobalLimitExec: skip=0, fetch=3
03)----AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3, __grouping_id@2 as __grouping_id], aggr=[], lim=[3]
04)------CoalescePartitionsExec
05)--------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[]
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true
01)SortPreservingMergeExec: [c3@1 ASC, c2@0 ASC], fetch=3
02)--SortExec: TopK(fetch=3), expr=[c3@1 ASC, c2@0 ASC], preserve_partitioning=[true]
03)----ProjectionExec: expr=[c2@0 as c2, c3@1 as c3]
04)------AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2, c3@1 as c3, __grouping_id@2 as __grouping_id], aggr=[]
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([c2@0, c3@1, __grouping_id@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[(NULL as c2, NULL as c3), (c2@0 as c2, NULL as c3), (c2@0 as c2, c3@1 as c3)], aggr=[]
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
09)----------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], file_type=csv, has_header=true

query II
SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) limit 3;
SELECT c2, c3 FROM aggregate_test_100 group by rollup(c2, c3) ORDER BY c3 NULLS FIRST, c2 NULLS FIRST limit 3;
----
NULL NULL
1 NULL
2 NULL
5 NULL


statement ok
Expand Down Expand Up @@ -7390,6 +7392,41 @@ query error Error during planning: ORDER BY and WITHIN GROUP clauses cannot be u
SELECT array_agg(a_varchar order by a_varchar) WITHIN GROUP (ORDER BY a_varchar)
FROM (VALUES ('a'), ('d'), ('c'), ('a')) t(a_varchar);

statement ok
SET datafusion.execution.target_partitions = 1;

query TT
EXPLAIN select * from (select 'id' as id union all select 'id' as id order by id) group by grouping sets ((id), ());
----
logical_plan
01)Projection: id
02)--Aggregate: groupBy=[[GROUPING SETS ((id), ())]], aggr=[[]]
03)----Union
04)------Projection: Utf8("id") AS id
05)--------EmptyRelation: rows=1
06)------Projection: Utf8("id") AS id
07)--------EmptyRelation: rows=1
physical_plan
01)ProjectionExec: expr=[id@0 as id]
02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id, __grouping_id@1 as __grouping_id], aggr=[], ordering_mode=PartiallySorted([0])
03)----CoalesceBatchesExec: target_batch_size=8192
04)------RepartitionExec: partitioning=Hash([id@0, __grouping_id@1], 1), input_partitions=2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm not understanding something but how does "repartitioning" to a single partition change anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it has single partition, but multiple record batches. aggregation assumes that records in same group are adjacent, but it's not true for this case. repartition solves this problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, been busy for past few days so just getting back to this. I think I understand the underlying issue now since id is a const we infer it as a singleton which is why we get the issue.

Still I'm concerned that we are solving this with a pretty blunt instrument. Adding a repartition to ever aggregation with a grouping set can have a non-trivial cost, especially in a distributed query.

Looking into it a bit more, it seems like in this case we infer SortProperties::Singleton for the id expr in the final aggregation which I think is incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the underlying issue is that in aggregation with group_id, the partial aggregation and final aggregation have different group columns. if partition num is greater than 1, it always do repartition, so this problem is covered up.

05)--------AggregateExec: mode=Partial, gby=[(id@0 as id), (NULL as id)], aggr=[]
06)----------UnionExec
07)------------ProjectionExec: expr=[id]
08)--------------PlaceholderRowExec
09)------------ProjectionExec: expr=[id]
10)--------------PlaceholderRowExec

query T rowsort
select * from (select 'id' as id union all select 'id' as id order by id) group by grouping sets ((id), ());
----
NULL
id

statement ok
set datafusion.execution.target_partitions = 2;

# distinct average
statement ok
create table distinct_avg (a int, b double) as values
Expand Down