Skip to content

Commit 8711ca9

Browse files
Dandandanjorgecarleitao
authored andcommitted
ARROW-10722: [Rust][DataFusion] Reduce overhead of some data types in aggregations / joins, improve benchmarks
This PR reduces the size of `GroupByScalar` from 32 bytes to 16 bytes by using `Box<String>`. This will reduce the size of a `Vec<GroupByScalar>` and thus the key of hashmaps used for aggregates / joins. Also, it changes the type of the key to `Box<[GroupByScalar]>` to reduce memory usage further by 8 bytes per key needed to hold the capacity of the vec. Finally we can remove a `Box` around the `Vec` holding the indices. Difference in speed seems to be minimal, at least in current state. I think in the future, it could be nice to see if the data could be packed efficiently in one `Box<[T]>` (where T is a primitive value) when having no dynamically sized types by using the schema instead of creating "dynamic" values. That should also make the hashing faster. Currently, when grouping on multiple i32 values, we need 32 bytes per value (next to 24 bytes for the Vec holding the values) instead of just 4! Also using const generics https://rust-lang.github.io/rfcs/2000-const-generics.html#:~:text=Rust%20currently%20has%20one%20type,implement%20traits%20for%20all%20arrays could provide a further improvement (by not having to store the length of the slice). This PR also tries to improve reproducability in the benchmarks a bit by using the seed in the random number generator (still a quite noisy on my machine though). Closes #8765 from Dandandan/reduce_key_size Lead-authored-by: Heres, Daniel <danielheres@gmail.com> Co-authored-by: Daniël Heres <danielheres@gmail.com> Signed-off-by: Jorge C. Leitao <jorgecarleitao@gmail.com>
1 parent 49f23a1 commit 8711ca9

4 files changed

Lines changed: 56 additions & 22 deletions

File tree

rust/datafusion/benches/aggregate_query_sql.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
extern crate criterion;
2020
use criterion::Criterion;
2121

22-
use rand::seq::SliceRandom;
23-
use rand::Rng;
22+
use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
2423
use std::sync::{Arc, Mutex};
2524
use tokio::runtime::Runtime;
2625

@@ -40,6 +39,10 @@ use datafusion::datasource::MemTable;
4039
use datafusion::error::Result;
4140
use datafusion::execution::context::ExecutionContext;
4241

42+
pub fn seedable_rng() -> StdRng {
43+
StdRng::seed_from_u64(42)
44+
}
45+
4346
fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
4447
let mut rt = Runtime::new().unwrap();
4548

@@ -50,7 +53,7 @@ fn query(ctx: Arc<Mutex<ExecutionContext>>, sql: &str) {
5053

5154
fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
5255
// use random numbers to avoid spurious compiler optimizations wrt to branching
53-
let mut rng = rand::thread_rng();
56+
let mut rng = seedable_rng();
5457

5558
(0..size)
5659
.map(|_| {
@@ -65,7 +68,7 @@ fn create_data(size: usize, null_density: f64) -> Vec<Option<f64>> {
6568

6669
fn create_integer_data(size: usize, value_density: f64) -> Vec<Option<u64>> {
6770
// use random numbers to avoid spurious compiler optimizations wrt to branching
68-
let mut rng = rand::thread_rng();
71+
let mut rng = seedable_rng();
6972

7073
(0..size)
7174
.map(|_| {
@@ -98,6 +101,8 @@ fn create_context(
98101
Field::new("u64_narrow", DataType::UInt64, false),
99102
]));
100103

104+
let mut rng = seedable_rng();
105+
101106
// define data.
102107
let partitions = (0..partitions_len)
103108
.map(|_| {
@@ -109,7 +114,7 @@ fn create_context(
109114
let keys: Vec<String> = (0..batch_size)
110115
.map(
111116
// use random numbers to avoid spurious compiler optimizations wrt to branching
112-
|_| format!("hi{:?}", vs.choose(&mut rand::thread_rng())),
117+
|_| format!("hi{:?}", vs.choose(&mut rng)),
113118
)
114119
.collect();
115120
let keys: Vec<&str> = keys.iter().map(|e| &**e).collect();
@@ -122,11 +127,7 @@ fn create_context(
122127
// Integer values between [0, 9].
123128
let integer_values_narrow_choices = (0..10).collect::<Vec<u64>>();
124129
let integer_values_narrow = (0..batch_size)
125-
.map(|_| {
126-
*integer_values_narrow_choices
127-
.choose(&mut rand::thread_rng())
128-
.unwrap()
129-
})
130+
.map(|_| *integer_values_narrow_choices.choose(&mut rng).unwrap())
130131
.collect::<Vec<u64>>();
131132

132133
RecordBatch::try_new(
@@ -216,6 +217,27 @@ fn criterion_benchmark(c: &mut Criterion) {
216217
)
217218
})
218219
});
220+
221+
c.bench_function("aggregate_query_group_by_u64 15 12", |b| {
222+
b.iter(|| {
223+
query(
224+
ctx.clone(),
225+
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
226+
FROM t GROUP BY u64_narrow",
227+
)
228+
})
229+
});
230+
231+
c.bench_function("aggregate_query_group_by_with_filter_u64 15 12", |b| {
232+
b.iter(|| {
233+
query(
234+
ctx.clone(),
235+
"SELECT u64_narrow, MIN(f64), AVG(f64), COUNT(f64) \
236+
FROM t \
237+
WHERE f32 > 10 AND f32 < 20 GROUP BY u64_narrow",
238+
)
239+
})
240+
});
219241
}
220242

221243
criterion_group!(benches, criterion_benchmark);

rust/datafusion/src/physical_plan/group_scalar.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub(crate) enum GroupByScalar {
3434
Int16(i16),
3535
Int32(i32),
3636
Int64(i64),
37-
Utf8(String),
37+
Utf8(Box<String>),
3838
}
3939

4040
impl TryFrom<&ScalarValue> for GroupByScalar {
@@ -50,7 +50,7 @@ impl TryFrom<&ScalarValue> for GroupByScalar {
5050
ScalarValue::UInt16(Some(v)) => GroupByScalar::UInt16(*v),
5151
ScalarValue::UInt32(Some(v)) => GroupByScalar::UInt32(*v),
5252
ScalarValue::UInt64(Some(v)) => GroupByScalar::UInt64(*v),
53-
ScalarValue::Utf8(Some(v)) => GroupByScalar::Utf8(v.clone()),
53+
ScalarValue::Utf8(Some(v)) => GroupByScalar::Utf8(Box::new(v.clone())),
5454
ScalarValue::Int8(None)
5555
| ScalarValue::Int16(None)
5656
| ScalarValue::Int32(None)
@@ -86,7 +86,7 @@ impl From<&GroupByScalar> for ScalarValue {
8686
GroupByScalar::UInt16(v) => ScalarValue::UInt16(Some(*v)),
8787
GroupByScalar::UInt32(v) => ScalarValue::UInt32(Some(*v)),
8888
GroupByScalar::UInt64(v) => ScalarValue::UInt64(Some(*v)),
89-
GroupByScalar::Utf8(v) => ScalarValue::Utf8(Some(v.clone())),
89+
GroupByScalar::Utf8(v) => ScalarValue::Utf8(Some(v.to_string())),
9090
}
9191
}
9292
}
@@ -131,4 +131,9 @@ mod tests {
131131

132132
Ok(())
133133
}
134+
135+
#[test]
136+
fn size_of_group_by_scalar() {
137+
assert_eq!(std::mem::size_of::<GroupByScalar>(), 16);
138+
}
134139
}

rust/datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,8 @@ fn group_aggregate_batch(
250250
key.push(GroupByScalar::UInt32(0));
251251
}
252252

253+
let mut key = key.into_boxed_slice();
254+
253255
// 1.1 construct the key from the group values
254256
// 1.2 construct the mapping key if it does not exist
255257
// 1.3 add the row' index to `indices`
@@ -270,7 +272,7 @@ fn group_aggregate_batch(
270272
.or_insert_with(|| {
271273
// We can safely unwrap here as we checked we can create an accumulator before
272274
let accumulator_set = create_accumulators(aggr_expr).unwrap();
273-
(key.clone(), (accumulator_set, Box::new(vec![row as u32])))
275+
(key.clone(), (accumulator_set, vec![row as u32]))
274276
});
275277
}
276278

@@ -296,7 +298,7 @@ fn group_aggregate_batch(
296298
// 2.3
297299
compute::take(
298300
array,
299-
&UInt32Array::from(*indices.clone()),
301+
&UInt32Array::from(indices.clone()),
300302
None, // None: no index check
301303
)
302304
.unwrap()
@@ -389,7 +391,7 @@ impl GroupedHashAggregateStream {
389391

390392
type AccumulatorSet = Vec<Box<dyn Accumulator>>;
391393
type Accumulators =
392-
HashMap<Vec<GroupByScalar>, (AccumulatorSet, Box<Vec<u32>>), RandomState>;
394+
HashMap<Box<[GroupByScalar]>, (AccumulatorSet, Vec<u32>), RandomState>;
393395

394396
impl Stream for GroupedHashAggregateStream {
395397
type Item = ArrowResult<RecordBatch>;
@@ -658,7 +660,9 @@ fn create_batch_from_map(
658660
GroupByScalar::UInt16(n) => Arc::new(UInt16Array::from(vec![*n])),
659661
GroupByScalar::UInt32(n) => Arc::new(UInt32Array::from(vec![*n])),
660662
GroupByScalar::UInt64(n) => Arc::new(UInt64Array::from(vec![*n])),
661-
GroupByScalar::Utf8(str) => Arc::new(StringArray::from(vec![&**str])),
663+
GroupByScalar::Utf8(str) => {
664+
Arc::new(StringArray::from(vec![&***str]))
665+
}
662666
})
663667
.collect::<Vec<ArrayRef>>();
664668

@@ -726,7 +730,7 @@ fn finalize_aggregation(
726730
pub(crate) fn create_key(
727731
group_by_keys: &[ArrayRef],
728732
row: usize,
729-
vec: &mut Vec<GroupByScalar>,
733+
vec: &mut Box<[GroupByScalar]>,
730734
) -> Result<()> {
731735
for i in 0..group_by_keys.len() {
732736
let col = &group_by_keys[i];
@@ -765,7 +769,7 @@ pub(crate) fn create_key(
765769
}
766770
DataType::Utf8 => {
767771
let array = col.as_any().downcast_ref::<StringArray>().unwrap();
768-
vec[i] = GroupByScalar::Utf8(String::from(array.value(row)))
772+
vec[i] = GroupByScalar::Utf8(Box::new(array.value(row).into()))
769773
}
770774
_ => {
771775
// This is internal because we should have caught this before.

rust/datafusion/src/physical_plan/hash_join.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type JoinIndex = Option<(usize, usize)>;
5252
// Maps ["on" value] -> [list of indices with this key's value]
5353
// E.g. [1, 2] -> [(0, 3), (1, 6), (0, 8)] indicates that (column1, column2) = [1, 2] is true
5454
// for rows 3 and 8 from batch 0 and row 6 from batch 1.
55-
type JoinHashMap = HashMap<Vec<GroupByScalar>, Vec<Index>, RandomState>;
55+
type JoinHashMap = HashMap<Box<[GroupByScalar]>, Vec<Index>, RandomState>;
5656
type JoinLeftData = (JoinHashMap, Vec<RecordBatch>);
5757

5858
/// join execution plan executes partitions in parallel and combines them into a set of
@@ -209,6 +209,8 @@ fn update_hash(
209209
key.push(GroupByScalar::UInt32(0));
210210
}
211211

212+
let mut key = key.into_boxed_slice();
213+
212214
// update the hash map
213215
for row in 0..batch.num_rows() {
214216
create_key(&keys_values, row, &mut key)?;
@@ -368,8 +370,9 @@ fn build_join_indexes(
368370
JoinType::Inner => {
369371
// inner => key intersection
370372
// unfortunately rust does not support intersection of map keys :(
371-
let left_set: HashSet<Vec<GroupByScalar>> = left.keys().cloned().collect();
372-
let left_right: HashSet<Vec<GroupByScalar>> = right.keys().cloned().collect();
373+
let left_set: HashSet<Box<[GroupByScalar]>> = left.keys().cloned().collect();
374+
let left_right: HashSet<Box<[GroupByScalar]>> =
375+
right.keys().cloned().collect();
373376
let inner = left_set.intersection(&left_right);
374377

375378
let mut indexes = Vec::new(); // unknown a prior size

0 commit comments

Comments
 (0)