Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 107 additions & 79 deletions cpp/src/parquet/arrow/record_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace BitUtil = ::arrow::BitUtil;

// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
static bool IsDictionaryIndexEncoding(Encoding::type e) {
return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
}

Expand Down Expand Up @@ -86,7 +86,7 @@ class RecordReader::RecordReaderImpl {

virtual ~RecordReaderImpl() = default;

virtual int64_t ReadRecordData(const int64_t num_records) = 0;
virtual int64_t ReadRecordData(int64_t num_records) = 0;

// Returns true if there are still values in this column.
bool HasNext() {
Expand Down Expand Up @@ -494,7 +494,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {
}

// Return number of logical records read
int64_t ReadRecordData(const int64_t num_records) override {
int64_t ReadRecordData(int64_t num_records) override {
// Conservative upper bound
const int64_t possible_num_values =
std::max(num_records, levels_written_ - levels_position_);
Expand Down Expand Up @@ -580,6 +580,13 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl {

DecoderType* current_decoder_;

// Initialize repetition and definition level decoders on the next data page.
int64_t InitializeLevelDecoders(const DataPage& page,
Encoding::type repetition_level_encoding,
Encoding::type definition_level_encoding);

void InitializeDataDecoder(const DataPage& page, int64_t levels_bytes);

// Advance to the next data page
bool ReadNewPage() override;

Expand Down Expand Up @@ -717,11 +724,94 @@ inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage*
DCHECK(current_decoder_);
}

// If the data page includes repetition and definition levels, we
// initialize the level decoders and return the number of encoded level bytes.
// The return value helps determine the number of bytes in the encoded data.
template <typename DType>
int64_t TypedRecordReader<DType>::InitializeLevelDecoders(
const DataPage& page, Encoding::type repetition_level_encoding,
Encoding::type definition_level_encoding) {
// Read a data page.
num_buffered_values_ = page.num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

const uint8_t* buffer = page.data();
int64_t levels_byte_size = 0;

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
repetition_level_encoding, descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
levels_byte_size += rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
definition_level_encoding, descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
levels_byte_size += def_levels_bytes;
}

return levels_byte_size;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
template <typename DType>
void TypedRecordReader<DType>::InitializeDataDecoder(const DataPage& page,
int64_t levels_byte_size) {
const uint8_t* buffer = page.data() + levels_byte_size;
const int64_t data_size = page.size() - levels_byte_size;

Encoding::type encoding = page.encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
DCHECK(it->second.get() != nullptr);
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
}

template <typename DType>
bool TypedRecordReader<DType>::ReadNewPage() {
// Loop until we find the next data page.
const uint8_t* buffer;

while (true) {
current_page_ = pager_->NextPage();
if (!current_page_) {
Expand All @@ -733,80 +823,18 @@ bool TypedRecordReader<DType>::ReadNewPage() {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
} else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());

// Read a data page.
num_buffered_values_ = page->num_values();

// Have not decoded any values from the data page yet
num_decoded_values_ = 0;

buffer = page->data();

// If the data page includes repetition and definition levels, we
// initialize the level decoder and subtract the encoded level bytes from
// the page size to determine the number of bytes in the encoded data.
int64_t data_size = page->size();

// Data page Layout: Repetition Levels - Definition Levels - encoded values.
// Levels are encoded as rle or bit-packed.
// Init repetition levels
if (descr_->max_repetition_level() > 0) {
int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
page->repetition_level_encoding(), descr_->max_repetition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += rep_levels_bytes;
data_size -= rep_levels_bytes;
}
// TODO figure a way to set max_definition_level_ to 0
// if the initial value is invalid

// Init definition levels
if (descr_->max_definition_level() > 0) {
int64_t def_levels_bytes = definition_level_decoder_.SetData(
page->definition_level_encoding(), descr_->max_definition_level(),
static_cast<int>(num_buffered_values_), buffer);
buffer += def_levels_bytes;
data_size -= def_levels_bytes;
}

// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
Encoding::type encoding = page->encoding();

if (IsDictionaryIndexEncoding(encoding)) {
encoding = Encoding::RLE_DICTIONARY;
}

auto it = decoders_.find(static_cast<int>(encoding));
if (it != decoders_.end()) {
DCHECK(it->second.get() != nullptr);
if (encoding == Encoding::RLE_DICTIONARY) {
DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
}
current_decoder_ = it->second.get();
} else {
switch (encoding) {
case Encoding::PLAIN: {
auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
current_decoder_ = decoder.get();
decoders_[static_cast<int>(encoding)] = std::move(decoder);
break;
}
case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");

case Encoding::DELTA_BINARY_PACKED:
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");

default:
throw ParquetException("Unknown encoding type.");
}
}
current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
static_cast<int>(data_size));
const auto page = std::static_pointer_cast<DataPageV1>(current_page_);
const int64_t levels_byte_size = InitializeLevelDecoders(
*page, page->repetition_level_encoding(), page->definition_level_encoding());
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else if (current_page_->type() == PageType::DATA_PAGE_V2) {
const auto page = std::static_pointer_cast<DataPageV2>(current_page_);
// Repetition and definition levels are always encoded using RLE encoding
// in the DataPageV2 format.
const int64_t levels_byte_size =
InitializeLevelDecoders(*page, Encoding::RLE, Encoding::RLE);
InitializeDataDecoder(*page, levels_byte_size);
return true;
} else {
// We don't know what this page type is. We're allowed to skip non-data
Expand Down
53 changes: 27 additions & 26 deletions cpp/src/parquet/column_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,45 +59,54 @@ class Page {
PageType::type type_;
};

/// \brief Base type for DataPageV1 and DataPageV2 including common attributes
class DataPage : public Page {
public:
DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding,
int32_t num_values() const { return num_values_; }
Encoding::type encoding() const { return encoding_; }
const EncodedStatistics& statistics() const { return statistics_; }

protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding,
const EncodedStatistics& statistics = EncodedStatistics())
: Page(buffer, PageType::DATA_PAGE),
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding),
statistics_(statistics) {}

int32_t num_values() const { return num_values_; }
int32_t num_values_;
Encoding::type encoding_;
EncodedStatistics statistics_;
};

Encoding::type encoding() const { return encoding_; }
class DataPageV1 : public DataPage {
public:
DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding,
const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, statistics),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}

Encoding::type repetition_level_encoding() const { return repetition_level_encoding_; }

Encoding::type definition_level_encoding() const { return definition_level_encoding_; }

const EncodedStatistics& statistics() const { return statistics_; }

private:
int32_t num_values_;
Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;
EncodedStatistics statistics_;
};

class CompressedDataPage : public DataPage {
class CompressedDataPage : public DataPageV1 {
public:
CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t uncompressed_size,
const EncodedStatistics& statistics = EncodedStatistics())
: DataPage(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding, statistics),
: DataPageV1(buffer, num_values, encoding, definition_level_encoding,
repetition_level_encoding, statistics),
uncompressed_size_(uncompressed_size) {}

int64_t uncompressed_size() const { return uncompressed_size_; }
Expand All @@ -106,40 +115,32 @@ class CompressedDataPage : public DataPage {
int64_t uncompressed_size_;
};

class DataPageV2 : public Page {
class DataPageV2 : public DataPage {
public:
DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t repetition_levels_byte_length,
bool is_compressed = false)
: Page(buffer, PageType::DATA_PAGE_V2),
num_values_(num_values),
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding),
num_nulls_(num_nulls),
num_rows_(num_rows),
encoding_(encoding),
definition_levels_byte_length_(definition_levels_byte_length),
repetition_levels_byte_length_(repetition_levels_byte_length),
is_compressed_(is_compressed) {}

int32_t num_values() const { return num_values_; }

int32_t num_nulls() const { return num_nulls_; }

int32_t num_rows() const { return num_rows_; }

Encoding::type encoding() const { return encoding_; }

int32_t definition_levels_byte_length() const { return definition_levels_byte_length_; }

int32_t repetition_levels_byte_length() const { return repetition_levels_byte_length_; }

bool is_compressed() const { return is_compressed_; }

private:
int32_t num_values_;
int32_t num_nulls_;
int32_t num_rows_;
Encoding::type encoding_;
int32_t definition_levels_byte_length_;
int32_t repetition_levels_byte_length_;
bool is_compressed_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {

shared_ptr<DictionaryPage> dict_page =
std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
shared_ptr<DataPageV1> data_page = MakeDataPage<Int32Type>(
&descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
pages_.push_back(dict_page);
pages_.push_back(data_page);
Expand Down
Loading