Skip to content
Closed
6 changes: 6 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ This product includes software from the Numpy project (BSD-new)
https://github.com/numpy/numpy/blob/e1f191c46f2eebd6cb892a4bfe14d9dd43a06c4e/numpy/core/src/multiarray/multiarraymodule.c#L2910
* Copyright (c) 1995, 1996, 1997 Jim Hugunin, [email protected]
* Copyright (c) 2005 Travis E. Oliphant [email protected] Brigham Young University

This product includes software from the Feather project (Apache 2.0)
https://github.com/wesm/feather

This product includes software from the DyND project (BSD 2-clause)
https://github.com/libdynd
37 changes: 37 additions & 0 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,43 @@ Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
return Status::OK();
}

// ----------------------------------------------------------------------
// OutputStream that writes to resizable buffer

static constexpr int64_t kBufferMinimumSize = 256;

BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer)
: buffer_(buffer),
capacity_(buffer->size()),
position_(0),
mutable_data_(buffer->mutable_data()) {}

Status BufferOutputStream::Close() {
return Status::OK();
}

Status BufferOutputStream::Tell(int64_t* position) {
*position = position_;
return Status::OK();
}

Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
RETURN_NOT_OK(Reserve(nbytes));
std::memcpy(mutable_data_ + position_, data, nbytes);
position_ += nbytes;
return Status::OK();
}

Status BufferOutputStream::Reserve(int64_t nbytes) {
while (position_ + nbytes > capacity_) {
int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2);
RETURN_NOT_OK(buffer_->Resize(new_capacity));
capacity_ = new_capacity;
}
mutable_data_ = buffer_->mutable_data();
return Status::OK();
}

// ----------------------------------------------------------------------
// In-memory buffer reader

Expand Down
18 changes: 8 additions & 10 deletions cpp/src/arrow/io/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,30 @@
namespace arrow {

class Buffer;
class MutableBuffer;
class ResizableBuffer;
class Status;

namespace io {

// An output stream that writes to a MutableBuffer, such as one obtained from a
// memory map
//
// TODO(wesm): Implement this class
class ARROW_EXPORT BufferOutputStream : public OutputStream {
public:
explicit BufferOutputStream(const std::shared_ptr<MutableBuffer>& buffer)
: buffer_(buffer) {}
explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);

// Implement the OutputStream interface
Status Close() override;
Status Tell(int64_t* position) override;
Status Write(const uint8_t* data, int64_t length) override;

// Returns the number of bytes remaining in the buffer
int64_t bytes_remaining() const;
Status Write(const uint8_t* data, int64_t nbytes) override;

private:
std::shared_ptr<MutableBuffer> buffer_;
// Ensures there is sufficient space available to write nbytes
Status Reserve(int64_t nbytes);

std::shared_ptr<ResizableBuffer> buffer_;
int64_t capacity_;
int64_t position_;
uint8_t* mutable_data_;
};

// A memory source that uses memory-mapped files for memory interactions
Expand Down
18 changes: 15 additions & 3 deletions cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ set(ARROW_IPC_TEST_LINK_LIBS

set(ARROW_IPC_SRCS
adapter.cc
file.cc
metadata.cc
metadata-internal.cc
)
Expand Down Expand Up @@ -60,6 +61,10 @@ ADD_ARROW_TEST(ipc-adapter-test)
ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
${ARROW_IPC_TEST_LINK_LIBS})

ADD_ARROW_TEST(ipc-file-test)
ARROW_TEST_LINK_LIBRARIES(ipc-file-test
${ARROW_IPC_TEST_LINK_LIBS})

ADD_ARROW_TEST(ipc-metadata-test)
ARROW_TEST_LINK_LIBRARIES(ipc-metadata-test
${ARROW_IPC_TEST_LINK_LIBS})
Expand All @@ -70,14 +75,20 @@ set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE)
set(OUTPUT_DIR ${CMAKE_SOURCE_DIR}/src/arrow/ipc)
set(FBS_OUTPUT_FILES "${OUTPUT_DIR}/Message_generated.h")

set(FBS_SRC ${CMAKE_SOURCE_DIR}/../format/Message.fbs)
get_filename_component(ABS_FBS_SRC ${FBS_SRC} ABSOLUTE)
set(FBS_SRC
${CMAKE_SOURCE_DIR}/../format/Message.fbs
${CMAKE_SOURCE_DIR}/../format/File.fbs)

foreach(FIL ${FBS_SRC})
get_filename_component(ABS_FIL ${FIL} ABSOLUTE)
list(APPEND ABS_FBS_SRC ${ABS_FIL})
endforeach()

add_custom_command(
OUTPUT ${FBS_OUTPUT_FILES}
COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${ABS_FBS_SRC}
DEPENDS ${ABS_FBS_SRC}
COMMENT "Running flatc compiler on ${FBS_SRC}"
COMMENT "Running flatc compiler on ${ABS_FBS_SRC}"
VERBATIM
)

Expand All @@ -87,6 +98,7 @@ add_dependencies(arrow_objlib metadata_fbs)
# Headers: top level
install(FILES
adapter.h
file.h
metadata.h
DESTINATION include/arrow/ipc)

Expand Down
126 changes: 75 additions & 51 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ static bool IsListType(const DataType* type) {
}

// ----------------------------------------------------------------------
// Row batch write path
// Record batch write path

Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes,
std::vector<std::shared_ptr<Buffer>>* buffers, int max_recursion_depth) {
Expand Down Expand Up @@ -132,28 +132,32 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
return Status::OK();
}

class RowBatchWriter {
class RecordBatchWriter {
public:
RowBatchWriter(const RowBatch* batch, int max_recursion_depth)
: batch_(batch), max_recursion_depth_(max_recursion_depth) {}
RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
int max_recursion_depth)
: columns_(&columns),
num_rows_(num_rows),
max_recursion_depth_(max_recursion_depth) {}

Status AssemblePayload() {
// Perform depth-first traversal of the row-batch
for (int i = 0; i < batch_->num_columns(); ++i) {
const Array* arr = batch_->column(i).get();
for (size_t i = 0; i < columns_->size(); ++i) {
const Array* arr = (*columns_)[i].get();
RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_));
}
return Status::OK();
}

Status Write(io::OutputStream* dst, int64_t* data_header_offset) {
// Write out all the buffers contiguously and compute the total size of the
// memory payload
int64_t offset = 0;

Status Write(
io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) {
// Get the starting position
int64_t position;
RETURN_NOT_OK(dst->Tell(&position));
int64_t start_position;
RETURN_NOT_OK(dst->Tell(&start_position));

// Keep track of the current position so we can determine the size of the
// message body
int64_t position = start_position;

for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
Expand All @@ -175,14 +179,16 @@ class RowBatchWriter {
// are using from any OS-level shared memory. The thought is that systems
// may (in the future) associate integer page id's with physical memory
// pages (according to whatever is the desired shared memory mechanism)
buffer_meta_.push_back(flatbuf::Buffer(0, position + offset, size));
buffer_meta_.push_back(flatbuf::Buffer(0, position, size));

if (size > 0) {
RETURN_NOT_OK(dst->Write(buffer->data(), size));
offset += size;
position += size;
}
}

*body_end_offset = position;

// Now that we have computed the locations of all of the buffers in shared
// memory, the data header can be converted to a flatbuffer and written out
//
Expand All @@ -192,65 +198,81 @@ class RowBatchWriter {
// construct the flatbuffer data accessor object (see arrow::ipc::Message)
std::shared_ptr<Buffer> data_header;
RETURN_NOT_OK(WriteDataHeader(
batch_->num_rows(), offset, field_nodes_, buffer_meta_, &data_header));
num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header));

// Write the data header at the end
RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));

*data_header_offset = position + offset;
position += data_header->size();
*header_end_offset = position;

return Align(dst, &position);
}

Status Align(io::OutputStream* dst, int64_t* position) {
// Write all buffers here on word boundaries
// TODO(wesm): Is there benefit to 64-byte padding in IPC?
int64_t remainder = PaddedLength(*position) - *position;
if (remainder > 0) {
RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder));
*position += remainder;
}
return Status::OK();
}

// This must be called after invoking AssemblePayload
Status GetTotalSize(int64_t* size) {
// emulates the behavior of Write without actually writing
int64_t body_offset;
int64_t data_header_offset;
MockOutputStream dst;
RETURN_NOT_OK(Write(&dst, &data_header_offset));
RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset));
*size = dst.GetExtentBytesWritten();
return Status::OK();
}

private:
const RowBatch* batch_;
// Do not copy this vector. Ownership must be retained elsewhere
const std::vector<std::shared_ptr<Array>>* columns_;
int32_t num_rows_;

std::vector<flatbuf::FieldNode> field_nodes_;
std::vector<flatbuf::Buffer> buffer_meta_;
std::vector<std::shared_ptr<Buffer>> buffers_;
int max_recursion_depth_;
};

Status WriteRowBatch(io::OutputStream* dst, const RowBatch* batch, int64_t* header_offset,
int max_recursion_depth) {
Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
int64_t* header_end_offset, int max_recursion_depth) {
DCHECK_GT(max_recursion_depth, 0);
RowBatchWriter serializer(batch, max_recursion_depth);
RecordBatchWriter serializer(columns, num_rows, max_recursion_depth);
RETURN_NOT_OK(serializer.AssemblePayload());
return serializer.Write(dst, header_offset);
return serializer.Write(dst, body_end_offset, header_end_offset);
}

Status GetRowBatchSize(const RowBatch* batch, int64_t* size) {
RowBatchWriter serializer(batch, kMaxIpcRecursionDepth);
Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
RecordBatchWriter serializer(
batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.AssemblePayload());
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}

// ----------------------------------------------------------------------
// Row batch read path
// Record batch read path

static constexpr int64_t INIT_METADATA_SIZE = 4096;

class RowBatchReader::RowBatchReaderImpl {
class RecordBatchReader::RecordBatchReaderImpl {
public:
RowBatchReaderImpl(io::ReadableFileInterface* file,
RecordBatchReaderImpl(io::ReadableFileInterface* file,
const std::shared_ptr<RecordBatchMessage>& metadata, int max_recursion_depth)
: file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
num_buffers_ = metadata->num_buffers();
num_flattened_fields_ = metadata->num_fields();
}

Status AssembleBatch(
const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch>* out) {
const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());

// The field_index and buffer_index are incremented in NextArray based on
Expand All @@ -263,7 +285,7 @@ class RowBatchReader::RowBatchReaderImpl {
RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i]));
}

*out = std::make_shared<RowBatch>(schema, metadata_->length(), arrays);
*out = std::make_shared<RecordBatch>(schema, metadata_->length(), arrays);
return Status::OK();
}

Expand Down Expand Up @@ -359,49 +381,51 @@ class RowBatchReader::RowBatchReaderImpl {
int num_flattened_fields_;
};

Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
std::shared_ptr<RowBatchReader>* out) {
return Open(file, position, kMaxIpcRecursionDepth, out);
Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
std::shared_ptr<RecordBatchReader>* out) {
return Open(file, offset, kMaxIpcRecursionDepth, out);
}

Status RowBatchReader::Open(io::ReadableFileInterface* file, int64_t position,
int max_recursion_depth, std::shared_ptr<RowBatchReader>* out) {
std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(file->ReadAt(position, INIT_METADATA_SIZE, &metadata));
Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer));

int32_t metadata_size = *reinterpret_cast<const int32_t*>(metadata->data());
int32_t metadata_size = *reinterpret_cast<const int32_t*>(buffer->data());

// We may not need to call ReadAt again
if (metadata_size > static_cast<int>(INIT_METADATA_SIZE - sizeof(int32_t))) {
// We don't have enough data, read the indicated metadata size.
RETURN_NOT_OK(file->ReadAt(position + sizeof(int32_t), metadata_size, &metadata));
if (metadata_size + static_cast<int>(sizeof(int32_t)) > offset) {
return Status::Invalid("metadata size invalid");
}

// Read the metadata
RETURN_NOT_OK(
file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer));

// TODO(wesm): buffer slicing here would be better in case ReadAt returns
// allocated memory

std::shared_ptr<Message> message;
RETURN_NOT_OK(Message::Open(metadata, &message));
RETURN_NOT_OK(Message::Open(buffer, &message));

if (message->type() != Message::RECORD_BATCH) {
return Status::Invalid("Metadata message is not a record batch");
}

std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();

std::shared_ptr<RowBatchReader> result(new RowBatchReader());
result->impl_.reset(new RowBatchReaderImpl(file, batch_meta, max_recursion_depth));
std::shared_ptr<RecordBatchReader> result(new RecordBatchReader());
result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, max_recursion_depth));
*out = result;

return Status::OK();
}

// Here the explicit destructor is required for compilers to be aware of
// the complete information of RowBatchReader::RowBatchReaderImpl class
RowBatchReader::~RowBatchReader() {}
// the complete information of RecordBatchReader::RecordBatchReaderImpl class
RecordBatchReader::~RecordBatchReader() {}

Status RowBatchReader::GetRowBatch(
const std::shared_ptr<Schema>& schema, std::shared_ptr<RowBatch>* out) {
Status RecordBatchReader::GetRecordBatch(
const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
return impl_->AssembleBatch(schema, out);
}

Expand Down
Loading