Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions cpp/src/parquet/stream_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,20 +129,26 @@ StreamWriter& StreamWriter::operator<<(FixedStringView v) {
}

StreamWriter& StreamWriter::operator<<(const char* v) {
return WriteVariableLength(v, std::strlen(v));
return WriteVariableLength(v, std::strlen(v), ConvertedType::UTF8);
}

StreamWriter& StreamWriter::operator<<(const std::string& v) {
return WriteVariableLength(v.data(), v.size());
return WriteVariableLength(v.data(), v.size(), ConvertedType::UTF8);
}

StreamWriter& StreamWriter::operator<<(::std::string_view v) {
return WriteVariableLength(v.data(), v.size());
return WriteVariableLength(v.data(), v.size(), ConvertedType::UTF8);
}

StreamWriter& StreamWriter::operator<<(RawDataView v) {
return WriteVariableLength(reinterpret_cast<const char*>(v.data()), v.size(),
ConvertedType::NONE);
}

StreamWriter& StreamWriter::WriteVariableLength(const char* data_ptr,
std::size_t data_len) {
CheckColumn(Type::BYTE_ARRAY, ConvertedType::UTF8);
std::size_t data_len,
ConvertedType::type type) {
CheckColumn(Type::BYTE_ARRAY, type);

auto writer = static_cast<ByteArrayWriter*>(row_group_writer_->column(column_index_++));

Expand Down
11 changes: 10 additions & 1 deletion cpp/src/parquet/stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include <string_view>
#include <vector>

#include "arrow/util/span.h"

#include "parquet/column_writer.h"
#include "parquet/file_writer.h"

Expand Down Expand Up @@ -151,6 +153,12 @@ class PARQUET_EXPORT StreamWriter {
StreamWriter& operator<<(const std::string& v);
StreamWriter& operator<<(::std::string_view v);

/// \brief Helper class to write variable length raw data.
using RawDataView = ::arrow::util::span<const uint8_t>;

/// \brief Output operators for variable length raw data.
StreamWriter& operator<<(RawDataView v);

/// \brief Output operator for optional fields.
template <typename T>
StreamWriter& operator<<(const optional<T>& v) {
Expand Down Expand Up @@ -190,7 +198,8 @@ class PARQUET_EXPORT StreamWriter {
return *this;
}

StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len);
StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len,
ConvertedType::type converted_type);

StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);

Expand Down
32 changes: 29 additions & 3 deletions cpp/src/parquet/stream_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ class TestStreamWriter : public ::testing::Test {
fields.push_back(schema::PrimitiveNode::Make("double_field", Repetition::REQUIRED,
Type::DOUBLE, ConvertedType::NONE));

fields.push_back(schema::PrimitiveNode::Make("bytes_field", Repetition::REQUIRED,
Type::BYTE_ARRAY, ConvertedType::NONE));

return std::static_pointer_cast<schema::GroupNode>(
schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
}
Expand All @@ -99,7 +102,7 @@ TEST_F(TestStreamWriter, DefaultConstructed) {
EXPECT_EQ(0, os.current_column());
EXPECT_EQ(0, os.current_row());
EXPECT_EQ(0, os.num_columns());
EXPECT_EQ(0, os.SkipColumns(10));
EXPECT_EQ(0, os.SkipColumns(11));
}

TEST_F(TestStreamWriter, TypeChecking) {
Expand Down Expand Up @@ -162,6 +165,17 @@ TEST_F(TestStreamWriter, TypeChecking) {
EXPECT_THROW(writer_ << 5.4f, ParquetException);
EXPECT_NO_THROW(writer_ << 5.4);

// Required type: Variable length byte array.
// Strings and naked char* are rejected because they should use UTF8 instead of None
// type.
EXPECT_EQ(10, writer_.current_column());
EXPECT_THROW(writer_ << 5, ParquetException);
EXPECT_THROW(writer_ << char3_array, ParquetException);
EXPECT_THROW(writer_ << char4_array, ParquetException);
EXPECT_THROW(writer_ << char5_array, ParquetException);
EXPECT_THROW(writer_ << std::string("not ok"), ParquetException);
EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)"\xff\0ok", 4));

EXPECT_EQ(0, writer_.current_row());
EXPECT_NO_THROW(writer_ << EndRow);
EXPECT_EQ(1, writer_.current_row());
Expand Down Expand Up @@ -210,6 +224,11 @@ TEST_F(TestStreamWriter, RequiredFieldChecking) {
EXPECT_THROW(writer_ << optional<double>(), ParquetException);
EXPECT_NO_THROW(writer_ << optional<double>(5.4));

// Required field of type: Variable length byte array.
EXPECT_THROW(writer_ << optional<StreamWriter::RawDataView>(), ParquetException);
EXPECT_NO_THROW(
writer_ << std::make_optional<StreamWriter::RawDataView>((uint8_t*)"ok", 2));

EXPECT_NO_THROW(writer_ << EndRow);
}

Expand All @@ -234,6 +253,7 @@ TEST_F(TestStreamWriter, EndRow) {
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) + 123));
EXPECT_NO_THROW(writer_ << 25.4f);
EXPECT_NO_THROW(writer_ << 3.3424);
EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)"ok", 2));
// Correct use of end row after all fields have been output.
EXPECT_NO_THROW(writer_ << EndRow);
EXPECT_EQ(1, writer_.current_row());
Expand Down Expand Up @@ -272,6 +292,10 @@ TEST_F(TestStreamWriter, EndRowGroup) {
EXPECT_NO_THROW(writer_ << uint64_t((1ull << 60) - i * i)) << "index: " << i;
EXPECT_NO_THROW(writer_ << 42325.4f / float(i + 1)) << "index: " << i;
EXPECT_NO_THROW(writer_ << 3.2342e5 / double(i + 1)) << "index: " << i;
std::string tmpString = std::to_string(i);
EXPECT_NO_THROW(writer_ << StreamWriter::RawDataView((uint8_t*)tmpString.c_str(),
tmpString.length()))
<< "index: " << i;
EXPECT_NO_THROW(writer_ << EndRow) << "index: " << i;

if (i % 1000 == 0) {
Expand All @@ -293,7 +317,8 @@ TEST_F(TestStreamWriter, SkipColumns) {
writer_ << true << std::string("Cannot skip mandatory columns");
EXPECT_THROW(writer_.SkipColumns(1), ParquetException);
writer_ << 'x' << std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0
<< StreamWriter::RawDataView((uint8_t*)"ok", 2);
writer_ << EndRow;
}

Expand All @@ -304,7 +329,8 @@ TEST_F(TestStreamWriter, AppendNotImplemented) {
writer_ = StreamWriter{ParquetFileWriter::Open(outfile, GetSchema())};
writer_ << false << std::string("Just one row") << 'x'
<< std::array<char, 4>{'A', 'B', 'C', 'D'} << int8_t(2) << uint16_t(3)
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0;
<< int32_t(4) << uint64_t(5) << 6.0f << 7.0
<< StreamWriter::RawDataView((uint8_t*)"ok", 2);
writer_ << EndRow;
writer_ = StreamWriter{};

Expand Down
Loading