Skip to content

Commit

Permalink
implement timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Aug 27, 2017
1 parent 3518c71 commit eb9694a
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions python/pyarrow/plasma.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ cdef class ObjectID:
return self.data.binary()


cdef class ObjectNotAvailable:
"""
Placeholder for an object that was not available within the given timeout.
"""
pass


cdef class PlasmaBuffer(Buffer):
"""
This is the type returned by calls to get with a PlasmaClient.
Expand Down Expand Up @@ -598,11 +605,17 @@ def put(PlasmaClient client, value, object_id=None):
stream = pyarrow.FixedSizeBufferOutputStream(buffer)
stream.set_memcopy_threads(4)
serialized.write_to(stream)
client.seal(id)
return id

def get(PlasmaClient client, object_ids, timeout_ms=-1):
results = []
buffers = client.get(object_ids, timeout_ms)
for buffer in buffers:
results.append(pyarrow.deserialize(buffer))
for i in range(len(object_ids)):
# buffers[i] is None if this object was not available within the
# timeout
if buffers[i]:
results.append(pyarrow.deserialize(buffers[i]))
else:
results.append(ObjectNotAvailable)
return results

0 comments on commit eb9694a

Please sign in to comment.