Skip to content

Commit eb9694a

Browse files
committed
implement timeouts
1 parent 3518c71 commit eb9694a

1 file changed

Lines changed: 15 additions & 2 deletions

File tree

python/pyarrow/plasma.pyx

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,13 @@ cdef class ObjectID:
160160
return self.data.binary()
161161

162162

163+
cdef class ObjectNotAvailable:
164+
"""
165+
Placeholder for an object that was not available within the given timeout.
166+
"""
167+
pass
168+
169+
163170
cdef class PlasmaBuffer(Buffer):
164171
"""
165172
This is the type returned by calls to get with a PlasmaClient.
@@ -598,11 +605,17 @@ def put(PlasmaClient client, value, object_id=None):
598605
stream = pyarrow.FixedSizeBufferOutputStream(buffer)
599606
stream.set_memcopy_threads(4)
600607
serialized.write_to(stream)
608+
client.seal(id)
601609
return id
602610

603611
def get(PlasmaClient client, object_ids, timeout_ms=-1):
604612
results = []
605613
buffers = client.get(object_ids, timeout_ms)
606-
for buffer in buffers:
607-
results.append(pyarrow.deserialize(buffer))
614+
for i in range(len(object_ids)):
615+
# buffers[i] is None if this object was not available within the
616+
# timeout
617+
if buffers[i]:
618+
results.append(pyarrow.deserialize(buffers[i]))
619+
else:
620+
results.append(ObjectNotAvailable)
608621
return results

0 commit comments

Comments
 (0)