Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2917a62
PARQUET-1095: [C++] Read and write Arrow decimal values
cpcloud Sep 22, 2017
613255e
Do not use std::copy when reinterpret_cast will suffice
cpcloud Oct 25, 2017
46dff15
Clean up uint32 test
cpcloud Oct 31, 2017
028fb03
Remove garbage values
cpcloud Nov 3, 2017
3d243d5
Checkpoint [ci skip]
cpcloud Nov 3, 2017
1782da0
Use arrow
cpcloud Nov 7, 2017
5c9292b
Proper dcheck call
cpcloud Nov 9, 2017
e162ca1
Allocate scratch space to hold the byteswapped values
cpcloud Nov 9, 2017
659fbc1
Fix deprecated API call
cpcloud Nov 9, 2017
8808e4c
Bump arrow version
cpcloud Nov 9, 2017
1eee6a9
Remove specific randint call
cpcloud Nov 10, 2017
9ff7eb4
Remove specific template parameters
cpcloud Nov 10, 2017
30655d6
Use arrow random_decimals
cpcloud Nov 11, 2017
7ab2e5c
Parameterize on precision
cpcloud Nov 11, 2017
6c9e2a7
Reduce the number of decimal test cases
cpcloud Nov 11, 2017
64748a8
Copy from arrow for now
cpcloud Nov 11, 2017
b2e0290
IWYU
cpcloud Nov 11, 2017
9f97c1d
Update for ARROW-1794: rename DecimalArray to Decimal128Array
cpcloud Nov 12, 2017
920832a
Update arrow version
cpcloud Nov 14, 2017
32a4abe
Cleanup iteration a bit
cpcloud Nov 14, 2017
c5c4294
Fix issues
cpcloud Nov 14, 2017
6036ca5
ARROW-1811
cpcloud Nov 14, 2017
16935de
Reverse operand order and explicit cast
cpcloud Nov 15, 2017
da0a7eb
Update for ARROW-1811
cpcloud Nov 15, 2017
e25c59b
Fix reader writer test for unique kernel addition
cpcloud Nov 18, 2017
51965cd
Min commit that contains the unique kernel in arrow
cpcloud Nov 18, 2017
83948ec
Add last_value_ init
cpcloud Nov 18, 2017
e4b02d3
Refactor types.h
cpcloud Nov 18, 2017
63018bc
Suppress C4996 due to arrow/util/variant.h
wesm Nov 19, 2017
8c3d222
Remove loop from BytesToInteger
cpcloud Nov 19, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ if ("${COMPILER_FAMILY}" STREQUAL "clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CLANG_OPTIONS}")
endif()

if ("${COMPILER_FAMILY}" STREQUAL "msvc")
# MSVC version of -Wno-deprecated
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4996")
endif()

############################################################
# "make lint" target
############################################################
Expand Down
2 changes: 1 addition & 1 deletion cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ if (NOT ARROW_FOUND)
-DARROW_BUILD_TESTS=OFF)

if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "")
set(ARROW_VERSION "0e21f84c2fc26dba949a03ee7d7ebfade0a65b81") # Arrow 0.7.1
set(ARROW_VERSION "f2806fa518583907a129b2ecb0b7ec8758b69e17")
else()
set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}")
endif()
Expand Down
Binary file added data/fixed_length_decimal.parquet
Binary file not shown.
Binary file added data/fixed_length_decimal_legacy.parquet
Binary file not shown.
Binary file added data/int32_decimal.parquet
Binary file not shown.
Binary file added data/int64_decimal.parquet
Binary file not shown.
175 changes: 141 additions & 34 deletions src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "gtest/gtest.h"

#include <sstream>
#include <arrow/compute/api.h>

#include "parquet/api/reader.h"
#include "parquet/api/writer.h"
Expand All @@ -37,20 +38,23 @@

#include "arrow/api.h"
#include "arrow/test-util.h"
#include "arrow/util/decimal.h"

using arrow::Array;
using arrow::ArrayVisitor;
using arrow::Buffer;
using arrow::ChunkedArray;
using arrow::Column;
using arrow::EncodeArrayToDictionary;
using arrow::ListArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;
using arrow::TimeUnit;
using arrow::default_memory_pool;
using arrow::compute::DictionaryEncode;
using arrow::compute::FunctionContext;
using arrow::compute::Datum;
using arrow::io::BufferReader;

using arrow::test::randint;
Expand All @@ -68,10 +72,10 @@ using ColumnVector = std::vector<std::shared_ptr<arrow::Column>>;
namespace parquet {
namespace arrow {

const int SMALL_SIZE = 100;
const int LARGE_SIZE = 10000;
static constexpr int SMALL_SIZE = 100;
static constexpr int LARGE_SIZE = 10000;

constexpr uint32_t kDefaultSeed = 0;
static constexpr uint32_t kDefaultSeed = 0;

LogicalType::type get_logical_type(const ::arrow::DataType& type) {
switch (type.id()) {
Expand Down Expand Up @@ -118,6 +122,8 @@ LogicalType::type get_logical_type(const ::arrow::DataType& type) {
static_cast<const ::arrow::DictionaryType&>(type);
return get_logical_type(*dict_type.dictionary()->type());
}
case ArrowId::DECIMAL:
return LogicalType::DECIMAL;
default:
break;
}
Expand Down Expand Up @@ -147,6 +153,7 @@ ParquetType::type get_physical_type(const ::arrow::DataType& type) {
case ArrowId::STRING:
return ParquetType::BYTE_ARRAY;
case ArrowId::FIXED_SIZE_BINARY:
case ArrowId::DECIMAL:
return ParquetType::FIXED_LEN_BYTE_ARRAY;
case ArrowId::DATE32:
return ParquetType::INT32;
Expand Down Expand Up @@ -299,6 +306,7 @@ struct test_traits<::arrow::FixedSizeBinaryType> {
const std::string test_traits<::arrow::StringType>::value("Test"); // NOLINT
const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); // NOLINT
const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); // NOLINT

template <typename T>
using ParquetDataType = DataType<test_traits<T>::parquet_enum>;

Expand Down Expand Up @@ -342,36 +350,52 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,

static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type,
Repetition::type repetition) {
int byte_width;
// Decimal is not implemented yet.
int32_t byte_width = -1;
int32_t precision = -1;
int32_t scale = -1;

switch (type.id()) {
case ::arrow::Type::DICTIONARY: {
const ::arrow::DictionaryType& dict_type =
static_cast<const ::arrow::DictionaryType&>(type);
const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
if (values_type.id() == ::arrow::Type::FIXED_SIZE_BINARY) {
byte_width =
static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
} else {
byte_width = -1;
switch (values_type.id()) {
case ::arrow::Type::FIXED_SIZE_BINARY:
byte_width =
static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
break;
case ::arrow::Type::DECIMAL: {
const auto& decimal_type =
static_cast<const ::arrow::Decimal128Type&>(values_type);
precision = decimal_type.precision();
scale = decimal_type.scale();
byte_width = DecimalSize(precision);
} break;
default:
break;
}
} break;
case ::arrow::Type::FIXED_SIZE_BINARY:
byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
break;
case ::arrow::Type::DECIMAL: {
const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
precision = decimal_type.precision();
scale = decimal_type.scale();
byte_width = DecimalSize(precision);
} break;
default:
byte_width = -1;
break;
}
auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
get_logical_type(type), byte_width);
get_logical_type(type), byte_width, precision, scale);
NodePtr node_ =
GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
return std::static_pointer_cast<GroupNode>(node_);
}

namespace internal {

void AssertArraysEqual(const Array &expected, const Array &actual) {
void AssertArraysEqual(const Array& expected, const Array& actual) {
if (!actual.Equals(expected)) {
std::stringstream pp_result;
std::stringstream pp_expected;
Expand Down Expand Up @@ -526,11 +550,19 @@ class TestParquetIO : public ::testing::Test {
// There we write an UInt32 Array but receive an Int64 Array as result for
// Parquet version 1.0.

typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type,
::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type,
::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
::arrow::BinaryType, ::arrow::FixedSizeBinaryType>
typedef ::testing::Types<
::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type,
::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type,
::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, ::arrow::StringType,
::arrow::BinaryType, ::arrow::FixedSizeBinaryType, DecimalWithPrecisionAndScale<1>,
DecimalWithPrecisionAndScale<3>, DecimalWithPrecisionAndScale<5>,
DecimalWithPrecisionAndScale<7>, DecimalWithPrecisionAndScale<10>,
DecimalWithPrecisionAndScale<12>, DecimalWithPrecisionAndScale<15>,
DecimalWithPrecisionAndScale<17>, DecimalWithPrecisionAndScale<19>,
DecimalWithPrecisionAndScale<22>, DecimalWithPrecisionAndScale<23>,
DecimalWithPrecisionAndScale<24>, DecimalWithPrecisionAndScale<27>,
DecimalWithPrecisionAndScale<29>, DecimalWithPrecisionAndScale<32>,
DecimalWithPrecisionAndScale<34>, DecimalWithPrecisionAndScale<38>>
TestTypes;

TYPED_TEST_CASE(TestParquetIO, TestTypes);
Expand Down Expand Up @@ -590,8 +622,10 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {

ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));

std::shared_ptr<Array> dict_values;
ASSERT_OK(EncodeArrayToDictionary(*values, default_memory_pool(), &dict_values));
Datum out;
FunctionContext ctx(default_memory_pool());
ASSERT_OK(DictionaryEncode(&ctx, Datum(values), &out));
std::shared_ptr<Array> dict_values = MakeArray(out.array());
std::shared_ptr<GroupNode> schema =
MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, dict_values);
Expand Down Expand Up @@ -856,25 +890,43 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
ASSERT_OK_NO_THROW(
WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));

std::shared_ptr<Array> expected_values;
std::shared_ptr<PoolBuffer> int64_data =
std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
{
ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
const uint32_t* uint32_data_ptr =
reinterpret_cast<const uint32_t*>(values->values()->data());
// std::copy might be faster but this is explicit on the casts)
for (int64_t i = 0; i < values->length(); i++) {
int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]);
}
auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
auto uint32_data_ptr = reinterpret_cast<const uint32_t*>(values->values()->data());
const auto cast_uint32_to_int64 = [](uint32_t value) {
return static_cast<int64_t>(value);
};
std::transform(uint32_data_ptr, uint32_data_ptr + values->length(), int64_data_ptr,
cast_uint32_to_int64);
}

std::vector<std::shared_ptr<Buffer>> buffers{values->null_bitmap(), int64_data};
auto arr_data = std::make_shared<::arrow::ArrayData>(::arrow::int64(), values->length(),
buffers, values->null_count());
ASSERT_OK(MakeArray(arr_data, &expected_values));
this->ReadAndCheckSingleColumnTable(expected_values);
std::shared_ptr<Array> expected_values = MakeArray(arr_data);
ASSERT_NE(expected_values, NULLPTR);

const auto& expected = static_cast<const ::arrow::Int64Array&>(*expected_values);
ASSERT_GT(values->length(), 0);
ASSERT_EQ(values->length(), expected.length());

// TODO(phillipc): Is there a better way to compare these two arrays?
// AssertArraysEqual requires the same type, but we only care about values in this case
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create one, like "compare the array data"

for (int i = 0; i < expected.length(); ++i) {
const bool value_is_valid = values->IsValid(i);
const bool expected_value_is_valid = expected.IsValid(i);

ASSERT_EQ(expected_value_is_valid, value_is_valid);

if (value_is_valid) {
uint32_t value = values->Value(i);
int64_t expected_value = expected.Value(i);
ASSERT_EQ(expected_value, static_cast<int64_t>(value));
}
}
}

using TestStringParquetIO = TestParquetIO<::arrow::StringType>;
Expand Down Expand Up @@ -1432,7 +1484,7 @@ void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
offset_values.push_back(total_elements);

std::vector<int8_t> value_draws;
randint<int8_t>(total_elements, 0, 100, &value_draws);
randint(total_elements, 0, 100, &value_draws);

std::vector<bool> is_valid;
random_is_valid(total_elements, 0.1, &is_valid);
Expand Down Expand Up @@ -1889,6 +1941,61 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
}

class TestArrowReaderAdHocSpark
: public ::testing::TestWithParam<
std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};

TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
std::string path(std::getenv("PARQUET_TEST_DATA"));

std::string filename;
std::shared_ptr<::arrow::DataType> decimal_type;
std::tie(filename, decimal_type) = GetParam();

path += "/" + filename;
ASSERT_GT(path.size(), 0);

auto pool = ::arrow::default_memory_pool();

std::unique_ptr<FileReader> arrow_reader;
ASSERT_NO_THROW(
arrow_reader.reset(new FileReader(pool, ParquetFileReader::OpenFile(path, false))));
std::shared_ptr<::arrow::Table> table;
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));

ASSERT_EQ(1, table->num_columns());

constexpr int32_t expected_length = 24;

auto value_column = table->column(0);
ASSERT_EQ(expected_length, value_column->length());

auto raw_array = value_column->data();
ASSERT_EQ(1, raw_array->num_chunks());

auto chunk = raw_array->chunk(0);

std::shared_ptr<Array> expected_array;

::arrow::Decimal128Builder builder(decimal_type, pool);

for (int32_t i = 0; i < expected_length; ++i) {
::arrow::Decimal128 value((i + 1) * 100);
ASSERT_OK(builder.Append(value));
}
ASSERT_OK(builder.Finish(&expected_array));

internal::AssertArraysEqual(*expected_array, *chunk);
}

INSTANTIATE_TEST_CASE_P(
ReadDecimals, TestArrowReaderAdHocSpark,
::testing::Values(
std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)),
std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)),
std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)),
std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2))));

} // namespace arrow

} // namespace parquet
6 changes: 3 additions & 3 deletions src/parquet/arrow/arrow-schema-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(TimeUnit::MILLI);
const auto TIMESTAMP_US = ::arrow::timestamp(TimeUnit::MICRO);
const auto TIMESTAMP_NS = ::arrow::timestamp(TimeUnit::NANO);
const auto BINARY = ::arrow::binary();
const auto DECIMAL_8_4 = std::make_shared<::arrow::DecimalType>(8, 4);
const auto DECIMAL_8_4 = std::make_shared<::arrow::Decimal128Type>(8, 4);

class TestConvertParquetSchema : public ::testing::Test {
public:
Expand All @@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test {
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
<< " != " << rhs->ToString();
EXPECT_TRUE(lhs->Equals(rhs))
<< i << " " << lhs->ToString() << " != " << rhs->ToString();
}
}

Expand Down
Loading