Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/pyarrow/io.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,6 @@ cdef class NativeFile:
# suite of Arrow C++ libraries
cdef read_handle(self, shared_ptr[ReadableFileInterface]* file)
cdef write_handle(self, shared_ptr[OutputStream]* file)

cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader)
cdef get_writer(object source, shared_ptr[OutputStream]* writer)
40 changes: 40 additions & 0 deletions python/pyarrow/io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,46 @@ def buffer_from_bytes(object obj):
result.init(buf)
return result

cdef get_reader(object source, shared_ptr[ReadableFileInterface]* reader):
cdef NativeFile nf

if isinstance(source, bytes):
source = BytesReader(source)
elif not isinstance(source, NativeFile) and hasattr(source, 'read'):
# Optimistically hope this is file-like
source = PythonFileInterface(source, mode='r')

if isinstance(source, NativeFile):
nf = source

# TODO: what about read-write sources (e.g. memory maps)
if not nf.is_readonly:
raise IOError('Native file is not readable')

nf.read_handle(reader)
else:
raise TypeError('Unable to read from object of type: {0}'
.format(type(source)))


cdef get_writer(object source, shared_ptr[OutputStream]* writer):
cdef NativeFile nf

if not isinstance(source, NativeFile) and hasattr(source, 'write'):
# Optimistically hope this is file-like
source = PythonFileInterface(source, mode='w')

if isinstance(source, NativeFile):
nf = source

if nf.is_readonly:
raise IOError('Native file is not writeable')

nf.write_handle(writer)
else:
raise TypeError('Unable to read from object of type: {0}'
.format(type(source)))

# ----------------------------------------------------------------------
# HDFS IO implementation

Expand Down
43 changes: 1 addition & 42 deletions python/pyarrow/ipc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ from pyarrow.includes.libarrow_ipc cimport *
cimport pyarrow.includes.pyarrow as pyarrow

from pyarrow.error cimport check_status
from pyarrow.io cimport NativeFile
from pyarrow.io cimport NativeFile, get_reader, get_writer
from pyarrow.schema cimport Schema
from pyarrow.table cimport RecordBatch

Expand All @@ -37,47 +37,6 @@ import pyarrow.io as io
cimport cpython as cp


cdef get_reader(source, shared_ptr[ReadableFileInterface]* reader):
cdef NativeFile nf

if isinstance(source, bytes):
source = io.BytesReader(source)
elif not isinstance(source, io.NativeFile) and hasattr(source, 'read'):
# Optimistically hope this is file-like
source = io.PythonFileInterface(source, mode='r')

if isinstance(source, NativeFile):
nf = source

# TODO: what about read-write sources (e.g. memory maps)
if not nf.is_readonly:
raise IOError('Native file is not readable')

nf.read_handle(reader)
else:
raise TypeError('Unable to read from object of type: {0}'
.format(type(source)))


cdef get_writer(source, shared_ptr[OutputStream]* writer):
cdef NativeFile nf

if not isinstance(source, io.NativeFile) and hasattr(source, 'write'):
# Optimistically hope this is file-like
source = io.PythonFileInterface(source, mode='w')

if isinstance(source, io.NativeFile):
nf = source

if nf.is_readonly:
raise IOError('Native file is not writeable')

nf.write_handle(writer)
else:
raise TypeError('Unable to read from object of type: {0}'
.format(type(source)))


cdef class ArrowFileWriter:
cdef:
shared_ptr[CFileWriter] writer
Expand Down
49 changes: 27 additions & 22 deletions python/pyarrow/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from pyarrow.error cimport check_status
from pyarrow.io import NativeFile
from pyarrow.table cimport Table

from pyarrow.io cimport NativeFile
from pyarrow.io cimport NativeFile, get_reader, get_writer

import six

Expand All @@ -49,22 +49,27 @@ cdef class ParquetReader:
def __cinit__(self):
self.allocator.set_pool(default_memory_pool())

cdef open_local_file(self, file_path):
cdef c_string path = tobytes(file_path)
def open(self, source):
self._open(source)

# Must be in one expression to avoid calling std::move which is not
# possible in Cython (due to missing rvalue support)
cdef _open(self, object source):
cdef:
shared_ptr[ReadableFileInterface] rd_handle
c_string path

# TODO(wesm): ParquetFileReader::OpenFIle can throw?
self.reader = unique_ptr[FileReader](
new FileReader(default_memory_pool(),
ParquetFileReader.OpenFile(path)))
if isinstance(source, six.string_types):
path = tobytes(source)

cdef open_native_file(self, NativeFile file):
cdef shared_ptr[ReadableFileInterface] cpp_handle
file.read_handle(&cpp_handle)
# Must be in one expression to avoid calling std::move which is not
# possible in Cython (due to missing rvalue support)

check_status(OpenFile(cpp_handle, &self.allocator, &self.reader))
# TODO(wesm): ParquetFileReader::OpenFile can throw?
self.reader = unique_ptr[FileReader](
new FileReader(default_memory_pool(),
ParquetFileReader.OpenFile(path)))
else:
get_reader(source, &rd_handle)
check_status(OpenFile(rd_handle, &self.allocator, &self.reader))

def read_all(self):
cdef:
Expand Down Expand Up @@ -137,11 +142,7 @@ def read_table(source, columns=None):
Content of the file as a table (of columns)
"""
cdef ParquetReader reader = ParquetReader()

if isinstance(source, six.string_types):
reader.open_local_file(source)
elif isinstance(source, NativeFile):
reader.open_native_file(source)
reader._open(source)

if columns is None:
return reader.read_all()
Expand Down Expand Up @@ -174,7 +175,10 @@ def write_table(table, sink, chunk_size=None, version=None,
cdef Table table_ = table
cdef CTable* ctable_ = table_.table
cdef shared_ptr[ParquetWriteSink] sink_

cdef shared_ptr[FileOutputStream] filesink_
cdef shared_ptr[OutputStream] general_sink

cdef WriterProperties.Builder properties_builder
cdef int64_t chunk_size_ = 0
if chunk_size is None:
Expand Down Expand Up @@ -232,10 +236,11 @@ def write_table(table, sink, chunk_size=None, version=None,
raise ArrowException("Unsupport compression codec")

if isinstance(sink, six.string_types):
check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
elif isinstance(sink, NativeFile):
sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))
check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
else:
get_writer(sink, &general_sink)
sink_.reset(new ParquetWriteSink(general_sink))

with nogil:
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,
Expand Down
36 changes: 33 additions & 3 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import io
import pytest

import pyarrow as A
Expand Down Expand Up @@ -132,9 +133,8 @@ def test_pandas_column_selection(tmpdir):

pdt.assert_frame_equal(df[['uint8']], df_read)

@parquet
def test_pandas_parquet_native_file_roundtrip(tmpdir):
size = 10000

def _test_dataframe(size=10000):
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
Expand All @@ -149,6 +149,12 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0
})
return df


@parquet
def test_pandas_parquet_native_file_roundtrip(tmpdir):
df = _test_dataframe(10000)
arrow_table = A.from_pandas_dataframe(df)
imos = paio.InMemoryOutputStream()
pq.write_table(arrow_table, imos, version="2.0")
Expand All @@ -158,6 +164,30 @@ def test_pandas_parquet_native_file_roundtrip(tmpdir):
pdt.assert_frame_equal(df, df_read)


@parquet
def test_pandas_parquet_pyfile_roundtrip(tmpdir):
filename = tmpdir.join('pandas_pyfile_roundtrip.parquet').strpath
size = 5
df = pd.DataFrame({
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
'strings': ['foo', 'bar', None, 'baz', 'qux']
})

arrow_table = A.from_pandas_dataframe(df)

with open(filename, 'wb') as f:
A.parquet.write_table(arrow_table, f, version="1.0")

data = io.BytesIO(open(filename, 'rb').read())

table_read = pq.read_table(data)
df_read = table_read.to_pandas()
pdt.assert_frame_equal(df, df_read)


@parquet
def test_pandas_parquet_configuration_options(tmpdir):
size = 10000
Expand Down