perf: Optimize array_agg() using GroupsAccumulator#20504
perf: Optimize array_agg() using GroupsAccumulator#20504martin-g merged 6 commits intoapache:mainfrom
array_agg() using GroupsAccumulator#20504Conversation
|
Benchmarks: |
| d 2.444444444444 0.541519476308 | ||
| e 3 0.505440263521 | ||
|
|
||
| # FIXME: add bool_and(v3) column when issue fixed |
There was a problem hiding this comment.
Not related to this PR -- this test was redundant with another query in the same file. Happy to split into another PR if preferred.
array_agg() using GroupsAccumulator
| opt_filter: Option<&BooleanArray>, | ||
| total_num_groups: usize, | ||
| ) -> Result<()> { | ||
| assert_eq!(values.len(), 1, "single argument to update_batch"); |
There was a problem hiding this comment.
| assert_eq!(values.len(), 1, "single argument to update_batch"); | |
| assert_eq_or_internal_err!(values.len(), 1, "single argument to update_batch"); |
There was a problem hiding this comment.
I was using assert_eq! because that is what a bunch of the other aggregate functions use, e.g.,:
I don't have a strong preference but we should probably be consistent.
There was a problem hiding this comment.
Let's see what others think.
My opinion is that a library should try to avoid panicking as hard as possible.
But this could be improved in a follow-up if others agree!
| _opt_filter: Option<&BooleanArray>, | ||
| total_num_groups: usize, | ||
| ) -> Result<()> { | ||
| assert_eq!(values.len(), 1, "one argument to merge_batch"); |
There was a problem hiding this comment.
| assert_eq!(values.len(), 1, "one argument to merge_batch"); | |
| assert_eq_or_internal_err!(values.len(), 1, "one argument to merge_batch"); |
| values: &[ArrayRef], | ||
| opt_filter: Option<&BooleanArray>, | ||
| ) -> Result<Vec<ArrayRef>> { | ||
| assert_eq!(values.len(), 1, "one argument to convert_to_state"); |
There was a problem hiding this comment.
| assert_eq!(values.len(), 1, "one argument to convert_to_state"); | |
| assert_eq_or_internal_err!(values.len(), 1, "one argument to convert_to_state"); |
|
Also related cc @duongcongtoai perhaps you have some time to help review this PR? |
| !args.is_distinct && args.order_bys.is_empty() | ||
| } | ||
|
|
||
| fn create_groups_accumulator( |
There was a problem hiding this comment.
i was trying to compare this implementation with myown and found 2x difference (mine is faster 😆 ), i was trying to add some profile to check if there is room for improvement but most of the runtime goes to the optimizer parts instead of the real micro function, maybe we should refactor the benchmark/add micro benchmark there
There was a problem hiding this comment.
i was trying to compare this implementation with #17915 and found 2x difference (mine is faster 😆 )
Interesting -- which workload did you use for this?
There was a problem hiding this comment.
i was refering to array_agg_query_group_by_many_groups
|
Comparing this implementation with #17915 on the end-to-end |
9bc1f4e to
c5238ac
Compare
|
I think my initial approach of organizing the aggregate state by group wasn't ideal -- when there are many groups, this leads to a lot of small allocations and a more random memory access pattern. I pushed a new version of this PR that uses a per-batch organization instead: that is, for each batch we keep a reference to the batch contents, plus a Vec of Updated benchmark numbers: So this approach is >2x faster than my initial approach for the many-groups case, and somewhat faster for the mid-groups case as well. It is slightly slower for the few-groups case, but intuitively I'd guess that is tolerable: the regression is relatively modest compared to the speedups for other cases. But LMK if folks disagree about that. |
|
Running the repro from #17446, main vs. this PR: (Directory names are a bit confusing: the |
|
Thank you @neilconway, @duongcongtoai and @alamb ! |
|
This is so great -- thank you all |
Which issue does this PR close?
array_aggis very slow #20465.array_agg, Polars is 4 times faster for equal query #17446.Rationale for this change
This PR optimizes the performance of
array_agg()by adding support for theGroupsAccumulatorAPI.The design tries to minimize the amount of per-batch work done in
update_batch(): we store a reference to the batch, and a(group_idx, row_idx)pair for each row. Inevaluate(), we assemble all the requested output with a singleinterleavecall.This turns out to be significantly faster, because we copy much less data and assembling the results can be vectorized more effectively. For example, on a benchmark with 5000 groups and 5000 int64 values per group, this approach is roughly 190x faster than the previous approach.
Releasing memory after a partial emit is a little more involved than the previous approach, but with some determination it is still possible.
What changes are included in this PR?
GroupsAccumulatorAPI forarray_agg()array_aggof a named struct over a dict, following the workload in Slow aggregrate query witharray_agg, Polars is 4 times faster for equal query #17446Are these changes tested?
Yes, and benchmarked.
Are there any user-facing changes?
No.
AI usage
Iterated with the help of multiple AI tools; I've reviewed and understand the resulting code.