-
Notifications
You must be signed in to change notification settings - Fork 4k
ARROW-759: [Python] Serializing large class of Python objects in Apache Arrow #965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
55 commits
Select commit
Hold shift + click to select a range
5766b8c
python to arrow serialization
pcmoritz deb3b46
rename serialization entry point
pcmoritz 3af1c67
deserialization path (need to figure out if base object and refcounti…
pcmoritz 44fb98b
work in progress
pcmoritz 49a4acb
roundtrip working for the first time
pcmoritz bd36c83
handle very long longs with custom serialization callback
pcmoritz 8b2ffe6
working version
pcmoritz f229d8d
serialization of custom objects
pcmoritz 30bb960
rebase
pcmoritz 2171761
fix python unicode string
pcmoritz 7069e20
fix imports
pcmoritz c4782ac
fix
pcmoritz 91b57d5
fix linting
pcmoritz 2e08de4
fix namespaces
pcmoritz 802e739
clang-format
pcmoritz a6105d2
lint fix
pcmoritz 080db03
fix first few comments
pcmoritz 74b9e46
convert DESERIALIZE_SEQUENCE to a template
pcmoritz c38c58d
get rid of leaks and clarify reference counting for dicts
pcmoritz aaf6f09
remove code duplication
pcmoritz 3929273
increase Py_True refcount and hide helper methods
pcmoritz e73c1ea
make DictBuilder private
pcmoritz 3298329
mutable refs and small fixes
pcmoritz 99e2d1a
cleanups
pcmoritz 3e94e6d
clang-format
pcmoritz c1f377b
more fixes
pcmoritz e1fc0c5
restructure
pcmoritz faf9a3e
make exported API more consistent
pcmoritz 2f0760c
fix api
pcmoritz 389bfc6
documentation
pcmoritz aeafd82
fix callbacks
pcmoritz c425978
prevent possible memory leaks
pcmoritz a88d410
convert DESERIALIZE_SEQUENCE back to a macro
pcmoritz f25f3f3
cleanups
pcmoritz 4cc45cd
cleanup
pcmoritz bcebdfe
fix longlong vs int64 and unsigned variant
pcmoritz adcc8f7
shuffle stuff around
pcmoritz 95cb9da
fix GIL
pcmoritz aa1f300
linting
pcmoritz 49aba8a
make it compile on windows
pcmoritz 84d62f6
more fixes
pcmoritz fe56c73
fixes
pcmoritz a6fdb76
make tests work
pcmoritz 54af39b
more fixes
pcmoritz 831e2f2
remove sequence.h
pcmoritz c8efef9
Fix various Clang compiler warnings due to integer conversions. clang…
wesm ce5784d
Do not use ARROW_CHECK in production code. Consolidate python_to_arro…
wesm a9522c5
Refactoring, address code review comments. fix flake8 issues
wesm 8a42f30
Add doxygen comment to set_serialization_callbacks
wesm 8e59617
Use pytest tmpdir for large memory map fixture so works on Windows
wesm 114a5fb
Add a Python container for the SerializedPyObject data, total_bytes m…
wesm a6a402e
Memory map fixture robustness on Windows
wesm b70235c
Add pyarrow.deserialize convenience method
wesm 2164db7
Add SerializedPyObject to public API
wesm 31486ed
Fix typo
wesm File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,221 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #include "arrow/python/arrow_to_python.h" | ||
|
|
||
| #include <cstdint> | ||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| #include "arrow/array.h" | ||
| #include "arrow/io/interfaces.h" | ||
| #include "arrow/ipc/reader.h" | ||
| #include "arrow/python/common.h" | ||
| #include "arrow/python/helpers.h" | ||
| #include "arrow/python/numpy_convert.h" | ||
| #include "arrow/table.h" | ||
| #include "arrow/util/logging.h" | ||
|
|
||
| extern "C" { | ||
| extern PyObject* pyarrow_serialize_callback; | ||
| extern PyObject* pyarrow_deserialize_callback; | ||
| } | ||
|
|
||
| namespace arrow { | ||
| namespace py { | ||
|
|
||
| Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result); | ||
|
|
||
| Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx, | ||
| PyObject* base, | ||
| const std::vector<std::shared_ptr<Tensor>>& tensors, | ||
| PyObject** out); | ||
|
|
||
| Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx, | ||
| PyObject* base, | ||
| const std::vector<std::shared_ptr<Tensor>>& tensors, | ||
| PyObject** out); | ||
|
|
||
| Status DeserializeDict(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx, | ||
| PyObject* base, | ||
| const std::vector<std::shared_ptr<Tensor>>& tensors, | ||
| PyObject** out) { | ||
| auto data = std::dynamic_pointer_cast<StructArray>(array); | ||
| ScopedRef keys, vals; | ||
| ScopedRef result(PyDict_New()); | ||
| RETURN_NOT_OK( | ||
| DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, keys.ref())); | ||
| RETURN_NOT_OK( | ||
| DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, vals.ref())); | ||
| for (int64_t i = start_idx; i < stop_idx; ++i) { | ||
| // PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem. | ||
| // The latter two steal references whereas PyDict_SetItem does not. So we need | ||
| // to make sure the reference count is decremented by letting the ScopedRef | ||
| // go out of scope at the end. | ||
| PyDict_SetItem(result.get(), PyList_GET_ITEM(keys.get(), i - start_idx), | ||
| PyList_GET_ITEM(vals.get(), i - start_idx)); | ||
| } | ||
| static PyObject* py_type = PyUnicode_FromString("_pytype_"); | ||
| if (PyDict_Contains(result.get(), py_type)) { | ||
| RETURN_NOT_OK(CallCustomCallback(pyarrow_deserialize_callback, result.get(), out)); | ||
| } else { | ||
| *out = result.release(); | ||
| } | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject* base, | ||
| const std::vector<std::shared_ptr<arrow::Tensor>>& tensors, | ||
| PyObject** out) { | ||
| DCHECK(array); | ||
| int32_t index = std::static_pointer_cast<Int32Array>(array)->Value(offset); | ||
| RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out)); | ||
| // Mark the array as immutable | ||
| ScopedRef flags(PyObject_GetAttrString(*out, "flags")); | ||
| DCHECK(flags.get() != NULL) << "Could not mark Numpy array immutable"; | ||
| Py_INCREF(Py_False); | ||
| int flag_set = PyObject_SetAttrString(flags.get(), "writeable", Py_False); | ||
| DCHECK(flag_set == 0) << "Could not mark Numpy array immutable"; | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status GetValue(std::shared_ptr<Array> arr, int64_t index, int32_t type, PyObject* base, | ||
| const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** result) { | ||
| switch (arr->type()->id()) { | ||
| case Type::BOOL: | ||
| *result = | ||
| PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index)); | ||
| return Status::OK(); | ||
| case Type::INT64: | ||
| *result = | ||
| PyLong_FromSsize_t(std::static_pointer_cast<Int64Array>(arr)->Value(index)); | ||
| return Status::OK(); | ||
| case Type::BINARY: { | ||
| int32_t nchars; | ||
| const uint8_t* str = | ||
| std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars); | ||
| *result = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(str), nchars); | ||
| return CheckPyError(); | ||
| } | ||
| case Type::STRING: { | ||
| int32_t nchars; | ||
| const uint8_t* str = | ||
| std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars); | ||
| *result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error checking? |
||
| return CheckPyError(); | ||
| } | ||
| case Type::FLOAT: | ||
| *result = | ||
| PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index)); | ||
| return Status::OK(); | ||
| case Type::DOUBLE: | ||
| *result = | ||
| PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index)); | ||
| return Status::OK(); | ||
| case Type::STRUCT: { | ||
| auto s = std::static_pointer_cast<StructArray>(arr); | ||
| auto l = std::static_pointer_cast<ListArray>(s->field(0)); | ||
| if (s->type()->child(0)->name() == "list") { | ||
| return DeserializeList(l->values(), l->value_offset(index), | ||
| l->value_offset(index + 1), base, tensors, result); | ||
| } else if (s->type()->child(0)->name() == "tuple") { | ||
| return DeserializeTuple(l->values(), l->value_offset(index), | ||
| l->value_offset(index + 1), base, tensors, result); | ||
| } else if (s->type()->child(0)->name() == "dict") { | ||
| return DeserializeDict(l->values(), l->value_offset(index), | ||
| l->value_offset(index + 1), base, tensors, result); | ||
| } else { | ||
| DCHECK(false) << "unexpected StructArray type " << s->type()->child(0)->name(); | ||
| } | ||
| } | ||
| // We use an Int32Builder here to distinguish the tensor indices from | ||
| // the Type::INT64 above (see tensor_indices_ in SequenceBuilder). | ||
| case Type::INT32: { | ||
| return DeserializeArray(arr, index, base, tensors, result); | ||
| } | ||
| default: | ||
| DCHECK(false) << "union tag " << type << " not recognized"; | ||
| } | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| #define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \ | ||
| auto data = std::dynamic_pointer_cast<UnionArray>(array); \ | ||
| int64_t size = array->length(); \ | ||
| ScopedRef result(CREATE_FN(stop_idx - start_idx)); \ | ||
| auto types = std::make_shared<Int8Array>(size, data->type_ids()); \ | ||
| auto offsets = std::make_shared<Int32Array>(size, data->value_offsets()); \ | ||
| for (int64_t i = start_idx; i < stop_idx; ++i) { \ | ||
| if (data->IsNull(i)) { \ | ||
| Py_INCREF(Py_None); \ | ||
| SET_ITEM_FN(result.get(), i - start_idx, Py_None); \ | ||
| } else { \ | ||
| int64_t offset = offsets->Value(i); \ | ||
| int8_t type = types->Value(i); \ | ||
| std::shared_ptr<Array> arr = data->child(type); \ | ||
| PyObject* value; \ | ||
| RETURN_NOT_OK(GetValue(arr, offset, type, base, tensors, &value)); \ | ||
| SET_ITEM_FN(result.get(), i - start_idx, value); \ | ||
| } \ | ||
| } \ | ||
| *out = result.release(); \ | ||
| return Status::OK(); | ||
|
|
||
| Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx, | ||
| PyObject* base, | ||
| const std::vector<std::shared_ptr<Tensor>>& tensors, | ||
| PyObject** out) { | ||
| DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM) | ||
| } | ||
|
|
||
| Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx, | ||
| PyObject* base, | ||
| const std::vector<std::shared_ptr<Tensor>>& tensors, | ||
| PyObject** out) { | ||
| DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM) | ||
| } | ||
|
|
||
| Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src, | ||
| SerializedPyObject* out) { | ||
| std::shared_ptr<ipc::RecordBatchStreamReader> reader; | ||
| int64_t offset; | ||
| int64_t bytes_read; | ||
| int32_t num_tensors; | ||
| // Read number of tensors | ||
| RETURN_NOT_OK( | ||
| src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors))); | ||
| RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader)); | ||
| RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch)); | ||
| RETURN_NOT_OK(src->Tell(&offset)); | ||
| offset += 4; // Skip the end-of-stream message | ||
| for (int i = 0; i < num_tensors; ++i) { | ||
| std::shared_ptr<Tensor> tensor; | ||
| RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor)); | ||
| out->tensors.push_back(tensor); | ||
| RETURN_NOT_OK(src->Tell(&offset)); | ||
| } | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status DeserializeObject(const SerializedPyObject& obj, PyObject* base, PyObject** out) { | ||
| PyAcquireGIL lock; | ||
| return DeserializeList(obj.batch->column(0), 0, obj.batch->num_rows(), base, | ||
| obj.tensors, out); | ||
| } | ||
|
|
||
| } // namespace py | ||
| } // namespace arrow | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,66 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #ifndef ARROW_PYTHON_ARROW_TO_PYTHON_H | ||
| #define ARROW_PYTHON_ARROW_TO_PYTHON_H | ||
|
|
||
| #include "arrow/python/platform.h" | ||
|
|
||
| #include <cstdint> | ||
| #include <memory> | ||
| #include <vector> | ||
|
|
||
| #include "arrow/python/python_to_arrow.h" | ||
| #include "arrow/status.h" | ||
| #include "arrow/util/visibility.h" | ||
|
|
||
| namespace arrow { | ||
|
|
||
| class RecordBatch; | ||
| class Tensor; | ||
|
|
||
| namespace io { | ||
|
|
||
| class RandomAccessFile; | ||
|
|
||
| } // namespace io | ||
|
|
||
| namespace py { | ||
|
|
||
| /// \brief Read serialized Python sequence from file interface using Arrow IPC | ||
| /// \param[in] src a RandomAccessFile | ||
| /// \param[out] out the reconstructed data | ||
| /// \return Status | ||
| ARROW_EXPORT | ||
| Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src, | ||
| SerializedPyObject* out); | ||
|
|
||
| /// \brief Reconstruct Python object from Arrow-serialized representation | ||
| /// \param[in] object | ||
| /// \param[in] base a Python object holding the underlying data that any NumPy | ||
| /// arrays will reference, to avoid premature deallocation | ||
| /// \param[out] out the returned object | ||
| /// \return Status | ||
| /// This acquires the GIL | ||
| ARROW_EXPORT | ||
| Status DeserializeObject(const SerializedPyObject& object, PyObject* base, | ||
| PyObject** out); | ||
|
|
||
| } // namespace py | ||
| } // namespace arrow | ||
|
|
||
| #endif // ARROW_PYTHON_ARROW_TO_PYTHON_H |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error checking?