-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-32723: [C++][Parquet] Add option to use LARGE* variants of binary types #35825
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e5e96ec
b9b48f8
ae62954
34917d5
835b07d
50427c6
df65ce7
e826b8e
764ef98
c6244ea
5a4bbb0
2d84e57
90f14df
b88b024
0b53b05
f574e2e
295e062
25d7815
fe8d67b
35e5835
9aff2f3
eb850c4
c2aab63
837ed6c
35cdb99
a5000e1
eb71c17
686a3f7
a61fc32
e2600d0
3b86e23
cc027b7
177db7a
66223ee
5cd39d8
1089010
a6c42ee
15be2a2
8d5ba3d
fd8f979
a5736d5
b4ecd0d
9e9dff9
ae1db20
09a9eaf
1664983
322319e
1775a7a
7f6e2bf
75fb615
0801267
7f09a16
a8d20a4
5fcf4e1
8901cbc
dff017a
232e01f
d7d76c6
90ceb07
0394963
a8df2e7
2bb3b14
c114d44
d1d5798
0eaa60f
1e642fa
b299497
2c23dd7
eca9d6f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -438,11 +438,11 @@ void CheckConfiguredRoundtrip( | |
| void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads, | ||
| int64_t row_group_size, const std::vector<int>& column_subset, | ||
| std::shared_ptr<Table>* out, | ||
| const std::shared_ptr<ArrowWriterProperties>& arrow_properties = | ||
| default_arrow_writer_properties()) { | ||
| const std::shared_ptr<ArrowWriterProperties>& | ||
| arrow_writer_properties = default_arrow_writer_properties()) { | ||
| std::shared_ptr<Buffer> buffer; | ||
| ASSERT_NO_FATAL_FAILURE( | ||
| WriteTableToBuffer(table, row_group_size, arrow_properties, &buffer)); | ||
| WriteTableToBuffer(table, row_group_size, arrow_writer_properties, &buffer)); | ||
|
|
||
| std::unique_ptr<FileReader> reader; | ||
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), | ||
|
|
@@ -610,9 +610,18 @@ class ParquetIOTestBase : public ::testing::Test { | |
| } | ||
|
|
||
| void ReaderFromSink(std::unique_ptr<FileReader>* out) { | ||
| return ReaderFromSink(out, default_arrow_reader_properties()); | ||
| } | ||
|
|
||
| void ReaderFromSink(std::unique_ptr<FileReader>* out, | ||
| const ArrowReaderProperties& arrow_reader_properties) { | ||
wgtmac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ASSERT_OK_AND_ASSIGN(auto buffer, sink_->Finish()); | ||
| ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer), | ||
| ::arrow::default_memory_pool(), out)); | ||
|
|
||
| FileReaderBuilder builder; | ||
| ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer))); | ||
| ASSERT_OK_NO_THROW(builder.properties(arrow_reader_properties) | ||
| ->memory_pool(::arrow::default_memory_pool()) | ||
| ->Build(out)); | ||
| } | ||
|
|
||
| void ReadSingleColumnFile(std::unique_ptr<FileReader> file_reader, | ||
|
|
@@ -660,18 +669,20 @@ class ParquetIOTestBase : public ::testing::Test { | |
|
|
||
| void RoundTripSingleColumn( | ||
| const std::shared_ptr<Array>& values, const std::shared_ptr<Array>& expected, | ||
| const std::shared_ptr<::parquet::ArrowWriterProperties>& arrow_properties, | ||
| const std::shared_ptr<::parquet::ArrowWriterProperties>& arrow_writer_properties, | ||
| const ArrowReaderProperties& arrow_reader_properties = | ||
| default_arrow_reader_properties(), | ||
| bool nullable = true) { | ||
| std::shared_ptr<Table> table = MakeSimpleTable(values, nullable); | ||
| this->ResetSink(); | ||
| ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, | ||
| values->length(), default_writer_properties(), | ||
| arrow_properties)); | ||
| arrow_writer_properties)); | ||
|
|
||
| std::shared_ptr<Table> out; | ||
| std::unique_ptr<FileReader> reader; | ||
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader)); | ||
| const bool expect_metadata = arrow_properties->store_schema(); | ||
| ASSERT_NO_FATAL_FAILURE(this->ReaderFromSink(&reader, arrow_reader_properties)); | ||
| const bool expect_metadata = arrow_writer_properties->store_schema(); | ||
| ASSERT_NO_FATAL_FAILURE( | ||
| this->ReadTableFromFile(std::move(reader), expect_metadata, &out)); | ||
| ASSERT_EQ(1, out->num_columns()); | ||
|
|
@@ -1342,6 +1353,23 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compatibility) { | |
|
|
||
| using TestStringParquetIO = TestParquetIO<::arrow::StringType>; | ||
|
|
||
| #if defined(_WIN64) || defined(__LP64__) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this condition. Which platforms is it excluding and why? Large binary data is supposed to work on every platform, so there should be no reason to skip some platforms here. |
||
| TEST_F(TestStringParquetIO, SmallStringWithLargeBinaryVariantSetting) { | ||
| auto values = ArrayFromJSON(::arrow::utf8(), R"(["foo", "", null, "bar"])"); | ||
|
|
||
| this->RoundTripSingleColumn(values, values, default_arrow_writer_properties()); | ||
|
|
||
| ArrowReaderProperties arrow_reader_properties; | ||
| arrow_reader_properties.set_use_large_binary_variants(true); | ||
|
|
||
| ASSERT_OK_AND_ASSIGN(std::shared_ptr<Array> casted, | ||
| ::arrow::compute::Cast(*values, ::arrow::large_utf8())); | ||
|
|
||
| this->RoundTripSingleColumn(values, casted, default_arrow_writer_properties(), | ||
| arrow_reader_properties); | ||
| } | ||
| #endif | ||
|
|
||
| TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { | ||
| std::shared_ptr<Array> values; | ||
| ::arrow::StringBuilder builder; | ||
|
|
@@ -1369,6 +1397,7 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) { | |
|
|
||
| using TestLargeBinaryParquetIO = TestParquetIO<::arrow::LargeBinaryType>; | ||
|
|
||
| #if defined(_WIN64) || defined(__LP64__) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, it does not seem right that you are restricting tests that used to work on every platform (and that have no obvious reason to fail on some platforms). |
||
| TEST_F(TestLargeBinaryParquetIO, Basics) { | ||
| const char* json = "[\"foo\", \"\", null, \"\xff\"]"; | ||
|
|
||
|
|
@@ -1388,6 +1417,13 @@ TEST_F(TestLargeBinaryParquetIO, Basics) { | |
| const auto arrow_properties = | ||
| ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); | ||
| this->RoundTripSingleColumn(large_array, large_array, arrow_properties); | ||
|
|
||
| ArrowReaderProperties arrow_reader_properties; | ||
| arrow_reader_properties.set_use_large_binary_variants(true); | ||
| // Input is narrow array, but expected output is large array, opposite of the above | ||
| // tests. This validates narrow arrays can be read as large arrays. | ||
| this->RoundTripSingleColumn(narrow_array, large_array, | ||
| default_arrow_writer_properties(), arrow_reader_properties); | ||
| } | ||
|
|
||
| using TestLargeStringParquetIO = TestParquetIO<::arrow::LargeStringType>; | ||
|
|
@@ -1412,6 +1448,7 @@ TEST_F(TestLargeStringParquetIO, Basics) { | |
| ::parquet::ArrowWriterProperties::Builder().store_schema()->build(); | ||
| this->RoundTripSingleColumn(large_array, large_array, arrow_properties); | ||
| } | ||
| #endif | ||
|
|
||
| using TestNullParquetIO = TestParquetIO<::arrow::NullType>; | ||
|
|
||
|
|
@@ -3834,13 +3871,14 @@ TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) { | |
| ASSERT_EQ(expected, calculated); | ||
| } | ||
|
|
||
| void TryReadDataFile(const std::string& path, | ||
| ::arrow::StatusCode expected_code = ::arrow::StatusCode::OK) { | ||
| void TryReadDataFileWithProperties( | ||
| const std::string& path, const ArrowReaderProperties& properties, | ||
| ::arrow::StatusCode expected_code = ::arrow::StatusCode::OK) { | ||
| auto pool = ::arrow::default_memory_pool(); | ||
|
|
||
| std::unique_ptr<FileReader> arrow_reader; | ||
| Status s = | ||
| FileReader::Make(pool, ParquetFileReader::OpenFile(path, false), &arrow_reader); | ||
| Status s = FileReader::Make(pool, ParquetFileReader::OpenFile(path, false), properties, | ||
| &arrow_reader); | ||
| if (s.ok()) { | ||
| std::shared_ptr<::arrow::Table> table; | ||
| s = arrow_reader->ReadTable(&table); | ||
|
|
@@ -3851,6 +3889,11 @@ void TryReadDataFile(const std::string& path, | |
| << ", but got " << s.ToString(); | ||
| } | ||
|
|
||
| void TryReadDataFile(const std::string& path, | ||
| ::arrow::StatusCode expected_code = ::arrow::StatusCode::OK) { | ||
| TryReadDataFileWithProperties(path, default_arrow_reader_properties(), expected_code); | ||
| } | ||
|
|
||
| TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { | ||
| // PARQUET-995 | ||
| TryReadDataFile(test::get_data_file("alltypes_plain.parquet")); | ||
|
|
@@ -3862,6 +3905,19 @@ TEST(TestArrowReaderAdHoc, CorruptedSchema) { | |
| TryReadDataFile(path, ::arrow::StatusCode::IOError); | ||
| } | ||
|
|
||
| #if defined(ARROW_WITH_BROTLI) && defined(__LP64__) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still don't understand what if (sizeof(void*) < 8) {
GTEST_SKIP() << "Test only runs on 64-bit platforms as it allocates more than 2GB RAM";
}
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also see that this test takes 18 seconds in debug mode. This seems a bit excessive :-/ |
||
| TEST(TestArrowParquet, LargeByteArray) { | ||
wgtmac marked this conversation as resolved.
Show resolved
Hide resolved
arthurpassos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| auto path = test::get_data_file("large_string_map.brotli.parquet"); | ||
| TryReadDataFile(path, ::arrow::StatusCode::NotImplemented); | ||
| ArrowReaderProperties reader_properties; | ||
| reader_properties.set_use_large_binary_variants(true); | ||
| reader_properties.set_read_dictionary(0, false); | ||
arthurpassos marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| TryReadDataFileWithProperties(path, reader_properties); | ||
| reader_properties.set_read_dictionary(0, true); | ||
| TryReadDataFileWithProperties(path, reader_properties); | ||
| } | ||
| #endif | ||
|
|
||
| TEST(TestArrowReaderAdHoc, LARGE_MEMORY_TEST(LargeStringColumn)) { | ||
| // ARROW-3762 | ||
| ::arrow::StringBuilder builder; | ||
|
|
@@ -4548,16 +4604,22 @@ TEST(TestArrowWriteDictionaries, NestedSubfield) { | |
| class TestArrowReadDeltaEncoding : public ::testing::Test { | ||
| public: | ||
| void ReadTableFromParquetFile(const std::string& file_name, | ||
| const ArrowReaderProperties& properties, | ||
| std::shared_ptr<Table>* out) { | ||
| auto file = test::get_data_file(file_name); | ||
| auto pool = ::arrow::default_memory_pool(); | ||
| std::unique_ptr<FileReader> parquet_reader; | ||
| ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), | ||
| ASSERT_OK(FileReader::Make(pool, ParquetFileReader::OpenFile(file, false), properties, | ||
| &parquet_reader)); | ||
| ASSERT_OK(parquet_reader->ReadTable(out)); | ||
| ASSERT_OK((*out)->ValidateFull()); | ||
| } | ||
|
|
||
| void ReadTableFromParquetFile(const std::string& file_name, | ||
| std::shared_ptr<Table>* out) { | ||
| return ReadTableFromParquetFile(file_name, default_arrow_reader_properties(), out); | ||
| } | ||
|
|
||
| void ReadTableFromCSVFile(const std::string& file_name, | ||
| const ::arrow::csv::ConvertOptions& convert_options, | ||
| std::shared_ptr<Table>* out) { | ||
|
|
@@ -4605,6 +4667,27 @@ TEST_F(TestArrowReadDeltaEncoding, DeltaByteArray) { | |
| ::arrow::AssertTablesEqual(*actual_table, *expect_table, false); | ||
| } | ||
|
|
||
| TEST_F(TestArrowReadDeltaEncoding, DeltaByteArrayWithLargeBinaryVariant) { | ||
| std::shared_ptr<::arrow::Table> actual_table, expect_table; | ||
| ArrowReaderProperties properties; | ||
| properties.set_use_large_binary_variants(true); | ||
|
|
||
| ReadTableFromParquetFile("delta_byte_array.parquet", properties, &actual_table); | ||
|
|
||
| auto convert_options = ::arrow::csv::ConvertOptions::Defaults(); | ||
| std::vector<std::string> column_names = { | ||
| "c_customer_id", "c_salutation", "c_first_name", | ||
| "c_last_name", "c_preferred_cust_flag", "c_birth_country", | ||
| "c_login", "c_email_address", "c_last_review_date"}; | ||
| for (auto name : column_names) { | ||
| convert_options.column_types[name] = ::arrow::large_utf8(); | ||
| } | ||
| convert_options.strings_can_be_null = true; | ||
| ReadTableFromCSVFile("delta_byte_array_expect.csv", convert_options, &expect_table); | ||
|
Comment on lines
+4677
to
+4686
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like you could factor this out in the test fixture. |
||
|
|
||
| ::arrow::AssertTablesEqual(*actual_table, *expect_table, false); | ||
| } | ||
|
|
||
| TEST_F(TestArrowReadDeltaEncoding, IncrementalDecodeDeltaByteArray) { | ||
| auto file = test::get_data_file("delta_byte_array.parquet"); | ||
| auto pool = ::arrow::default_memory_pool(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -487,8 +487,9 @@ Status TransferBinary(RecordReader* reader, MemoryPool* pool, | |
| auto chunks = binary_reader->GetBuilderChunks(); | ||
| for (auto& chunk : chunks) { | ||
| if (!chunk->type()->Equals(*logical_type_field->type())) { | ||
| // XXX: if a LargeBinary chunk is larger than 2GB, the MSBs of offsets | ||
| // will be lost because they are first created as int32 and then cast to int64. | ||
| // If a LargeBinary chunk is larger than 2GB and use_large_binary_variants | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would keep the XXX because it is a gotcha.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to know something new :) |
||
| // is not set, the MSBs of offsets will be lost because they are first created | ||
| // as int32 and then cast to int64. | ||
| ARROW_ASSIGN_OR_RAISE( | ||
| chunk, | ||
| ::arrow::compute::Cast(*chunk, logical_type_field->type(), cast_options, &ctx)); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.