Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ matrix:
- compiler: gcc
language: cpp
os: linux
group: deprecated
before_script:
- export CC="gcc-4.9"
- export CXX="g++-4.9"
Expand Down
7 changes: 5 additions & 2 deletions python/pyarrow/includes/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport CArray, CSchema, CStatus, CTable, MemoryPool
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream


cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
Expand Down Expand Up @@ -131,6 +131,9 @@ cdef extern from "parquet/arrow/io.h" namespace "parquet::arrow" nogil:
ParquetReadSource(ParquetAllocator* allocator)
Open(const shared_ptr[ReadableFileInterface]& file)

cdef cppclass ParquetWriteSink:
ParquetWriteSink(const shared_ptr[OutputStream]& file)


cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
CStatus OpenFile(const shared_ptr[ReadableFileInterface]& file,
Expand All @@ -154,6 +157,6 @@ cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
cdef CStatus WriteFlatTable(
const CTable* table, MemoryPool* pool,
const shared_ptr[ParquetOutputStream]& sink,
const shared_ptr[ParquetWriteSink]& sink,
int64_t chunk_size,
const shared_ptr[WriterProperties]& properties)
18 changes: 12 additions & 6 deletions python/pyarrow/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from pyarrow.includes.libarrow cimport *
from pyarrow.includes.parquet cimport *
from pyarrow.includes.libarrow_io cimport ReadableFileInterface
from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream, FileOutputStream
cimport pyarrow.includes.pyarrow as pyarrow

from pyarrow.array cimport Array
Expand Down Expand Up @@ -151,15 +151,15 @@ def read_table(source, columns=None):
return Table.from_arrays(columns, arrays)


def write_table(table, filename, chunk_size=None, version=None,
def write_table(table, sink, chunk_size=None, version=None,
use_dictionary=True, compression=None):
"""
Write a Table to Parquet format

Parameters
----------
table : pyarrow.Table
filename : string
sink: string or pyarrow.io.NativeFile
chunk_size : int
The maximum number of rows in each Parquet RowGroup. As a default,
we will write a single RowGroup per file.
Expand All @@ -173,7 +173,8 @@ def write_table(table, filename, chunk_size=None, version=None,
"""
cdef Table table_ = table
cdef CTable* ctable_ = table_.table
cdef shared_ptr[ParquetOutputStream] sink
cdef shared_ptr[ParquetWriteSink] sink_
cdef shared_ptr[FileOutputStream] filesink_
cdef WriterProperties.Builder properties_builder
cdef int64_t chunk_size_ = 0
if chunk_size is None:
Expand Down Expand Up @@ -230,7 +231,12 @@ def write_table(table, filename, chunk_size=None, version=None,
else:
raise ArrowException("Unsupport compression codec")

sink.reset(new LocalFileOutputStream(tobytes(filename)))
if isinstance(sink, six.string_types):
check_status(FileOutputStream.Open(tobytes(sink), &filesink_))
sink_.reset(new ParquetWriteSink(<shared_ptr[OutputStream]>filesink_))
elif isinstance(sink, NativeFile):
sink_.reset(new ParquetWriteSink((<NativeFile>sink).wr_file))

with nogil:
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink,
check_status(WriteFlatTable(ctable_, default_memory_pool(), sink_,
chunk_size_, properties_builder.build()))
27 changes: 27 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest

import pyarrow as A
import pyarrow.io as paio

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -131,6 +132,32 @@ def test_pandas_column_selection(tmpdir):

pdt.assert_frame_equal(df[['uint8']], df_read)

@parquet
def test_pandas_parquet_native_file_roundtrip(tmpdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16),
'uint32': np.arange(size, dtype=np.uint32),
'uint64': np.arange(size, dtype=np.uint64),
'int8': np.arange(size, dtype=np.int16),
'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0
})
arrow_table = A.from_pandas_dataframe(df)
imos = paio.InMemoryOutputStream()
pq.write_table(arrow_table, imos, version="2.0")
buf = imos.get_result()
reader = paio.BufferReader(buf)
df_read = pq.read_table(reader).to_pandas()
pdt.assert_frame_equal(df, df_read)


@parquet
def test_pandas_parquet_configuration_options(tmpdir):
size = 10000
Expand Down