Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/parquet/column/properties-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ TEST(TestReaderProperties, Basics) {
}

TEST(TestWriterProperties, Basics) {
WriterProperties props;
std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();

ASSERT_EQ(DEFAULT_PAGE_SIZE, props.data_pagesize());
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props.dictionary_pagesize());
ASSERT_EQ(DEFAULT_IS_DICTIONARY_ENABLED, props.is_dictionary_enabled());
ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize());
ASSERT_EQ(DEFAULT_IS_DICTIONARY_ENABLED, props->dictionary_enabled());
ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
}

} // namespace test
Expand Down
94 changes: 75 additions & 19 deletions src/parquet/column/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

namespace parquet {

struct ParquetVersion {
enum type { PARQUET_1_0, PARQUET_2_0 };
};

static int64_t DEFAULT_BUFFER_SIZE = 0;
static bool DEFAULT_USE_BUFFERED_STREAM = false;

Expand Down Expand Up @@ -72,42 +76,94 @@ ReaderProperties default_reader_properties();
static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
ParquetVersion::PARQUET_1_0;

class WriterProperties {
public:
explicit WriterProperties(MemoryAllocator* allocator = default_allocator())
: allocator_(allocator) {
pagesize_ = DEFAULT_PAGE_SIZE;
dictionary_pagesize_ = DEFAULT_DICTIONARY_PAGE_SIZE;
dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED;
}
class Builder {
public:
Builder()
: allocator_(default_allocator()),
dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED),
dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
pagesize_(DEFAULT_PAGE_SIZE),
version_(DEFAULT_WRITER_VERSION) {}
virtual ~Builder() {}

Builder* allocator(MemoryAllocator* allocator) {
allocator_ = allocator;
return this;
}

int64_t dictionary_pagesize() const { return dictionary_pagesize_; }
Builder* dictionary_pagesize(int64_t dictionary_psize) {
dictionary_pagesize_ = dictionary_psize;
return this;
}

void set_dictionary_pagesize(int64_t dictionary_psize) {
dictionary_pagesize_ = dictionary_psize;
}
Builder* data_pagesize(int64_t pg_size) {
pagesize_ = pg_size;
return this;
}

int64_t data_pagesize() const { return pagesize_; }
Builder* enable_dictionary() {
dictionary_enabled_ = true;
return this;
}

void set_data_pagesize(int64_t pg_size) { pagesize_ = pg_size; }
Builder* disable_dictionary() {
dictionary_enabled_ = false;
return this;
}

void enable_dictionary() { dictionary_enabled_ = true; }
Builder* version(ParquetVersion::type version) {
version_ = version;
return this;
}

void disable_dictionary() { dictionary_enabled_ = false; }
std::shared_ptr<WriterProperties> build() {
return std::shared_ptr<WriterProperties>(new WriterProperties(
allocator_, dictionary_enabled_, dictionary_pagesize_, pagesize_, version_));
}

bool is_dictionary_enabled() const { return dictionary_enabled_; }
private:
MemoryAllocator* allocator_;
bool dictionary_enabled_;
int64_t dictionary_pagesize_;
int64_t pagesize_;
ParquetVersion::type version_;
};

MemoryAllocator* allocator() { return allocator_; }

bool dictionary_enabled() const { return dictionary_enabled_; }

int64_t dictionary_pagesize() const { return dictionary_pagesize_; }

int64_t data_pagesize() const { return pagesize_; }

ParquetVersion::type version() { return parquet_version_; }

private:
int64_t pagesize_;
int64_t dictionary_pagesize_;
bool dictionary_enabled_;
explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled,
int64_t dictionary_pagesize, int64_t pagesize, ParquetVersion::type version)
: allocator_(allocator),
dictionary_enabled_(dictionary_enabled),
dictionary_pagesize_(dictionary_pagesize),
pagesize_(pagesize),
parquet_version_(version) {
pagesize_ = DEFAULT_PAGE_SIZE;
dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED;
}

MemoryAllocator* allocator_;
bool dictionary_enabled_;
int64_t dictionary_pagesize_;
int64_t pagesize_;
ParquetVersion::type parquet_version_;
};

WriterProperties default_writer_properties();
std::shared_ptr<WriterProperties> default_writer_properties();

} // namespace parquet

Expand Down
5 changes: 3 additions & 2 deletions src/parquet/column/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ namespace parquet {
// ----------------------------------------------------------------------
// ColumnWriter

WriterProperties default_writer_properties() {
static WriterProperties default_writer_properties;
std::shared_ptr<WriterProperties> default_writer_properties() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return const std:shared_ptr<T>&, saves extra copy

static std::shared_ptr<WriterProperties> default_writer_properties =
WriterProperties::Builder().build();
return default_writer_properties;
}

Expand Down
14 changes: 10 additions & 4 deletions src/parquet/file/writer-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,9 @@ void RowGroupSerializer::Close() {

std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
MemoryAllocator* allocator) {
MemoryAllocator* allocator, const std::shared_ptr<WriterProperties>& properties) {
std::unique_ptr<ParquetFileWriter::Contents> result(
new FileSerializer(sink, schema, allocator));
new FileSerializer(sink, schema, allocator, properties));

return result;
}
Expand Down Expand Up @@ -200,6 +200,10 @@ int64_t FileSerializer::num_rows() const {
return num_rows_;
}

const std::shared_ptr<WriterProperties>& FileSerializer::properties() const {
return properties_;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return const std::shared_ptr<T>&


RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
if (row_group_writer_) { row_group_writer_->Close(); }
num_rows_ += num_rows;
Expand Down Expand Up @@ -243,12 +247,14 @@ void FileSerializer::WriteMetaData() {
}

FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink,
std::shared_ptr<GroupNode>& schema, MemoryAllocator* allocator = default_allocator())
std::shared_ptr<GroupNode>& schema, MemoryAllocator* allocator,
const std::shared_ptr<WriterProperties>& properties)
: sink_(sink),
allocator_(allocator),
num_row_groups_(0),
num_rows_(0),
is_open_(true) {
is_open_(true),
properties_(properties) {
schema_.Init(schema);
StartFile();
}
Expand Down
9 changes: 7 additions & 2 deletions src/parquet/file/writer-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,15 @@ class FileSerializer : public ParquetFileWriter::Contents {
public:
static std::unique_ptr<ParquetFileWriter::Contents> Open(
std::shared_ptr<OutputStream> sink, std::shared_ptr<schema::GroupNode>& schema,
MemoryAllocator* allocator = default_allocator());
MemoryAllocator* allocator = default_allocator(),
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

void Close() override;

RowGroupWriter* AppendRowGroup(int64_t num_rows) override;

const std::shared_ptr<WriterProperties>& properties() const override;

int num_columns() const override;
int num_row_groups() const override;
int64_t num_rows() const override;
Expand All @@ -124,7 +127,8 @@ class FileSerializer : public ParquetFileWriter::Contents {

private:
explicit FileSerializer(std::shared_ptr<OutputStream> sink,
std::shared_ptr<schema::GroupNode>& schema, MemoryAllocator* allocator);
std::shared_ptr<schema::GroupNode>& schema, MemoryAllocator* allocator,
const std::shared_ptr<WriterProperties>& properties);

std::shared_ptr<OutputStream> sink_;
format::FileMetaData metadata_;
Expand All @@ -134,6 +138,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
int num_rows_;
bool is_open_;
std::unique_ptr<RowGroupWriter> row_group_writer_;
std::shared_ptr<WriterProperties> properties_;

void StartFile();
void WriteMetaData();
Expand Down
6 changes: 5 additions & 1 deletion src/parquet/file/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ParquetFileWriter::~ParquetFileWriter() {

std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
MemoryAllocator* allocator) {
MemoryAllocator* allocator, const std::shared_ptr<WriterProperties>& properties) {
auto contents = FileSerializer::Open(sink, schema, allocator);

std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
Expand All @@ -80,4 +80,8 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
return contents_->AppendRowGroup(num_rows);
}

const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
return contents_->properties();
}

} // namespace parquet
11 changes: 10 additions & 1 deletion src/parquet/file/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <memory>

#include "parquet/column/properties.h"
#include "parquet/schema/descriptor.h"
#include "parquet/schema/types.h"
#include "parquet/util/mem-allocator.h"
Expand Down Expand Up @@ -93,6 +94,8 @@ class ParquetFileWriter {
virtual int num_columns() const = 0;
virtual int num_row_groups() const = 0;

virtual const std::shared_ptr<WriterProperties>& properties() const = 0;

// Return const-poitner to make it clear that this object is not to be copied
const SchemaDescriptor* schema() const { return &schema_; }
SchemaDescriptor schema_;
Expand All @@ -103,7 +106,8 @@ class ParquetFileWriter {

static std::unique_ptr<ParquetFileWriter> Open(std::shared_ptr<OutputStream> sink,
std::shared_ptr<schema::GroupNode>& schema,
MemoryAllocator* allocator = default_allocator());
MemoryAllocator* allocator = default_allocator(),
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

void Open(std::unique_ptr<Contents> contents);
void Close();
Expand Down Expand Up @@ -138,6 +142,11 @@ class ParquetFileWriter {
*/
int num_row_groups() const;

/**
* Configuartion passed to the writer, e.g. the used Parquet format version.
*/
const std::shared_ptr<WriterProperties>& properties() const;

/**
* Returns the file schema descriptor
*/
Expand Down