Skip to content

Commit 7ad929a

Browse files
Find the correct fields when using page filter on struct fields in parquet (#8848)
* Dont consider struct fields for filtering in parquet * use parquet_column instead of find_column_index. * Remove unused struct * Fix formatting issues. * Simplify struct field resolution * fix formatting * fmt --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent bc0ba6a commit 7ad929a

2 files changed

Lines changed: 90 additions & 29 deletions

File tree

datafusion/core/src/datasource/physical_plan/parquet/mod.rs

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,13 @@ impl FileOpener for ParquetOpener {
549549
// with that range can be skipped as well
550550
if enable_page_index && !row_groups.is_empty() {
551551
if let Some(p) = page_pruning_predicate {
552-
let pruned =
553-
p.prune(&row_groups, file_metadata.as_ref(), &file_metrics)?;
552+
let pruned = p.prune(
553+
&file_schema,
554+
builder.parquet_schema(),
555+
&row_groups,
556+
file_metadata.as_ref(),
557+
&file_metrics,
558+
)?;
554559
if let Some(row_selection) = pruned {
555560
builder = builder.with_row_selection(row_selection);
556561
}
@@ -782,7 +787,8 @@ mod tests {
782787
array::{Int64Array, Int8Array, StringArray},
783788
datatypes::{DataType, Field, SchemaBuilder},
784789
};
785-
use arrow_array::Date64Array;
790+
use arrow_array::{Date64Array, StructArray};
791+
use arrow_schema::Fields;
786792
use chrono::{TimeZone, Utc};
787793
use datafusion_common::{assert_contains, ToDFSchema};
788794
use datafusion_common::{FileType, GetExt, ScalarValue};
@@ -793,6 +799,7 @@ mod tests {
793799
use object_store::local::LocalFileSystem;
794800
use object_store::path::Path;
795801
use object_store::ObjectMeta;
802+
use parquet::arrow::ArrowWriter;
796803
use std::fs::{self, File};
797804
use std::io::Write;
798805
use tempfile::TempDir;
@@ -1765,12 +1772,14 @@ mod tests {
17651772

17661773
// assert the batches and some metrics
17671774
#[rustfmt::skip]
1768-
let expected = ["+-----+",
1775+
let expected = [
1776+
"+-----+",
17691777
"| int |",
17701778
"+-----+",
17711779
"| 4 |",
17721780
"| 5 |",
1773-
"+-----+"];
1781+
"+-----+"
1782+
];
17741783
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
17751784
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 4);
17761785
assert!(
@@ -2136,4 +2145,65 @@ mod tests {
21362145
let execution_props = ExecutionProps::new();
21372146
create_physical_expr(expr, &df_schema, &execution_props).unwrap()
21382147
}
2148+
2149+
#[tokio::test]
2150+
async fn test_struct_filter_parquet() -> Result<()> {
2151+
let tmp_dir = TempDir::new()?;
2152+
let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
2153+
write_file(&path);
2154+
let ctx = SessionContext::new();
2155+
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
2156+
ctx.register_listing_table("base_table", path, opt, None, None)
2157+
.await
2158+
.unwrap();
2159+
let sql = "select * from base_table where name='test02'";
2160+
let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
2161+
assert_eq!(batch.len(), 1);
2162+
let expected = [
2163+
"+---------------------+----+--------+",
2164+
"| struct | id | name |",
2165+
"+---------------------+----+--------+",
2166+
"| {id: 4, name: aaa2} | 2 | test02 |",
2167+
"+---------------------+----+--------+",
2168+
];
2169+
crate::assert_batches_eq!(expected, &batch);
2170+
Ok(())
2171+
}
2172+
2173+
fn write_file(file: &String) {
2174+
let struct_fields = Fields::from(vec![
2175+
Field::new("id", DataType::Int64, false),
2176+
Field::new("name", DataType::Utf8, false),
2177+
]);
2178+
let schema = Schema::new(vec![
2179+
Field::new("struct", DataType::Struct(struct_fields.clone()), false),
2180+
Field::new("id", DataType::Int64, true),
2181+
Field::new("name", DataType::Utf8, false),
2182+
]);
2183+
let id_array = Int64Array::from(vec![Some(1), Some(2)]);
2184+
let columns = vec![
2185+
Arc::new(Int64Array::from(vec![3, 4])) as _,
2186+
Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _,
2187+
];
2188+
let struct_array = StructArray::new(struct_fields, columns, None);
2189+
2190+
let name_array = StringArray::from(vec![Some("test01"), Some("test02")]);
2191+
let schema = Arc::new(schema);
2192+
2193+
let batch = RecordBatch::try_new(
2194+
schema.clone(),
2195+
vec![
2196+
Arc::new(struct_array),
2197+
Arc::new(id_array),
2198+
Arc::new(name_array),
2199+
],
2200+
)
2201+
.unwrap();
2202+
let file = File::create(file).unwrap();
2203+
let w_opt = WriterProperties::builder().build();
2204+
let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap();
2205+
writer.write(&batch).unwrap();
2206+
writer.flush().unwrap();
2207+
writer.close().unwrap();
2208+
}
21392209
}

datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ use arrow::array::{
2323
};
2424
use arrow::datatypes::DataType;
2525
use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
26+
use arrow_schema::Schema;
2627
use datafusion_common::{DataFusionError, Result, ScalarValue};
2728
use datafusion_physical_expr::expressions::Column;
2829
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
2930
use log::{debug, trace};
30-
use parquet::schema::types::ColumnDescriptor;
31+
use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
3132
use parquet::{
3233
arrow::arrow_reader::{RowSelection, RowSelector},
3334
errors::ParquetError,
@@ -41,7 +42,9 @@ use std::collections::HashSet;
4142
use std::sync::Arc;
4243

4344
use crate::datasource::physical_plan::parquet::parquet_to_arrow_decimal_type;
44-
use crate::datasource::physical_plan::parquet::statistics::from_bytes_to_i128;
45+
use crate::datasource::physical_plan::parquet::statistics::{
46+
from_bytes_to_i128, parquet_column,
47+
};
4548
use crate::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
4649

4750
use super::metrics::ParquetFileMetrics;
@@ -128,6 +131,8 @@ impl PagePruningPredicate {
128131
/// Returns a [`RowSelection`] for the given file
129132
pub fn prune(
130133
&self,
134+
arrow_schema: &Schema,
135+
parquet_schema: &SchemaDescriptor,
131136
row_groups: &[usize],
132137
file_metadata: &ParquetMetaData,
133138
file_metrics: &ParquetFileMetrics,
@@ -163,9 +168,8 @@ impl PagePruningPredicate {
163168

164169
let mut row_selections = Vec::with_capacity(page_index_predicates.len());
165170
for predicate in page_index_predicates {
166-
// find column index by looking in the row group metadata.
167-
let col_idx = find_column_index(predicate, &groups[0]);
168-
171+
// find column index in the parquet schema
172+
let col_idx = find_column_index(predicate, arrow_schema, parquet_schema);
169173
let mut selectors = Vec::with_capacity(row_groups.len());
170174
for r in row_groups.iter() {
171175
let row_group_metadata = &groups[*r];
@@ -231,7 +235,7 @@ impl PagePruningPredicate {
231235
}
232236
}
233237

234-
/// Returns the column index in the row group metadata for the single
238+
/// Returns the column index in the row parquet schema for the single
235239
/// column of a single column pruning predicate.
236240
///
237241
/// For example, give the predicate `y > 5`
@@ -246,12 +250,12 @@ impl PagePruningPredicate {
246250
/// Panics:
247251
///
248252
/// If the predicate contains more than one column reference (assumes
249-
/// that `extract_page_index_push_down_predicates` only return
253+
/// that `extract_page_index_push_down_predicates` only returns
250254
/// predicate with one col)
251-
///
252255
fn find_column_index(
253256
predicate: &PruningPredicate,
254-
row_group_metadata: &RowGroupMetaData,
257+
arrow_schema: &Schema,
258+
parquet_schema: &SchemaDescriptor,
255259
) -> Option<usize> {
256260
let mut found_required_column: Option<&Column> = None;
257261

@@ -269,25 +273,12 @@ fn find_column_index(
269273
}
270274
}
271275

272-
let column = if let Some(found_required_column) = found_required_column.as_ref() {
273-
found_required_column
274-
} else {
276+
let Some(column) = found_required_column.as_ref() else {
275277
trace!("No column references in pruning predicate");
276278
return None;
277279
};
278280

279-
let col_idx = row_group_metadata
280-
.columns()
281-
.iter()
282-
.enumerate()
283-
.find(|(_idx, c)| c.column_descr().name() == column.name())
284-
.map(|(idx, _c)| idx);
285-
286-
if col_idx.is_none() {
287-
trace!("Can not find column {} in row group meta", column.name());
288-
}
289-
290-
col_idx
281+
parquet_column(parquet_schema, arrow_schema, column.name()).map(|x| x.0)
291282
}
292283

293284
/// Intersects the [`RowSelector`]s

0 commit comments

Comments
 (0)