Skip to content

Commit df63590

Browse files
Dandandanclaude
andauthored
[Minor] Use per-predicate projection masks in arrow_reader_clickbench benchmark (#9413)
# Which issue does this PR close? - Closes #NNN. # Rationale for this change As suggested by Claude - currently it uses a projection mask for all columns, significantly slowing down queries that have multiple predicates. This makes it more in line with consumer side (e.g. DataFusion) (so we can more accurately benchmark improvements). It shows the perf difference in a number of (multi-filter) queries: ``` group clickbench-optimizations main arrow_reader_clickbench/async_object_store/Q22 1.00 151.8±6.46ms ? ?/sec 1.52 230.5±1.68ms ? ?/sec arrow_reader_clickbench/async_object_store/Q36 1.00 26.3±0.24ms ? ?/sec 4.30 113.1±0.67ms ? ?/sec arrow_reader_clickbench/async_object_store/Q37 1.00 9.3±0.06ms ? ?/sec 9.64 89.7±1.20ms ? ?/sec arrow_reader_clickbench/async_object_store/Q38 1.00 22.4±0.26ms ? ?/sec 1.44 32.3±0.29ms ? ?/sec arrow_reader_clickbench/async_object_store/Q39 1.00 38.1±0.66ms ? ?/sec 1.09 41.5±0.35ms ? ?/sec arrow_reader_clickbench/async_object_store/Q40 1.00 13.0±0.15ms ? ?/sec 2.96 38.6±0.45ms ? ?/sec arrow_reader_clickbench/async_object_store/Q41 1.00 10.1±0.11ms ? ?/sec 2.83 28.5±0.73ms ? ?/sec arrow_reader_clickbench/async_object_store/Q42 1.00 5.6±0.05ms ? ?/sec 1.87 10.5±0.12ms ? ?/sec ``` # What changes are included in this PR? # Are these changes tested? # Are there any user-facing changes? --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 39a2b71 commit df63590

1 file changed

Lines changed: 16 additions & 81 deletions

File tree

parquet/benches/arrow_reader_clickbench.rs

Lines changed: 16 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -638,66 +638,6 @@ fn find_file_if_exists(mut current_dir: PathBuf, file_name: &str) -> Option<Path
638638
None
639639
}
640640

641-
/// Represents a mapping from each column selected in the `ProjectionMask`
642-
/// created from `filter_columns`, to the corresponding index in the list of
643-
/// `filter_columns`?
644-
///
645-
/// # Example
646-
///
647-
/// If:
648-
/// * the file schema has columns `[A, B, C]`
649-
/// * `filter_columns` is `[C, A]`
650-
/// * ==> `ProjectionMask` will be `[true, false, true]` = `[A, C]`
651-
///
652-
/// `FilterIndices` will be `[1, 0]`, because column `C` (index 0 in
653-
/// filter_columns) is selected at index 1 of the `ProjectionMask` and column
654-
/// `A` (index 1 in `filter_columns`) is selected at index 0 of the
655-
/// `ProjectionMask`.
656-
struct FilterIndices {
657-
/// * index is offset in Query::filter_columns
658-
/// * value is offset in column selected by filter ProjectionMask
659-
inner: Vec<usize>,
660-
}
661-
662-
impl FilterIndices {
663-
/// Create a new `FilterIndices` from a list of column indices
664-
///
665-
/// Parameters:
666-
/// * `schema_descriptor`: The schema of the file
667-
/// * `filter_schema_indices`: a list of column indices in the schema
668-
fn new(schema_descriptor: &SchemaDescriptor, filter_schema_indices: Vec<usize>) -> Self {
669-
for &filter_index in &filter_schema_indices {
670-
assert!(filter_index < schema_descriptor.num_columns());
671-
}
672-
// When the columns are selected using a ProjectionMask, they are
673-
// returned in the order of the schema (not the order they were specified)
674-
//
675-
// So if the original schema indices are 5, 1, 3 (select the sixth and
676-
// second and fourth column), the RecordBatch returned will select them
677-
// in order 1, 3, 5,
678-
//
679-
// Thus we need a map to convert back to the original selection order
680-
// `[1, 2, 0]`
681-
let mut reordered: Vec<_> = filter_schema_indices.iter().enumerate().collect();
682-
reordered.sort_by_key(|(_projection_idx, original_schema_idx)| **original_schema_idx);
683-
let mut inner = vec![0; reordered.len()];
684-
for (output_idx, (projection_idx, _original_schema_idx)) in
685-
reordered.into_iter().enumerate()
686-
{
687-
inner[projection_idx] = output_idx;
688-
}
689-
Self { inner }
690-
}
691-
692-
/// Given the index of a column in `filter_columns`, return the index of the
693-
/// column in the columns selected from `ProjectionMask`
694-
fn map_column(&self, filter_columns_index: usize) -> usize {
695-
// The selection index is the index in the filter mask
696-
// The inner index is the index in the filter columns
697-
self.inner[filter_columns_index]
698-
}
699-
}
700-
701641
/// Encapsulates the test parameters for a single benchmark
702642
struct ReadTest {
703643
/// Human identifiable name
@@ -706,10 +646,8 @@ struct ReadTest {
706646
arrow_reader_metadata: ArrowReaderMetadata,
707647
/// Which columns in the file should be projected (decoded after filter)?
708648
projection_mask: ProjectionMask,
709-
/// Which columns in the file should be passed to the filter?
710-
filter_mask: ProjectionMask,
711-
/// Mapping from column selected in filter mask to `Query::filter_columns`
712-
filter_indices: FilterIndices,
649+
/// Schema indices for each filter column (in filter_columns order)
650+
filter_schema_indices: Vec<usize>,
713651
/// Predicates to apply
714652
predicates: Vec<ClickBenchPredicate>,
715653
/// How many rows are expected to pass the predicate?
@@ -744,16 +682,12 @@ impl ReadTest {
744682
};
745683

746684
let filter_schema_indices = column_indices(schema_descr, &filter_columns);
747-
let filter_mask =
748-
ProjectionMask::leaves(schema_descr, filter_schema_indices.iter().cloned());
749-
let filter_indices = FilterIndices::new(schema_descr, filter_schema_indices);
750685

751686
Self {
752687
name,
753688
arrow_reader_metadata,
754689
projection_mask,
755-
filter_mask,
756-
filter_indices,
690+
filter_schema_indices,
757691
predicates,
758692
expected_row_count,
759693
}
@@ -851,25 +785,26 @@ impl ReadTest {
851785

852786
/// Return a `RowFilter` to apply to the reader.
853787
///
854-
/// Note that since `RowFilter` does not implement Clone, we need to create
855-
/// the filter for each row
788+
/// Each predicate gets a ProjectionMask containing only the single column
789+
/// it needs, rather than all filter columns. This avoids decoding expensive
790+
/// columns (e.g. strings) when evaluating cheap predicates (e.g. integer equality).
856791
fn row_filter(&self) -> RowFilter {
857-
// Note: The predicates are in terms columns in the filter mask
858-
// but the record batch passed back has columns in the order of the file
859-
// schema
792+
let schema_descr = self
793+
.arrow_reader_metadata
794+
.metadata()
795+
.file_metadata()
796+
.schema_descr();
860797

861-
// Convert the predicates to ArrowPredicateFn to conform to the RowFilter API
862798
let arrow_predicates: Vec<_> = self
863799
.predicates
864800
.iter()
865801
.map(|pred| {
866-
let orig_column_index = pred.column_index();
867-
let column_index = self.filter_indices.map_column(orig_column_index);
802+
let schema_index = self.filter_schema_indices[pred.column_index()];
803+
let predicate_mask = ProjectionMask::leaves(schema_descr, [schema_index]);
868804
let mut predicate_fn = pred.predicate_fn();
869-
Box::new(ArrowPredicateFn::new(
870-
self.filter_mask.clone(),
871-
move |batch| (predicate_fn)(batch.column(column_index)),
872-
)) as Box<dyn ArrowPredicate>
805+
Box::new(ArrowPredicateFn::new(predicate_mask, move |batch| {
806+
(predicate_fn)(batch.column(0))
807+
})) as Box<dyn ArrowPredicate>
873808
})
874809
.collect();
875810

0 commit comments

Comments
 (0)