From eb9694a0163b0da832e4be6f485219db79d8f0e8 Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 26 Aug 2017 17:08:50 -0700 Subject: [PATCH] implement timeouts --- python/pyarrow/plasma.pyx | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index 51300039a4436..47dd790d80eaf 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -160,6 +160,13 @@ cdef class ObjectID: return self.data.binary() +cdef class ObjectNotAvailable: + """ + Placeholder for an object that was not available within the given timeout. + """ + pass + + cdef class PlasmaBuffer(Buffer): """ This is the type returned by calls to get with a PlasmaClient. @@ -598,11 +605,17 @@ def put(PlasmaClient client, value, object_id=None): stream = pyarrow.FixedSizeBufferOutputStream(buffer) stream.set_memcopy_threads(4) serialized.write_to(stream) + client.seal(id) return id def get(PlasmaClient client, object_ids, timeout_ms=-1): results = [] buffers = client.get(object_ids, timeout_ms) - for buffer in buffers: - results.append(pyarrow.deserialize(buffer)) + for i in range(len(object_ids)): + # buffers[i] is None if this object was not available within the + # timeout + if buffers[i]: + results.append(pyarrow.deserialize(buffers[i])) + else: + results.append(ObjectNotAvailable) return results