Skip to content

Commit 0da51b7

Browse files
authored
GH-15042: [C++][Parquet] Update stats on subsequent batches of dictionaries (#15179)
* Closes: #15042 Authored-by: Will Jones <willjones127@gmail.com> Signed-off-by: Will Jones <willjones127@gmail.com>
1 parent a06a5d6 commit 0da51b7

2 files changed

Lines changed: 112 additions & 25 deletions

File tree

cpp/src/parquet/arrow/arrow_reader_writer_test.cc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ using arrow::DataType;
8080
using arrow::Datum;
8181
using arrow::DecimalType;
8282
using arrow::default_memory_pool;
83+
using arrow::DictionaryArray;
8384
using arrow::ListArray;
8485
using arrow::PrimitiveArray;
8586
using arrow::ResizableBuffer;
@@ -4138,6 +4139,74 @@ TEST_P(TestArrowWriteDictionary, Statistics) {
41384139
INSTANTIATE_TEST_SUITE_P(WriteDictionary, TestArrowWriteDictionary,
41394140
::testing::Values(ParquetDataPageVersion::V1,
41404141
ParquetDataPageVersion::V2));
4142+
4143+
TEST_P(TestArrowWriteDictionary, StatisticsUnifiedDictionary) {
4144+
// Two chunks, with a shared dictionary
4145+
std::shared_ptr<::arrow::Table> table;
4146+
std::shared_ptr<::arrow::DataType> dict_type =
4147+
::arrow::dictionary(::arrow::int32(), ::arrow::utf8());
4148+
std::shared_ptr<::arrow::Schema> schema =
4149+
::arrow::schema({::arrow::field("values", dict_type)});
4150+
{
4151+
// It's important there are no duplicate values in the dictionary, otherwise
4152+
// we trigger the WriteDense() code path which side-steps dictionary encoding.
4153+
std::shared_ptr<::arrow::Array> test_dictionary =
4154+
ArrayFromJSON(::arrow::utf8(), R"(["b", "c", "d", "a"])");
4155+
std::vector<std::shared_ptr<::arrow::Array>> test_indices = {
4156+
ArrayFromJSON(::arrow::int32(),
4157+
R"([3, null, 3, 3, null, 3])"), // ["a", null "a", "a", null, "a"]
4158+
ArrayFromJSON(
4159+
::arrow::int32(),
4160+
R"([0, 3, null, 0, null, 1])")}; // ["b", "a", null, "b", null, "c"]
4161+
4162+
::arrow::ArrayVector chunks = {
4163+
std::make_shared<DictionaryArray>(dict_type, test_indices[0], test_dictionary),
4164+
std::make_shared<DictionaryArray>(dict_type, test_indices[1], test_dictionary),
4165+
};
4166+
std::shared_ptr<ChunkedArray> arr = std::make_shared<ChunkedArray>(chunks, dict_type);
4167+
table = ::arrow::Table::Make(schema, {arr});
4168+
}
4169+
4170+
std::shared_ptr<::arrow::ResizableBuffer> serialized_data = AllocateBuffer();
4171+
auto out_stream = std::make_shared<::arrow::io::BufferOutputStream>(serialized_data);
4172+
{
4173+
// Will write data as two row groups, one with 9 rows and one with 3.
4174+
std::shared_ptr<WriterProperties> writer_properties =
4175+
WriterProperties::Builder()
4176+
.max_row_group_length(9)
4177+
->data_page_version(this->GetParquetDataPageVersion())
4178+
->write_batch_size(3)
4179+
->data_pagesize(3)
4180+
->build();
4181+
std::unique_ptr<FileWriter> writer;
4182+
ASSERT_OK_AND_ASSIGN(
4183+
writer, FileWriter::Open(*schema, ::arrow::default_memory_pool(), out_stream,
4184+
writer_properties, default_arrow_writer_properties()));
4185+
ASSERT_OK(writer->WriteTable(*table, std::numeric_limits<int64_t>::max()));
4186+
ASSERT_OK(writer->Close());
4187+
ASSERT_OK(out_stream->Close());
4188+
}
4189+
4190+
auto buffer_reader = std::make_shared<::arrow::io::BufferReader>(serialized_data);
4191+
std::unique_ptr<ParquetFileReader> parquet_reader =
4192+
ParquetFileReader::Open(std::move(buffer_reader));
4193+
// Check row group statistics
4194+
std::shared_ptr<FileMetaData> metadata = parquet_reader->metadata();
4195+
ASSERT_EQ(metadata->num_row_groups(), 2);
4196+
ASSERT_EQ(metadata->RowGroup(0)->num_rows(), 9);
4197+
ASSERT_EQ(metadata->RowGroup(1)->num_rows(), 3);
4198+
auto stats0 = metadata->RowGroup(0)->ColumnChunk(0)->statistics();
4199+
auto stats1 = metadata->RowGroup(1)->ColumnChunk(0)->statistics();
4200+
ASSERT_EQ(stats0->num_values(), 6);
4201+
ASSERT_EQ(stats1->num_values(), 2);
4202+
ASSERT_EQ(stats0->null_count(), 3);
4203+
ASSERT_EQ(stats1->null_count(), 1);
4204+
ASSERT_EQ(stats0->EncodeMin(), "a");
4205+
ASSERT_EQ(stats1->EncodeMin(), "b");
4206+
ASSERT_EQ(stats0->EncodeMax(), "b");
4207+
ASSERT_EQ(stats1->EncodeMax(), "c");
4208+
}
4209+
41414210
// ----------------------------------------------------------------------
41424211
// Tests for directly reading DictionaryArray
41434212

cpp/src/parquet/column_writer.cc

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,6 +1479,43 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
14791479
value_offset += batch_num_spaced_values;
14801480
};
14811481

1482+
auto update_stats = [&]() {
1483+
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
1484+
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
1485+
// of the values array to a (probably not smaller) referenced values array.
1486+
//
1487+
// Once the MinMax kernel supports all data types we should use that kernel instead
1488+
// as it does not make any copies.
1489+
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
1490+
exec_ctx.set_use_threads(false);
1491+
1492+
std::shared_ptr<::arrow::Array> referenced_dictionary;
1493+
// If dictionary is the same dictionary we already have, just use that
1494+
if (preserved_dictionary_ && preserved_dictionary_ == dictionary) {
1495+
referenced_dictionary = preserved_dictionary_;
1496+
} else {
1497+
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
1498+
::arrow::compute::Unique(*indices, &exec_ctx));
1499+
1500+
// On first run, we might be able to re-use the existing dictionary
1501+
if (referenced_indices.length() == dictionary->length()) {
1502+
referenced_dictionary = dictionary;
1503+
} else {
1504+
PARQUET_ASSIGN_OR_THROW(
1505+
::arrow::Datum referenced_dictionary_datum,
1506+
::arrow::compute::Take(dictionary, referenced_indices,
1507+
::arrow::compute::TakeOptions(/*boundscheck=*/false),
1508+
&exec_ctx));
1509+
referenced_dictionary = referenced_dictionary_datum.make_array();
1510+
}
1511+
}
1512+
1513+
int64_t non_null_count = indices->length() - indices->null_count();
1514+
page_statistics_->IncrementNullCount(num_levels - non_null_count);
1515+
page_statistics_->IncrementNumValues(non_null_count);
1516+
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
1517+
};
1518+
14821519
// Handle seeing dictionary for the first time
14831520
if (!preserved_dictionary_) {
14841521
// It's a new dictionary. Call PutDictionary and keep track of it
@@ -1493,37 +1530,18 @@ Status TypedColumnWriterImpl<DType>::WriteArrowDictionary(
14931530
}
14941531

14951532
if (page_statistics_ != nullptr) {
1496-
// TODO(PARQUET-2068) This approach may make two copies. First, a copy of the
1497-
// indices array to a (hopefully smaller) referenced indices array. Second, a copy
1498-
// of the values array to a (probably not smaller) referenced values array.
1499-
//
1500-
// Once the MinMax kernel supports all data types we should use that kernel instead
1501-
// as it does not make any copies.
1502-
::arrow::compute::ExecContext exec_ctx(ctx->memory_pool);
1503-
exec_ctx.set_use_threads(false);
1504-
PARQUET_ASSIGN_OR_THROW(::arrow::Datum referenced_indices,
1505-
::arrow::compute::Unique(*indices, &exec_ctx));
1506-
std::shared_ptr<::arrow::Array> referenced_dictionary;
1507-
if (referenced_indices.length() == dictionary->length()) {
1508-
referenced_dictionary = dictionary;
1509-
} else {
1510-
PARQUET_ASSIGN_OR_THROW(
1511-
::arrow::Datum referenced_dictionary_datum,
1512-
::arrow::compute::Take(dictionary, referenced_indices,
1513-
::arrow::compute::TakeOptions(/*boundscheck=*/false),
1514-
&exec_ctx));
1515-
referenced_dictionary = referenced_dictionary_datum.make_array();
1516-
}
1517-
int64_t non_null_count = indices->length() - indices->null_count();
1518-
page_statistics_->IncrementNullCount(num_levels - non_null_count);
1519-
page_statistics_->IncrementNumValues(non_null_count);
1520-
page_statistics_->Update(*referenced_dictionary, /*update_counts=*/false);
1533+
update_stats();
15211534
}
15221535
preserved_dictionary_ = dictionary;
15231536
} else if (!dictionary->Equals(*preserved_dictionary_)) {
15241537
// Dictionary has changed
15251538
PARQUET_CATCH_NOT_OK(FallbackToPlainEncoding());
15261539
return WriteDense();
1540+
} else {
1541+
// Dictionary is same, but we need to update stats
1542+
if (page_statistics_ != nullptr) {
1543+
update_stats();
1544+
}
15271545
}
15281546

15291547
PARQUET_CATCH_NOT_OK(

0 commit comments

Comments
 (0)