Skip to content

Commit 23f5003

Browse files
adriangbclaude
andauthored
Add struct pushdown query benchmark and projection pushdown tests (#19962)
## Summary Extract benchmarks and sqllogictest cases from #19538 for easier review. This PR includes: - **New Benchmark**: `parquet_struct_query.rs` - Benchmarks SQL queries on struct columns in Parquet files - 524,288 rows across 8 row groups - 20 benchmark queries covering struct access, filtering, joins, and aggregations - Struct schema: `id` (Int32) and `s` (Struct with `id`/Int32 and `value`/Utf8 fields) - **SQLLogicTest**: `projection_pushdown.slt` - Tests for projection pushdown optimization ## Changes - Added `datafusion/core/benches/parquet_struct_query.rs` - Updated `datafusion/core/Cargo.toml` with benchmark entry - Added `datafusion/sqllogictest/test_files/projection_pushdown.slt` ## Test Plan - Run benchmark: `cargo bench --profile dev --bench parquet_struct_query` - All 20 benchmark queries execute successfully - Parquet file generated with correct row count (524,288) and row groups (8) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent 35e99b9 commit 23f5003

3 files changed

Lines changed: 1371 additions & 0 deletions

File tree

datafusion/core/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,11 @@ harness = false
241241
name = "parquet_query_sql"
242242
required-features = ["parquet"]
243243

244+
[[bench]]
245+
harness = false
246+
name = "parquet_struct_query"
247+
required-features = ["parquet"]
248+
244249
[[bench]]
245250
harness = false
246251
name = "range_and_generate_series"
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmarks of SQL queries on struct columns in parquet data
19+
20+
use arrow::array::{ArrayRef, Int32Array, StringArray, StructArray};
21+
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
22+
use arrow::record_batch::RecordBatch;
23+
use criterion::{Criterion, criterion_group, criterion_main};
24+
use datafusion::prelude::SessionContext;
25+
use datafusion_common::instant::Instant;
26+
use parquet::arrow::ArrowWriter;
27+
use parquet::file::properties::{WriterProperties, WriterVersion};
28+
use rand::distr::Alphanumeric;
29+
use rand::prelude::*;
30+
use rand::rng;
31+
use std::hint::black_box;
32+
use std::ops::Range;
33+
use std::path::Path;
34+
use std::sync::Arc;
35+
use tempfile::NamedTempFile;
36+
use tokio::runtime::Runtime;
37+
38+
/// The number of batches to write
39+
const NUM_BATCHES: usize = 128;
40+
/// The number of rows in each record batch to write
41+
const WRITE_RECORD_BATCH_SIZE: usize = 4096;
42+
/// The number of rows in a row group
43+
const ROW_GROUP_SIZE: usize = 65536;
44+
/// The number of row groups expected
45+
const EXPECTED_ROW_GROUPS: usize = 8;
46+
/// The range for random string lengths
47+
const STRING_LENGTH_RANGE: Range<usize> = 50..200;
48+
49+
fn schema() -> SchemaRef {
50+
let struct_fields = Fields::from(vec![
51+
Field::new("id", DataType::Int32, false),
52+
Field::new("value", DataType::Utf8, false),
53+
]);
54+
let struct_type = DataType::Struct(struct_fields);
55+
56+
Arc::new(Schema::new(vec![
57+
Field::new("id", DataType::Int32, false),
58+
Field::new("s", struct_type, false),
59+
]))
60+
}
61+
62+
fn generate_strings(len: usize) -> ArrayRef {
63+
let mut rng = rng();
64+
Arc::new(StringArray::from_iter((0..len).map(|_| {
65+
let string_len = rng.random_range(STRING_LENGTH_RANGE.clone());
66+
Some(
67+
(0..string_len)
68+
.map(|_| char::from(rng.sample(Alphanumeric)))
69+
.collect::<String>(),
70+
)
71+
})))
72+
}
73+
74+
fn generate_batch(batch_id: usize) -> RecordBatch {
75+
let schema = schema();
76+
let len = WRITE_RECORD_BATCH_SIZE;
77+
78+
// Generate sequential IDs based on batch_id for uniqueness
79+
let base_id = (batch_id * len) as i32;
80+
let id_values: Vec<i32> = (0..len).map(|i| base_id + i as i32).collect();
81+
let id_array = Arc::new(Int32Array::from(id_values.clone()));
82+
83+
// Create struct id array (matching top-level id)
84+
let struct_id_array = Arc::new(Int32Array::from(id_values));
85+
86+
// Generate random strings for struct value field
87+
let value_array = generate_strings(len);
88+
89+
// Construct StructArray
90+
let struct_array = StructArray::from(vec![
91+
(
92+
Arc::new(Field::new("id", DataType::Int32, false)),
93+
struct_id_array as ArrayRef,
94+
),
95+
(
96+
Arc::new(Field::new("value", DataType::Utf8, false)),
97+
value_array,
98+
),
99+
]);
100+
101+
RecordBatch::try_new(schema, vec![id_array, Arc::new(struct_array)]).unwrap()
102+
}
103+
104+
fn generate_file() -> NamedTempFile {
105+
let now = Instant::now();
106+
let mut named_file = tempfile::Builder::new()
107+
.prefix("parquet_struct_query")
108+
.suffix(".parquet")
109+
.tempfile()
110+
.unwrap();
111+
112+
println!("Generating parquet file - {}", named_file.path().display());
113+
let schema = schema();
114+
115+
let properties = WriterProperties::builder()
116+
.set_writer_version(WriterVersion::PARQUET_2_0)
117+
.set_max_row_group_size(ROW_GROUP_SIZE)
118+
.build();
119+
120+
let mut writer =
121+
ArrowWriter::try_new(&mut named_file, schema, Some(properties)).unwrap();
122+
123+
for batch_id in 0..NUM_BATCHES {
124+
let batch = generate_batch(batch_id);
125+
writer.write(&batch).unwrap();
126+
}
127+
128+
let metadata = writer.close().unwrap();
129+
let file_metadata = metadata.file_metadata();
130+
let expected_rows = WRITE_RECORD_BATCH_SIZE * NUM_BATCHES;
131+
assert_eq!(
132+
file_metadata.num_rows() as usize,
133+
expected_rows,
134+
"Expected {} rows but got {}",
135+
expected_rows,
136+
file_metadata.num_rows()
137+
);
138+
assert_eq!(
139+
metadata.row_groups().len(),
140+
EXPECTED_ROW_GROUPS,
141+
"Expected {} row groups but got {}",
142+
EXPECTED_ROW_GROUPS,
143+
metadata.row_groups().len()
144+
);
145+
146+
println!(
147+
"Generated parquet file with {} rows and {} row groups in {} seconds",
148+
file_metadata.num_rows(),
149+
metadata.row_groups().len(),
150+
now.elapsed().as_secs_f32()
151+
);
152+
153+
named_file
154+
}
155+
156+
fn create_context(file_path: &str) -> SessionContext {
157+
let ctx = SessionContext::new();
158+
let rt = Runtime::new().unwrap();
159+
rt.block_on(ctx.register_parquet("t", file_path, Default::default()))
160+
.unwrap();
161+
ctx
162+
}
163+
164+
fn query(ctx: &SessionContext, rt: &Runtime, sql: &str) {
165+
let ctx = ctx.clone();
166+
let sql = sql.to_string();
167+
let df = rt.block_on(ctx.sql(&sql)).unwrap();
168+
black_box(rt.block_on(df.collect()).unwrap());
169+
}
170+
171+
fn criterion_benchmark(c: &mut Criterion) {
172+
let (file_path, temp_file) = match std::env::var("PARQUET_FILE") {
173+
Ok(file) => (file, None),
174+
Err(_) => {
175+
let temp_file = generate_file();
176+
(temp_file.path().display().to_string(), Some(temp_file))
177+
}
178+
};
179+
180+
assert!(Path::new(&file_path).exists(), "path not found");
181+
println!("Using parquet file {file_path}");
182+
183+
let ctx = create_context(&file_path);
184+
let rt = Runtime::new().unwrap();
185+
186+
// Basic struct access
187+
c.bench_function("struct_access", |b| {
188+
b.iter(|| query(&ctx, &rt, "select id, s['id'] from t"))
189+
});
190+
191+
// Filter queries
192+
c.bench_function("filter_struct_field_eq", |b| {
193+
b.iter(|| query(&ctx, &rt, "select id from t where s['id'] = 5"))
194+
});
195+
196+
c.bench_function("filter_struct_field_with_select", |b| {
197+
b.iter(|| query(&ctx, &rt, "select id, s['id'] from t where s['id'] = 5"))
198+
});
199+
200+
c.bench_function("filter_top_level_with_struct_select", |b| {
201+
b.iter(|| query(&ctx, &rt, "select s['id'] from t where id = 5"))
202+
});
203+
204+
c.bench_function("filter_struct_string_length", |b| {
205+
b.iter(|| query(&ctx, &rt, "select id from t where length(s['value']) > 100"))
206+
});
207+
208+
c.bench_function("filter_struct_range", |b| {
209+
b.iter(|| {
210+
query(
211+
&ctx,
212+
&rt,
213+
"select id from t where s['id'] > 100 and s['id'] < 200",
214+
)
215+
})
216+
});
217+
218+
// Join queries (limited with WHERE id < 1000 for performance)
219+
c.bench_function("join_struct_to_struct", |b| {
220+
b.iter(|| query(
221+
&ctx,
222+
&rt,
223+
"select t1.id from t t1 join t t2 on t1.s['id'] = t2.s['id'] where t1.id < 1000"
224+
))
225+
});
226+
227+
c.bench_function("join_struct_to_toplevel", |b| {
228+
b.iter(|| query(
229+
&ctx,
230+
&rt,
231+
"select t1.id from t t1 join t t2 on t1.s['id'] = t2.id where t1.id < 1000"
232+
))
233+
});
234+
235+
c.bench_function("join_toplevel_to_struct", |b| {
236+
b.iter(|| query(
237+
&ctx,
238+
&rt,
239+
"select t1.id from t t1 join t t2 on t1.id = t2.s['id'] where t1.id < 1000"
240+
))
241+
});
242+
243+
c.bench_function("join_struct_to_struct_with_top_level", |b| {
244+
b.iter(|| query(
245+
&ctx,
246+
&rt,
247+
"select t1.id from t t1 join t t2 on t1.s['id'] = t2.s['id'] and t1.id = t2.id where t1.id < 1000"
248+
))
249+
});
250+
251+
c.bench_function("join_struct_and_struct_value", |b| {
252+
b.iter(|| query(
253+
&ctx,
254+
&rt,
255+
"select t1.s['id'], t2.s['value'] from t t1 join t t2 on t1.id = t2.id where t1.id < 1000"
256+
))
257+
});
258+
259+
// Group by queries
260+
c.bench_function("group_by_struct_field", |b| {
261+
b.iter(|| query(&ctx, &rt, "select s['id'] from t group by s['id']"))
262+
});
263+
264+
c.bench_function("group_by_struct_select_toplevel", |b| {
265+
b.iter(|| query(&ctx, &rt, "select max(id) from t group by s['id']"))
266+
});
267+
268+
c.bench_function("group_by_toplevel_select_struct", |b| {
269+
b.iter(|| query(&ctx, &rt, "select max(s['id']) from t group by id"))
270+
});
271+
272+
c.bench_function("group_by_struct_with_count", |b| {
273+
b.iter(|| {
274+
query(
275+
&ctx,
276+
&rt,
277+
"select s['id'], count(*) from t group by s['id']",
278+
)
279+
})
280+
});
281+
282+
c.bench_function("group_by_multiple_with_count", |b| {
283+
b.iter(|| {
284+
query(
285+
&ctx,
286+
&rt,
287+
"select id, s['id'], count(*) from t group by id, s['id']",
288+
)
289+
})
290+
});
291+
292+
// Additional queries
293+
c.bench_function("order_by_struct_limit", |b| {
294+
b.iter(|| {
295+
query(
296+
&ctx,
297+
&rt,
298+
"select id, s['id'] from t order by s['id'] limit 1000",
299+
)
300+
})
301+
});
302+
303+
c.bench_function("distinct_struct_field", |b| {
304+
b.iter(|| query(&ctx, &rt, "select distinct s['id'] from t"))
305+
});
306+
307+
// Temporary file must outlive the benchmarks, it is deleted when dropped
308+
drop(temp_file);
309+
}
310+
311+
criterion_group!(benches, criterion_benchmark);
312+
criterion_main!(benches);

0 commit comments

Comments
 (0)