Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cpp/src/arrow/ipc/feather_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,24 @@ TEST_P(TestFeather, SliceBooleanRoundTrip) {
CheckSlices(batch);
}

TEST_P(TestFeather, SliceListRoundTrip) {
if (GetParam().version == kFeatherV1Version) {
GTEST_SKIP() << "Feather V1 does not support list types";
}
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeListRecordBatchSized(600, &batch));
CheckSlices(batch);
}

TEST_P(TestFeather, SliceListViewRoundTrip) {
if (GetParam().version == kFeatherV1Version) {
GTEST_SKIP() << "Feather V1 does not support list view types";
}
std::shared_ptr<RecordBatch> batch;
ASSERT_OK(ipc::test::MakeListViewRecordBatchSized(600, &batch));
CheckSlices(batch);
}

INSTANTIATE_TEST_SUITE_P(
FeatherTests, TestFeather,
::testing::Values(TestParam(kFeatherV1Version), TestParam(kFeatherV2Version),
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,29 @@ TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) {
TestMetadataVersion(MetadataVersion::V5);
}

TEST_F(TestIpcRoundTrip, ListWithSlicedValues) {
// This tests serialization of a sliced ListArray that got sliced "the Rust
// way": by slicing the value_offsets buffer, but keeping top-level offset at
// 0.
auto child_data = ArrayFromJSON(int32(), "[1, 2, 3, 4, 5]")->data();

// Offsets buffer [2, 5]
TypedBufferBuilder<int32_t> offsets_builder;
ASSERT_OK(offsets_builder.Reserve(2));
ASSERT_OK(offsets_builder.Append(2));
ASSERT_OK(offsets_builder.Append(5));
ASSERT_OK_AND_ASSIGN(auto offsets_buffer, offsets_builder.Finish());

auto list_data = ArrayData::Make(list(int32()),
/*num_rows=*/1,
/*buffers=*/{nullptr, offsets_buffer});
list_data->child_data = {child_data};
std::shared_ptr<Array> list_array = MakeArray(list_data);
ASSERT_OK(list_array->ValidateFull());

CheckRoundtrip(list_array);
}

TEST(TestReadMessage, CorruptedSmallInput) {
std::string data = "abc";
auto reader = io::BufferReader::FromString(data);
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/arrow/ipc/test_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
return Status::OK();
}

Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
Status MakeListRecordBatchSized(const int length, std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = field("f0", list(int32()));
auto f1 = field("f1", list(list(int32())));
Expand All @@ -431,7 +431,6 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Example data

MemoryPool* pool = default_memory_pool();
const int length = 200;
std::shared_ptr<Array> leaf_values, list_array, list_list_array, large_list_array;
const bool include_nulls = true;
RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
Expand All @@ -446,7 +445,11 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
return Status::OK();
}

Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
return MakeListRecordBatchSized(200, out);
}

Status MakeListViewRecordBatchSized(const int length, std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = field("f0", list_view(int32()));
auto f1 = field("f1", list_view(list_view(int32())));
Expand All @@ -456,7 +459,6 @@ Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Example data

MemoryPool* pool = default_memory_pool();
const int length = 200;
std::shared_ptr<Array> leaf_values, list_array, list_list_array, large_list_array;
const bool include_nulls = true;
RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
Expand All @@ -471,6 +473,10 @@ Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
return Status::OK();
}

Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out) {
return MakeListRecordBatchSized(200, out);
}

Status MakeFixedSizeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = field("f0", fixed_size_list(int32(), 1));
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/ipc/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,15 @@ Status MakeStringTypesRecordBatchWithNulls(std::shared_ptr<RecordBatch>* out);
ARROW_TESTING_EXPORT
Status MakeNullRecordBatch(std::shared_ptr<RecordBatch>* out);

ARROW_TESTING_EXPORT
Status MakeListRecordBatchSized(int length, std::shared_ptr<RecordBatch>* out);

ARROW_TESTING_EXPORT
Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out);

ARROW_TESTING_EXPORT
Status MakeListViewRecordBatchSized(int length, std::shared_ptr<RecordBatch>* out);

ARROW_TESTING_EXPORT
Status MakeListViewRecordBatch(std::shared_ptr<RecordBatch>* out);

Expand Down
36 changes: 19 additions & 17 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,34 +324,36 @@ class RecordBatchSerializer {
// Share slicing logic between ListArray, BinaryArray and LargeBinaryArray
using offset_type = typename ArrayType::offset_type;

auto offsets = array.value_offsets();
if (array.length() == 0) {
*value_offsets = array.value_offsets();
return Status::OK();
}

int64_t required_bytes = sizeof(offset_type) * (array.length() + 1);
if (array.offset() != 0) {
// If we have a non-zero offset, then the value offsets do not start at
// zero. We must a) create a new offsets array with shifted offsets and
// b) slice the values array accordingly

if (array.value_offset(0) > 0) {
// If the offset of the first value is non-zero, then we must create a new
// offsets buffer with shifted offsets.
ARROW_ASSIGN_OR_RAISE(auto shifted_offsets,
AllocateBuffer(required_bytes, options_.memory_pool));

auto dest_offsets = shifted_offsets->mutable_span_as<offset_type>();
const offset_type start_offset = array.value_offset(0);
const offset_type* source_offsets = array.raw_value_offsets();
const offset_type start_offset = source_offsets[0];

for (int i = 0; i < array.length(); ++i) {
dest_offsets[i] = array.value_offset(i) - start_offset;
for (int i = 0; i <= array.length(); ++i) {
dest_offsets[i] = source_offsets[i] - start_offset;
}
// Final offset
dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset;
offsets = std::move(shifted_offsets);
*value_offsets = std::move(shifted_offsets);
} else {
// ARROW-6046: Slice offsets to used extent, in case we have a truncated
// slice
if (offsets != nullptr && offsets->size() > required_bytes) {
offsets = SliceBuffer(offsets, 0, required_bytes);
// ARROW-6046: if we have a truncated slice with unused leading or
// trailing data, then we slice it.
if (array.offset() > 0 || array.value_offsets()->size() > required_bytes) {
*value_offsets = SliceBuffer(
array.value_offsets(), array.offset() * sizeof(offset_type), required_bytes);
} else {
*value_offsets = array.value_offsets();
}
}
*value_offsets = std::move(offsets);
return Status::OK();
}

Expand Down
Loading