Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
461 changes: 305 additions & 156 deletions cpp/src/arrow/parquet/parquet-io-test.cc

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion cpp/src/arrow/parquet/parquet-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ class TestConvertArrowSchema : public ::testing::Test {

Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
arrow_schema_ = std::make_shared<Schema>(fields);
return ToParquetSchema(arrow_schema_.get(), &result_schema_);
std::shared_ptr<::parquet::WriterProperties> properties =
::parquet::default_writer_properties();
return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_);
}

protected:
Expand Down
160 changes: 145 additions & 15 deletions cpp/src/arrow/parquet/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/parquet/reader.h"

#include <algorithm>
#include <queue>
#include <string>
#include <vector>
Expand All @@ -27,6 +28,7 @@
#include "arrow/schema.h"
#include "arrow/table.h"
#include "arrow/types/primitive.h"
#include "arrow/types/string.h"
#include "arrow/util/status.h"

using parquet::ColumnReader;
Expand All @@ -36,6 +38,19 @@ using parquet::TypedColumnReader;
namespace arrow {
namespace parquet {

template <typename ArrowType>
struct ArrowTypeTraits {
typedef NumericBuilder<ArrowType> builder_type;
};

template <>
struct ArrowTypeTraits<BooleanType> {
typedef BooleanBuilder builder_type;
};

template <typename ArrowType>
using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;

class FileReader::Impl {
public:
Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);
Expand All @@ -61,9 +76,45 @@ class FlatColumnReader::Impl {
template <typename ArrowType, typename ParquetType>
Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);

template <typename ArrowType, typename ParquetType>
Status ReadNullableFlatBatch(const int16_t* def_levels,
typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
BuilderType<ArrowType>* builder);
template <typename ArrowType, typename ParquetType>
Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
BuilderType<ArrowType>* builder);

private:
void NextRowGroup();

template <typename InType, typename OutType>
struct can_copy_ptr {
static constexpr bool value =
std::is_same<InType, OutType>::value ||
(std::is_integral<InType>{} && std::is_integral<OutType>{} &&
(sizeof(InType) == sizeof(OutType)));
};

template <typename InType, typename OutType,
typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
Status ConvertPhysicalType(
const InType* in_ptr, int64_t length, const OutType** out_ptr) {
*out_ptr = reinterpret_cast<const OutType*>(in_ptr);
return Status::OK();
}

template <typename InType, typename OutType,
typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
Status ConvertPhysicalType(
const InType* in_ptr, int64_t length, const OutType** out_ptr) {
RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
OutType* mutable_out_ptr =
reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
*out_ptr = mutable_out_ptr;
return Status::OK();
}

MemoryPool* pool_;
const ::parquet::ColumnDescriptor* descr_;
::parquet::ParquetFileReader* reader_;
Expand Down Expand Up @@ -155,45 +206,116 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor
NextRowGroup();
}

template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
int64_t values_read, BuilderType<ArrowType>* builder) {
using ArrowCType = typename ArrowType::c_type;
using ParquetCType = typename ParquetType::c_type;

DCHECK(builder);
const ArrowCType* values_ptr;
RETURN_NOT_OK(
(ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
RETURN_NOT_OK(builder->Append(values_ptr, values_read));
return Status::OK();
}

template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
BuilderType<ArrowType>* builder) {
using ArrowCType = typename ArrowType::c_type;

DCHECK(builder);
RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
int values_idx = 0;
for (int64_t i = 0; i < levels_read; i++) {
if (def_levels[i] < descr_->max_definition_level()) {
valid_bytes[i] = 0;
} else {
valid_bytes[i] = 1;
values_ptr[i] = values[values_idx++];
}
}
RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
return Status::OK();
}

template <typename ArrowType, typename ParquetType>
Status FlatColumnReader::Impl::TypedReadBatch(
int batch_size, std::shared_ptr<Array>* out) {
using ParquetCType = typename ParquetType::c_type;

int values_to_read = batch_size;
NumericBuilder<ArrowType> builder(pool_, field_->type);
BuilderType<ArrowType> builder(pool_, field_->type);
while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type));
values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
int64_t values_read;
int64_t levels_read;
int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
auto values =
reinterpret_cast<typename ParquetType::c_type*>(values_buffer_.mutable_data());
auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
values_to_read, def_levels, nullptr, values, &values_read));
values_to_read -= levels_read;
if (descr_->max_definition_level() == 0) {
RETURN_NOT_OK(builder.Append(values, values_read));
RETURN_NOT_OK(
(ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
} else {
// As per the defintion and checks for flat columns:
// descr_->max_definition_level() == 1
RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
def_levels, values, values_read, levels_read, &builder)));
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}
*out = builder.Finish();
return Status::OK();
}

template <>
Status FlatColumnReader::Impl::TypedReadBatch<StringType, ::parquet::ByteArrayType>(
int batch_size, std::shared_ptr<Array>* out) {
int values_to_read = batch_size;
StringBuilder builder(pool_, field_->type);
while ((values_to_read > 0) && column_reader_) {
values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray));
if (descr_->max_definition_level() > 0) {
def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
}
auto reader =
dynamic_cast<TypedColumnReader<::parquet::ByteArrayType>*>(column_reader_.get());
int64_t values_read;
int64_t levels_read;
int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data());
PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
values_to_read, def_levels, nullptr, values, &values_read));
values_to_read -= levels_read;
if (descr_->max_definition_level() == 0) {
for (int64_t i = 0; i < levels_read; i++) {
RETURN_NOT_OK(
builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
}
} else {
// descr_->max_definition_level() == 1
RETURN_NOT_OK(values_builder_buffer_.Resize(
levels_read * sizeof(typename ParquetType::c_type)));
RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
auto values_ptr = reinterpret_cast<typename ParquetType::c_type*>(
values_builder_buffer_.mutable_data());
uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
int values_idx = 0;
for (int64_t i = 0; i < levels_read; i++) {
if (def_levels[i] < descr_->max_definition_level()) {
valid_bytes[i] = 0;
RETURN_NOT_OK(builder.AppendNull());
} else {
valid_bytes[i] = 1;
values_ptr[i] = values[values_idx++];
RETURN_NOT_OK(
builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
values[values_idx].len));
values_idx++;
}
}
builder.Append(values_ptr, levels_read, valid_bytes);
}
if (!column_reader_->HasNext()) { NextRowGroup(); }
}
Expand All @@ -214,10 +336,18 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
}

switch (field_->type->type) {
TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType)
TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(UINT32, UInt32Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type)
TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
TYPED_BATCH_CASE(STRING, StringType, ::parquet::ByteArrayType)
default:
return Status::NotImplemented(field_->type->ToString());
}
Expand Down
47 changes: 38 additions & 9 deletions cpp/src/arrow/parquet/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ namespace parquet {

const auto BOOL = std::make_shared<BooleanType>();
const auto UINT8 = std::make_shared<UInt8Type>();
const auto INT8 = std::make_shared<Int8Type>();
const auto UINT16 = std::make_shared<UInt16Type>();
const auto INT16 = std::make_shared<Int16Type>();
const auto UINT32 = std::make_shared<UInt32Type>();
const auto INT32 = std::make_shared<Int32Type>();
const auto UINT64 = std::make_shared<UInt64Type>();
const auto INT64 = std::make_shared<Int64Type>();
const auto FLOAT = std::make_shared<FloatType>();
const auto DOUBLE = std::make_shared<DoubleType>();
Expand Down Expand Up @@ -92,6 +97,21 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
case LogicalType::NONE:
*out = INT32;
break;
case LogicalType::UINT_8:
*out = UINT8;
break;
case LogicalType::INT_8:
*out = INT8;
break;
case LogicalType::UINT_16:
*out = UINT16;
break;
case LogicalType::INT_16:
*out = INT16;
break;
case LogicalType::UINT_32:
*out = UINT32;
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
Expand All @@ -107,6 +127,9 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
case LogicalType::NONE:
*out = INT64;
break;
case LogicalType::UINT_64:
*out = UINT64;
break;
case LogicalType::DECIMAL:
*out = MakeDecimalType(node);
break;
Expand Down Expand Up @@ -187,20 +210,21 @@ Status FromParquetSchema(
}

Status StructToNode(const std::shared_ptr<StructType>& type, const std::string& name,
bool nullable, NodePtr* out) {
bool nullable, const ::parquet::WriterProperties& properties, NodePtr* out) {
Repetition::type repetition = Repetition::REQUIRED;
if (nullable) { repetition = Repetition::OPTIONAL; }

std::vector<NodePtr> children(type->num_children());
for (int i = 0; i < type->num_children(); i++) {
RETURN_NOT_OK(FieldToNode(type->child(i), &children[i]));
RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
}

*out = GroupNode::Make(name, repetition, children);
return Status::OK();
}

Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
Status FieldToNode(const std::shared_ptr<Field>& field,
const ::parquet::WriterProperties& properties, NodePtr* out) {
LogicalType::type logical_type = LogicalType::NONE;
ParquetType::type type;
Repetition::type repetition = Repetition::REQUIRED;
Expand Down Expand Up @@ -231,8 +255,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
logical_type = LogicalType::INT_16;
break;
case Type::UINT32:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_32;
if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
type = ParquetType::INT64;
} else {
type = ParquetType::INT32;
logical_type = LogicalType::UINT_32;
}
break;
case Type::INT32:
type = ParquetType::INT32;
Expand Down Expand Up @@ -277,7 +305,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
break;
case Type::STRUCT: {
auto struct_type = std::static_pointer_cast<StructType>(field->type);
return StructToNode(struct_type, field->name, field->nullable, out);
return StructToNode(struct_type, field->name, field->nullable, properties, out);
} break;
default:
// TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
Expand All @@ -287,11 +315,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
return Status::OK();
}

Status ToParquetSchema(
const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) {
Status ToParquetSchema(const Schema* arrow_schema,
const ::parquet::WriterProperties& properties,
std::shared_ptr<::parquet::SchemaDescriptor>* out) {
std::vector<NodePtr> nodes(arrow_schema->num_fields());
for (int i = 0; i < arrow_schema->num_fields(); i++) {
RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i]));
RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
}

NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/arrow/parquet/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>

#include "parquet/api/schema.h"
#include "parquet/api/writer.h"

#include "arrow/schema.h"
#include "arrow/type.h"
Expand All @@ -36,10 +37,12 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr<Field
Status FromParquetSchema(
const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out);

Status FieldToNode(const std::shared_ptr<Field>& field, ::parquet::schema::NodePtr* out);
Status FieldToNode(const std::shared_ptr<Field>& field,
const ::parquet::WriterProperties& properties, ::parquet::schema::NodePtr* out);

Status ToParquetSchema(
const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out);
Status ToParquetSchema(const Schema* arrow_schema,
const ::parquet::WriterProperties& properties,
std::shared_ptr<::parquet::SchemaDescriptor>* out);

} // namespace parquet

Expand Down
Loading