Skip to content

Commit 1922c7e

Browse files
committed
refactor compute_file_group_statistics
1 parent d4b9209 commit 1922c7e

3 files changed

Lines changed: 63 additions & 92 deletions

File tree

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,12 +1127,12 @@ impl ListingTable {
11271127
get_files_with_limit(files, limit, self.options.collect_stat).await?;
11281128

11291129
let file_groups = file_group.split_files(self.options.target_partitions);
1130-
compute_all_files_statistics(
1130+
Ok(compute_all_files_statistics(
11311131
file_groups,
11321132
self.schema(),
11331133
self.options.collect_stat,
11341134
inexact_stats,
1135-
)
1135+
))
11361136
}
11371137

11381138
/// Collects statistics for a given partitioned file.

datafusion/datasource/src/file_groups.rs

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
//! Logic for managing groups of [`PartitionedFile`]s in DataFusion
1919
20+
use crate::statistics::compute_file_group_statistics;
2021
use crate::{FileRange, PartitionedFile};
2122
use datafusion_common::Statistics;
2223
use itertools::Itertools;
@@ -199,11 +200,23 @@ impl FileGroupPartitioner {
199200
}
200201

201202
// special case when order must be preserved
202-
if self.preserve_order_within_groups {
203+
let repartitioned_groups = if self.preserve_order_within_groups {
203204
self.repartition_preserving_order(file_groups)
204205
} else {
205206
self.repartition_evenly_by_size(file_groups)
207+
};
208+
209+
if repartitioned_groups.is_none() {
210+
return None;
211+
}
212+
213+
let repartitioned_groups = repartitioned_groups.unwrap();
214+
// Recompute statistics for each file group
215+
let mut groups = Vec::with_capacity(repartitioned_groups.len());
216+
for file_group in repartitioned_groups {
217+
groups.push(compute_file_group_statistics(file_group, true));
206218
}
219+
Some(groups)
207220
}
208221

209222
/// Evenly repartition files across partitions by size, ignoring any
@@ -264,21 +277,7 @@ impl FileGroupPartitioner {
264277
.flatten()
265278
.chunk_by(|(partition_idx, _)| *partition_idx)
266279
.into_iter()
267-
.map(|(_, group)| {
268-
FileGroup::new(
269-
group
270-
.map(|(_, vals)| {
271-
if let Some(stat) = vals.statistics.clone() {
272-
vals.with_statistics(Arc::new(
273-
stat.as_ref().clone().to_inexact(),
274-
))
275-
} else {
276-
vals
277-
}
278-
})
279-
.collect_vec(),
280-
)
281-
})
280+
.map(|(_, group)| FileGroup::new(group.map(|(_, vals)| vals).collect_vec()))
282281
.collect_vec();
283282

284283
Some(repartitioned_files)
@@ -376,10 +375,6 @@ impl FileGroupPartitioner {
376375
} else {
377376
target_group.push(updated_file);
378377
}
379-
if let Some(statistics) = target_group.statistics.as_mut() {
380-
// Todo: maybe we can evaluate the statistics by range in the future
381-
*statistics = statistics.clone().to_inexact()
382-
}
383378
range_start = range_end;
384379
range_end += range_size;
385380
}
@@ -972,8 +967,7 @@ mod test {
972967
}
973968

974969
#[test]
975-
fn repartition_with_statistics_and_with_preserve_order_within_groups(
976-
) -> datafusion_common::Result<()> {
970+
fn repartition_file_groups_with_statistics() -> datafusion_common::Result<()> {
977971
// Create test files
978972
let mut file1 = pfile("a", 100);
979973
let mut file2 = pfile("b", 50);
@@ -1040,21 +1034,18 @@ mod test {
10401034
assert!(!stats.column_statistics[0].max_value.is_exact().unwrap());
10411035
}
10421036

1043-
for (idx, group) in repartitioned.into_iter().enumerate() {
1037+
for group in repartitioned.into_iter() {
10441038
// Check all files have inexact statistics regardless of group
10451039
for file in group.files.iter() {
10461040
let stats = file.statistics.as_ref().unwrap();
10471041
assert_stats_are_inexact(stats);
10481042
}
10491043

1050-
// Check group statistics based on index
1051-
if idx == 0 || idx == 1 {
1052-
let stats = group.statistics.unwrap();
1053-
assert_stats_are_inexact(&stats);
1054-
} else if idx == 2 {
1055-
assert!(group.statistics.is_none());
1056-
}
1044+
let stats = group.statistics.unwrap();
1045+
assert_stats_are_inexact(&stats);
10571046
}
1047+
1048+
// Check the specific statistics for each partitioned file and each group
10581049
Ok(())
10591050
}
10601051

datafusion/datasource/src/statistics.rs

Lines changed: 40 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -410,23 +410,24 @@ pub async fn get_statistics_with_limit(
410410
}
411411

412412
/// Generic function to compute statistics across multiple items that have statistics
413-
fn compute_summary_statistics<T, I>(
413+
/// If `items` is empty or all items don't have statistics, it returns `None`.
414+
pub fn compute_summary_statistics<T, I>(
414415
items: I,
415-
file_schema: &SchemaRef,
416416
stats_extractor: impl Fn(&T) -> Option<&Statistics>,
417-
) -> Statistics
417+
) -> Option<Statistics>
418418
where
419419
I: IntoIterator<Item = T>,
420420
{
421-
let size = file_schema.fields().len();
422-
let mut col_stats_set = vec![ColumnStatistics::default(); size];
421+
let mut col_stats_set = Vec::new();
423422
let mut num_rows = Precision::<usize>::Absent;
424423
let mut total_byte_size = Precision::<usize>::Absent;
425424

426-
for (idx, item) in items.into_iter().enumerate() {
425+
for item in items.into_iter() {
427426
if let Some(item_stats) = stats_extractor(&item) {
428-
if idx == 0 {
427+
if col_stats_set.is_empty() {
429428
// First item, set values directly
429+
col_stats_set =
430+
vec![ColumnStatistics::default(); item_stats.column_statistics.len()];
430431
num_rows = item_stats.num_rows;
431432
total_byte_size = item_stats.total_byte_size;
432433
for (index, column_stats) in
@@ -458,11 +459,15 @@ where
458459
}
459460
}
460461

461-
Statistics {
462+
if col_stats_set.is_empty() {
463+
// No statistics available
464+
return None;
465+
}
466+
Some(Statistics {
462467
num_rows,
463468
total_byte_size,
464469
column_statistics: col_stats_set,
465-
}
470+
})
466471
}
467472

468473
/// Computes the summary statistics for a group of files(`FileGroup` level's statistics).
@@ -479,22 +484,24 @@ where
479484
/// * `collect_stats` - Whether to collect statistics (if false, returns original file group)
480485
///
481486
/// # Returns
482-
/// A new file group with summary statistics attached
487+
/// A new file group with summary statistics attached if there is statistics
483488
pub fn compute_file_group_statistics(
484-
file_group: FileGroup,
485-
file_schema: SchemaRef,
489+
mut file_group: FileGroup,
486490
collect_stats: bool,
487-
) -> Result<FileGroup> {
491+
) -> FileGroup {
488492
if !collect_stats {
489-
return Ok(file_group);
493+
return file_group;
490494
}
491495

492-
let statistics =
493-
compute_summary_statistics(file_group.iter(), &file_schema, |file| {
494-
file.statistics.as_ref().map(|stats| stats.as_ref())
495-
});
496+
let statistics = compute_summary_statistics(file_group.iter(), |file| {
497+
file.statistics.as_ref().map(|stats| stats.as_ref())
498+
});
499+
500+
if let Some(stats) = statistics {
501+
file_group = file_group.with_statistics(stats);
502+
}
496503

497-
Ok(file_group.with_statistics(Arc::new(statistics)))
504+
file_group
498505
}
499506

500507
/// Computes statistics for all files across multiple file groups.
@@ -519,29 +526,30 @@ pub fn compute_all_files_statistics(
519526
file_schema: SchemaRef,
520527
collect_stats: bool,
521528
inexact_stats: bool,
522-
) -> Result<(Vec<FileGroup>, Statistics)> {
529+
) -> (Vec<FileGroup>, Statistics) {
530+
if !collect_stats {
531+
return (file_groups, Statistics::new_unknown(&file_schema));
532+
}
523533
let mut file_groups_with_stats = Vec::with_capacity(file_groups.len());
524534

525535
// First compute statistics for each file group
526536
for file_group in file_groups {
527-
file_groups_with_stats.push(compute_file_group_statistics(
528-
file_group,
529-
Arc::clone(&file_schema),
530-
collect_stats,
531-
)?);
537+
file_groups_with_stats
538+
.push(compute_file_group_statistics(file_group, collect_stats));
532539
}
533540

534541
// Then summary statistics across all file groups
535542
let mut statistics =
536-
compute_summary_statistics(&file_groups_with_stats, &file_schema, |file_group| {
543+
compute_summary_statistics(&file_groups_with_stats, |file_group| {
537544
file_group.statistics()
538-
});
545+
})
546+
.unwrap_or(Statistics::new_unknown(&file_schema));
539547

540548
if inexact_stats {
541549
statistics = statistics.to_inexact()
542550
}
543551

544-
Ok((file_groups_with_stats, statistics))
552+
(file_groups_with_stats, statistics)
545553
}
546554

547555
pub fn add_row_stats(
@@ -620,18 +628,11 @@ fn set_min_if_lesser(
620628
#[cfg(test)]
621629
mod tests {
622630
use super::*;
623-
use arrow::datatypes::{DataType, Field, Schema};
624631
use datafusion_common::ScalarValue;
625632
use std::sync::Arc;
626633

627634
#[test]
628635
fn test_compute_summary_statistics_basic() {
629-
// Create a schema with two columns
630-
let schema = Arc::new(Schema::new(vec![
631-
Field::new("col1", DataType::Int32, false),
632-
Field::new("col2", DataType::Int32, false),
633-
]));
634-
635636
// Create items with statistics
636637
let stats1 = Statistics {
637638
num_rows: Precision::Exact(10),
@@ -679,7 +680,7 @@ mod tests {
679680

680681
// Call compute_summary_statistics
681682
let summary_stats =
682-
compute_summary_statistics(items, &schema, |item| Some(item.as_ref()));
683+
compute_summary_statistics(items, |item| Some(item.as_ref())).unwrap();
683684

684685
// Verify the results
685686
assert_eq!(summary_stats.num_rows, Precision::Exact(25)); // 10 + 15
@@ -719,13 +720,6 @@ mod tests {
719720

720721
#[test]
721722
fn test_compute_summary_statistics_mixed_precision() {
722-
// Create a schema with one column
723-
let schema = Arc::new(Schema::new(vec![Field::new(
724-
"col1",
725-
DataType::Int32,
726-
false,
727-
)]));
728-
729723
// Create items with different precision levels
730724
let stats1 = Statistics {
731725
num_rows: Precision::Exact(10),
@@ -754,7 +748,7 @@ mod tests {
754748
let items = vec![Arc::new(stats1), Arc::new(stats2)];
755749

756750
let summary_stats =
757-
compute_summary_statistics(items, &schema, |item| Some(item.as_ref()));
751+
compute_summary_statistics(items, |item| Some(item.as_ref())).unwrap();
758752

759753
assert_eq!(summary_stats.num_rows, Precision::Inexact(25));
760754
assert_eq!(summary_stats.total_byte_size, Precision::Inexact(250));
@@ -774,25 +768,11 @@ mod tests {
774768

775769
#[test]
776770
fn test_compute_summary_statistics_empty() {
777-
let schema = Arc::new(Schema::new(vec![Field::new(
778-
"col1",
779-
DataType::Int32,
780-
false,
781-
)]));
782-
783771
// Empty collection
784772
let items: Vec<Arc<Statistics>> = vec![];
785773

786-
let summary_stats =
787-
compute_summary_statistics(items, &schema, |item| Some(item.as_ref()));
774+
let summary_stats = compute_summary_statistics(items, |item| Some(item.as_ref()));
788775

789-
// Verify default values for empty collection
790-
assert_eq!(summary_stats.num_rows, Precision::Absent);
791-
assert_eq!(summary_stats.total_byte_size, Precision::Absent);
792-
assert_eq!(summary_stats.column_statistics.len(), 1);
793-
assert_eq!(
794-
summary_stats.column_statistics[0].null_count,
795-
Precision::Absent
796-
);
776+
assert!(summary_stats.is_none());
797777
}
798778
}

0 commit comments

Comments
 (0)