Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/src/arrow/parquet/parquet-io-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ class TestParquetIO : public ::testing::Test {
size_t chunk_size = values.size() / num_chunks;
for (int i = 0; i < num_chunks; i++) {
auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
auto column_writer = static_cast<ParquetWriter<TestType>*>(
row_group_writer->NextColumn());
auto column_writer =
static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
T* data = values.data() + i * chunk_size;
column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
column_writer->Close();
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/parquet/parquet-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
using ParquetType = parquet::Type;
using parquet::LogicalType;
using parquet::Repetition;
using parquet::WriterProperties;
using parquet::schema::NodePtr;
using parquet::schema::GroupNode;
using parquet::schema::PrimitiveNode;
Expand Down Expand Up @@ -183,7 +184,8 @@ class TestConvertArrowSchema : public ::testing::Test {

Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) {
arrow_schema_ = std::make_shared<Schema>(fields);
return ToParquetSchema(arrow_schema_.get(), &result_schema_);
std::shared_ptr<WriterProperties> properties = ::parquet::default_writer_properties();
return ToParquetSchema(arrow_schema_.get(), properties.get(), &result_schema_);
}

protected:
Expand Down
41 changes: 28 additions & 13 deletions cpp/src/arrow/parquet/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
#include "arrow/types/string.h"
#include "arrow/util/status.h"

using parquet::ParquetVersion;
using parquet::Repetition;
using parquet::WriterProperties;
using parquet::schema::Node;
using parquet::schema::NodePtr;
using parquet::schema::GroupNode;
Expand Down Expand Up @@ -187,20 +189,21 @@ Status FromParquetSchema(
}

Status StructToNode(const std::shared_ptr<StructType>& type, const std::string& name,
bool nullable, NodePtr* out) {
bool nullable, WriterProperties* properties, NodePtr* out) {
Repetition::type repetition = Repetition::REQUIRED;
if (nullable) { repetition = Repetition::OPTIONAL; }

std::vector<NodePtr> children(type->num_children());
for (int i = 0; i < type->num_children(); i++) {
RETURN_NOT_OK(FieldToNode(type->child(i), &children[i]));
RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
}

*out = GroupNode::Make(name, repetition, children);
return Status::OK();
}

Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
Status FieldToNode(
const std::shared_ptr<Field>& field, WriterProperties* properties, NodePtr* out) {
LogicalType::type logical_type = LogicalType::NONE;
ParquetType::type type;
Repetition::type repetition = Repetition::REQUIRED;
Expand All @@ -216,30 +219,42 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
break;
case Type::UINT8:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_8;
if (properties->version() == ParquetVersion::PARQUET_2_0) {
logical_type = LogicalType::UINT_8;
}
break;
case Type::INT8:
type = ParquetType::INT32;
logical_type = LogicalType::INT_8;
if (properties->version() == ParquetVersion::PARQUET_2_0) {
logical_type = LogicalType::INT_8;
}
break;
case Type::UINT16:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_16;
if (properties->version() == ParquetVersion::PARQUET_2_0) {
logical_type = LogicalType::UINT_16;
}
break;
case Type::INT16:
type = ParquetType::INT32;
logical_type = LogicalType::INT_16;
if (properties->version() == ParquetVersion::PARQUET_2_0) {
logical_type = LogicalType::INT_16;
}
break;
case Type::UINT32:
type = ParquetType::INT32;
logical_type = LogicalType::UINT_32;
if (properties->version() == ParquetVersion::PARQUET_2_0) {
logical_type = LogicalType::UINT_32;
}
break;
case Type::INT32:
type = ParquetType::INT32;
break;
case Type::UINT64:
type = ParquetType::INT64;
logical_type = LogicalType::UINT_64;
if (properties->version() == ParquetVersion::PARQUET_2_0) {
logical_type = LogicalType::UINT_64;
}
break;
case Type::INT64:
type = ParquetType::INT64;
Expand Down Expand Up @@ -277,7 +292,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
break;
case Type::STRUCT: {
auto struct_type = std::static_pointer_cast<StructType>(field->type);
return StructToNode(struct_type, field->name, field->nullable, out);
return StructToNode(struct_type, field->name, field->nullable, properties, out);
} break;
default:
// TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
Expand All @@ -287,11 +302,11 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) {
return Status::OK();
}

Status ToParquetSchema(
const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) {
Status ToParquetSchema(const Schema* arrow_schema, WriterProperties* properties,
std::shared_ptr<::parquet::SchemaDescriptor>* out) {
std::vector<NodePtr> nodes(arrow_schema->num_fields());
for (int i = 0; i < arrow_schema->num_fields(); i++) {
RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i]));
RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
}

NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
Expand Down
9 changes: 6 additions & 3 deletions cpp/src/arrow/parquet/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <memory>

#include "parquet/api/reader.h"
#include "parquet/api/schema.h"

#include "arrow/schema.h"
Expand All @@ -36,10 +37,12 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr<Field
Status FromParquetSchema(
const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out);

Status FieldToNode(const std::shared_ptr<Field>& field, ::parquet::schema::NodePtr* out);
Status FieldToNode(const std::shared_ptr<Field>& field,
::parquet::WriterProperties* properties, ::parquet::schema::NodePtr* out);

Status ToParquetSchema(
const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out);
Status ToParquetSchema(const Schema* arrow_schema,
::parquet::WriterProperties* properties,
std::shared_ptr<::parquet::SchemaDescriptor>* out);

} // namespace parquet

Expand Down
15 changes: 9 additions & 6 deletions cpp/src/arrow/parquet/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@

#include "arrow/array.h"
#include "arrow/column.h"
#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/table.h"
#include "arrow/types/construct.h"
#include "arrow/types/primitive.h"
#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/util/status.h"

using parquet::ParquetFileWriter;
using parquet::WriterProperties;
using parquet::schema::GroupNode;

namespace arrow {
Expand Down Expand Up @@ -165,12 +166,14 @@ MemoryPool* FileWriter::memory_pool() const {
FileWriter::~FileWriter() {}

Status WriteFlatTable(const Table* table, MemoryPool* pool,
std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) {
std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties) {
std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema;
RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema));
RETURN_NOT_OK(
ToParquetSchema(table->schema().get(), properties.get(), &parquet_schema));
auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema());
std::unique_ptr<ParquetFileWriter> parquet_writer =
ParquetFileWriter::Open(sink, schema_node);
std::unique_ptr<ParquetFileWriter> parquet_writer = ParquetFileWriter::Open(
sink, schema_node, ::parquet::default_allocator(), properties);
FileWriter writer(pool, std::move(parquet_writer));

// TODO: Support writing chunked arrays.
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/parquet/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class FileWriter {
* The table shall only consist of nullable, non-repeated columns of primitive type.
*/
Status WriteFlatTable(const Table* table, MemoryPool* pool,
std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size);
std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size,
const std::shared_ptr<::parquet::WriterProperties>& properties =
::parquet::default_writer_properties());

} // namespace parquet

Expand Down