Skip to content

Commit b50f235

Browse files
pcmoritzwesm
authored andcommitted
ARROW-759: [Python] Serializing large class of Python objects in Apache Arrow
This PR adds the capability to serialize a large class of (nested) Python objects in Apache Arrow. The eventual goal is to evolve this into a more modern version of pickle that will make it possible to read the data from other languages supported by Apache Arrow (and might also be faster). Currently we support lists, tuples, dicts, strings, numpy objects, Python classes and namedtuples. A fallback to (cloud-)pickle can be provided for objects that cannot be natively represented in Arrow (for example lambdas). Numpy data within objects is efficiently represented using Arrow's Tensor facilities and for the nested Python sequences we use Arrow's UnionArray. There are many loose ends that will need to be addressed in follow up PRs. Author: Philipp Moritz <pcmoritz@gmail.com> Author: Wes McKinney <wes.mckinney@twosigma.com> Closes #965 from pcmoritz/python-serialization and squashes the following commits: 31486ed [Wes McKinney] Fix typo 2164db7 [Wes McKinney] Add SerializedPyObject to public API b70235c [Wes McKinney] Add pyarrow.deserialize convenience method a6a402e [Wes McKinney] Memory map fixture robustness on Windows 114a5fb [Wes McKinney] Add a Python container for the SerializedPyObject data, total_bytes method 8e59617 [Wes McKinney] Use pytest tmpdir for large memory map fixture so works on Windows 8a42f30 [Wes McKinney] Add doxygen comment to set_serialization_callbacks a9522c5 [Wes McKinney] Refactoring, address code review comments. fix flake8 issues ce5784d [Wes McKinney] Do not use ARROW_CHECK in production code. Consolidate python_to_arrow code c8efef9 [Wes McKinney] Fix various Clang compiler warnings due to integer conversions. clang-format 831e2f2 [Philipp Moritz] remove sequence.h 54af39b [Philipp Moritz] more fixes a6fdb76 [Philipp Moritz] make tests work fe56c73 [Philipp Moritz] fixes 84d62f6 [Philipp Moritz] more fixes 49aba8a [Philipp Moritz] make it compile on windows aa1f300 [Philipp Moritz] linting 95cb9da [Philipp Moritz] fix GIL adcc8f7 [Philipp Moritz] shuffle stuff around bcebdfe [Philipp Moritz] fix longlong vs int64 and unsigned variant 4cc45cd [Philipp Moritz] cleanup f25f3f3 [Philipp Moritz] cleanups a88d410 [Philipp Moritz] convert DESERIALIZE_SEQUENCE back to a macro c425978 [Philipp Moritz] prevent possible memory leaks aeafd82 [Philipp Moritz] fix callbacks 389bfc6 [Philipp Moritz] documentation 2f0760c [Philipp Moritz] fix api faf9a3e [Philipp Moritz] make exported API more consistent e1fc0c5 [Philipp Moritz] restructure c1f377b [Philipp Moritz] more fixes 3e94e6d [Philipp Moritz] clang-format 99e2d1a [Philipp Moritz] cleanups 3298329 [Philipp Moritz] mutable refs and small fixes e73c1ea [Philipp Moritz] make DictBuilder private 3929273 [Philipp Moritz] increase Py_True refcount and hide helper methods aaf6f09 [Philipp Moritz] remove code duplication c38c58d [Philipp Moritz] get rid of leaks and clarify reference counting for dicts 74b9e46 [Philipp Moritz] convert DESERIALIZE_SEQUENCE to a template 080db03 [Philipp Moritz] fix first few comments a6105d2 [Philipp Moritz] lint fix 802e739 [Philipp Moritz] clang-format 2e08de4 [Philipp Moritz] fix namespaces 91b57d5 [Philipp Moritz] fix linting c4782ac [Philipp Moritz] fix 7069e20 [Philipp Moritz] fix imports 2171761 [Philipp Moritz] fix python unicode string 30bb960 [Philipp Moritz] rebase f229d8d [Philipp Moritz] serialization of custom objects 8b2ffe6 [Philipp Moritz] working version bd36c83 [Philipp Moritz] handle very long longs with custom serialization callback 49a4acb [Philipp Moritz] roundtrip working for the first time 44fb98b [Philipp Moritz] work in progress 3af1c67 [Philipp Moritz] deserialization path (need to figure out if base object and refcounting is handled correctly) deb3b46 [Philipp Moritz] rename serialization entry point 5766b8c [Philipp Moritz] python to arrow serialization
1 parent 10f7158 commit b50f235

16 files changed

Lines changed: 1619 additions & 9 deletions

File tree

cpp/src/arrow/builder.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
244244
using PrimitiveBuilder<T>::Reserve;
245245

246246
/// Append a single scalar and increase the size if necessary.
247-
Status Append(value_type val) {
247+
Status Append(const value_type val) {
248248
RETURN_NOT_OK(ArrayBuilder::Reserve(1));
249249
UnsafeAppend(val);
250250
return Status::OK();
@@ -255,7 +255,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
255255
///
256256
/// This method does not capacity-check; make sure to call Reserve
257257
/// beforehand.
258-
void UnsafeAppend(value_type val) {
258+
void UnsafeAppend(const value_type val) {
259259
BitUtil::SetBit(null_bitmap_data_, length_);
260260
raw_data_[length_++] = val;
261261
}
@@ -371,7 +371,7 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase
371371
using ArrayBuilder::Advance;
372372

373373
/// Scalar append
374-
Status Append(uint64_t val) {
374+
Status Append(const uint64_t val) {
375375
RETURN_NOT_OK(Reserve(1));
376376
BitUtil::SetBit(null_bitmap_data_, length_);
377377

@@ -430,7 +430,7 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase
430430
using ArrayBuilder::Advance;
431431

432432
/// Scalar append
433-
Status Append(int64_t val) {
433+
Status Append(const int64_t val) {
434434
RETURN_NOT_OK(Reserve(1));
435435
BitUtil::SetBit(null_bitmap_data_, length_);
436436

@@ -511,7 +511,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
511511
std::shared_ptr<Buffer> data() const { return data_; }
512512

513513
/// Scalar append
514-
Status Append(bool val) {
514+
Status Append(const bool val) {
515515
RETURN_NOT_OK(Reserve(1));
516516
BitUtil::SetBit(null_bitmap_data_, length_);
517517
if (val) {
@@ -523,7 +523,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
523523
return Status::OK();
524524
}
525525

526-
Status Append(uint8_t val) { return Append(val != 0); }
526+
Status Append(const uint8_t val) { return Append(val != 0); }
527527

528528
/// Vector append
529529
///

cpp/src/arrow/python/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ set(ARROW_PYTHON_TEST_LINK_LIBS ${ARROW_PYTHON_MIN_TEST_LIBS})
4343

4444
set(ARROW_PYTHON_SRCS
4545
arrow_to_pandas.cc
46+
arrow_to_python.cc
4647
builtin_convert.cc
4748
common.cc
4849
config.cc
@@ -51,6 +52,7 @@ set(ARROW_PYTHON_SRCS
5152
io.cc
5253
numpy_convert.cc
5354
pandas_to_arrow.cc
55+
python_to_arrow.cc
5456
pyarrow.cc
5557
)
5658

@@ -83,6 +85,7 @@ endif()
8385
install(FILES
8486
api.h
8587
arrow_to_pandas.h
88+
arrow_to_python.h
8689
builtin_convert.h
8790
common.h
8891
config.h
@@ -92,6 +95,7 @@ install(FILES
9295
numpy_convert.h
9396
numpy_interop.h
9497
pandas_to_arrow.h
98+
python_to_arrow.h
9599
platform.h
96100
pyarrow.h
97101
type_traits.h

cpp/src/arrow/python/api.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
#define ARROW_PYTHON_API_H
2020

2121
#include "arrow/python/arrow_to_pandas.h"
22+
#include "arrow/python/arrow_to_python.h"
2223
#include "arrow/python/builtin_convert.h"
2324
#include "arrow/python/common.h"
2425
#include "arrow/python/helpers.h"
2526
#include "arrow/python/io.h"
2627
#include "arrow/python/numpy_convert.h"
2728
#include "arrow/python/pandas_to_arrow.h"
29+
#include "arrow/python/python_to_arrow.h"
2830

2931
#endif // ARROW_PYTHON_API_H
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#include "arrow/python/arrow_to_python.h"
19+
20+
#include <cstdint>
21+
#include <memory>
22+
#include <vector>
23+
24+
#include "arrow/array.h"
25+
#include "arrow/io/interfaces.h"
26+
#include "arrow/ipc/reader.h"
27+
#include "arrow/python/common.h"
28+
#include "arrow/python/helpers.h"
29+
#include "arrow/python/numpy_convert.h"
30+
#include "arrow/table.h"
31+
#include "arrow/util/logging.h"
32+
33+
extern "C" {
34+
extern PyObject* pyarrow_serialize_callback;
35+
extern PyObject* pyarrow_deserialize_callback;
36+
}
37+
38+
namespace arrow {
39+
namespace py {
40+
41+
Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result);
42+
43+
Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
44+
PyObject* base,
45+
const std::vector<std::shared_ptr<Tensor>>& tensors,
46+
PyObject** out);
47+
48+
Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
49+
PyObject* base,
50+
const std::vector<std::shared_ptr<Tensor>>& tensors,
51+
PyObject** out);
52+
53+
Status DeserializeDict(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
54+
PyObject* base,
55+
const std::vector<std::shared_ptr<Tensor>>& tensors,
56+
PyObject** out) {
57+
auto data = std::dynamic_pointer_cast<StructArray>(array);
58+
ScopedRef keys, vals;
59+
ScopedRef result(PyDict_New());
60+
RETURN_NOT_OK(
61+
DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, keys.ref()));
62+
RETURN_NOT_OK(
63+
DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, vals.ref()));
64+
for (int64_t i = start_idx; i < stop_idx; ++i) {
65+
// PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem.
66+
// The latter two steal references whereas PyDict_SetItem does not. So we need
67+
// to make sure the reference count is decremented by letting the ScopedRef
68+
// go out of scope at the end.
69+
PyDict_SetItem(result.get(), PyList_GET_ITEM(keys.get(), i - start_idx),
70+
PyList_GET_ITEM(vals.get(), i - start_idx));
71+
}
72+
static PyObject* py_type = PyUnicode_FromString("_pytype_");
73+
if (PyDict_Contains(result.get(), py_type)) {
74+
RETURN_NOT_OK(CallCustomCallback(pyarrow_deserialize_callback, result.get(), out));
75+
} else {
76+
*out = result.release();
77+
}
78+
return Status::OK();
79+
}
80+
81+
Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject* base,
82+
const std::vector<std::shared_ptr<arrow::Tensor>>& tensors,
83+
PyObject** out) {
84+
DCHECK(array);
85+
int32_t index = std::static_pointer_cast<Int32Array>(array)->Value(offset);
86+
RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out));
87+
// Mark the array as immutable
88+
ScopedRef flags(PyObject_GetAttrString(*out, "flags"));
89+
DCHECK(flags.get() != NULL) << "Could not mark Numpy array immutable";
90+
Py_INCREF(Py_False);
91+
int flag_set = PyObject_SetAttrString(flags.get(), "writeable", Py_False);
92+
DCHECK(flag_set == 0) << "Could not mark Numpy array immutable";
93+
return Status::OK();
94+
}
95+
96+
Status GetValue(std::shared_ptr<Array> arr, int64_t index, int32_t type, PyObject* base,
97+
const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** result) {
98+
switch (arr->type()->id()) {
99+
case Type::BOOL:
100+
*result =
101+
PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
102+
return Status::OK();
103+
case Type::INT64:
104+
*result =
105+
PyLong_FromSsize_t(std::static_pointer_cast<Int64Array>(arr)->Value(index));
106+
return Status::OK();
107+
case Type::BINARY: {
108+
int32_t nchars;
109+
const uint8_t* str =
110+
std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
111+
*result = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
112+
return CheckPyError();
113+
}
114+
case Type::STRING: {
115+
int32_t nchars;
116+
const uint8_t* str =
117+
std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
118+
*result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
119+
return CheckPyError();
120+
}
121+
case Type::FLOAT:
122+
*result =
123+
PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
124+
return Status::OK();
125+
case Type::DOUBLE:
126+
*result =
127+
PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
128+
return Status::OK();
129+
case Type::STRUCT: {
130+
auto s = std::static_pointer_cast<StructArray>(arr);
131+
auto l = std::static_pointer_cast<ListArray>(s->field(0));
132+
if (s->type()->child(0)->name() == "list") {
133+
return DeserializeList(l->values(), l->value_offset(index),
134+
l->value_offset(index + 1), base, tensors, result);
135+
} else if (s->type()->child(0)->name() == "tuple") {
136+
return DeserializeTuple(l->values(), l->value_offset(index),
137+
l->value_offset(index + 1), base, tensors, result);
138+
} else if (s->type()->child(0)->name() == "dict") {
139+
return DeserializeDict(l->values(), l->value_offset(index),
140+
l->value_offset(index + 1), base, tensors, result);
141+
} else {
142+
DCHECK(false) << "unexpected StructArray type " << s->type()->child(0)->name();
143+
}
144+
}
145+
// We use an Int32Builder here to distinguish the tensor indices from
146+
// the Type::INT64 above (see tensor_indices_ in SequenceBuilder).
147+
case Type::INT32: {
148+
return DeserializeArray(arr, index, base, tensors, result);
149+
}
150+
default:
151+
DCHECK(false) << "union tag " << type << " not recognized";
152+
}
153+
return Status::OK();
154+
}
155+
156+
#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \
157+
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
158+
int64_t size = array->length(); \
159+
ScopedRef result(CREATE_FN(stop_idx - start_idx)); \
160+
auto types = std::make_shared<Int8Array>(size, data->type_ids()); \
161+
auto offsets = std::make_shared<Int32Array>(size, data->value_offsets()); \
162+
for (int64_t i = start_idx; i < stop_idx; ++i) { \
163+
if (data->IsNull(i)) { \
164+
Py_INCREF(Py_None); \
165+
SET_ITEM_FN(result.get(), i - start_idx, Py_None); \
166+
} else { \
167+
int64_t offset = offsets->Value(i); \
168+
int8_t type = types->Value(i); \
169+
std::shared_ptr<Array> arr = data->child(type); \
170+
PyObject* value; \
171+
RETURN_NOT_OK(GetValue(arr, offset, type, base, tensors, &value)); \
172+
SET_ITEM_FN(result.get(), i - start_idx, value); \
173+
} \
174+
} \
175+
*out = result.release(); \
176+
return Status::OK();
177+
178+
Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
179+
PyObject* base,
180+
const std::vector<std::shared_ptr<Tensor>>& tensors,
181+
PyObject** out) {
182+
DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM)
183+
}
184+
185+
Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
186+
PyObject* base,
187+
const std::vector<std::shared_ptr<Tensor>>& tensors,
188+
PyObject** out) {
189+
DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM)
190+
}
191+
192+
Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src,
193+
SerializedPyObject* out) {
194+
std::shared_ptr<ipc::RecordBatchStreamReader> reader;
195+
int64_t offset;
196+
int64_t bytes_read;
197+
int32_t num_tensors;
198+
// Read number of tensors
199+
RETURN_NOT_OK(
200+
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
201+
RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
202+
RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch));
203+
RETURN_NOT_OK(src->Tell(&offset));
204+
offset += 4; // Skip the end-of-stream message
205+
for (int i = 0; i < num_tensors; ++i) {
206+
std::shared_ptr<Tensor> tensor;
207+
RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor));
208+
out->tensors.push_back(tensor);
209+
RETURN_NOT_OK(src->Tell(&offset));
210+
}
211+
return Status::OK();
212+
}
213+
214+
Status DeserializeObject(const SerializedPyObject& obj, PyObject* base, PyObject** out) {
215+
PyAcquireGIL lock;
216+
return DeserializeList(obj.batch->column(0), 0, obj.batch->num_rows(), base,
217+
obj.tensors, out);
218+
}
219+
220+
} // namespace py
221+
} // namespace arrow
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#ifndef ARROW_PYTHON_ARROW_TO_PYTHON_H
19+
#define ARROW_PYTHON_ARROW_TO_PYTHON_H
20+
21+
#include "arrow/python/platform.h"
22+
23+
#include <cstdint>
24+
#include <memory>
25+
#include <vector>
26+
27+
#include "arrow/python/python_to_arrow.h"
28+
#include "arrow/status.h"
29+
#include "arrow/util/visibility.h"
30+
31+
namespace arrow {
32+
33+
class RecordBatch;
34+
class Tensor;
35+
36+
namespace io {
37+
38+
class RandomAccessFile;
39+
40+
} // namespace io
41+
42+
namespace py {
43+
44+
/// \brief Read serialized Python sequence from file interface using Arrow IPC
45+
/// \param[in] src a RandomAccessFile
46+
/// \param[out] out the reconstructed data
47+
/// \return Status
48+
ARROW_EXPORT
49+
Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src,
50+
SerializedPyObject* out);
51+
52+
/// \brief Reconstruct Python object from Arrow-serialized representation
53+
/// \param[in] object
54+
/// \param[in] base a Python object holding the underlying data that any NumPy
55+
/// arrays will reference, to avoid premature deallocation
56+
/// \param[out] out the returned object
57+
/// \return Status
58+
/// This acquires the GIL
59+
ARROW_EXPORT
60+
Status DeserializeObject(const SerializedPyObject& object, PyObject* base,
61+
PyObject** out);
62+
63+
} // namespace py
64+
} // namespace arrow
65+
66+
#endif // ARROW_PYTHON_ARROW_TO_PYTHON_H

0 commit comments

Comments
 (0)