Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions python/pyarrow/includes/pyarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ cdef extern from "pyarrow/api.h" namespace "pyarrow" nogil:
PyStatus PandasMaskedToArrow(MemoryPool* pool, object ao, object mo,
shared_ptr[CArray]* out)

PyStatus ArrowToPandas(const shared_ptr[CColumn]& arr, object py_ref,
PyObject** out)
PyStatus ConvertArrayToPandas(const shared_ptr[CArray]& arr,
object py_ref, PyObject** out)

PyStatus ConvertColumnToPandas(const shared_ptr[CColumn]& arr,
object py_ref, PyObject** out)

MemoryPool* get_memory_pool()

Expand Down
12 changes: 12 additions & 0 deletions python/pyarrow/io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,18 @@ cdef class InMemoryOutputStream(NativeFile):
return result


cdef class BufferReader(NativeFile):
cdef:
Buffer buffer

def __cinit__(self, Buffer buffer):
self.buffer = buffer
self.rd_file.reset(new CBufferReader(buffer.buffer.get().data(),
buffer.buffer.get().size()))
self.is_readonly = 1
self.is_open = True


def buffer_from_bytes(object obj):
"""
Construct an Arrow buffer from a Python bytes object
Expand Down
25 changes: 23 additions & 2 deletions python/pyarrow/table.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ cdef class Column:

import pandas as pd

check_status(pyarrow.ArrowToPandas(self.sp_column, self, &arr))
check_status(pyarrow.ConvertColumnToPandas(self.sp_column, self, &arr))
return pd.Series(<object>arr, name=self.name)

cdef _check_nullptr(self):
Expand Down Expand Up @@ -233,6 +233,27 @@ cdef class RecordBatch:

return self.batch.Equals(deref(other.batch))

def to_pandas(self):
"""
Convert the arrow::RecordBatch to a pandas DataFrame
"""
cdef:
PyObject* np_arr
shared_ptr[CArray] arr
Column column

import pandas as pd

names = []
data = []
for i in range(self.batch.num_columns()):
arr = self.batch.column(i)
check_status(pyarrow.ConvertArrayToPandas(arr, self, &np_arr))
names.append(frombytes(self.batch.column_name(i)))
data.append(<object> np_arr)

return pd.DataFrame(dict(zip(names, data)), columns=names)

@classmethod
def from_pandas(cls, df):
"""
Expand Down Expand Up @@ -354,7 +375,7 @@ cdef class Table:
for i in range(self.table.num_columns()):
col = self.table.column(i)
column = self.column(i)
check_status(pyarrow.ArrowToPandas(col, column, &arr))
check_status(pyarrow.ConvertColumnToPandas(col, column, &arr))
names.append(frombytes(col.get().name()))
data.append(<object> arr)

Expand Down
40 changes: 37 additions & 3 deletions python/pyarrow/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io

import numpy as np

from pandas.util.testing import assert_frame_equal
import pandas as pd

import pyarrow as A
Expand Down Expand Up @@ -85,17 +87,40 @@ def test_ipc_file_simple_roundtrip():
helper.run()


def test_ipc_zero_copy_numpy():
df = pd.DataFrame({'foo': [1.5]})

batch = A.RecordBatch.from_pandas(df)
sink = arrow_io.InMemoryOutputStream()
write_file(batch, sink)
buffer = sink.get_result()
reader = arrow_io.BufferReader(buffer)

batches = read_file(reader)

data = batches[0].to_pandas()
rdf = pd.DataFrame(data)
assert_frame_equal(df, rdf)


# XXX: For benchmarking

def big_batch():
K = 2**4
N = 2**20
df = pd.DataFrame(
np.random.randn(2**4, 2**20).T,
columns=[str(i) for i in range(2**4)]
np.random.randn(K, N).T,
columns=[str(i) for i in range(K)]
)

df = pd.concat([df] * 2 ** 3, ignore_index=True)
return df


return A.RecordBatch.from_pandas(df)
def write_to_memory2(batch):
sink = arrow_io.InMemoryOutputStream()
write_file(batch, sink)
return sink.get_result()


def write_to_memory(batch):
Expand All @@ -114,3 +139,12 @@ def read_file(source):
reader = ipc.ArrowFileReader(source)
return [reader.get_record_batch(i)
for i in range(reader.num_record_batches)]

# df = big_batch()
# batch = A.RecordBatch.from_pandas(df)
# mem = write_to_memory(batch)
# batches = read_file(mem)
# data = batches[0].to_pandas()
# rdf = pd.DataFrame(data)

# [x.to_pandas() for x in batches]
41 changes: 30 additions & 11 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,47 @@
# specific language governing permissions and limitations
# under the License.

import pyarrow as A
import numpy as np

from pandas.util.testing import assert_frame_equal
import pandas as pd

import pyarrow as pa
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: We should settle on a single abbreviation for pyarrow



def test_recordbatch_basics():
data = [
A.from_pylist(range(5)),
A.from_pylist([-10, -5, 0, 5, 10])
pa.from_pylist(range(5)),
pa.from_pylist([-10, -5, 0, 5, 10])
]

batch = A.RecordBatch.from_arrays(['c0', 'c1'], data)
batch = pa.RecordBatch.from_arrays(['c0', 'c1'], data)

assert len(batch) == 5
assert batch.num_rows == 5
assert batch.num_columns == len(data)


def test_recordbatch_from_to_pandas():
data = pd.DataFrame({
'c1': np.array([1, 2, 3, 4, 5], dtype='int64'),
'c2': np.array([1, 2, 3, 4, 5], dtype='uint32'),
'c2': np.random.randn(5),
'c3': ['foo', 'bar', None, 'baz', 'qux'],
'c4': [False, True, False, True, False]
})

batch = pa.RecordBatch.from_pandas(data)
result = batch.to_pandas()
assert_frame_equal(data, result)


def test_table_basics():
data = [
A.from_pylist(range(5)),
A.from_pylist([-10, -5, 0, 5, 10])
pa.from_pylist(range(5)),
pa.from_pylist([-10, -5, 0, 5, 10])
]
table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')
assert table.name == 'table_name'
assert len(table) == 5
assert table.num_rows == 5
Expand All @@ -50,15 +69,15 @@ def test_table_basics():

def test_table_pandas():
data = [
A.from_pylist(range(5)),
A.from_pylist([-10, -5, 0, 5, 10])
pa.from_pylist(range(5)),
pa.from_pylist([-10, -5, 0, 5, 10])
]
table = A.Table.from_arrays(('a', 'b'), data, 'table_name')
table = pa.Table.from_arrays(('a', 'b'), data, 'table_name')

# TODO: Use this part once from_pandas is implemented
# data = {'a': range(5), 'b': [-10, -5, 0, 5, 10]}
# df = pd.DataFrame(data)
# A.Table.from_pandas(df)
# pa.Table.from_pandas(df)

df = table.to_pandas()
assert set(df.columns) == set(('a', 'b'))
Expand Down
19 changes: 17 additions & 2 deletions python/src/pyarrow/adapters/pandas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include "pyarrow/numpy_interop.h"

#include "pyarrow/adapters/pandas.h"

#include <cmath>
#include <cstdint>
#include <memory>
Expand All @@ -38,6 +40,7 @@ namespace pyarrow {

using arrow::Array;
using arrow::Column;
using arrow::Field;
using arrow::DataType;
namespace util = arrow::util;

Expand Down Expand Up @@ -106,7 +109,7 @@ struct npy_traits<NPY_FLOAT64> {

template <>
struct npy_traits<NPY_DATETIME> {
typedef double value_type;
typedef int64_t value_type;
using TypeClass = arrow::TimestampType;

static constexpr bool supports_nulls = true;
Expand Down Expand Up @@ -163,6 +166,8 @@ class ArrowSerializer {
Status ConvertData();

Status ConvertObjectStrings(std::shared_ptr<Array>* out) {
PyAcquireGIL lock;

PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));
arrow::TypePtr string_type(new arrow::StringType());
arrow::StringBuilder string_builder(pool_, string_type);
Expand Down Expand Up @@ -197,6 +202,8 @@ class ArrowSerializer {
}

Status ConvertBooleans(std::shared_ptr<Array>* out) {
PyAcquireGIL lock;

PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_));

int nbytes = util::bytes_for_bits(length_);
Expand Down Expand Up @@ -798,7 +805,15 @@ class ArrowDeserializer {
} \
break;

Status ArrowToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
Status ConvertArrayToPandas(const std::shared_ptr<Array>& arr, PyObject* py_ref,
PyObject** out) {
static std::string dummy_name = "dummy";
auto field = std::make_shared<Field>(dummy_name, arr->type());
auto col = std::make_shared<Column>(field, arr);
return ConvertColumnToPandas(col, py_ref, out);
}

Status ConvertColumnToPandas(const std::shared_ptr<Column>& col, PyObject* py_ref,
PyObject** out) {
switch(col->type()->type) {
FROM_ARROW_CASE(BOOL);
Expand Down
7 changes: 6 additions & 1 deletion python/src/pyarrow/adapters/pandas.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace arrow {

class Array;
class Column;
class MemoryPool;

} // namespace arrow

Expand All @@ -39,7 +40,11 @@ namespace pyarrow {
class Status;

PYARROW_EXPORT
Status ArrowToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
Status ConvertArrayToPandas(const std::shared_ptr<arrow::Array>& arr, PyObject* py_ref,
PyObject** out);

PYARROW_EXPORT
Status ConvertColumnToPandas(const std::shared_ptr<arrow::Column>& col, PyObject* py_ref,
PyObject** out);

PYARROW_EXPORT
Expand Down
4 changes: 2 additions & 2 deletions python/src/pyarrow/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class PYARROW_EXPORT NumPyBuffer : public arrow::Buffer {
Py_INCREF(arr);

data_ = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_));
size_ = PyArray_SIZE(arr_);
capacity_ = size_ * PyArray_DESCR(arr_)->elsize;
size_ = PyArray_SIZE(arr_) * PyArray_DESCR(arr_)->elsize;
capacity_ = size_;
}

virtual ~NumPyBuffer() {
Expand Down
2 changes: 1 addition & 1 deletion python/src/pyarrow/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ arrow::Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
ARROW_RETURN_NOT_OK(CheckPyError());

PyObject* result = PyObject_CallMethod(file_, "write", "(O)", py_data);
Py_DECREF(py_data);
Py_XDECREF(py_data);
Py_XDECREF(result);
ARROW_RETURN_NOT_OK(CheckPyError());
return arrow::Status::OK();
Expand Down