Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions cpp/src/arrow/io/io-memory-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,30 @@ TEST_F(TestMemoryMappedFile, InvalidFile) {
IOError, MemoryMappedFile::Open(non_existent_path, FileMode::READ, &result));
}

class TestBufferOutputStream : public ::testing::Test {
public:
void SetUp() {
buffer_.reset(new PoolBuffer(default_memory_pool()));
stream_.reset(new BufferOutputStream(buffer_));
}

protected:
std::shared_ptr<PoolBuffer> buffer_;
std::unique_ptr<OutputStream> stream_;
};

TEST_F(TestBufferOutputStream, CloseResizes) {
std::string data = "data123456";

const int64_t nbytes = static_cast<int64_t>(data.size());
const int K = 100;
for (int i = 0; i < K; ++i) {
EXPECT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes));
}

ASSERT_OK(stream_->Close());
ASSERT_EQ(K * nbytes, buffer_->size());
}

} // namespace io
} // namespace arrow
13 changes: 10 additions & 3 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,11 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b
mutable_data_(buffer->mutable_data()) {}

Status BufferOutputStream::Close() {
return Status::OK();
if (position_ < capacity_) {
return buffer_->Resize(position_);
} else {
return Status::OK();
}
}

Status BufferOutputStream::Tell(int64_t* position) {
Expand All @@ -228,8 +232,11 @@ Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
}

Status BufferOutputStream::Reserve(int64_t nbytes) {
while (position_ + nbytes > capacity_) {
int64_t new_capacity = std::max(kBufferMinimumSize, capacity_ * 2);
int64_t new_capacity = capacity_;
while (position_ + nbytes > new_capacity) {
new_capacity = std::max(kBufferMinimumSize, new_capacity * 2);
}
if (new_capacity > capacity_) {
RETURN_NOT_OK(buffer_->Resize(new_capacity));
capacity_ = new_capacity;
}
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ SET_TARGET_PROPERTIES(arrow_ipc PROPERTIES
LINKER_LANGUAGE CXX
LINK_FLAGS "${ARROW_IPC_LINK_FLAGS}")

if (APPLE)
set_target_properties(arrow_ipc
PROPERTIES
BUILD_WITH_INSTALL_RPATH ON
INSTALL_NAME_DIR "@rpath")
endif()

ADD_ARROW_TEST(ipc-adapter-test)
ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test
${ARROW_IPC_TEST_LINK_LIBS})
Expand Down
16 changes: 10 additions & 6 deletions cpp/src/arrow/ipc/adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,14 @@ class RecordBatchWriter {
for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
int64_t size = 0;
int64_t padding = 0;

// The buffer might be null if we are handling zero row lengths.
if (buffer) {
// We use capacity here, because size might not reflect the padding
// requirements of buffers but capacity always should.
size = buffer->capacity();
// check that padding is appropriate
RETURN_NOT_OK(CheckMultipleOf64(size));
size = buffer->size();
padding = util::RoundUpToMultipleOf64(size) - size;
}

// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
// future. Use page=0 for now
Expand All @@ -179,12 +178,17 @@ class RecordBatchWriter {
// are using from any OS-level shared memory. The thought is that systems
// may (in the future) associate integer page id's with physical memory
// pages (according to whatever is the desired shared memory mechanism)
buffer_meta_.push_back(flatbuf::Buffer(0, position, size));
buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding));

if (size > 0) {
RETURN_NOT_OK(dst->Write(buffer->data(), size));
position += size;
}

if (padding > 0) {
RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
position += padding;
}
}

*body_end_offset = position;
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/ipc/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ namespace ipc {

// Align on 8-byte boundaries
static constexpr int kArrowAlignment = 8;
static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};

// Buffers are padded to 64-byte boundaries (for SIMD)
static constexpr int kArrowBufferAlignment = 64;

static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0};

static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
return ((nbytes + alignment - 1) / alignment) * alignment;
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/arrow/table-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,31 @@ TEST_F(TestTable, InvalidColumns) {
ASSERT_RAISES(Invalid, table_->ValidateColumns());
}

class TestRecordBatch : public TestBase {};

TEST_F(TestRecordBatch, Equals) {
const int length = 10;

auto f0 = std::make_shared<Field>("f0", INT32);
auto f1 = std::make_shared<Field>("f1", UINT8);
auto f2 = std::make_shared<Field>("f2", INT16);

vector<shared_ptr<Field>> fields = {f0, f1, f2};
auto schema = std::make_shared<Schema>(fields);

auto a0 = MakePrimitive<Int32Array>(length);
auto a1 = MakePrimitive<UInt8Array>(length);
auto a2 = MakePrimitive<Int16Array>(length);

RecordBatch b1(schema, length, {a0, a1, a2});
RecordBatch b2(schema, 5, {a0, a1, a2});
RecordBatch b3(schema, length, {a0, a1});
RecordBatch b4(schema, length, {a0, a1, a1});

ASSERT_TRUE(b1.Equals(b1));
ASSERT_FALSE(b1.Equals(b2));
ASSERT_FALSE(b1.Equals(b3));
ASSERT_FALSE(b1.Equals(b4));
}

} // namespace arrow
16 changes: 16 additions & 0 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <sstream>

#include "arrow/array.h"
#include "arrow/column.h"
#include "arrow/schema.h"
#include "arrow/util/status.h"
Expand All @@ -35,6 +36,21 @@ const std::string& RecordBatch::column_name(int i) const {
return schema_->field(i)->name;
}

bool RecordBatch::Equals(const RecordBatch& other) const {
if (num_columns() != other.num_columns() || num_rows_ != other.num_rows()) {
return false;
}

for (int i = 0; i < num_columns(); ++i) {
if (!column(i)->Equals(other.column(i))) { return false; }
}

return true;
}

// ----------------------------------------------------------------------
// Table methods

Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema,
const std::vector<std::shared_ptr<Column>>& columns)
: name_(name), schema_(schema), columns_(columns) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class ARROW_EXPORT RecordBatch {
RecordBatch(const std::shared_ptr<Schema>& schema, int32_t num_rows,
const std::vector<std::shared_ptr<Array>>& columns);

bool Equals(const RecordBatch& other) const;

// @returns: the table's schema
const std::shared_ptr<Schema>& schema() const { return schema_; }

Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/types/primitive-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ void TestPrimitiveBuilder<PBoolean>::Check(
}

typedef ::testing::Types<PBoolean, PUInt8, PUInt16, PUInt32, PUInt64, PInt8, PInt16,
PInt32, PInt64, PFloat, PDouble>
Primitives;
PInt32, PInt64, PFloat, PDouble> Primitives;

TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives);

Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/util/bit-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define ARROW_UTIL_BIT_UTIL_H

#include <cstdint>
#include <limits>
#include <memory>
#include <vector>

Expand Down Expand Up @@ -77,6 +78,18 @@ static inline bool is_multiple_of_64(int64_t n) {
return (n & 63) == 0;
}

inline int64_t RoundUpToMultipleOf64(int64_t num) {
// TODO(wesm): is this definitely needed?
// DCHECK_GE(num, 0);
constexpr int64_t round_to = 64;
constexpr int64_t force_carry_addend = round_to - 1;
constexpr int64_t truncate_bitmask = ~(round_to - 1);
constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
// handle overflow case. This should result in a malloc error upstream
return num;
}

void bytes_to_bits(const std::vector<uint8_t>& bytes, uint8_t* bits);
ARROW_EXPORT Status bytes_to_bits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*);

Expand Down
16 changes: 2 additions & 14 deletions cpp/src/arrow/util/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,13 @@
#include <cstdint>
#include <limits>

#include "arrow/util/bit-util.h"
#include "arrow/util/logging.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

namespace arrow {

namespace {
int64_t RoundUpToMultipleOf64(int64_t num) {
DCHECK_GE(num, 0);
constexpr int64_t round_to = 64;
constexpr int64_t force_carry_addend = round_to - 1;
constexpr int64_t truncate_bitmask = ~(round_to - 1);
constexpr int64_t max_roundable_num = std::numeric_limits<int64_t>::max() - round_to;
if (num <= max_roundable_num) { return (num + force_carry_addend) & truncate_bitmask; }
// handle overflow case. This should result in a malloc error upstream
return num;
}
} // namespace

Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
data_ = parent->data() + offset;
size_ = size;
Expand All @@ -64,7 +52,7 @@ PoolBuffer::~PoolBuffer() {
Status PoolBuffer::Reserve(int64_t new_capacity) {
if (!mutable_data_ || new_capacity > capacity_) {
uint8_t* new_data;
new_capacity = RoundUpToMultipleOf64(new_capacity);
new_capacity = util::RoundUpToMultipleOf64(new_capacity);
if (mutable_data_) {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
memcpy(new_data, mutable_data_, size_);
Expand Down
1 change: 0 additions & 1 deletion cpp/src/arrow/util/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <cstring>
#include <memory>

#include "arrow/util/bit-util.h"
#include "arrow/util/macros.h"
#include "arrow/util/status.h"
#include "arrow/util/visibility.h"
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/logging.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ class CerrLog {
class FatalLog : public CerrLog {
public:
explicit FatalLog(int /* severity */) // NOLINT
: CerrLog(ARROW_FATAL){} // NOLINT
: CerrLog(ARROW_FATAL) {} // NOLINT

[[noreturn]] ~FatalLog() {
[[noreturn]] ~FatalLog() {
if (has_logged_) { std::cerr << std::endl; }
std::exit(1);
}
Expand Down
8 changes: 6 additions & 2 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ ADD_THIRDPARTY_LIB(arrow
SHARED_LIB ${ARROW_SHARED_LIB})
ADD_THIRDPARTY_LIB(arrow_io
SHARED_LIB ${ARROW_IO_SHARED_LIB})
ADD_THIRDPARTY_LIB(arrow_ipc
SHARED_LIB ${ARROW_IPC_SHARED_LIB})

############################################################
# Linker setup
Expand Down Expand Up @@ -415,6 +417,8 @@ if (UNIX)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
endif()

SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

add_subdirectory(src/pyarrow)
add_subdirectory(src/pyarrow/util)

Expand All @@ -423,6 +427,7 @@ set(CYTHON_EXTENSIONS
config
error
io
ipc
scalar
schema
table
Expand All @@ -442,6 +447,7 @@ set(PYARROW_SRCS
set(LINK_LIBS
arrow
arrow_io
arrow_ipc
)

if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
Expand All @@ -455,8 +461,6 @@ if(PARQUET_FOUND AND PARQUET_ARROW_FOUND)
parquet)
endif()

SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE)

add_library(pyarrow SHARED
${PYARROW_SRCS})
target_link_libraries(pyarrow ${LINK_LIBS})
Expand Down
11 changes: 11 additions & 0 deletions python/cmake_modules/FindArrow.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,31 @@ find_library(ARROW_IO_LIB_PATH NAMES arrow_io
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)

find_library(ARROW_IPC_LIB_PATH NAMES arrow_ipc
PATHS
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)

if (ARROW_INCLUDE_DIR AND ARROW_LIB_PATH)
set(ARROW_FOUND TRUE)
set(ARROW_LIB_NAME libarrow)
set(ARROW_IO_LIB_NAME libarrow_io)
set(ARROW_IPC_LIB_NAME libarrow_ipc)

set(ARROW_LIBS ${ARROW_SEARCH_LIB_PATH})
set(ARROW_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_LIB_NAME}.a)
set(ARROW_SHARED_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})

set(ARROW_IO_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IO_LIB_NAME}.a)
set(ARROW_IO_SHARED_LIB ${ARROW_LIBS}/${ARROW_IO_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})

set(ARROW_IPC_STATIC_LIB ${ARROW_SEARCH_LIB_PATH}/${ARROW_IPC_LIB_NAME}.a)
set(ARROW_IPC_SHARED_LIB ${ARROW_LIBS}/${ARROW_IPC_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})

if (NOT Arrow_FIND_QUIETLY)
message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
message(STATUS "Found the Arrow IO library: ${ARROW_IO_LIB_PATH}")
message(STATUS "Found the Arrow IPC library: ${ARROW_IPC_LIB_PATH}")
endif ()
else ()
if (NOT Arrow_FIND_QUIETLY)
Expand Down
3 changes: 1 addition & 2 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,4 @@
list_, struct, field,
DataType, Field, Schema, schema)

from pyarrow.array import RowBatch
from pyarrow.table import Column, Table, from_pandas_dataframe
from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe
Loading