Skip to content

Commit eb9c1ff

Browse files
EmilyMattmartin-g
authored andcommitted
feat: Add evaluate_to_arrays function (apache#18446)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#18330 . ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Reduce code duplication. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> A util function replacing many calls which are using the same code. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> No logic should change whatsoever, so each area which now uses this code should have it's own tests and benchmarks unmodified. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> Yes, there is now a new pub function. No other changes to API. --------- Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> (cherry picked from commit 76b4156) (cherry picked from commit 0ff5c27)
1 parent d719463 commit eb9c1ff

13 files changed

Lines changed: 153 additions & 99 deletions

File tree

datafusion/expr-common/src/columnar_value.rs

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,12 @@ impl ColumnarValue {
113113
}
114114
}
115115

116-
/// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
117-
/// number of rows. [`Self::Scalar`] is converted by repeating the same
118-
/// scalar multiple times which is not as efficient as handling the scalar
119-
/// directly.
116+
/// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
117+
/// number of rows by repeating the same scalar multiple times,
118+
/// which is not as efficient as handling the scalar directly.
119+
/// [`Self::Array`] will just be returned as is.
120+
///
121+
/// See [`Self::into_array_of_size`] if you need to validate the length of the output array.
120122
///
121123
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
122124
/// arrays of the same length.
@@ -135,6 +137,38 @@ impl ColumnarValue {
135137
/// number of rows. [`Self::Scalar`] is converted by repeating the same
136138
/// scalar multiple times which is not as efficient as handling the scalar
137139
/// directly.
140+
/// This validates that if this is [`Self::Array`], it has the expected length.
141+
///
142+
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
143+
/// arrays of the same length.
144+
///
145+
/// # Errors
146+
///
147+
/// Errors if `self` is a Scalar that fails to be converted into an array of size or
148+
/// if the array length does not match the expected length
149+
pub fn into_array_of_size(self, num_rows: usize) -> Result<ArrayRef> {
150+
match self {
151+
ColumnarValue::Array(array) => {
152+
if array.len() == num_rows {
153+
Ok(array)
154+
} else {
155+
internal_err!(
156+
"Array length {} does not match expected length {}",
157+
array.len(),
158+
num_rows
159+
)
160+
}
161+
}
162+
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
163+
}
164+
}
165+
166+
/// Convert any [`Self::Scalar`] into an Arrow [`ArrayRef`] with the specified
167+
/// number of rows by repeating the same scalar multiple times,
168+
/// which is not as efficient as handling the scalar directly.
169+
/// [`Self::Array`] will just be returned as is.
170+
///
171+
/// See [`Self::to_array_of_size`] if you need to validate the length of the output array.
138172
///
139173
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
140174
/// arrays of the same length.
@@ -149,6 +183,36 @@ impl ColumnarValue {
149183
})
150184
}
151185

186+
/// Convert a columnar value into an Arrow [`ArrayRef`] with the specified
187+
/// number of rows. [`Self::Scalar`] is converted by repeating the same
188+
/// scalar multiple times which is not as efficient as handling the scalar
189+
/// directly.
190+
/// This validates that if this is [`Self::Array`], it has the expected length.
191+
///
192+
/// See [`Self::values_to_arrays`] to convert multiple columnar values into
193+
/// arrays of the same length.
194+
///
195+
/// # Errors
196+
///
197+
/// Errors if `self` is a Scalar that fails to be converted into an array of size or
198+
/// if the array length does not match the expected length
199+
pub fn to_array_of_size(&self, num_rows: usize) -> Result<ArrayRef> {
200+
match self {
201+
ColumnarValue::Array(array) => {
202+
if array.len() == num_rows {
203+
Ok(Arc::clone(array))
204+
} else {
205+
internal_err!(
206+
"Array length {} does not match expected length {}",
207+
array.len(),
208+
num_rows
209+
)
210+
}
211+
}
212+
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
213+
}
214+
}
215+
152216
/// Null columnar values are implemented as a null array in order to pass batch
153217
/// num_rows
154218
pub fn create_null_array(num_rows: usize) -> Self {
@@ -249,6 +313,34 @@ mod tests {
249313
use super::*;
250314
use arrow::array::Int32Array;
251315

316+
#[test]
317+
fn into_array_of_size() {
318+
// Array case
319+
let arr = make_array(1, 3);
320+
let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
321+
assert_eq!(&arr_columnar_value.into_array_of_size(3).unwrap(), &arr);
322+
323+
// Scalar case
324+
let scalar_columnar_value = ColumnarValue::Scalar(ScalarValue::Int32(Some(42)));
325+
let expected_array = make_array(42, 100);
326+
assert_eq!(
327+
&scalar_columnar_value.into_array_of_size(100).unwrap(),
328+
&expected_array
329+
);
330+
331+
// Array case with wrong size
332+
let arr = make_array(1, 3);
333+
let arr_columnar_value = ColumnarValue::Array(Arc::clone(&arr));
334+
let result = arr_columnar_value.into_array_of_size(5);
335+
let err = result.unwrap_err();
336+
assert!(
337+
err.to_string().starts_with(
338+
"Internal error: Array length 3 does not match expected length 5"
339+
),
340+
"Found: {err}"
341+
);
342+
}
343+
252344
#[test]
253345
fn values_to_arrays() {
254346
// (input, expected)

datafusion/physical-expr-common/src/utils.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::tree_node::ExprContext;
2222

2323
use arrow::array::{make_array, Array, ArrayRef, BooleanArray, MutableArrayData};
2424
use arrow::compute::{and_kleene, is_not_null, SlicesIterator};
25+
use arrow::record_batch::RecordBatch;
2526
use datafusion_common::Result;
2627
use datafusion_expr_common::sort_properties::ExprProperties;
2728

@@ -91,6 +92,26 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
9192
Ok(make_array(data))
9293
}
9394

95+
/// Evaluates expressions against a record batch.
96+
/// This will convert the resulting ColumnarValues to ArrayRefs,
97+
/// duplicating any ScalarValues that may have been returned,
98+
/// and validating that the returned arrays all have the same
99+
/// number of rows as the input batch.
100+
#[inline]
101+
pub fn evaluate_expressions_to_arrays<'a>(
102+
exprs: impl IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
103+
batch: &RecordBatch,
104+
) -> Result<Vec<ArrayRef>> {
105+
let num_rows = batch.num_rows();
106+
exprs
107+
.into_iter()
108+
.map(|e| {
109+
e.evaluate(batch)
110+
.and_then(|col| col.into_array_of_size(num_rows))
111+
})
112+
.collect::<Result<Vec<ArrayRef>>>()
113+
}
114+
94115
#[cfg(test)]
95116
mod tests {
96117
use std::sync::Arc;

datafusion/physical-expr/src/window/standard_window_function_expr.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use arrow::record_batch::RecordBatch;
2323
use datafusion_common::Result;
2424
use datafusion_expr::PartitionEvaluator;
2525

26+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
2627
use std::any::Any;
2728
use std::sync::Arc;
2829

@@ -57,13 +58,7 @@ pub trait StandardWindowFunctionExpr: Send + Sync + std::fmt::Debug {
5758
///
5859
/// Typically, the resulting vector is a single element vector.
5960
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
60-
self.expressions()
61-
.iter()
62-
.map(|e| {
63-
e.evaluate(batch)
64-
.and_then(|v| v.into_array(batch.num_rows()))
65-
})
66-
.collect()
61+
evaluate_expressions_to_arrays(&self.expressions(), batch)
6762
}
6863

6964
/// Create a [`PartitionEvaluator`] for evaluating the function on

datafusion/physical-expr/src/window/window_expr.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use datafusion_expr::window_state::{
4040
use datafusion_expr::{Accumulator, PartitionEvaluator, WindowFrame, WindowFrameBound};
4141
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
4242

43+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
4344
use indexmap::IndexMap;
4445

4546
/// Common trait for [window function] implementations
@@ -89,13 +90,7 @@ pub trait WindowExpr: Send + Sync + Debug {
8990
/// Evaluate the window function arguments against the batch and return
9091
/// array ref, normally the resulting `Vec` is a single element one.
9192
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
92-
self.expressions()
93-
.iter()
94-
.map(|e| {
95-
e.evaluate(batch)
96-
.and_then(|v| v.into_array(batch.num_rows()))
97-
})
98-
.collect()
93+
evaluate_expressions_to_arrays(&self.expressions(), batch)
9994
}
10095

10196
/// Evaluate the window function values against the batch

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use datafusion_physical_expr_common::sort_expr::{
5959
};
6060

6161
use datafusion_expr::utils::AggregateOrderSensitivity;
62+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
6263
use itertools::Itertools;
6364

6465
pub mod group_values;
@@ -1434,25 +1435,14 @@ pub fn finalize_aggregation(
14341435
}
14351436
}
14361437

1437-
/// Evaluates expressions against a record batch.
1438-
fn evaluate(
1439-
expr: &[Arc<dyn PhysicalExpr>],
1440-
batch: &RecordBatch,
1441-
) -> Result<Vec<ArrayRef>> {
1442-
expr.iter()
1443-
.map(|expr| {
1444-
expr.evaluate(batch)
1445-
.and_then(|v| v.into_array(batch.num_rows()))
1446-
})
1447-
.collect()
1448-
}
1449-
1450-
/// Evaluates expressions against a record batch.
1438+
/// Evaluates groups of expressions against a record batch.
14511439
pub fn evaluate_many(
14521440
expr: &[Vec<Arc<dyn PhysicalExpr>>],
14531441
batch: &RecordBatch,
14541442
) -> Result<Vec<Vec<ArrayRef>>> {
1455-
expr.iter().map(|expr| evaluate(expr, batch)).collect()
1443+
expr.iter()
1444+
.map(|expr| evaluate_expressions_to_arrays(expr, batch))
1445+
.collect()
14561446
}
14571447

14581448
fn evaluate_optional(
@@ -1506,23 +1496,14 @@ pub fn evaluate_group_by(
15061496
group_by: &PhysicalGroupBy,
15071497
batch: &RecordBatch,
15081498
) -> Result<Vec<Vec<ArrayRef>>> {
1509-
let exprs: Vec<ArrayRef> = group_by
1510-
.expr
1511-
.iter()
1512-
.map(|(expr, _)| {
1513-
let value = expr.evaluate(batch)?;
1514-
value.into_array(batch.num_rows())
1515-
})
1516-
.collect::<Result<Vec<_>>>()?;
1517-
1518-
let null_exprs: Vec<ArrayRef> = group_by
1519-
.null_expr
1520-
.iter()
1521-
.map(|(expr, _)| {
1522-
let value = expr.evaluate(batch)?;
1523-
value.into_array(batch.num_rows())
1524-
})
1525-
.collect::<Result<Vec<_>>>()?;
1499+
let exprs = evaluate_expressions_to_arrays(
1500+
group_by.expr.iter().map(|(expr, _)| expr),
1501+
batch,
1502+
)?;
1503+
let null_exprs = evaluate_expressions_to_arrays(
1504+
group_by.null_expr.iter().map(|(expr, _)| expr),
1505+
batch,
1506+
)?;
15261507

15271508
group_by
15281509
.groups

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@ use std::borrow::Cow;
3333
use std::sync::Arc;
3434
use std::task::{Context, Poll};
3535

36+
use super::AggregateExec;
3637
use crate::filter::batch_filter;
3738
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
39+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
3840
use futures::stream::{Stream, StreamExt};
3941

40-
use super::AggregateExec;
41-
4242
/// stream struct for aggregation without grouping columns
4343
pub(crate) struct AggregateStream {
4444
stream: BoxStream<'static, Result<RecordBatch>>,
@@ -219,13 +219,8 @@ fn aggregate_batch(
219219
None => Cow::Borrowed(&batch),
220220
};
221221

222-
let n_rows = batch.num_rows();
223-
224222
// 1.3
225-
let values = expr
226-
.iter()
227-
.map(|e| e.evaluate(&batch).and_then(|v| v.into_array(n_rows)))
228-
.collect::<Result<Vec<_>>>()?;
223+
let values = evaluate_expressions_to_arrays(expr, batch.as_ref())?;
229224

230225
// 1.4
231226
let size_pre = accum.size();

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
7777

7878
use ahash::RandomState;
7979
use datafusion_physical_expr_common::physical_expr::fmt_sql;
80+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8081
use futures::TryStreamExt;
8182
use parking_lot::Mutex;
8283

@@ -1449,13 +1450,7 @@ async fn collect_left_input(
14491450
BooleanBufferBuilder::new(0)
14501451
};
14511452

1452-
let left_values = on_left
1453-
.iter()
1454-
.map(|c| {
1455-
c.evaluate(&single_batch)?
1456-
.into_array(single_batch.num_rows())
1457-
})
1458-
.collect::<Result<Vec<_>>>()?;
1453+
let left_values = evaluate_expressions_to_arrays(&on_left, &single_batch)?;
14591454

14601455
// Compute bounds for dynamic filter if enabled
14611456
let bounds = match bounds_accumulators {

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use datafusion_common::{
5050
use datafusion_physical_expr::PhysicalExprRef;
5151

5252
use ahash::RandomState;
53+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
5354
use futures::{ready, Stream, StreamExt};
5455

5556
/// Represents build-side of hash join.
@@ -436,11 +437,7 @@ impl HashJoinStream {
436437
}
437438
Some(Ok(batch)) => {
438439
// Precalculate hash values for fetched batch
439-
let keys_values = self
440-
.on_right
441-
.iter()
442-
.map(|c| c.evaluate(&batch)?.into_array(batch.num_rows()))
443-
.collect::<Result<Vec<_>>>()?;
440+
let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?;
444441

445442
self.hashes_buffer.clear();
446443
self.hashes_buffer.resize(batch.num_rows(), 0);

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExprRef};
7878
use datafusion_physical_expr_common::sort_expr::{LexOrdering, OrderingRequirements};
7979

8080
use ahash::RandomState;
81+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
8182
use futures::{ready, Stream, StreamExt};
8283
use parking_lot::Mutex;
8384

@@ -1065,14 +1066,8 @@ fn lookup_join_hashmap(
10651066
hashes_buffer: &mut Vec<u64>,
10661067
deleted_offset: Option<usize>,
10671068
) -> Result<(UInt64Array, UInt32Array)> {
1068-
let keys_values = probe_on
1069-
.iter()
1070-
.map(|c| c.evaluate(probe_batch)?.into_array(probe_batch.num_rows()))
1071-
.collect::<Result<Vec<_>>>()?;
1072-
let build_join_values = build_on
1073-
.iter()
1074-
.map(|c| c.evaluate(build_batch)?.into_array(build_batch.num_rows()))
1075-
.collect::<Result<Vec<_>>>()?;
1069+
let keys_values = evaluate_expressions_to_arrays(probe_on, probe_batch)?;
1070+
let build_join_values = evaluate_expressions_to_arrays(build_on, build_batch)?;
10761071

10771072
hashes_buffer.clear();
10781073
hashes_buffer.resize(probe_batch.num_rows(), 0);

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use datafusion_physical_expr::{
6868
};
6969

7070
use datafusion_physical_expr_common::datum::compare_op_for_nested;
71+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
7172
use futures::future::{BoxFuture, Shared};
7273
use futures::{ready, FutureExt};
7374
use parking_lot::Mutex;
@@ -1665,10 +1666,7 @@ pub fn update_hash(
16651666
fifo_hashmap: bool,
16661667
) -> Result<()> {
16671668
// evaluate the keys
1668-
let keys_values = on
1669-
.iter()
1670-
.map(|c| c.evaluate(batch)?.into_array(batch.num_rows()))
1671-
.collect::<Result<Vec<_>>>()?;
1669+
let keys_values = evaluate_expressions_to_arrays(on, batch)?;
16721670

16731671
// calculate the hash values
16741672
let hash_values = create_hashes(&keys_values, random_state, hashes_buffer)?;

0 commit comments

Comments
 (0)