Skip to content
65 changes: 51 additions & 14 deletions python/doc/source/plasma.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,46 @@ follows:
def random_object_id():
return plasma.ObjectID(np.random.bytes(20))

Putting and Getting Python Objects
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Creating an Object
^^^^^^^^^^^^^^^^^^
Plasma supports two APIs for creating and accessing objects: A high level
API that allows storing and retrieving Python objects and a low level
API that allows creating, writing and sealing buffers and operating on
the binary data directly. In this section we describe the high level API.

This is how you can put and get a Python object:

.. code-block:: python

# Create a python object.
object_id = client.put("hello, world")

# Get the object.
client.get(object_id)

This works with all Python objects supported by the Arrow Python object
serialization.

You can also get multiple objects at the same time (which can be more
efficient since it avoids IPC round trips):

.. code-block:: python

# Create multiple python objects.
object_id1 = client.put(1)
object_id2 = client.put(2)
object_id3 = client.put(3)

# Get the objects.
client.get([object_id1, object_id2, object_id3])

Furthermore, it is possible to provide a timeout for the get call. If the
object is not available within the timeout, the special object
`pyarrow.ObjectNotAvailable` will be returned.

Creating an Object Buffer
^^^^^^^^^^^^^^^^^^^^^^^^^

Objects are created in Plasma in two stages. First, they are **created**, which
allocates a buffer for the object. At this point, the client can write to the
Expand All @@ -111,7 +148,7 @@ give the object's maximum size in bytes.

.. code-block:: python

# Create an object.
# Create an object buffer.
object_id = plasma.ObjectID(20 * b"a")
object_size = 1000
buffer = memoryview(client.create(object_id, object_size))
Expand All @@ -129,11 +166,11 @@ immutable, and making it available to other Plasma clients.
client.seal(object_id)


Getting an Object
^^^^^^^^^^^^^^^^^
Getting an Object Buffer
^^^^^^^^^^^^^^^^^^^^^^^^

After an object has been sealed, any client who knows the object ID can get
the object.
the object buffer.

.. code-block:: python

Expand All @@ -143,11 +180,11 @@ the object.

# Get the object in the second client. This blocks until the object has been sealed.
object_id2 = plasma.ObjectID(20 * b"a")
[buffer2] = client2.get([object_id])
[buffer2] = client2.get_buffers([object_id])

If the object has not been sealed yet, then the call to client.get will block
until the object has been sealed by the client constructing the object. Using
the ``timeout_ms`` argument to get, you can specify a timeout for this (in
If the object has not been sealed yet, then the call to client.get_buffers will
block until the object has been sealed by the client constructing the object.
Using the ``timeout_ms`` argument to get, you can specify a timeout for this (in
milliseconds). After the timeout, the interpreter will yield control back.

.. code-block:: shell
Expand Down Expand Up @@ -223,7 +260,7 @@ To read the object, first retrieve it as a ``PlasmaBuffer`` using its object ID.
.. code-block:: python

# Get the arrow object by ObjectID.
[buf2] = client.get([object_id])
[buf2] = client.get_buffers([object_id])

To convert the ``PlasmaBuffer`` back into an Arrow ``Tensor``, first create a
pyarrow ``BufferReader`` object from it. You can then pass the ``BufferReader``
Expand Down Expand Up @@ -310,13 +347,13 @@ Since we store the Pandas DataFrame as a PyArrow ``RecordBatch`` object,
to get the object back from the Plasma store, we follow similar steps
to those specified in `Getting Arrow Objects from Plasma`_.

We first have to convert the ``PlasmaBuffer`` returned from ``client.get``
into an Arrow ``BufferReader`` object.
We first have to convert the ``PlasmaBuffer`` returned from
``client.get_buffers`` into an Arrow ``BufferReader`` object.

.. code-block:: python

# Fetch the Plasma object
[data] = client.get([object_id]) # Get PlasmaBuffer from ObjectID
[data] = client.get_buffers([object_id]) # Get PlasmaBuffer from ObjectID
buffer = pa.BufferReader(data)

From the ``BufferReader``, we can create a specific ``RecordBatchStreamReader``
Expand Down
2 changes: 1 addition & 1 deletion python/examples/plasma/sorting/sort_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def put_df(df):

def get_dfs(object_ids):
"""Retrieve dataframes from the object store given their object IDs."""
buffers = client.get(object_ids)
buffers = client.get_buffers(object_ids)
return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas()
for buf in buffers]

Expand Down
6 changes: 4 additions & 2 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@
ArrowIOError,
ArrowMemoryError,
ArrowNotImplementedError,
ArrowTypeError)
ArrowTypeError,
PlasmaObjectExists)

# Serialization
from pyarrow.lib import (deserialize_from, deserialize,
serialize, serialize_to, read_serialized,
SerializedPyObject)
SerializedPyObject,
SerializationException, DeserializationException)

from pyarrow.filesystem import FileSystem, LocalFileSystem

Expand Down
86 changes: 83 additions & 3 deletions python/pyarrow/plasma.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ from libcpp.vector cimport vector as c_vector
from libc.stdint cimport int64_t, uint8_t, uintptr_t
from cpython.pycapsule cimport *

import collections
import pyarrow

from pyarrow.lib cimport Buffer, NativeFile, check_status
from pyarrow.includes.libarrow cimport (CMutableBuffer, CBuffer,
CFixedSizeBufferWriter, CStatus)
Expand All @@ -41,6 +44,9 @@ cdef extern from "plasma/common.h" nogil:
@staticmethod
CUniqueID from_binary(const c_string& binary)

@staticmethod
CUniqueID from_random()

c_bool operator==(const CUniqueID& rhs) const

c_string hex() const
Expand Down Expand Up @@ -157,6 +163,18 @@ cdef class ObjectID:
"""
return self.data.binary()

@staticmethod
def from_random():
cdef CUniqueID data = CUniqueID.from_random()
return ObjectID(data.binary())


cdef class ObjectNotAvailable:
"""
Placeholder for an object that was not available within the given timeout.
"""
pass


cdef class PlasmaBuffer(Buffer):
"""
Expand Down Expand Up @@ -285,7 +303,7 @@ cdef class PlasmaClient:
metadata.size(), &data))
return self._make_mutable_plasma_buffer(object_id, data, data_size)

def get(self, object_ids, timeout_ms=-1):
def get_buffers(self, object_ids, timeout_ms=-1):
"""
Returns data buffer from the PlasmaStore based on object ID.

Expand All @@ -296,7 +314,7 @@ cdef class PlasmaClient:
----------
object_ids : list
A list of ObjectIDs used to identify some objects.
timeout_ms :int
timeout_ms : int
The number of milliseconds that the get call should block before
timing out and returning. Pass -1 if the call should block and 0
if the call should return immediately.
Expand Down Expand Up @@ -352,6 +370,68 @@ cdef class PlasmaClient:
object_buffers[i].metadata_size))
return result

def put(self, object value, ObjectID object_id=None):
"""
Store a Python value into the object store.

Parameters
----------
value : object
A Python object to store.
object_id : ObjectID, default None
If this is provided, the specified object ID will be used to refer
to the object.

Returns
-------
The object ID associated to the Python object.
"""
cdef ObjectID target_id = object_id if object_id else ObjectID.from_random()
# TODO(pcm): Make serialization code support non-sequences and
# get rid of packing the value into a list here (and unpacking in get)
serialized = pyarrow.serialize([value])
buffer = self.create(target_id, serialized.total_bytes)
stream = pyarrow.FixedSizeBufferOutputStream(buffer)
stream.set_memcopy_threads(4)
serialized.write_to(stream)
self.seal(target_id)
return target_id

def get(self, object_ids, int timeout_ms=-1):
"""
Get one or more Python values from the object store.

Parameters
----------
object_ids : list or ObjectID
Object ID or list of object IDs associated to the values we get from
the store.
timeout_ms : int, default -1
The number of milliseconds that the get call should block before
timing out and returning. Pass -1 if the call should block and 0
if the call should return immediately.

Returns
-------
list or object
Python value or list of Python values for the data associated with
the object_ids and ObjectNotAvailable if the object was not available.
"""
if isinstance(object_ids, collections.Sequence):
results = []
buffers = self.get_buffers(object_ids, timeout_ms)
for i in range(len(object_ids)):
# buffers[i] is None if this object was not available within the
# timeout
if buffers[i]:
value, = pyarrow.deserialize(buffers[i])
results.append(value)
else:
results.append(ObjectNotAvailable)
return results
else:
return self.get([object_ids], timeout_ms)[0]

def seal(self, ObjectID object_id):
"""
Seal the buffer in the PlasmaStore for a particular object ID.
Expand Down Expand Up @@ -576,7 +656,7 @@ def connect(store_socket_name, manager_socket_name, int release_delay,
The maximum number of objects that the client will keep and
delay releasing (for caching reasons).
num_retries : int, default -1
Number of times tor ty to connect to plasma store. Default value of -1
Number of times to try to connect to plasma store. Default value of -1
uses the default (50)
"""
cdef PlasmaClient result = PlasmaClient()
Expand Down
33 changes: 23 additions & 10 deletions python/pyarrow/tests/test_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ def create_object(client, data_size, metadata_size, seal=True):
def assert_get_object_equal(unit_test, client1, client2, object_id,
memory_buffer=None, metadata=None):
import pyarrow.plasma as plasma
client1_buff = client1.get([object_id])[0]
client2_buff = client2.get([object_id])[0]
client1_buff = client1.get_buffers([object_id])[0]
client2_buff = client2.get_buffers([object_id])[0]
client1_metadata = client1.get_metadata([object_id])[0]
client2_metadata = client2.get_metadata([object_id])[0]
assert len(client1_buff) == len(client2_buff)
Expand Down Expand Up @@ -187,7 +187,7 @@ def test_create(self):
# Seal the object.
self.plasma_client.seal(object_id)
# Get the object.
memory_buffer = np.frombuffer(self.plasma_client.get([object_id])[0],
memory_buffer = np.frombuffer(self.plasma_client.get_buffers([object_id])[0],
dtype="uint8")
for i in range(length):
assert memory_buffer[i] == i % 256
Expand All @@ -209,7 +209,7 @@ def test_create_with_metadata(self):
self.plasma_client.seal(object_id)
# Get the object.
memory_buffer = np.frombuffer(
self.plasma_client.get([object_id])[0], dtype="uint8")
self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
for i in range(length):
assert memory_buffer[i] == i % 256
# Get the metadata.
Expand Down Expand Up @@ -241,7 +241,7 @@ def test_get(self):
# Test timing out of get with various timeouts.
for timeout in [0, 10, 100, 1000]:
object_ids = [random_object_id() for _ in range(num_object_ids)]
results = self.plasma_client.get(object_ids, timeout_ms=timeout)
results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout)
assert results == num_object_ids * [None]

data_buffers = []
Expand All @@ -256,8 +256,8 @@ def test_get(self):
# Test timing out from some but not all get calls with various
# timeouts.
for timeout in [0, 10, 100, 1000]:
data_results = self.plasma_client.get(object_ids,
timeout_ms=timeout)
data_results = self.plasma_client.get_buffers(object_ids,
timeout_ms=timeout)
# metadata_results = self.plasma_client.get_metadata(
# object_ids, timeout_ms=timeout)
for i in range(num_object_ids):
Expand All @@ -273,6 +273,19 @@ def test_get(self):
else:
assert results[i] is None

def test_put_and_get(self):
for value in [["hello", "world", 3, 1.0], None, "hello"]:
object_id = self.plasma_client.put(value)
[result] = self.plasma_client.get([object_id])
assert result == value

result = self.plasma_client.get(object_id)
assert result == value

object_id = pa.plasma.ObjectID.from_random()
[result] = self.plasma_client.get([object_id], timeout_ms=0)
assert result == pa.plasma.ObjectNotAvailable

def test_store_arrow_objects(self):
data = np.random.randn(10, 4)
# Write an arrow object.
Expand All @@ -284,7 +297,7 @@ def test_store_arrow_objects(self):
pa.write_tensor(tensor, stream)
self.plasma_client.seal(object_id)
# Read the arrow object.
[tensor] = self.plasma_client.get([object_id])
[tensor] = self.plasma_client.get_buffers([object_id])
reader = pa.BufferReader(tensor)
array = pa.read_tensor(reader).to_numpy()
# Assert that they are equal.
Expand Down Expand Up @@ -313,7 +326,7 @@ def test_store_pandas_dataframe(self):
self.plasma_client.seal(object_id)

# Read the DataFrame.
[data] = self.plasma_client.get([object_id])
[data] = self.plasma_client.get_buffers([object_id])
reader = pa.RecordBatchStreamReader(pa.BufferReader(data))
result = reader.get_next_batch().to_pandas()

Expand Down Expand Up @@ -551,7 +564,7 @@ def test_illegal_functionality(self):
# with pytest.raises(Exception):
# illegal_assignment()
# Get the object.
memory_buffer = self.plasma_client.get([object_id])[0]
memory_buffer = self.plasma_client.get_buffers([object_id])[0]

# Make sure the object is read only.
def illegal_assignment():
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/tests/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def array_custom_serializer(obj):
def array_custom_deserializer(serialized_obj):
return np.array(serialized_obj[0], dtype=np.dtype(serialized_obj[1]))


pa.lib.register_type(np.ndarray, 20 * b"\x00", pickle=False,
custom_serializer=array_custom_serializer,
custom_deserializer=array_custom_deserializer)
Expand Down