Skip to content

Commit

Permalink
Adds executor side Python caching (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw75 authored and vsreekanti committed Aug 21, 2019
1 parent cc90b9d commit 6aef23f
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 45 deletions.
66 changes: 40 additions & 26 deletions droplet/server/executor/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
serializer = Serializer()


def exec_function(exec_socket, kvs, user_library):
def exec_function(exec_socket, kvs, user_library, cache):
call = FunctionCall()
call.ParseFromString(exec_socket.recv())

Expand All @@ -49,11 +49,11 @@ def exec_function(exec_socket, kvs, user_library):
logging.info('Function %s not found! Returning an error.' %
(call.name))
sutils.error.error = FUNC_NOT_FOUND
result = serializer.dump(('ERROR', sutils.error.SerializeToString()))
result = ('ERROR', sutils.error.SerializeToString())
else:
try:
if call.consistency == NORMAL:
result = _exec_func_normal(kvs, f, fargs, user_library)
result = _exec_func_normal(kvs, f, fargs, user_library, cache)
else:
dependencies = {}
result = _exec_func_causal(kvs, f, fargs, user_library,
Expand All @@ -63,26 +63,25 @@ def exec_function(exec_socket, kvs, user_library):
logging.exception('Unexpected error %s while executing function.' %
(str(e)))
sutils.error.error = EXECUTION_ERROR
result = serializer.dump(('ERROR: ' + str(e),
sutils.error.SerializeToString()))
result = ('ERROR: ' + str(e), sutils.error.SerializeToString())

if call.consistency == NORMAL:
result = serializer.dump_lattice(result)
succeed = kvs.put(call.response_key, result)
else:
result = serializer.dump_lattice(result, MultiKeyCausalLattice)
result = serializer.dump_lattice(result, MultiKeyCausalLattice, causal_dependencies=dependencies)
succeed = kvs.causal_put(call.response_key, result)

if not succeed:
logging.info(f'Unsuccessful attempt to put key {call.response_key} '
+ 'into the KVS.')


def _exec_func_normal(kvs, func, args, user_lib):
def _exec_func_normal(kvs, func, args, user_lib, cache):
refs = list(filter(lambda a: isinstance(a, DropletReference), args))

if refs:
refs = _resolve_ref_normal(refs, kvs)
refs = _resolve_ref_normal(refs, kvs, cache)

return _run_function(func, refs, args, user_lib)

Expand Down Expand Up @@ -114,22 +113,37 @@ def _run_function(func, refs, args, user_lib):
return res


def _resolve_ref_normal(refs, kvs):
keys = [ref.key for ref in refs]
keys = list(set(keys))
kv_pairs = kvs.get(keys)

# When chaining function executions, we must wait, so we check to see if
# certain values have not been resolved yet.
while None in kv_pairs.values():
kv_pairs = kvs.get(keys)
def _resolve_ref_normal(refs, kvs, cache):
deserialize_map = {}
kv_pairs = {}
keys = set()

for ref in refs:
key = ref.key
# Because references might be repeated, we check to make sure that we
# haven't already deserialized this ref.
if ref.deserialize and isinstance(kv_pairs[key], Lattice):
kv_pairs[key] = serializer.load_lattice(kv_pairs[key])
deserialize_map[ref.key] = ref.deserialize
if ref.key in cache:
kv_pairs[ref.key] = cache[ref.key]
else:
keys.add(ref.key)

keys = list(keys)

if len(keys) != 0:
returned_kv_pairs = kvs.get(keys)

# When chaining function executions, we must wait, so we check to see if
# certain values have not been resolved yet.
while None in returned_kv_pairs.values():
returned_kv_pairs = kvs.get(keys)

for key in keys:
# Because references might be repeated, we check to make sure that we
# haven't already deserialized this ref.
if deserialize_map[key] and isinstance(returned_kv_pairs[key], Lattice):
kv_pairs[key] = serializer.load_lattice(returned_kv_pairs[key])
else:
kv_pairs[key] = returned_kv_pairs[key]
# Cache the deserialized payload for future use
cache[key] = kv_pairs[key]

return kv_pairs

Expand Down Expand Up @@ -184,11 +198,11 @@ def _resolve_ref_causal(refs, kvs, schedule, key_version_locations,


def exec_dag_function(pusher_cache, kvs, triggers, function, schedule,
user_library, dag_runtimes):
user_library, dag_runtimes, cache):
if schedule.consistency == NORMAL:
finished = _exec_dag_function_normal(pusher_cache, kvs,
triggers, function, schedule,
user_library)
user_library, cache)
else:
finished = _exec_dag_function_causal(pusher_cache, kvs,
triggers, function, schedule,
Expand Down Expand Up @@ -219,7 +233,7 @@ def _construct_trigger(sid, fname, result):


def _exec_dag_function_normal(pusher_cache, kvs, triggers, function, schedule,
user_lib):
user_lib, cache):
fname = schedule.target_function
fargs = list(schedule.arguments[fname].values)

Expand All @@ -228,7 +242,7 @@ def _exec_dag_function_normal(pusher_cache, kvs, triggers, function, schedule,
fargs += list(trigger.arguments.values)

fargs = [serializer.load(arg) for arg in fargs]
result = _exec_func_normal(kvs, function, fargs, user_lib)
result = _exec_func_normal(kvs, function, fargs, user_lib, cache)

is_sink = True
new_trigger = _construct_trigger(schedule.id, fname, result)
Expand Down
10 changes: 7 additions & 3 deletions droplet/server/executor/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def executor(ip, mgmt_ip, schedulers, thread_id):
# sink function.
dag_runtimes = {}

# Executor cache that stores deserialized payload
# It is a map from key to deserialized payload
cache = {}

# Internal metadata to track thread utilization.
report_start = time.time()
event_occupancy = {'pin': 0.0,
Expand Down Expand Up @@ -155,7 +159,7 @@ def executor(ip, mgmt_ip, schedulers, thread_id):
work_start = time.time()
with DropletUserLibrary(context, pusher_cache, ip, thread_id,
client) as user_library:
exec_function(exec_socket, client, user_library)
exec_function(exec_socket, client, user_library, cache)

utils.push_status(schedulers, pusher_cache, status)

Expand Down Expand Up @@ -192,7 +196,7 @@ def executor(ip, mgmt_ip, schedulers, thread_id):
exec_dag_function(pusher_cache, client,
received_triggers[trkey],
pinned_functions[fname], schedule,
user_library, dag_runtimes)
user_library, dag_runtimes, cache)
del received_triggers[trkey]
del queue[fname][schedule.id]

Expand Down Expand Up @@ -230,7 +234,7 @@ def executor(ip, mgmt_ip, schedulers, thread_id):
exec_dag_function(pusher_cache, client,
received_triggers[trkey],
pinned_functions[fname], schedule,
user_library, dag_runtimes)
user_library, dag_runtimes, cache)
del received_triggers[key]
del queue[fname][trigger.id]

Expand Down
6 changes: 4 additions & 2 deletions droplet/shared/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def load_lattice(self, lattice):

return result

def dump_lattice(self, value, typ=None):
def dump_lattice(self, value, typ=None, causal_dependencies={}):
if not typ:
if isinstance(value, set):
return self.dump_lattice(value, SetLattice)
Expand Down Expand Up @@ -138,7 +138,9 @@ def dump_lattice(self, value, typ=None):
# We assume that we will use the default vector clock for causal
# metadata.
data = SetLattice({self.dump(value)})
result = MultiKeyCausalLattice(DEFAULT_VC, MapLattice({}), data)
result = MultiKeyCausalLattice(DEFAULT_VC,
MapLattice(causal_dependencies),
data)
else:
raise ValueError(f'Unexpected lattice type: {str(typ)}')

Expand Down
7 changes: 7 additions & 0 deletions scripts/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,11 @@
export PYTHONPATH=$PYTHONPATH:$(pwd)

coverage run tests

# If the tests failed, do not generate a report, but report a failure instead.
EXIT=$?
if [[ $EXIT -ne 0 ]]; then
exit $EXIT
fi

coverage report -m
6 changes: 5 additions & 1 deletion tests/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import unittest

from tests.server.executor import (
Expand Down Expand Up @@ -57,4 +58,7 @@ def droplet_test_suite():

if __name__ == '__main__':
runner = unittest.TextTestRunner()
runner.run(droplet_test_suite())
result = runner.run(droplet_test_suite())

if not result.wasSuccessful():
sys.exit(1)
26 changes: 13 additions & 13 deletions tests/server/executor/test_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def func(_, x): return x * x
self.socket.inbox.append(call.SerializeToString())

# Execute the function call.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -115,7 +115,7 @@ def func(_, x): return x * x
self.socket.inbox.append(call.SerializeToString())

# Execute the function call.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand All @@ -141,7 +141,7 @@ def test_exec_function_nonexistent(self):
self.socket.inbox.append(call.SerializeToString())

# Attempt to execute the nonexistent function.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -179,7 +179,7 @@ def func(_, x):
self.socket.inbox.append(call.SerializeToString())

# Execute the function call.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Retrieve the result from the KVS and ensure that it is the correct
# lattice type.
Expand Down Expand Up @@ -218,7 +218,7 @@ def func(_, x): return x * x
self.socket.inbox.append(call.SerializeToString())

# Execute the function call.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -255,7 +255,7 @@ def func(_, x): return x * x
self.socket.inbox.append(call.SerializeToString())

# Execute the function call.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -292,7 +292,7 @@ def func(_, x): return sum(x)
self.socket.inbox.append(call.SerializeToString())

# Execute the function call.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -325,7 +325,7 @@ def func(_, x): return len(x) >= 2 and x[0] < x[1]
self.socket.inbox.append(call.SerializeToString())

# Execute the function call.
exec_function(self.socket, self.kvs_client, self.user_library)
exec_function(self.socket, self.kvs_client, self.user_library, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -353,7 +353,7 @@ def func(_, x): return x * x
schedule, triggers = self._create_fn_schedule(dag, arg, fname, [fname])

exec_dag_function(self.pusher_cache, self.kvs_client, triggers, func,
schedule, self.user_library, {})
schedule, self.user_library, {}, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -391,7 +391,7 @@ def func(_, x): return x * x
DEFAULT_VC.serialize(kv.vector_clock)

exec_dag_function(self.pusher_cache, self.kvs_client, triggers, func,
schedule, self.user_library, {})
schedule, self.user_library, {}, {})

# Assert that there have been 0 messages sent.
self.assertEqual(len(self.socket.outbox), 0)
Expand Down Expand Up @@ -435,7 +435,7 @@ def square(_, x): return x * x
sname])

exec_dag_function(self.pusher_cache, self.kvs_client, triggers, incr,
schedule, self.user_library, {})
schedule, self.user_library, {}, {})

# Assert that there has been a message sent.
self.assertEqual(len(self.pusher_cache.socket.outbox), 1)
Expand Down Expand Up @@ -473,7 +473,7 @@ def square(_, x): return x * x
[iname, sname], MULTI)

exec_dag_function(self.pusher_cache, self.kvs_client, triggers, incr,
schedule, self.user_library, {})
schedule, self.user_library, {}, {})

# Assert that there has been a message sent.
self.assertEqual(len(self.pusher_cache.socket.outbox), 1)
Expand Down Expand Up @@ -518,7 +518,7 @@ def square(_, x): return x * x
MULTI)

exec_dag_function(self.pusher_cache, self.kvs_client, triggers, incr,
schedule, self.user_library, {})
schedule, self.user_library, {}, {})

# Assert that there has been a message sent.
self.assertEqual(len(self.pusher_cache.socket.outbox), 1)
Expand Down

0 comments on commit 6aef23f

Please sign in to comment.