Skip to content

Commit 9df3222

Browse files
rdmellowesm
authored andcommitted
PARQUET-1482: [C++] Add branch to TypedRecordReader::ReadNewPage for PageType::DATA_PAGE_V2 to address incompatibility with parquetjs.
Tests This commit doesn't include tests; I am working on them now. I may need to use an actual file generated by parquetjs to test this issue, so I wonder if adding feeds1kMicros.parquet from the JIRA task to the parquet-testing repository is an option. Description parquetjs seems to be writing Parquet V2 files with DataPageV2 pages, while parquet-cpp writes Parquet V2 files with DataPage pages. Since TypedRecordReader::ReadNewPage() only had a branch for PageType::DATA_PAGE, the reader would return without reading any data for records that have DATA_PAGE_V2 pages. This explains the behavior observed in PARQUET-1482. This commit adds a new if-else branch for the DataPageV2 case in TypedRecordReader::ReadNewPage(). Since the DataPageV2 branch needed to reuse the code from the DataPage case, I refactored the repetition/definition level decoder initialization and the data decoder initialization to two new methods in the TypedRecordReader class. These new methods are now called by the DataPage and DataPageV2 initialization branches in TypedRecordReader::ReadNewPage(). There is an alternate implementation possible (with a smaller diff) by sharing the same else-if branch between DataPage and DataPageV2 using a pointer-to-derived shared_ptr<Page>. However, since the Page superclass doesn't have the necessary encoding() or num_values() methods, I would need to add a common superclass to both DataPage and DataPageV2 that defined these methods. I didn't do this because I was hesitant to modify the Page class hierarchy for this commit.
1 parent b77b662 commit 9df3222

1 file changed

Lines changed: 109 additions & 76 deletions

File tree

cpp/src/parquet/arrow/record_reader.cc

Lines changed: 109 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,16 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
580580

581581
DecoderType* current_decoder_;
582582

583+
// Initialize repetition and definition level decoders on the next data page.
584+
template <typename PageType>
585+
int64_t InitializeLevelDecoders(const std::shared_ptr<PageType> page,
586+
const Encoding::type repetition_level_encoding,
587+
const Encoding::type definition_level_encoding);
588+
589+
template <typename PageType>
590+
void InitializeDataDecoder(const std::shared_ptr<PageType> page,
591+
const int64_t levels_bytes);
592+
583593
// Advance to the next data page
584594
bool ReadNewPage() override;
585595

@@ -717,11 +727,96 @@ inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
717727
DCHECK(current_decoder_);
718728
}
719729

730+
// If the data page includes repetition and definition levels, we
731+
// initialize the level decoders and return the number of encoded level bytes.
732+
// The return value helps determine the number of bytes in the encoded data.
733+
template <typename DType>
734+
template <typename PageType>
735+
int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
736+
const std::shared_ptr<PageType> page, const Encoding::type repetition_level_encoding,
737+
const Encoding::type definition_level_encoding) {
738+
// Read a data page.
739+
num_buffered_values_ = page->num_values();
740+
741+
// Have not decoded any values from the data page yet
742+
num_decoded_values_ = 0;
743+
744+
const uint8_t* buffer = page->data();
745+
int64_t levels_byte_size = 0;
746+
747+
// Data page Layout: Repetition Levels - Definition Levels - encoded values.
748+
// Levels are encoded as rle or bit-packed.
749+
// Init repetition levels
750+
if (descr_->max_repetition_level() > 0) {
751+
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
752+
repetition_level_encoding, descr_->max_repetition_level(),
753+
static_cast<int>(num_buffered_values_), buffer);
754+
buffer += rep_levels_bytes;
755+
levels_byte_size += rep_levels_bytes;
756+
}
757+
// TODO figure a way to set max_definition_level_ to 0
758+
// if the initial value is invalid
759+
760+
// Init definition levels
761+
if (descr_->max_definition_level() > 0) {
762+
int64_t def_levels_bytes = definition_level_decoder_.SetData(
763+
definition_level_encoding, descr_->max_definition_level(),
764+
static_cast<int>(num_buffered_values_), buffer);
765+
levels_byte_size += def_levels_bytes;
766+
}
767+
768+
return levels_byte_size;
769+
}
770+
771+
// Get a decoder object for this page or create a new decoder if this is the
772+
// first page with this encoding.
773+
template <typename DType>
774+
template <typename PageType>
775+
void TypedRecordReader<DType>::InitializeDataDecoder(const std::shared_ptr<PageType> page,
776+
const int64_t levels_byte_size) {
777+
const uint8_t* buffer = page->data() + levels_byte_size;
778+
const int64_t data_size = page->size() - levels_byte_size;
779+
780+
Encoding::type encoding = page->encoding();
781+
782+
if (IsDictionaryIndexEncoding(encoding)) {
783+
encoding = Encoding::RLE_DICTIONARY;
784+
}
785+
786+
auto it = decoders_.find(static_cast<int>(encoding));
787+
if (it != decoders_.end()) {
788+
DCHECK(it->second.get() != nullptr);
789+
if (encoding == Encoding::RLE_DICTIONARY) {
790+
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
791+
}
792+
current_decoder_ = it->second.get();
793+
} else {
794+
switch (encoding) {
795+
case Encoding::PLAIN: {
796+
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
797+
current_decoder_ = decoder.get();
798+
decoders_[static_cast<int>(encoding)] = std::move(decoder);
799+
break;
800+
}
801+
case Encoding::RLE_DICTIONARY:
802+
throw ParquetException("Dictionary page must be before data page.");
803+
804+
case Encoding::DELTA_BINARY_PACKED:
805+
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
806+
case Encoding::DELTA_BYTE_ARRAY:
807+
ParquetException::NYI("Unsupported encoding");
808+
809+
default:
810+
throw ParquetException("Unknown encoding type.");
811+
}
812+
}
813+
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
814+
static_cast<int>(data_size));
815+
}
816+
720817
template <typename DType>
721818
bool TypedRecordReader<DType>::ReadNewPage() {
722819
// Loop until we find the next data page.
723-
const uint8_t* buffer;
724-
725820
while (true) {
726821
current_page_ = pager_->NextPage();
727822
if (!current_page_) {
@@ -733,80 +828,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
733828
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
734829
continue;
735830
} else if (current_page_->type() == PageType::DATA_PAGE) {
736-
const DataPage* page = static_cast<const DataPage*>(current_page_.get());
737-
738-
// Read a data page.
739-
num_buffered_values_ = page->num_values();
740-
741-
// Have not decoded any values from the data page yet
742-
num_decoded_values_ = 0;
743-
744-
buffer = page->data();
745-
746-
// If the data page includes repetition and definition levels, we
747-
// initialize the level decoder and subtract the encoded level bytes from
748-
// the page size to determine the number of bytes in the encoded data.
749-
int64_t data_size = page->size();
750-
751-
// Data page Layout: Repetition Levels - Definition Levels - encoded values.
752-
// Levels are encoded as rle or bit-packed.
753-
// Init repetition levels
754-
if (descr_->max_repetition_level() > 0) {
755-
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
756-
page->repetition_level_encoding(), descr_->max_repetition_level(),
757-
static_cast<int>(num_buffered_values_), buffer);
758-
buffer += rep_levels_bytes;
759-
data_size -= rep_levels_bytes;
760-
}
761-
// TODO figure a way to set max_definition_level_ to 0
762-
// if the initial value is invalid
763-
764-
// Init definition levels
765-
if (descr_->max_definition_level() > 0) {
766-
int64_t def_levels_bytes = definition_level_decoder_.SetData(
767-
page->definition_level_encoding(), descr_->max_definition_level(),
768-
static_cast<int>(num_buffered_values_), buffer);
769-
buffer += def_levels_bytes;
770-
data_size -= def_levels_bytes;
771-
}
772-
773-
// Get a decoder object for this page or create a new decoder if this is the
774-
// first page with this encoding.
775-
Encoding::type encoding = page->encoding();
776-
777-
if (IsDictionaryIndexEncoding(encoding)) {
778-
encoding = Encoding::RLE_DICTIONARY;
779-
}
780-
781-
auto it = decoders_.find(static_cast<int>(encoding));
782-
if (it != decoders_.end()) {
783-
DCHECK(it->second.get() != nullptr);
784-
if (encoding == Encoding::RLE_DICTIONARY) {
785-
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
786-
}
787-
current_decoder_ = it->second.get();
788-
} else {
789-
switch (encoding) {
790-
case Encoding::PLAIN: {
791-
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
792-
current_decoder_ = decoder.get();
793-
decoders_[static_cast<int>(encoding)] = std::move(decoder);
794-
break;
795-
}
796-
case Encoding::RLE_DICTIONARY:
797-
throw ParquetException("Dictionary page must be before data page.");
798-
799-
case Encoding::DELTA_BINARY_PACKED:
800-
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
801-
case Encoding::DELTA_BYTE_ARRAY:
802-
ParquetException::NYI("Unsupported encoding");
803-
804-
default:
805-
throw ParquetException("Unknown encoding type.");
806-
}
807-
}
808-
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
809-
static_cast<int>(data_size));
831+
const auto page = std::static_pointer_cast<DataPage>(current_page_);
832+
const int64_t levels_byte_size = InitializeLevelDecoders(
833+
page, page->repetition_level_encoding(), page->definition_level_encoding());
834+
InitializeDataDecoder(page, levels_byte_size);
835+
return true;
836+
} else if (current_page_->type() == PageType::DATA_PAGE_V2) {
837+
const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
838+
// Repetition and definition levels are always encoded using RLE encoding
839+
// in the DataPageV2 format.
840+
const int64_t levels_byte_size =
841+
InitializeLevelDecoders(page, Encoding::RLE, Encoding::RLE);
842+
InitializeDataDecoder(page, levels_byte_size);
810843
return true;
811844
} else {
812845
// We don't know what this page type is. We're allowed to skip non-data

0 commit comments

Comments
 (0)