Skip to content

Commit 76a3720

Browse files
xinlifoobaralamb
andauthored
Fix extract parquet statistics from Decimal256 columns (#10777)
* Fix Extract parquet statistics from Decimal256 columns * Fix comment --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 729b356 commit 76a3720

3 files changed

Lines changed: 187 additions & 26 deletions

File tree

datafusion/core/src/datasource/physical_plan/parquet/statistics.rs

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328
2121

22-
use arrow::{array::ArrayRef, datatypes::DataType, datatypes::TimeUnit};
22+
use arrow::{array::ArrayRef, datatypes::i256, datatypes::DataType, datatypes::TimeUnit};
2323
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
2424
use arrow_schema::{Field, FieldRef, Schema};
2525
use datafusion_common::{
@@ -36,7 +36,13 @@ pub(crate) fn from_bytes_to_i128(b: &[u8]) -> i128 {
3636
// The bytes array are from parquet file and must be the big-endian.
3737
// The endian is defined by parquet format, and the reference document
3838
// https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66
39-
i128::from_be_bytes(sign_extend_be(b))
39+
i128::from_be_bytes(sign_extend_be::<16>(b))
40+
}
41+
42+
// Convert the bytes array to i256.
43+
// The endian of the input bytes array must be big-endian.
44+
pub(crate) fn from_bytes_to_i256(b: &[u8]) -> i256 {
45+
i256::from_be_bytes(sign_extend_be::<32>(b))
4046
}
4147

4248
// Convert the bytes array to f16
@@ -48,13 +54,13 @@ pub(crate) fn from_bytes_to_f16(b: &[u8]) -> Option<f16> {
4854
}
4955

5056
// Copy from arrow-rs
51-
// https://github.com/apache/arrow-rs/blob/733b7e7fd1e8c43a404c3ce40ecf741d493c21b4/parquet/src/arrow/buffer/bit_util.rs#L55
52-
// Convert the byte slice to fixed length byte array with the length of 16
53-
fn sign_extend_be(b: &[u8]) -> [u8; 16] {
54-
assert!(b.len() <= 16, "Array too large, expected less than 16");
57+
// https://github.com/apache/arrow-rs/blob/198af7a3f4aa20f9bd003209d9f04b0f37bb120e/parquet/src/arrow/buffer/bit_util.rs#L54
58+
// Convert the byte slice to fixed length byte array with the length of N.
59+
pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
60+
assert!(b.len() <= N, "Array too large, expected less than {N}");
5561
let is_negative = (b[0] & 128u8) == 128u8;
56-
let mut result = if is_negative { [255u8; 16] } else { [0u8; 16] };
57-
for (d, s) in result.iter_mut().skip(16 - b.len()).zip(b) {
62+
let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
63+
for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) {
5864
*d = *s;
5965
}
6066
result
@@ -83,6 +89,13 @@ macro_rules! get_statistic {
8389
*scale,
8490
))
8591
}
92+
Some(DataType::Decimal256(precision, scale)) => {
93+
Some(ScalarValue::Decimal256(
94+
Some(i256::from(*s.$func())),
95+
*precision,
96+
*scale,
97+
))
98+
}
8699
Some(DataType::Int8) => {
87100
Some(ScalarValue::Int8(Some((*s.$func()).try_into().unwrap())))
88101
}
@@ -123,6 +136,13 @@ macro_rules! get_statistic {
123136
*scale,
124137
))
125138
}
139+
Some(DataType::Decimal256(precision, scale)) => {
140+
Some(ScalarValue::Decimal256(
141+
Some(i256::from(*s.$func())),
142+
*precision,
143+
*scale,
144+
))
145+
}
126146
Some(DataType::UInt64) => {
127147
Some(ScalarValue::UInt64(Some((*s.$func()) as u64)))
128148
}
@@ -169,6 +189,13 @@ macro_rules! get_statistic {
169189
*scale,
170190
))
171191
}
192+
Some(DataType::Decimal256(precision, scale)) => {
193+
Some(ScalarValue::Decimal256(
194+
Some(from_bytes_to_i256(s.$bytes_func())),
195+
*precision,
196+
*scale,
197+
))
198+
}
172199
Some(DataType::Binary) => {
173200
Some(ScalarValue::Binary(Some(s.$bytes_func().to_vec())))
174201
}
@@ -202,6 +229,13 @@ macro_rules! get_statistic {
202229
*scale,
203230
))
204231
}
232+
Some(DataType::Decimal256(precision, scale)) => {
233+
Some(ScalarValue::Decimal256(
234+
Some(from_bytes_to_i256(s.$bytes_func())),
235+
*precision,
236+
*scale,
237+
))
238+
}
205239
Some(DataType::FixedSizeBinary(size)) => {
206240
let value = s.$bytes_func().to_vec();
207241
let value = if value.len().try_into() == Ok(*size) {
@@ -438,13 +472,13 @@ impl<'a> StatisticsConverter<'a> {
438472
mod test {
439473
use super::*;
440474
use arrow::compute::kernels::cast_utils::Parser;
441-
use arrow::datatypes::{Date32Type, Date64Type};
475+
use arrow::datatypes::{i256, Date32Type, Date64Type};
442476
use arrow_array::{
443477
new_null_array, Array, BinaryArray, BooleanArray, Date32Array, Date64Array,
444-
Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
445-
Int8Array, LargeBinaryArray, RecordBatch, StringArray, StructArray,
446-
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
447-
TimestampSecondArray,
478+
Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array,
479+
Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, StringArray,
480+
StructArray, TimestampMicrosecondArray, TimestampMillisecondArray,
481+
TimestampNanosecondArray, TimestampSecondArray,
448482
};
449483
use arrow_schema::{Field, SchemaRef};
450484
use bytes::Bytes;
@@ -824,6 +858,42 @@ mod test {
824858
.unwrap(),
825859
),
826860
}
861+
.run();
862+
863+
Test {
864+
input: Arc::new(
865+
Decimal256Array::from(vec![
866+
// row group 1
867+
Some(i256::from(100)),
868+
None,
869+
Some(i256::from(22000)),
870+
// row group 2
871+
Some(i256::MAX),
872+
Some(i256::MIN),
873+
None,
874+
// row group 3
875+
None,
876+
None,
877+
None,
878+
])
879+
.with_precision_and_scale(76, 76)
880+
.unwrap(),
881+
),
882+
expected_min: Arc::new(
883+
Decimal256Array::from(vec![Some(i256::from(100)), Some(i256::MIN), None])
884+
.with_precision_and_scale(76, 76)
885+
.unwrap(),
886+
),
887+
expected_max: Arc::new(
888+
Decimal256Array::from(vec![
889+
Some(i256::from(22000)),
890+
Some(i256::MAX),
891+
None,
892+
])
893+
.with_precision_and_scale(76, 76)
894+
.unwrap(),
895+
),
896+
}
827897
.run()
828898
}
829899

datafusion/core/tests/parquet/arrow_statistics.rs

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,17 @@ use std::sync::Arc;
2424
use crate::parquet::{struct_array, Scenario};
2525
use arrow::compute::kernels::cast_utils::Parser;
2626
use arrow::datatypes::{
27-
Date32Type, Date64Type, TimestampMicrosecondType, TimestampMillisecondType,
27+
i256, Date32Type, Date64Type, TimestampMicrosecondType, TimestampMillisecondType,
2828
TimestampNanosecondType, TimestampSecondType,
2929
};
3030
use arrow_array::{
3131
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
32-
Decimal128Array, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
33-
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, LargeStringArray,
34-
RecordBatch, StringArray, Time32MillisecondArray, Time32SecondArray,
35-
Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray,
36-
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray,
37-
UInt16Array, UInt32Array, UInt64Array, UInt8Array,
32+
Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array, Float32Array,
33+
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
34+
LargeStringArray, RecordBatch, StringArray, Time32MillisecondArray,
35+
Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
36+
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
37+
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
3838
};
3939
use arrow_schema::{DataType, Field, Schema};
4040
use datafusion::datasource::physical_plan::parquet::{
@@ -1356,6 +1356,38 @@ async fn test_decimal() {
13561356
column_name: "decimal_col",
13571357
}
13581358
.run();
1359+
1360+
// This creates a parquet file of 1 column "decimal256_col" with decimal data type and precicion 9, scale 2
1361+
// file has 3 record batches, each has 5 rows. They will be saved into 3 row groups
1362+
let decimal256_reader = TestReader {
1363+
scenario: Scenario::Decimal256,
1364+
row_per_group: 5,
1365+
};
1366+
Test {
1367+
reader: decimal256_reader.build().await,
1368+
expected_min: Arc::new(
1369+
Decimal256Array::from(vec![
1370+
i256::from(100),
1371+
i256::from(-500),
1372+
i256::from(2000),
1373+
])
1374+
.with_precision_and_scale(9, 2)
1375+
.unwrap(),
1376+
),
1377+
expected_max: Arc::new(
1378+
Decimal256Array::from(vec![
1379+
i256::from(600),
1380+
i256::from(600),
1381+
i256::from(6000),
1382+
])
1383+
.with_precision_and_scale(9, 2)
1384+
.unwrap(),
1385+
),
1386+
expected_null_counts: UInt64Array::from(vec![0, 0, 0]),
1387+
expected_row_counts: UInt64Array::from(vec![5, 5, 5]),
1388+
column_name: "decimal256_col",
1389+
}
1390+
.run();
13591391
}
13601392
#[tokio::test]
13611393
async fn test_dictionary() {

datafusion/core/tests/parquet/mod.rs

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717

1818
//! Parquet integration tests
1919
use arrow::array::Decimal128Array;
20+
use arrow::datatypes::i256;
2021
use arrow::{
2122
array::{
2223
make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array,
23-
DictionaryArray, FixedSizeBinaryArray, Float16Array, Float32Array, Float64Array,
24-
Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
25-
LargeStringArray, StringArray, StructArray, Time32MillisecondArray,
26-
Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray,
27-
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
28-
TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
24+
Decimal256Array, DictionaryArray, FixedSizeBinaryArray, Float16Array,
25+
Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
26+
LargeBinaryArray, LargeStringArray, StringArray, StructArray,
27+
Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
28+
Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray,
29+
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array,
30+
UInt64Array, UInt8Array,
2931
},
3032
datatypes::{DataType, Field, Int32Type, Int8Type, Schema, TimeUnit},
3133
record_batch::RecordBatch,
@@ -84,6 +86,7 @@ enum Scenario {
8486
Float16,
8587
Float64,
8688
Decimal,
89+
Decimal256,
8790
DecimalBloomFilterInt32,
8891
DecimalBloomFilterInt64,
8992
DecimalLargePrecision,
@@ -618,6 +621,24 @@ fn make_decimal_batch(v: Vec<i128>, precision: u8, scale: i8) -> RecordBatch {
618621
RecordBatch::try_new(schema, vec![array.clone()]).unwrap()
619622
}
620623

624+
/// Return record batch with decimal256 vector
625+
///
626+
/// Columns are named
627+
/// "decimal256_col" -> Decimal256Array
628+
fn make_decimal256_batch(v: Vec<i256>, precision: u8, scale: i8) -> RecordBatch {
629+
let schema = Arc::new(Schema::new(vec![Field::new(
630+
"decimal256_col",
631+
DataType::Decimal256(precision, scale),
632+
true,
633+
)]));
634+
let array = Arc::new(
635+
Decimal256Array::from(v)
636+
.with_precision_and_scale(precision, scale)
637+
.unwrap(),
638+
) as ArrayRef;
639+
RecordBatch::try_new(schema, vec![array]).unwrap()
640+
}
641+
621642
/// Return record batch with a few rows of data for all of the supported date
622643
/// types with the specified offset (in days)
623644
///
@@ -1009,6 +1030,44 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
10091030
make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2),
10101031
]
10111032
}
1033+
Scenario::Decimal256 => {
1034+
// decimal256 record batch
1035+
vec![
1036+
make_decimal256_batch(
1037+
vec![
1038+
i256::from(100),
1039+
i256::from(200),
1040+
i256::from(300),
1041+
i256::from(400),
1042+
i256::from(600),
1043+
],
1044+
9,
1045+
2,
1046+
),
1047+
make_decimal256_batch(
1048+
vec![
1049+
i256::from(-500),
1050+
i256::from(100),
1051+
i256::from(300),
1052+
i256::from(400),
1053+
i256::from(600),
1054+
],
1055+
9,
1056+
2,
1057+
),
1058+
make_decimal256_batch(
1059+
vec![
1060+
i256::from(2000),
1061+
i256::from(3000),
1062+
i256::from(3000),
1063+
i256::from(4000),
1064+
i256::from(6000),
1065+
],
1066+
9,
1067+
2,
1068+
),
1069+
]
1070+
}
10121071
Scenario::DecimalBloomFilterInt32 => {
10131072
// decimal record batch
10141073
vec![

0 commit comments

Comments
 (0)