diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 304058650164f..b165070c60605 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -942,7 +942,7 @@ version = "3.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro-error", "proc-macro2", "quote", @@ -976,7 +976,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7" dependencies = [ "strum 0.26.2", - "strum_macros 0.26.2", + "strum_macros 0.26.4", "unicode-width", ] @@ -1156,6 +1156,7 @@ dependencies = [ "object_store", "parking_lot", "parquet", + "paste", "pin-project-lite", "rand", "sqlparser", @@ -1255,7 +1256,7 @@ dependencies = [ "serde_json", "sqlparser", "strum 0.26.2", - "strum_macros 0.26.2", + "strum_macros 0.26.4", ] [[package]] @@ -1809,6 +1810,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1881,9 +1888,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.28" +version = "0.14.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf96e135eb83a2a8ddf766e426a841d8ddd7449d5f00d34ea02b41d2f19eef80" +checksum = "f361cde2f109281a220d4307746cdfd5ee3f410da58a70377762396775634b33" dependencies = [ "bytes", "futures-channel", @@ -2683,9 +2690,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.84" +version = "1.0.85" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec96c6a92621310b51366f1e28d05ef11489516e93be030060e5fc12024a49d6" +checksum = "22244ce15aa966053a896d1accb3a6e68469b97c7f33f284b99f0d576879fc23" dependencies = [ "unicode-ident", ] @@ -3218,7 +3225,7 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "990079665f075b699031e9c08fd3ab99be5029b96f3b78dc0709e8f77e4efebf" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "syn 1.0.109", @@ -3303,7 +3310,7 @@ version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" dependencies = [ - "strum_macros 0.26.2", + "strum_macros 0.26.4", ] [[package]] @@ -3312,7 +3319,7 @@ version = "0.25.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" dependencies = [ - "heck", + "heck 0.4.1", "proc-macro2", "quote", "rustversion", @@ -3321,11 +3328,11 @@ dependencies = [ [[package]] name = "strum_macros" -version = "0.26.2" +version = "0.26.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6cf59daf282c0a494ba14fd21610a0325f9f90ec9d1231dea26bcb1d696c946" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", "rustversion", @@ -3711,9 +3718,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-width" -version = "0.1.12" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68f5e5f3158ecfd4b8ff6fe086db7c8467a2dfdac97fe420f2b7c4aa97af66d6" +checksum = "0336d538f7abc86d282a4189614dfaa90810dfc2c6f6427eaf88e16311dd225d" [[package]] name = "untrusted" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 9f1f7484357b2..3946758ff9370 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -121,6 +121,7 @@ num_cpus = { workspace = true } object_store = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true, optional = true, default-features = true } +paste = "1.0.15" pin-project-lite = "^0.2.7" rand = { workspace = true } sqlparser = { workspace = true } diff --git a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs index 8d0d30bf41fc6..a73538d02a7fa 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/statistics.rs @@ -19,17 +19,26 @@ // TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328 -use arrow::{array::ArrayRef, datatypes::i256, datatypes::DataType, datatypes::TimeUnit}; -use arrow_array::{new_empty_array, new_null_array, UInt64Array}; -use arrow_schema::{Field, FieldRef, Schema}; -use datafusion_common::{ - internal_datafusion_err, internal_err, plan_err, Result, ScalarValue, +use arrow::datatypes::i256; +use arrow::{array::ArrayRef, datatypes::DataType}; +use arrow_array::{ + new_null_array, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, + Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, LargeStringArray, + StringArray, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, + Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, + TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, + UInt64Array, UInt8Array, }; +use arrow_schema::{Field, FieldRef, Schema, TimeUnit}; +use datafusion_common::{internal_datafusion_err, internal_err, plan_err, Result}; use half::f16; use parquet::file::metadata::ParquetMetaData; use parquet::file::statistics::Statistics as ParquetStatistics; use parquet::schema::types::SchemaDescriptor; +use paste::paste; use std::sync::Arc; + // Convert the bytes array to i128. // The endian of the input bytes array must be big-endian. pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 { @@ -66,201 +75,446 @@ pub fn sign_extend_be(b: &[u8]) -> [u8; N] { result } -/// Extract a single min/max statistics from a [`ParquetStatistics`] object +/// Define an adapter iterator for extracting statistics from an iterator of +/// `ParquetStatistics` +/// +/// +/// Handles checking if the statistics are present and valid with the correct type. /// -/// * `$column_statistics` is the `ParquetStatistics` object -/// * `$func is the function` (`min`/`max`) to call to get the value -/// * `$bytes_func` is the function (`min_bytes`/`max_bytes`) to call to get the value as bytes -/// * `$target_arrow_type` is the [`DataType`] of the target statistics -macro_rules! get_statistic { - ($column_statistics:expr, $func:ident, $bytes_func:ident, $target_arrow_type:expr) => {{ - if !$column_statistics.has_min_max_set() { - return None; +/// Parameters: +/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`) +/// * `$func` is the function to call to get the value (e.g. `min` or `max`) +/// * `$parquet_statistics_type` is the type of the statistics (e.g. `ParquetStatistics::Boolean`) +/// * `$stat_value_type` is the type of the statistics value (e.g. `bool`) +macro_rules! make_stats_iterator { + ($iterator_type:ident, $func:ident, $parquet_statistics_type:path, $stat_value_type:ty) => { + /// Maps an iterator of `ParquetStatistics` into an iterator of + /// `&$stat_value_type`` + /// + /// Yielded elements: + /// * Some(stats) if valid + /// * None if the statistics are not present, not valid, or not $stat_value_type + struct $iterator_type<'a, I> + where + I: Iterator>, + { + iter: I, } - match $column_statistics { - ParquetStatistics::Boolean(s) => Some(ScalarValue::Boolean(Some(*s.$func()))), - ParquetStatistics::Int32(s) => { - match $target_arrow_type { - // int32 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - *precision, - *scale, - )) - } - Some(DataType::Decimal256(precision, scale)) => { - Some(ScalarValue::Decimal256( - Some(i256::from(*s.$func())), - *precision, - *scale, - )) - } - Some(DataType::Int8) => { - Some(ScalarValue::Int8(Some((*s.$func()).try_into().unwrap()))) - } - Some(DataType::Int16) => { - Some(ScalarValue::Int16(Some((*s.$func()).try_into().unwrap()))) - } - Some(DataType::UInt8) => { - Some(ScalarValue::UInt8(Some((*s.$func()).try_into().unwrap()))) - } - Some(DataType::UInt16) => { - Some(ScalarValue::UInt16(Some((*s.$func()).try_into().unwrap()))) - } - Some(DataType::UInt32) => { - Some(ScalarValue::UInt32(Some((*s.$func()) as u32))) - } - Some(DataType::Date32) => { - Some(ScalarValue::Date32(Some(*s.$func()))) - } - Some(DataType::Date64) => { - Some(ScalarValue::Date64(Some(i64::from(*s.$func()) * 24 * 60 * 60 * 1000))) - } - Some(DataType::Time32(TimeUnit::Second)) => { - Some(ScalarValue::Time32Second(Some((*s.$func())))) - } - Some(DataType::Time32(TimeUnit::Millisecond)) => { - Some(ScalarValue::Time32Millisecond(Some((*s.$func())))) - } - _ => Some(ScalarValue::Int32(Some(*s.$func()))), - } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator>, + { + /// Create a new iterator to extract the statistics + fn new(iter: I) -> Self { + Self { iter } } - ParquetStatistics::Int64(s) => { - match $target_arrow_type { - // int64 to decimal with the precision and scale - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(*s.$func() as i128), - *precision, - *scale, - )) - } - Some(DataType::Decimal256(precision, scale)) => { - Some(ScalarValue::Decimal256( - Some(i256::from(*s.$func())), - *precision, - *scale, - )) - } - Some(DataType::UInt64) => { - Some(ScalarValue::UInt64(Some((*s.$func()) as u64))) - } - Some(DataType::Time64(TimeUnit::Microsecond)) => { - Some(ScalarValue::Time64Microsecond(Some((*s.$func() as i64)))) - } - Some(DataType::Time64(TimeUnit::Nanosecond)) => { - Some(ScalarValue::Time64Nanosecond(Some((*s.$func() as i64)))) + } + + /// Implement the Iterator trait for the iterator + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator>, + { + type Item = Option<&'a $stat_value_type>; + + /// return the next statistics value + fn next(&mut self) -> Option { + let next = self.iter.next(); + next.map(|x| { + x.and_then(|stats| match stats { + $parquet_statistics_type(s) if stats.has_min_max_set() => { + Some(s.$func()) + } + _ => None, + }) + }) + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + } + }; +} + +make_stats_iterator!( + MinBooleanStatsIterator, + min, + ParquetStatistics::Boolean, + bool +); +make_stats_iterator!( + MaxBooleanStatsIterator, + max, + ParquetStatistics::Boolean, + bool +); +make_stats_iterator!(MinInt32StatsIterator, min, ParquetStatistics::Int32, i32); +make_stats_iterator!(MaxInt32StatsIterator, max, ParquetStatistics::Int32, i32); +make_stats_iterator!(MinInt64StatsIterator, min, ParquetStatistics::Int64, i64); +make_stats_iterator!(MaxInt64StatsIterator, max, ParquetStatistics::Int64, i64); +make_stats_iterator!(MinFloatStatsIterator, min, ParquetStatistics::Float, f32); +make_stats_iterator!(MaxFloatStatsIterator, max, ParquetStatistics::Float, f32); +make_stats_iterator!(MinDoubleStatsIterator, min, ParquetStatistics::Double, f64); +make_stats_iterator!(MaxDoubleStatsIterator, max, ParquetStatistics::Double, f64); +make_stats_iterator!( + MinByteArrayStatsIterator, + min_bytes, + ParquetStatistics::ByteArray, + [u8] +); +make_stats_iterator!( + MaxByteArrayStatsIterator, + max_bytes, + ParquetStatistics::ByteArray, + [u8] +); +make_stats_iterator!( + MinFixedLenByteArrayStatsIterator, + min_bytes, + ParquetStatistics::FixedLenByteArray, + [u8] +); +make_stats_iterator!( + MaxFixedLenByteArrayStatsIterator, + max_bytes, + ParquetStatistics::FixedLenByteArray, + [u8] +); + +/// Special iterator adapter for extracting i128 values from from an iterator of +/// `ParquetStatistics` +/// +/// Handles checking if the statistics are present and valid with the correct type. +/// +/// Depending on the parquet file, the statistics for `Decimal128` can be stored as +/// `Int32`, `Int64` or `ByteArray` or `FixedSizeByteArray` :mindblown: +/// +/// This iterator handles all cases, extracting the values +/// and converting it to `stat_value_type`. +/// +/// Parameters: +/// * `$iterator_type` is the name of the iterator type (e.g. `MinBooleanStatsIterator`) +/// * `$func` is the function to call to get the value (e.g. `min` or `max`) +/// * `$bytes_func` is the function to call to get the value as bytes (e.g. `min_bytes` or `max_bytes`) +/// * `$stat_value_type` is the type of the statistics value (e.g. `i128`) +/// * `convert_func` is the function to convert the bytes to stats value (e.g. `from_bytes_to_i128`) +macro_rules! make_decimal_stats_iterator { + ($iterator_type:ident, $func:ident, $bytes_func:ident, $stat_value_type:ident, $convert_func: ident) => { + struct $iterator_type<'a, I> + where + I: Iterator>, + { + iter: I, + } + + impl<'a, I> $iterator_type<'a, I> + where + I: Iterator>, + { + fn new(iter: I) -> Self { + Self { iter } + } + } + + impl<'a, I> Iterator for $iterator_type<'a, I> + where + I: Iterator>, + { + type Item = Option<$stat_value_type>; + + fn next(&mut self) -> Option { + let next = self.iter.next(); + next.map(|x| { + x.and_then(|stats| { + if !stats.has_min_max_set() { + return None; + } + match stats { + ParquetStatistics::Int32(s) => { + Some($stat_value_type::from(*s.$func())) + } + ParquetStatistics::Int64(s) => { + Some($stat_value_type::from(*s.$func())) + } + ParquetStatistics::ByteArray(s) => { + Some($convert_func(s.$bytes_func())) + } + ParquetStatistics::FixedLenByteArray(s) => { + Some($convert_func(s.$bytes_func())) + } + _ => None, + } + }) + }) + } + + fn size_hint(&self) -> (usize, Option) { + self.iter.size_hint() + } + } + }; +} + +make_decimal_stats_iterator!( + MinDecimal128StatsIterator, + min, + min_bytes, + i128, + from_bytes_to_i128 +); +make_decimal_stats_iterator!( + MaxDecimal128StatsIterator, + max, + max_bytes, + i128, + from_bytes_to_i128 +); +make_decimal_stats_iterator!( + MinDecimal256StatsIterator, + min, + min_bytes, + i256, + from_bytes_to_i256 +); +make_decimal_stats_iterator!( + MaxDecimal256StatsIterator, + max, + max_bytes, + i256, + from_bytes_to_i256 +); + +/// Special macro to combine the statistics iterators for min and max using the [`mod@paste`] macro. +/// This is used to avoid repeating the same code for min and max statistics extractions +/// +/// Parameters: +/// stat_type_prefix: The prefix of the statistics iterator type (e.g. `Min` or `Max`) +/// data_type: The data type of the statistics (e.g. `DataType::Int32`) +/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from. +macro_rules! get_statistics { + ($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => { + paste! { + match $data_type { + DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter( + [<$stat_type_prefix BooleanStatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Int8 => Ok(Arc::new(Int8Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + if let Ok(v) = i8::try_from(*x) { + Some(v) + } else { + None + } + }) + }), + ))), + DataType::Int16 => Ok(Arc::new(Int16Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + if let Ok(v) = i16::try_from(*x) { + Some(v) + } else { + None + } + }) + }), + ))), + DataType::Int32 => Ok(Arc::new(Int32Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Int64 => Ok(Arc::new(Int64Array::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::UInt8 => Ok(Arc::new(UInt8Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + if let Ok(v) = u8::try_from(*x) { + Some(v) + } else { + None + } + }) + }), + ))), + DataType::UInt16 => Ok(Arc::new(UInt16Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + if let Ok(v) = u16::try_from(*x) { + Some(v) + } else { + None + } + }) + }), + ))), + DataType::UInt32 => Ok(Arc::new(UInt32Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u32)), + ))), + DataType::UInt64 => Ok(Arc::new(UInt64Array::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.map(|x| *x as u64)), + ))), + DataType::Float16 => Ok(Arc::new(Float16Array::from_iter( + [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| x.and_then(|x| { + from_bytes_to_f16(x) + })), + ))), + DataType::Float32 => Ok(Arc::new(Float32Array::from_iter( + [<$stat_type_prefix FloatStatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Float64 => Ok(Arc::new(Float64Array::from_iter( + [<$stat_type_prefix DoubleStatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Date32 => Ok(Arc::new(Date32Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + ))), + DataType::Date64 => Ok(Arc::new(Date64Array::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator) + .map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)), + ))), + DataType::Timestamp(unit, timezone) =>{ + let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()); + + Ok(match unit { + TimeUnit::Second => { + Arc::new(match timezone { + Some(tz) => TimestampSecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampSecondArray::from_iter(iter), + }) } - Some(DataType::Timestamp(unit, timezone)) => { - Some(match unit { - TimeUnit::Second => ScalarValue::TimestampSecond( - Some(*s.$func()), - timezone.clone(), - ), - TimeUnit::Millisecond => ScalarValue::TimestampMillisecond( - Some(*s.$func()), - timezone.clone(), - ), - TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond( - Some(*s.$func()), - timezone.clone(), - ), - TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond( - Some(*s.$func()), - timezone.clone(), - ), + TimeUnit::Millisecond => { + Arc::new(match timezone { + Some(tz) => TimestampMillisecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampMillisecondArray::from_iter(iter), }) } - _ => Some(ScalarValue::Int64(Some(*s.$func()))), - } - } - // 96 bit ints not supported - ParquetStatistics::Int96(_) => None, - ParquetStatistics::Float(s) => Some(ScalarValue::Float32(Some(*s.$func()))), - ParquetStatistics::Double(s) => Some(ScalarValue::Float64(Some(*s.$func()))), - ParquetStatistics::ByteArray(s) => { - match $target_arrow_type { - // decimal data type - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(from_bytes_to_i128(s.$bytes_func())), - *precision, - *scale, - )) + TimeUnit::Microsecond => { + Arc::new(match timezone { + Some(tz) => TimestampMicrosecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampMicrosecondArray::from_iter(iter), + }) } - Some(DataType::Decimal256(precision, scale)) => { - Some(ScalarValue::Decimal256( - Some(from_bytes_to_i256(s.$bytes_func())), - *precision, - *scale, - )) + TimeUnit::Nanosecond => { + Arc::new(match timezone { + Some(tz) => TimestampNanosecondArray::from_iter(iter).with_timezone(tz.clone()), + None => TimestampNanosecondArray::from_iter(iter), + }) } - Some(DataType::Binary) => { - Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec()))) + }) + }, + DataType::Time32(unit) => { + Ok(match unit { + TimeUnit::Second => Arc::new(Time32SecondArray::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + TimeUnit::Millisecond => Arc::new(Time32MillisecondArray::from_iter( + [<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + _ => { + let len = $iterator.count(); + // don't know how to extract statistics, so return a null array + new_null_array($data_type, len) } - Some(DataType::LargeBinary) => { - Some(ScalarValue::LargeBinary(Some(s.$bytes_func().to_vec()))) + }) + }, + DataType::Time64(unit) => { + Ok(match unit { + TimeUnit::Microsecond => Arc::new(Time64MicrosecondArray::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + TimeUnit::Nanosecond => Arc::new(Time64NanosecondArray::from_iter( + [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()), + )), + _ => { + let len = $iterator.count(); + // don't know how to extract statistics, so return a null array + new_null_array($data_type, len) } - Some(DataType::LargeUtf8) | _ => { - let utf8_value = std::str::from_utf8(s.$bytes_func()) - .map(|s| s.to_string()) - .ok(); - if utf8_value.is_none() { + }) + }, + DataType::Binary => Ok(Arc::new(BinaryArray::from_iter( + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x| x.to_vec())), + ))), + DataType::LargeBinary => Ok(Arc::new(LargeBinaryArray::from_iter( + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| x.map(|x|x.to_vec())), + ))), + DataType::Utf8 => Ok(Arc::new(StringArray::from_iter( + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x).map(|s| s.to_string()).ok(); + if res.is_none() { log::debug!("Utf8 statistics is a non-UTF8 value, ignoring it."); } - - match $target_arrow_type { - Some(DataType::LargeUtf8) => Some(ScalarValue::LargeUtf8(utf8_value)), - _ => Some(ScalarValue::Utf8(utf8_value)), - } - } - } + res + }) + }), + ))), + DataType::LargeUtf8 => { + Ok(Arc::new(LargeStringArray::from_iter( + [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + let res = std::str::from_utf8(x).map(|s| s.to_string()).ok(); + if res.is_none() { + log::debug!("LargeUtf8 statistics is a non-UTF8 value, ignoring it."); + } + res + }) + }), + ))) } - // type not fully supported yet - ParquetStatistics::FixedLenByteArray(s) => { - match $target_arrow_type { - // just support specific logical data types, there are others each - // with their own ordering - Some(DataType::Decimal128(precision, scale)) => { - Some(ScalarValue::Decimal128( - Some(from_bytes_to_i128(s.$bytes_func())), - *precision, - *scale, - )) - } - Some(DataType::Decimal256(precision, scale)) => { - Some(ScalarValue::Decimal256( - Some(from_bytes_to_i256(s.$bytes_func())), - *precision, - *scale, - )) - } - Some(DataType::FixedSizeBinary(size)) => { - let value = s.$bytes_func().to_vec(); - let value = if value.len().try_into() == Ok(*size) { - Some(value) + DataType::FixedSizeBinary(size) => Ok(Arc::new(FixedSizeBinaryArray::from( + [<$stat_type_prefix FixedLenByteArrayStatsIterator>]::new($iterator).map(|x| { + x.and_then(|x| { + if x.len().try_into() == Ok(*size) { + Some(x) } else { log::debug!( "FixedSizeBinary({}) statistics is a binary of size {}, ignoring it.", size, - value.len(), + x.len(), ); None - }; - Some(ScalarValue::FixedSizeBinary( - *size, - value, - )) - } - Some(DataType::Float16) => { - Some(ScalarValue::Float16(from_bytes_to_f16(s.$bytes_func()))) - } - _ => None, - } + } + }) + }).collect::>(), + ))), + DataType::Decimal128(precision, scale) => { + let arr = Decimal128Array::from_iter( + [<$stat_type_prefix Decimal128StatsIterator>]::new($iterator) + ).with_precision_and_scale(*precision, *scale)?; + Ok(Arc::new(arr)) + }, + DataType::Decimal256(precision, scale) => { + let arr = Decimal256Array::from_iter( + [<$stat_type_prefix Decimal256StatsIterator>]::new($iterator) + ).with_precision_and_scale(*precision, *scale)?; + Ok(Arc::new(arr)) + }, + DataType::Dictionary(_, value_type) => { + [<$stat_type_prefix:lower _ statistics>](value_type, $iterator) } - } - }}; + + DataType::Map(_,_) | + DataType::Duration(_) | + DataType::Interval(_) | + DataType::Null | + DataType::BinaryView | + DataType::Utf8View | + DataType::List(_) | + DataType::ListView(_) | + DataType::FixedSizeList(_, _) | + DataType::LargeList(_) | + DataType::LargeListView(_) | + DataType::Struct(_) | + DataType::Union(_, _) | + DataType::RunEndEncoded(_, _) => { + let len = $iterator.count(); + // don't know how to extract statistics, so return a null array + Ok(new_null_array($data_type, len)) + } + }}} } /// Lookups up the parquet column by name @@ -293,9 +547,7 @@ pub(crate) fn min_statistics<'a, I: Iterator Result { - let scalars = iterator - .map(|x| x.and_then(|s| get_statistic!(s, min, min_bytes, Some(data_type)))); - collect_scalars(data_type, scalars) + get_statistics!(Min, data_type, iterator) } /// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`] @@ -303,24 +555,7 @@ pub(crate) fn max_statistics<'a, I: Iterator Result { - let scalars = iterator - .map(|x| x.and_then(|s| get_statistic!(s, max, max_bytes, Some(data_type)))); - collect_scalars(data_type, scalars) -} - -/// Builds an array from an iterator of ScalarValue -fn collect_scalars>>( - data_type: &DataType, - iterator: I, -) -> Result { - let mut scalars = iterator.peekable(); - match scalars.peek().is_none() { - true => Ok(new_empty_array(data_type)), - false => { - let null = ScalarValue::try_from(data_type)?; - ScalarValue::iter_to_array(scalars.map(|x| x.unwrap_or_else(|| null.clone()))) - } - } + get_statistics!(Max, data_type, iterator) } /// What type of statistics should be extracted? @@ -474,11 +709,10 @@ mod test { use arrow::compute::kernels::cast_utils::Parser; use arrow::datatypes::{i256, Date32Type, Date64Type}; use arrow_array::{ - new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array, - Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray, - StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, + new_empty_array, new_null_array, Array, BinaryArray, BooleanArray, Date32Array, + Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, + StringArray, StructArray, TimestampNanosecondArray, }; use arrow_schema::{Field, SchemaRef}; use bytes::Bytes; diff --git a/datafusion/core/tests/parquet/arrow_statistics.rs b/datafusion/core/tests/parquet/arrow_statistics.rs index 19cc4db4d20e7..2f8fbab647775 100644 --- a/datafusion/core/tests/parquet/arrow_statistics.rs +++ b/datafusion/core/tests/parquet/arrow_statistics.rs @@ -495,7 +495,6 @@ async fn test_timestamp() { Test { reader: &reader, - // mins are [1577840461000000000, 1577840471000000000, 1577841061000000000, 1578704461000000000,] expected_min: Arc::new(TimestampNanosecondArray::from(vec![ TimestampNanosecondType::parse("2020-01-01T01:01:01"), TimestampNanosecondType::parse("2020-01-01T01:01:11"),