Skip to content

Commit a8dd15a

Browse files
authored
GH-40592: [C++][Parquet] Implement SizeStatistics (#40594)
### Rationale for this change Parquet format 2.10.0 has introduced SizeStatistics. parquet-mr has also implemented this: apache/parquet-java#1177. Now it is time for parquet-cpp to pick the ball. ### What changes are included in this PR? Implement reading and writing size statistics for parquet-cpp. ### Are these changes tested? Yes, a bunch of test cases have been added. ### Are there any user-facing changes? Yes, now parquet users are able to read and write size statistics. * GitHub Issue: #40592 Authored-by: Gang Wu <ustcwg@gmail.com> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent f5787fe commit a8dd15a

19 files changed

Lines changed: 967 additions & 71 deletions

cpp/src/arrow/util/hashing.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,14 @@ class BinaryMemoTable : public MemoTable {
843843
}
844844
}
845845

846+
// Visit the stored value at a specific index in insertion order.
847+
// The visitor function should have the signature `void(std::string_view)`
848+
// or `void(const std::string_view&)`.
849+
template <typename VisitFunc>
850+
void VisitValue(int32_t idx, VisitFunc&& visit) const {
851+
visit(binary_builder_.GetView(idx));
852+
}
853+
846854
protected:
847855
struct Payload {
848856
int32_t memo_index;

cpp/src/parquet/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ set(PARQUET_SRCS
181181
printer.cc
182182
properties.cc
183183
schema.cc
184+
size_statistics.cc
184185
statistics.cc
185186
stream_reader.cc
186187
stream_writer.cc
@@ -373,6 +374,7 @@ add_parquet_test(internals-test
373374
metadata_test.cc
374375
page_index_test.cc
375376
public_api_test.cc
377+
size_statistics_test.cc
376378
types_test.cc)
377379

378380
set_source_files_properties(public_api_test.cc PROPERTIES SKIP_PRECOMPILE_HEADERS ON

cpp/src/parquet/column_page.h

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <optional>
2727
#include <string>
2828

29+
#include "parquet/size_statistics.h"
2930
#include "parquet/statistics.h"
3031
#include "parquet/types.h"
3132

@@ -69,27 +70,30 @@ class DataPage : public Page {
6970
/// Currently it is only present from data pages created by ColumnWriter in order
7071
/// to collect page index.
7172
std::optional<int64_t> first_row_index() const { return first_row_index_; }
73+
const SizeStatistics& size_statistics() const { return size_statistics_; }
7274

7375
virtual ~DataPage() = default;
7476

7577
protected:
7678
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
7779
Encoding::type encoding, int64_t uncompressed_size,
78-
EncodedStatistics statistics = EncodedStatistics(),
79-
std::optional<int64_t> first_row_index = std::nullopt)
80+
EncodedStatistics statistics, std::optional<int64_t> first_row_index,
81+
SizeStatistics size_statistics)
8082
: Page(buffer, type),
8183
num_values_(num_values),
8284
encoding_(encoding),
8385
uncompressed_size_(uncompressed_size),
8486
statistics_(std::move(statistics)),
85-
first_row_index_(std::move(first_row_index)) {}
87+
first_row_index_(std::move(first_row_index)),
88+
size_statistics_(std::move(size_statistics)) {}
8689

8790
int32_t num_values_;
8891
Encoding::type encoding_;
8992
int64_t uncompressed_size_;
9093
EncodedStatistics statistics_;
9194
/// Row ordinal within the row group to the first row in the data page.
9295
std::optional<int64_t> first_row_index_;
96+
SizeStatistics size_statistics_;
9397
};
9498

9599
class DataPageV1 : public DataPage {
@@ -98,9 +102,11 @@ class DataPageV1 : public DataPage {
98102
Encoding::type encoding, Encoding::type definition_level_encoding,
99103
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
100104
EncodedStatistics statistics = EncodedStatistics(),
101-
std::optional<int64_t> first_row_index = std::nullopt)
105+
std::optional<int64_t> first_row_index = std::nullopt,
106+
SizeStatistics size_statistics = SizeStatistics())
102107
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, uncompressed_size,
103-
std::move(statistics), std::move(first_row_index)),
108+
std::move(statistics), std::move(first_row_index),
109+
std::move(size_statistics)),
104110
definition_level_encoding_(definition_level_encoding),
105111
repetition_level_encoding_(repetition_level_encoding) {}
106112

@@ -120,9 +126,11 @@ class DataPageV2 : public DataPage {
120126
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
121127
int64_t uncompressed_size, bool is_compressed = false,
122128
EncodedStatistics statistics = EncodedStatistics(),
123-
std::optional<int64_t> first_row_index = std::nullopt)
129+
std::optional<int64_t> first_row_index = std::nullopt,
130+
SizeStatistics size_statistics = SizeStatistics())
124131
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, uncompressed_size,
125-
std::move(statistics), std::move(first_row_index)),
132+
std::move(statistics), std::move(first_row_index),
133+
std::move(size_statistics)),
126134
num_nulls_(num_nulls),
127135
num_rows_(num_rows),
128136
definition_levels_byte_length_(definition_levels_byte_length),

cpp/src/parquet/column_writer.cc

Lines changed: 103 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
#include "parquet/platform.h"
5656
#include "parquet/properties.h"
5757
#include "parquet/schema.h"
58+
#include "parquet/size_statistics.h"
5859
#include "parquet/statistics.h"
5960
#include "parquet/thrift_internal.h"
6061
#include "parquet/types.h"
@@ -437,7 +438,7 @@ class SerializedPageWriter : public PageWriter {
437438

438439
/// Collect page index
439440
if (column_index_builder_ != nullptr) {
440-
column_index_builder_->AddPage(page.statistics());
441+
column_index_builder_->AddPage(page.statistics(), page.size_statistics());
441442
}
442443
if (offset_index_builder_ != nullptr) {
443444
const int64_t compressed_size = output_data_len + header_size;
@@ -451,8 +452,9 @@ class SerializedPageWriter : public PageWriter {
451452
/// start_pos is a relative offset in the buffered mode. It should be
452453
/// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
453454
/// has flushed all data pages.
454-
offset_index_builder_->AddPage(start_pos, static_cast<int32_t>(compressed_size),
455-
*page.first_row_index());
455+
offset_index_builder_->AddPage(
456+
start_pos, static_cast<int32_t>(compressed_size), *page.first_row_index(),
457+
page.size_statistics().unencoded_byte_array_data_bytes);
456458
}
457459

458460
total_uncompressed_size_ += uncompressed_size + header_size;
@@ -774,11 +776,17 @@ class ColumnWriterImpl {
774776
// Serializes Dictionary Page if enabled
775777
virtual void WriteDictionaryPage() = 0;
776778

779+
// A convenience struct to combine the encoded statistics and size statistics
780+
struct StatisticsPair {
781+
EncodedStatistics encoded_stats;
782+
SizeStatistics size_stats;
783+
};
784+
777785
// Plain-encoded statistics of the current page
778-
virtual EncodedStatistics GetPageStatistics() = 0;
786+
virtual StatisticsPair GetPageStatistics() = 0;
779787

780788
// Plain-encoded statistics of the whole chunk
781-
virtual EncodedStatistics GetChunkStatistics() = 0;
789+
virtual StatisticsPair GetChunkStatistics() = 0;
782790

783791
// Merges page statistics into chunk statistics, then resets the values
784792
virtual void ResetPageStatistics() = 0;
@@ -981,8 +989,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
981989
PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
982990
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size, values,
983991
uncompressed_data_->mutable_data());
984-
985-
EncodedStatistics page_stats = GetPageStatistics();
992+
auto [page_stats, page_size_stats] = GetPageStatistics();
986993
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
987994
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
988995
ResetPageStatistics();
@@ -1006,13 +1013,15 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t definition_levels_rle_size,
10061013
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
10071014
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
10081015
compressed_data_copy, num_values, encoding_, Encoding::RLE, Encoding::RLE,
1009-
uncompressed_size, std::move(page_stats), first_row_index);
1016+
uncompressed_size, std::move(page_stats), first_row_index,
1017+
std::move(page_size_stats));
10101018
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
10111019

10121020
data_pages_.push_back(std::move(page_ptr));
10131021
} else { // Eagerly write pages
10141022
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, Encoding::RLE,
1015-
uncompressed_size, std::move(page_stats), first_row_index);
1023+
uncompressed_size, std::move(page_stats), first_row_index,
1024+
std::move(page_size_stats));
10161025
WriteDataPage(page);
10171026
}
10181027
}
@@ -1039,7 +1048,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
10391048
ConcatenateBuffers(definition_levels_rle_size, repetition_levels_rle_size,
10401049
compressed_values, combined->mutable_data());
10411050

1042-
EncodedStatistics page_stats = GetPageStatistics();
1051+
auto [page_stats, page_size_stats] = GetPageStatistics();
10431052
page_stats.ApplyStatSizeLimits(properties_->max_statistics_size(descr_->path()));
10441053
page_stats.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
10451054
ResetPageStatistics();
@@ -1062,14 +1071,15 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t definition_levels_rle_size,
10621071
combined->CopySlice(0, combined->size(), allocator_));
10631072
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
10641073
combined, num_values, null_count, num_rows, encoding_, def_levels_byte_length,
1065-
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), page_stats,
1066-
first_row_index);
1074+
rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
1075+
std::move(page_stats), first_row_index, std::move(page_size_stats));
10671076
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
10681077
data_pages_.push_back(std::move(page_ptr));
10691078
} else {
10701079
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
10711080
def_levels_byte_length, rep_levels_byte_length, uncompressed_size,
1072-
pager_->has_compressor(), page_stats, first_row_index);
1081+
pager_->has_compressor(), std::move(page_stats), first_row_index,
1082+
std::move(page_size_stats));
10731083
WriteDataPage(page);
10741084
}
10751085
}
@@ -1083,7 +1093,7 @@ int64_t ColumnWriterImpl::Close() {
10831093

10841094
FlushBufferedDataPages();
10851095

1086-
EncodedStatistics chunk_statistics = GetChunkStatistics();
1096+
auto [chunk_statistics, chunk_size_statistics] = GetChunkStatistics();
10871097
chunk_statistics.ApplyStatSizeLimits(
10881098
properties_->max_statistics_size(descr_->path()));
10891099
chunk_statistics.set_is_signed(SortOrder::SIGNED == descr_->sort_order());
@@ -1092,6 +1102,9 @@ int64_t ColumnWriterImpl::Close() {
10921102
if (rows_written_ > 0 && chunk_statistics.is_set()) {
10931103
metadata_->SetStatistics(chunk_statistics);
10941104
}
1105+
if (rows_written_ > 0 && chunk_size_statistics.is_set()) {
1106+
metadata_->SetSizeStatistics(chunk_size_statistics);
1107+
}
10951108
metadata_->SetKeyValueMetadata(key_value_metadata_);
10961109
pager_->Close(has_dictionary_, fallback_);
10971110
}
@@ -1217,6 +1230,11 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
12171230
page_statistics_ = MakeStatistics<DType>(descr_, allocator_);
12181231
chunk_statistics_ = MakeStatistics<DType>(descr_, allocator_);
12191232
}
1233+
if (properties->size_statistics_level() == SizeStatisticsLevel::ColumnChunk ||
1234+
properties->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
1235+
page_size_statistics_ = SizeStatistics::Make(descr_);
1236+
chunk_size_statistics_ = SizeStatistics::Make(descr_);
1237+
}
12201238
pages_change_on_record_boundaries_ =
12211239
properties->data_page_version() == ParquetDataPageVersion::V2 ||
12221240
properties->page_index_enabled(descr_->path());
@@ -1355,15 +1373,26 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
13551373
total_bytes_written_ += pager_->WriteDictionaryPage(page);
13561374
}
13571375

1358-
EncodedStatistics GetPageStatistics() override {
1359-
EncodedStatistics result;
1360-
if (page_statistics_) result = page_statistics_->Encode();
1376+
StatisticsPair GetPageStatistics() override {
1377+
StatisticsPair result;
1378+
if (page_statistics_) {
1379+
result.encoded_stats = page_statistics_->Encode();
1380+
}
1381+
if (properties_->size_statistics_level() == SizeStatisticsLevel::PageAndColumnChunk) {
1382+
ARROW_DCHECK(page_size_statistics_ != nullptr);
1383+
result.size_stats = *page_size_statistics_;
1384+
}
13611385
return result;
13621386
}
13631387

1364-
EncodedStatistics GetChunkStatistics() override {
1365-
EncodedStatistics result;
1366-
if (chunk_statistics_) result = chunk_statistics_->Encode();
1388+
StatisticsPair GetChunkStatistics() override {
1389+
StatisticsPair result;
1390+
if (chunk_statistics_) {
1391+
result.encoded_stats = chunk_statistics_->Encode();
1392+
}
1393+
if (chunk_size_statistics_) {
1394+
result.size_stats = *chunk_size_statistics_;
1395+
}
13671396
return result;
13681397
}
13691398

@@ -1372,6 +1401,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
13721401
chunk_statistics_->Merge(*page_statistics_);
13731402
page_statistics_->Reset();
13741403
}
1404+
if (page_size_statistics_ != nullptr) {
1405+
chunk_size_statistics_->Merge(*page_size_statistics_);
1406+
page_size_statistics_->Reset();
1407+
}
13751408
}
13761409

13771410
Type::type type() const override { return descr_->physical_type(); }
@@ -1425,6 +1458,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
14251458
DictEncoder<DType>* current_dict_encoder_;
14261459
std::shared_ptr<TypedStats> page_statistics_;
14271460
std::shared_ptr<TypedStats> chunk_statistics_;
1461+
std::unique_ptr<SizeStatistics> page_size_statistics_;
1462+
std::shared_ptr<SizeStatistics> chunk_size_statistics_;
14281463
bool pages_change_on_record_boundaries_;
14291464

14301465
// If writing a sequence of ::arrow::DictionaryArray to the writer, we keep the
@@ -1467,6 +1502,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
14671502
rows_written_ += num_values;
14681503
num_buffered_rows_ += num_values;
14691504
}
1505+
1506+
UpdateLevelHistogram(num_values, def_levels, rep_levels);
14701507
return values_to_write;
14711508
}
14721509

@@ -1558,6 +1595,47 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
15581595
rows_written_ += num_levels;
15591596
num_buffered_rows_ += num_levels;
15601597
}
1598+
1599+
UpdateLevelHistogram(num_levels, def_levels, rep_levels);
1600+
}
1601+
1602+
void UpdateLevelHistogram(int64_t num_levels, const int16_t* def_levels,
1603+
const int16_t* rep_levels) const {
1604+
if (page_size_statistics_ == nullptr) {
1605+
return;
1606+
}
1607+
1608+
auto add_levels = [](std::vector<int64_t>& level_histogram,
1609+
::arrow::util::span<const int16_t> levels) {
1610+
for (int16_t level : levels) {
1611+
ARROW_DCHECK_LT(level, static_cast<int16_t>(level_histogram.size()));
1612+
++level_histogram[level];
1613+
}
1614+
};
1615+
1616+
if (descr_->max_definition_level() > 0) {
1617+
add_levels(page_size_statistics_->definition_level_histogram,
1618+
{def_levels, static_cast<size_t>(num_levels)});
1619+
} else {
1620+
page_size_statistics_->definition_level_histogram[0] += num_levels;
1621+
}
1622+
1623+
if (descr_->max_repetition_level() > 0) {
1624+
add_levels(page_size_statistics_->repetition_level_histogram,
1625+
{rep_levels, static_cast<size_t>(num_levels)});
1626+
} else {
1627+
page_size_statistics_->repetition_level_histogram[0] += num_levels;
1628+
}
1629+
}
1630+
1631+
// Update the unencoded data bytes for ByteArray only per the specification.
1632+
void UpdateUnencodedDataBytes() const {
1633+
if constexpr (std::is_same_v<T, ByteArray>) {
1634+
if (page_size_statistics_ != nullptr) {
1635+
page_size_statistics_->IncrementUnencodedByteArrayDataBytes(
1636+
current_encoder_->ReportUnencodedDataBytes());
1637+
}
1638+
}
15611639
}
15621640

15631641
void CommitWriteAndCheckPageLimit(int64_t num_levels, int64_t num_values,
@@ -1611,6 +1689,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
16111689
if (page_statistics_ != nullptr) {
16121690
page_statistics_->Update(values, num_values, num_nulls);
16131691
}
1692+
UpdateUnencodedDataBytes();
16141693
}
16151694

16161695
/// \brief Write values with spaces and update page statistics accordingly.
@@ -1639,6 +1718,7 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
16391718
page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset,
16401719
num_spaced_values, num_values, num_nulls);
16411720
}
1721+
UpdateUnencodedDataBytes();
16421722
}
16431723
};
16441724

@@ -1739,6 +1819,8 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
17391819
writeable_indices,
17401820
MaybeReplaceValidity(writeable_indices, null_count, ctx->memory_pool));
17411821
dict_encoder->PutIndices(*writeable_indices);
1822+
// Update unencoded byte array data size to size statistics
1823+
UpdateUnencodedDataBytes();
17421824
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, null_count, check_page);
17431825
value_offset += batch_num_spaced_values;
17441826
};
@@ -2219,6 +2301,7 @@ Status TypedColumnWriterImpl<ByteArrayType>::WriteArrowDense(
22192301
page_statistics_->IncrementNullCount(batch_size - non_null);
22202302
page_statistics_->IncrementNumValues(non_null);
22212303
}
2304+
UpdateUnencodedDataBytes();
22222305
CommitWriteAndCheckPageLimit(batch_size, batch_num_values, batch_size - non_null,
22232306
check_page);
22242307
CheckDictionarySizeLimit();

0 commit comments

Comments
 (0)