Skip to content

Commit f8dbbb2

Browse files
goldmedalwiedld
authored andcommitted
Move MAKE_MAP to ExprPlanner (apache#11452)
* move make_map to ExprPlanner * add benchmark for make_map * remove todo comment * update lock * refactor plan_make_map * implement make_array_strict for type checking strictly * fix planner provider * roll back to `make_array` * update lock
1 parent a3191f2 commit f8dbbb2

10 files changed

Lines changed: 131 additions & 184 deletions

File tree

datafusion-cli/Cargo.lock

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/expr/src/planner.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ pub trait ExprPlanner: Send + Sync {
173173
fn plan_overlay(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
174174
Ok(PlannerResult::Original(args))
175175
}
176+
177+
/// Plan a make_map expression, e.g., `make_map(key1, value1, key2, value2, ...)`
178+
///
179+
/// Returns origin expression arguments if not possible
180+
fn plan_make_map(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
181+
Ok(PlannerResult::Original(args))
182+
}
176183
}
177184

178185
/// An operator with two arguments to plan

datafusion/functions-array/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,15 @@ datafusion-functions-aggregate = { workspace = true }
5353
itertools = { version = "0.12", features = ["use_std"] }
5454
log = { workspace = true }
5555
paste = "1.0.14"
56+
rand = "0.8.5"
5657

5758
[dev-dependencies]
5859
criterion = { version = "0.5", features = ["async_tokio"] }
5960

6061
[[bench]]
6162
harness = false
6263
name = "array_expression"
64+
65+
[[bench]]
66+
harness = false
67+
name = "map"
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
extern crate criterion;
19+
20+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
21+
use rand::prelude::ThreadRng;
22+
use rand::Rng;
23+
24+
use datafusion_common::ScalarValue;
25+
use datafusion_expr::planner::ExprPlanner;
26+
use datafusion_expr::Expr;
27+
use datafusion_functions_array::planner::ArrayFunctionPlanner;
28+
29+
fn keys(rng: &mut ThreadRng) -> Vec<String> {
30+
let mut keys = vec![];
31+
for _ in 0..1000 {
32+
keys.push(rng.gen_range(0..9999).to_string());
33+
}
34+
keys
35+
}
36+
37+
fn values(rng: &mut ThreadRng) -> Vec<i32> {
38+
let mut values = vec![];
39+
for _ in 0..1000 {
40+
values.push(rng.gen_range(0..9999));
41+
}
42+
values
43+
}
44+
45+
fn criterion_benchmark(c: &mut Criterion) {
46+
c.bench_function("make_map_1000", |b| {
47+
let mut rng = rand::thread_rng();
48+
let keys = keys(&mut rng);
49+
let values = values(&mut rng);
50+
let mut buffer = Vec::new();
51+
for i in 0..1000 {
52+
buffer.push(Expr::Literal(ScalarValue::Utf8(Some(keys[i].clone()))));
53+
buffer.push(Expr::Literal(ScalarValue::Int32(Some(values[i]))));
54+
}
55+
56+
let planner = ArrayFunctionPlanner {};
57+
58+
b.iter(|| {
59+
black_box(
60+
planner
61+
.plan_make_map(buffer.clone())
62+
.expect("map should work on valid values"),
63+
);
64+
});
65+
});
66+
}
67+
68+
criterion_group!(benches, criterion_benchmark);
69+
criterion_main!(benches);

datafusion/functions-array/src/planner.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
//! SQL planning extensions like [`ArrayFunctionPlanner`] and [`FieldAccessPlanner`]
1919
20-
use datafusion_common::{utils::list_ndims, DFSchema, Result};
20+
use datafusion_common::{exec_err, utils::list_ndims, DFSchema, Result};
21+
use datafusion_expr::expr::ScalarFunction;
2122
use datafusion_expr::{
2223
expr::AggregateFunctionDefinition,
2324
planner::{ExprPlanner, PlannerResult, RawBinaryExpr, RawFieldAccessExpr},
@@ -98,6 +99,24 @@ impl ExprPlanner for ArrayFunctionPlanner {
9899
) -> Result<PlannerResult<Vec<Expr>>> {
99100
Ok(PlannerResult::Planned(make_array(exprs)))
100101
}
102+
103+
fn plan_make_map(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
104+
if args.len() % 2 != 0 {
105+
return exec_err!("make_map requires an even number of arguments");
106+
}
107+
108+
let (keys, values): (Vec<_>, Vec<_>) =
109+
args.into_iter().enumerate().partition(|(i, _)| i % 2 == 0);
110+
let keys = make_array(keys.into_iter().map(|(_, e)| e).collect());
111+
let values = make_array(values.into_iter().map(|(_, e)| e).collect());
112+
113+
Ok(PlannerResult::Planned(Expr::ScalarFunction(
114+
ScalarFunction::new_udf(
115+
datafusion_functions::core::map(),
116+
vec![keys, values],
117+
),
118+
)))
119+
}
101120
}
102121

103122
pub struct FieldAccessPlanner;

datafusion/functions/benches/map.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use arrow_buffer::{OffsetBuffer, ScalarBuffer};
2323
use criterion::{black_box, criterion_group, criterion_main, Criterion};
2424
use datafusion_common::ScalarValue;
2525
use datafusion_expr::ColumnarValue;
26-
use datafusion_functions::core::{make_map, map};
26+
use datafusion_functions::core::map;
2727
use rand::prelude::ThreadRng;
2828
use rand::Rng;
2929
use std::sync::Arc;
@@ -45,27 +45,6 @@ fn values(rng: &mut ThreadRng) -> Vec<i32> {
4545
}
4646

4747
fn criterion_benchmark(c: &mut Criterion) {
48-
c.bench_function("make_map_1000", |b| {
49-
let mut rng = rand::thread_rng();
50-
let keys = keys(&mut rng);
51-
let values = values(&mut rng);
52-
let mut buffer = Vec::new();
53-
for i in 0..1000 {
54-
buffer.push(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
55-
keys[i].clone(),
56-
))));
57-
buffer.push(ColumnarValue::Scalar(ScalarValue::Int32(Some(values[i]))));
58-
}
59-
60-
b.iter(|| {
61-
black_box(
62-
make_map()
63-
.invoke(&buffer)
64-
.expect("map should work on valid values"),
65-
);
66-
});
67-
});
68-
6948
c.bench_function("map_1000", |b| {
7049
let mut rng = rand::thread_rng();
7150
let field = Arc::new(Field::new("item", DataType::Utf8, true));

datafusion/functions/src/core/map.rs

Lines changed: 2 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,11 @@ use std::collections::VecDeque;
2020
use std::sync::Arc;
2121

2222
use arrow::array::{Array, ArrayData, ArrayRef, MapArray, StructArray};
23-
use arrow::compute::concat;
2423
use arrow::datatypes::{DataType, Field, SchemaBuilder};
2524
use arrow_buffer::{Buffer, ToByteSlice};
2625

27-
use datafusion_common::{exec_err, internal_err, ScalarValue};
28-
use datafusion_common::{not_impl_err, Result};
26+
use datafusion_common::Result;
27+
use datafusion_common::{exec_err, ScalarValue};
2928
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
3029

3130
/// Check if we can evaluate the expr to constant directly.
@@ -40,41 +39,6 @@ fn can_evaluate_to_const(args: &[ColumnarValue]) -> bool {
4039
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)))
4140
}
4241

43-
fn make_map(args: &[ColumnarValue]) -> Result<ColumnarValue> {
44-
let can_evaluate_to_const = can_evaluate_to_const(args);
45-
46-
let (key, value): (Vec<_>, Vec<_>) = args
47-
.chunks_exact(2)
48-
.map(|chunk| {
49-
if let ColumnarValue::Array(_) = chunk[0] {
50-
return not_impl_err!("make_map does not support array keys");
51-
}
52-
if let ColumnarValue::Array(_) = chunk[1] {
53-
return not_impl_err!("make_map does not support array values");
54-
}
55-
Ok((chunk[0].clone(), chunk[1].clone()))
56-
})
57-
.collect::<Result<Vec<_>>>()?
58-
.into_iter()
59-
.unzip();
60-
61-
let keys = ColumnarValue::values_to_arrays(&key)?;
62-
let values = ColumnarValue::values_to_arrays(&value)?;
63-
64-
let keys: Vec<_> = keys.iter().map(|k| k.as_ref()).collect();
65-
let values: Vec<_> = values.iter().map(|v| v.as_ref()).collect();
66-
67-
let key = match concat(&keys) {
68-
Ok(key) => key,
69-
Err(e) => return internal_err!("Error concatenating keys: {}", e),
70-
};
71-
let value = match concat(&values) {
72-
Ok(value) => value,
73-
Err(e) => return internal_err!("Error concatenating values: {}", e),
74-
};
75-
make_map_batch_internal(key, value, can_evaluate_to_const)
76-
}
77-
7842
fn make_map_batch(args: &[ColumnarValue]) -> Result<ColumnarValue> {
7943
if args.len() != 2 {
8044
return exec_err!(
@@ -154,115 +118,6 @@ fn make_map_batch_internal(
154118
})
155119
}
156120

157-
#[derive(Debug)]
158-
pub struct MakeMap {
159-
signature: Signature,
160-
}
161-
162-
impl Default for MakeMap {
163-
fn default() -> Self {
164-
Self::new()
165-
}
166-
}
167-
168-
impl MakeMap {
169-
pub fn new() -> Self {
170-
Self {
171-
signature: Signature::user_defined(Volatility::Immutable),
172-
}
173-
}
174-
}
175-
176-
impl ScalarUDFImpl for MakeMap {
177-
fn as_any(&self) -> &dyn Any {
178-
self
179-
}
180-
181-
fn name(&self) -> &str {
182-
"make_map"
183-
}
184-
185-
fn signature(&self) -> &Signature {
186-
&self.signature
187-
}
188-
189-
fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
190-
if arg_types.is_empty() {
191-
return exec_err!(
192-
"make_map requires at least one pair of arguments, got 0 instead"
193-
);
194-
}
195-
if arg_types.len() % 2 != 0 {
196-
return exec_err!(
197-
"make_map requires an even number of arguments, got {} instead",
198-
arg_types.len()
199-
);
200-
}
201-
202-
let key_type = &arg_types[0];
203-
let mut value_type = &arg_types[1];
204-
205-
for (i, chunk) in arg_types.chunks_exact(2).enumerate() {
206-
if chunk[0].is_null() {
207-
return exec_err!("make_map key cannot be null at position {}", i);
208-
}
209-
if &chunk[0] != key_type {
210-
return exec_err!(
211-
"make_map requires all keys to have the same type {}, got {} instead at position {}",
212-
key_type,
213-
chunk[0],
214-
i
215-
);
216-
}
217-
218-
if !chunk[1].is_null() {
219-
if value_type.is_null() {
220-
value_type = &chunk[1];
221-
} else if &chunk[1] != value_type {
222-
return exec_err!(
223-
"map requires all values to have the same type {}, got {} instead at position {}",
224-
value_type,
225-
&chunk[1],
226-
i
227-
);
228-
}
229-
}
230-
}
231-
232-
let mut result = Vec::new();
233-
for _ in 0..arg_types.len() / 2 {
234-
result.push(key_type.clone());
235-
result.push(value_type.clone());
236-
}
237-
238-
Ok(result)
239-
}
240-
241-
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
242-
let key_type = &arg_types[0];
243-
let mut value_type = &arg_types[1];
244-
245-
for chunk in arg_types.chunks_exact(2) {
246-
if !chunk[1].is_null() && value_type.is_null() {
247-
value_type = &chunk[1];
248-
}
249-
}
250-
251-
let mut builder = SchemaBuilder::new();
252-
builder.push(Field::new("key", key_type.clone(), false));
253-
builder.push(Field::new("value", value_type.clone(), true));
254-
let fields = builder.finish().fields;
255-
Ok(DataType::Map(
256-
Arc::new(Field::new("entries", DataType::Struct(fields), false)),
257-
false,
258-
))
259-
}
260-
261-
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
262-
make_map(args)
263-
}
264-
}
265-
266121
#[derive(Debug)]
267122
pub struct MapFunc {
268123
signature: Signature,

0 commit comments

Comments
 (0)