Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/python/pandas_convert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1102,7 +1102,7 @@ static inline PyObject* NewArray1DFromType(

set_numpy_metadata(type, arrow_type, descr);
return PyArray_NewFromDescr(&PyArray_Type, descr, 1, dims, nullptr, data,
NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY, nullptr);
NPY_ARRAY_OWNDATA | NPY_ARRAY_CARRAY | NPY_ARRAY_WRITEABLE, nullptr);
}

class PandasBlock {
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,5 @@ def deserialize_pandas(buf, nthreads=1):
"""
buffer_reader = pa.BufferReader(buf)
reader = pa.RecordBatchFileReader(buffer_reader)
return reader.read_all().to_pandas(nthreads=nthreads)
table = reader.read_all()
return table.to_pandas(nthreads=nthreads)
73 changes: 73 additions & 0 deletions python/pyarrow/pandas_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,76 @@ def construct_metadata(df, index_levels, preserve_index):
}
).encode('utf8')
}


def table_to_blockmanager(table, nthreads=1):
import pandas.core.internals as _int
from pyarrow.compat import DatetimeTZDtype
import pyarrow.lib as lib

block_table = table

index_columns = []
index_arrays = []
index_names = []
schema = table.schema
row_count = table.num_rows
metadata = schema.metadata

if metadata is not None and b'pandas' in metadata:
pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
index_columns = pandas_metadata['index_columns']

for name in index_columns:
i = schema.get_field_index(name)
if i != -1:
col = table.column(i)
index_name = (None if is_unnamed_index_level(name)
else name)
values = col.to_pandas().values
if not values.flags.writeable:
# ARROW-1054: in pandas 0.19.2, factorize will reject
# non-writeable arrays when calling MultiIndex.from_arrays
values = values.copy()

index_arrays.append(values)
index_names.append(index_name)
block_table = block_table.remove_column(
block_table.schema.get_field_index(name)
)

result = lib.table_to_blocks(block_table, nthreads)

blocks = []
for item in result:
block_arr = item['block']
placement = item['placement']
if 'dictionary' in item:
cat = pd.Categorical(block_arr,
categories=item['dictionary'],
ordered=False, fastpath=True)
block = _int.make_block(cat, placement=placement,
klass=_int.CategoricalBlock,
fastpath=True)
elif 'timezone' in item:
dtype = DatetimeTZDtype('ns', tz=item['timezone'])
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype, fastpath=True)
else:
block = _int.make_block(block_arr, placement=placement)
blocks.append(block)

if len(index_arrays) > 1:
index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
elif len(index_arrays) == 1:
index = pd.Index(index_arrays[0], name=index_names[0])
else:
index = pd.RangeIndex(row_count)

axes = [
[column.name for column in block_table.itercolumns()],
index
]

return _int.BlockManager(blocks, axes)
1 change: 0 additions & 1 deletion python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# specific language governing permissions and limitations
# under the License.

import itertools
import json

import six
Expand Down
76 changes: 5 additions & 71 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -559,86 +559,20 @@ cdef class RecordBatch:
return pyarrow_wrap_batch(batch)


cdef table_to_blockmanager(const shared_ptr[CTable]& ctable, int nthreads):
import pandas.core.internals as _int
from pandas import RangeIndex, Categorical
from pyarrow.compat import DatetimeTZDtype

cdef:
Table table = pyarrow_wrap_table(ctable)
Table block_table = pyarrow_wrap_table(ctable)
Schema schema = table.schema

size_t row_count = table.num_rows
size_t total_columns = table.num_columns

dict metadata = schema.metadata
dict pandas_metadata = None

list index_columns = []
list index_arrays = []

if metadata is not None and b'pandas' in metadata:
pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
index_columns = pandas_metadata['index_columns']

cdef:
Column col
int64_t i

for name in index_columns:
i = schema.get_field_index(name)
if i != -1:
col = table.column(i)
index_name = None if pdcompat.is_unnamed_index_level(name) else name
index_arrays.append(
pd.Index(col.to_pandas().values, name=index_name)
)
block_table = block_table.remove_column(
block_table.schema.get_field_index(name)
)

def table_to_blocks(Table table, int nthreads):
cdef:
PyObject* result_obj
shared_ptr[CTable] c_block_table = block_table.sp_table
shared_ptr[CTable] c_table = table.sp_table

with nogil:
check_status(
libarrow.ConvertTableToPandas(
c_block_table, nthreads, &result_obj
c_table, nthreads, &result_obj
)
)

result = PyObject_to_object(result_obj)

blocks = []
for item in result:
block_arr = item['block']
placement = item['placement']
if 'dictionary' in item:
cat = Categorical(block_arr,
categories=item['dictionary'],
ordered=False, fastpath=True)
block = _int.make_block(cat, placement=placement,
klass=_int.CategoricalBlock,
fastpath=True)
elif 'timezone' in item:
dtype = DatetimeTZDtype('ns', tz=item['timezone'])
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype, fastpath=True)
else:
block = _int.make_block(block_arr, placement=placement)
blocks.append(block)

cdef list axes = [
[column.name for column in block_table.itercolumns()],
pd.MultiIndex.from_arrays(
index_arrays
) if index_arrays else pd.RangeIndex(row_count),
]
return PyObject_to_object(result_obj)

return _int.BlockManager(blocks, axes)


cdef class Table:
Expand Down Expand Up @@ -829,7 +763,7 @@ cdef class Table:
if nthreads is None:
nthreads = cpu_count()

mgr = table_to_blockmanager(self.sp_table, nthreads)
mgr = pdcompat.table_to_blockmanager(self, nthreads)
return pd.DataFrame(mgr)

def to_pydict(self):
Expand Down