From 1862c9ee89ea2a2c34bd9dc4dc136ce947fc1c73 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Sun, 2 Dec 2018 11:58:41 -0800 Subject: [PATCH 01/17] Allowing named worker-setups; 1. Convert worker-setups to a dictionary. 2. Coroutines in scheduler takes a callbacks dict as input, where functions are serialized as strings/bytes. 3. When the name already exists, the newly registered setup function will immediately be run on all existing clients, but new clients will only see the updated version of the setup function. --- distributed/client.py | 19 +++++++++++++------ distributed/scheduler.py | 20 +++++++++++++------- distributed/worker.py | 23 ++++++++++++++--------- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 04e0e4ccf73..d694f86cde0 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3546,8 +3546,12 @@ def _get_task_stream(self, start=None, stop=None, count=None, plot=False, raise gen.Return(msgs) @gen.coroutine - def _register_worker_callbacks(self, setup=None): - responses = yield self.scheduler.register_worker_callbacks(setup=dumps(setup)) + def _register_worker_callbacks(self, name, setup=None): + callbacks = {} + if setup is not None: + callbacks['setup'] = dumps(setup) + + responses = yield self.scheduler.register_worker_callbacks(callbacks=callbacks, name=name) results = {} for key, resp in responses.items(): if resp['status'] == 'OK': @@ -3556,25 +3560,28 @@ def _register_worker_callbacks(self, setup=None): six.reraise(*clean_exception(**resp)) raise gen.Return(results) - def register_worker_callbacks(self, setup=None): + def register_worker_callbacks(self, name, setup=None): """ Registers a setup callback function for all current and future workers. This registers a new setup function for workers in this cluster. The function will run immediately on all currently connected workers. It will also be run upon connection by any workers that are added in the - future. Multiple setup functions can be registered - these will be - called in the order they were added. + future. Multiple setup functions can be registered - the order they are + called are undefined. If the function takes an input argument named ``dask_worker`` then that variable will be populated with the worker itself. Parameters ---------- + name : string + A unique identifier of the callbacks. If a set of callbacks + of the same name are already registered, they will be replaced. setup : callable(dask_worker: Worker) -> None Function to register and run on all workers """ - return self.sync(self._register_worker_callbacks, setup=setup) + return self.sync(self._register_worker_callbacks, setup=setup, name=name) class Executor(Client): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index f6b71a455d4..13b3e4f33ce 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -904,7 +904,7 @@ def __init__( self.plugins = [] self.transition_log = deque(maxlen=dask.config.get('distributed.scheduler.transition-log-length')) self.log = deque(maxlen=dask.config.get('distributed.scheduler.transition-log-length')) - self.worker_setups = [] + self.worker_setups = {} worker_handlers = { 'task-finished': self.handle_task_finished, @@ -3090,14 +3090,20 @@ def get_task_stream(self, comm=None, start=None, stop=None, count=None): return ts.collect(start=start, stop=stop, count=count) @gen.coroutine - def register_worker_callbacks(self, comm, setup=None): - """ Registers a setup function, and call it on every worker """ - if setup is None: - raise gen.Return({}) + def register_worker_callbacks(self, comm, name, callbacks): + """ Registers a setup function, and call it on every worker; + """ + responses = {} + + if 'setup' in callbacks: + setup = callbacks['setup'] + + # add the setup function to the list to run them on new clients. + self.worker_setups[name] = setup - self.worker_setups.append(setup) + # trigger the setup function on the existing clients. + responses.update((yield self.broadcast(msg=dict(op='run', function=setup)))) - responses = yield self.broadcast(msg=dict(op='run', function=setup)) raise gen.Return(responses) ##################### diff --git a/distributed/worker.py b/distributed/worker.py index 4fb41a4aecb..4b8935793a5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -513,6 +513,8 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, io_loop=self.io_loop) self.periodic_callbacks['profile-cycle'] = pc + self.worker_setups = {} + _global_workers.append(weakref.ref(self)) ################## @@ -610,23 +612,26 @@ def _register_with_scheduler(self): raise ValueError("Unexpected response from register: %r" % (response,)) else: - # Retrieve eventual init functions and run them - for function_bytes in response['worker-setups']: - setup_function = pickle.loads(function_bytes) - if has_arg(setup_function, 'dask_worker'): - result = setup_function(dask_worker=self) - else: - result = setup_function() - logger.info('Init function %s ran: output=%s' % (setup_function, result)) - logger.info(' Registered to: %26s', self.scheduler.address) logger.info('-' * 49) + # Retrieve eventual init functions (only worker-setups for now) + for name, function_bytes in response['worker-setups'].items(): + self.worker_setups[name] = pickle.loads(function_bytes) + self.batched_stream = BatchedSend(interval='2ms', loop=self.loop) self.batched_stream.start(comm) self.periodic_callbacks['heartbeat'].start() self.loop.add_callback(self.handle_scheduler, comm) + # run eventual init functions (only worker-setups for now) + for name, setup_function in self.worker_setups.items(): + if has_arg(setup_function, 'dask_worker'): + result = setup_function(dask_worker=self) + else: + result = setup_function() + logger.info('Init function %s : %s ran: output=%s' % (name, setup_function, result)) + @gen.coroutine def heartbeat(self): if not self.heartbeat_active: From 13322a81eb74f38abf6d0e6aad608c193816e16b Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Mon, 3 Dec 2018 09:44:30 -0800 Subject: [PATCH 02/17] Add default names, draft for unregister. --- distributed/client.py | 50 +++++++++++++++++++++++++++++++--------- distributed/scheduler.py | 27 ++++++++++++++++++++-- 2 files changed, 64 insertions(+), 13 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index d694f86cde0..b147f686ff2 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3546,43 +3546,71 @@ def _get_task_stream(self, start=None, stop=None, count=None, plot=False, raise gen.Return(msgs) @gen.coroutine - def _register_worker_callbacks(self, name, setup=None): + def _register_worker_callbacks(self, setup=None, name=None): callbacks = {} - if setup is not None: - callbacks['setup'] = dumps(setup) + names = {} - responses = yield self.scheduler.register_worker_callbacks(callbacks=callbacks, name=name) + # prepare the callbacks and their names + if setup is not None: + serialized = dumps(setup) + callbacks['setup'] = setup_serialized + if name is None: + name = funcname(setup) + \ + '-' + uuid.uuid5('dask-worker-callbacks', + serialized) + names['setup'] = name + + responses = yield self.scheduler.register_worker_callbacks(callbacks=callbacks, names=names) results = {} for key, resp in responses.items(): if resp['status'] == 'OK': results[key] = resp['result'] elif resp['status'] == 'error': six.reraise(*clean_exception(**resp)) - raise gen.Return(results) + raise gen.Return(names) - def register_worker_callbacks(self, name, setup=None): + def register_worker_callbacks(self, setup=None, name=None): """ Registers a setup callback function for all current and future workers. This registers a new setup function for workers in this cluster. The function will run immediately on all currently connected workers. It will also be run upon connection by any workers that are added in the - future. Multiple setup functions can be registered - the order they are - called are undefined. + future. Multiple setup functions can be registered + We only keep one function version per name. + + The callback function shall be indempotent, and may be called several + times. The order callback functions are called are undefined. If the function takes an input argument named ``dask_worker`` then that variable will be populated with the worker itself. Parameters ---------- - name : string - A unique identifier of the callbacks. If a set of callbacks - of the same name are already registered, they will be replaced. setup : callable(dask_worker: Worker) -> None Function to register and run on all workers + name : string, or None + A unique identifier for the callbacks. If None, a default identifier + will be generated from the type of the callback and the function. + A good candidate value for this argument is the full qualified name + of the function. """ return self.sync(self._register_worker_callbacks, setup=setup, name=name) + @gen.coroutine + def _unregsiter_worker_callbacks(self, setup=None): + names = {} + if setup is not None: + names['setup'] = setup + + result = yield self.scheduler.unregister_worker_callbacks(names=names) + raise gen.Return(result) + + + def unregister_worker_callbacks(self, setup=None): + """ """ + return self.sync(self._unregister_worker_callbacks, setup=setup) + class Executor(Client): """ Deprecated: see Client """ diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 13b3e4f33ce..d174d6b83ec 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3090,13 +3090,27 @@ def get_task_stream(self, comm=None, start=None, stop=None, count=None): return ts.collect(start=start, stop=stop, count=count) @gen.coroutine - def register_worker_callbacks(self, comm, name, callbacks): - """ Registers a setup function, and call it on every worker; + def register_worker_callbacks(self, comm, callbacks, names): + """ Registers a set of event driven callback functions on workers for the given name. + + Parameters + ---------- + 'callbacks': dict + a dictionary of serialized callback functions. Currently + supported callbacks are: + + - 'setup' : a function that shall be called on workers when they first + connect to the scheduler. The function is called on every existing worker. + + 'names' : dict + a dictionary of function names for each entry in callbacks. + """ responses = {} if 'setup' in callbacks: setup = callbacks['setup'] + name = names['setup'] # add the setup function to the list to run them on new clients. self.worker_setups[name] = setup @@ -3106,6 +3120,15 @@ def register_worker_callbacks(self, comm, name, callbacks): raise gen.Return(responses) + @gen.coroutine + def unregister_worker_callbacks(self, comm, names): + # add the setup function to the list to run them on new clients. + if 'setup' in names: + name = names['setup'] + worker_setups.pop(name) + + raise gen.Return(None) + ##################### # State Transitions # ##################### From f6913c2dca407bd84d3d7719308cb09de6dcc6e0 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Mon, 3 Dec 2018 10:06:17 -0800 Subject: [PATCH 03/17] Update tests. --- distributed/client.py | 11 +++++------ distributed/scheduler.py | 5 +++-- distributed/tests/test_worker.py | 33 ++++++++++++++++++++++++++++---- 3 files changed, 37 insertions(+), 12 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index b147f686ff2..d975bafcfb7 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3549,15 +3549,14 @@ def _get_task_stream(self, start=None, stop=None, count=None, plot=False, def _register_worker_callbacks(self, setup=None, name=None): callbacks = {} names = {} - + from hashlib import md5 # prepare the callbacks and their names if setup is not None: serialized = dumps(setup) - callbacks['setup'] = setup_serialized + callbacks['setup'] = serialized if name is None: - name = funcname(setup) + \ - '-' + uuid.uuid5('dask-worker-callbacks', - serialized) + h = md5(serialized) + name = funcname(setup) + '-' + h.hexdigest() names['setup'] = name responses = yield self.scheduler.register_worker_callbacks(callbacks=callbacks, names=names) @@ -3598,7 +3597,7 @@ def register_worker_callbacks(self, setup=None, name=None): return self.sync(self._register_worker_callbacks, setup=setup, name=name) @gen.coroutine - def _unregsiter_worker_callbacks(self, setup=None): + def _unregister_worker_callbacks(self, setup=None): names = {} if setup is not None: names['setup'] = setup diff --git a/distributed/scheduler.py b/distributed/scheduler.py index d174d6b83ec..33bd18e6931 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -963,7 +963,8 @@ def __init__( 'heartbeat_worker': self.heartbeat_worker, 'get_task_status': self.get_task_status, 'get_task_stream': self.get_task_stream, - 'register_worker_callbacks': self.register_worker_callbacks + 'register_worker_callbacks': self.register_worker_callbacks, + 'unregister_worker_callbacks': self.unregister_worker_callbacks } self._transitions = { @@ -3125,7 +3126,7 @@ def unregister_worker_callbacks(self, comm, names): # add the setup function to the list to run them on new clients. if 'setup' in names: name = names['setup'] - worker_setups.pop(name) + self.worker_setups.pop(name) raise gen.Return(None) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 2ee1a988b7b..c47eba55465 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1211,6 +1211,10 @@ def mystartup2(): os.environ['MY_ENV_VALUE'] = 'WORKER_ENV_VALUE' return "Env set." + #preload function to run + def mystartup3(dask_worker): + dask_worker.init_variable2 = 1 + #Check that preload function has been run def test_import(dask_worker): return hasattr(dask_worker, 'init_variable') @@ -1220,6 +1224,9 @@ def test_startup2(): import os return os.getenv('MY_ENV_VALUE', None) == 'WORKER_ENV_VALUE' + def test_startup3(dask_worker): + return hasattr(dask_worker, 'init_variable2') + # Nothing has been run yet assert len(s.worker_setups) == 0 result = yield c.run(test_import) @@ -1235,8 +1242,14 @@ def test_startup2(): yield worker._close() # Add a preload function - response = yield c.register_worker_callbacks(setup=mystartup) - assert len(response) == 2 + names = yield c.register_worker_callbacks(setup=mystartup) + assert len(names) == 1 + assert len(s.worker_setups) == 1 + + # Add the same preload function, again + names_again = yield c.register_worker_callbacks(setup=mystartup) + assert len(names_again) == 1 + assert names_again['setup'] == names['setup'] assert len(s.worker_setups) == 1 # Check it has been ran on existing worker @@ -1251,21 +1264,33 @@ def test_startup2(): yield worker._close() # Register another preload function - response = yield c.register_worker_callbacks(setup=mystartup2) - assert len(response) == 2 + names2 = yield c.register_worker_callbacks(setup=mystartup2) + assert len(names2) == 1 assert len(s.worker_setups) == 2 # Check it has been run result = yield c.run(test_startup2) assert list(result.values()) == [True] * 2 + # unregister a preload function + names3 = yield c.register_worker_callbacks(setup=mystartup3) + assert len(s.worker_setups) == 3 + yield c.unregister_worker_callbacks(setup=names3['setup']) + assert len(s.worker_setups) == 2 + # Start a worker and check it is ran on it worker = Worker(s.address, loop=s.loop) yield worker._start() result = yield c.run(test_import, workers=[worker.address]) assert list(result.values()) == [True] + result = yield c.run(test_startup2, workers=[worker.address]) assert list(result.values()) == [True] + + # startup3 is not ran, as it is unregistered before worker is added. + result = yield c.run(test_startup3, workers=[worker.address]) + assert list(result.values()) == [False] + yield worker._close() # Final exception test From eb52c33c1790cfbe88c6ff24f5b254d42f842f71 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Mon, 3 Dec 2018 10:07:07 -0800 Subject: [PATCH 04/17] move md5 import to the top. --- distributed/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/client.py b/distributed/client.py index d975bafcfb7..e67077b9e77 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -22,6 +22,7 @@ import socket import warnings import weakref +from hashlib import md5 import dask from dask.base import tokenize, normalize_token, collections_to_dsk @@ -3549,7 +3550,6 @@ def _get_task_stream(self, start=None, stop=None, count=None, plot=False, def _register_worker_callbacks(self, setup=None, name=None): callbacks = {} names = {} - from hashlib import md5 # prepare the callbacks and their names if setup is not None: serialized = dumps(setup) From 69558a54415c12d3e0ac886f95b8564548550d91 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Mon, 3 Dec 2018 10:31:16 -0800 Subject: [PATCH 05/17] fix for lint. --- distributed/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/client.py b/distributed/client.py index e67077b9e77..863953b27c5 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3605,7 +3605,6 @@ def _unregister_worker_callbacks(self, setup=None): result = yield self.scheduler.unregister_worker_callbacks(names=names) raise gen.Return(result) - def unregister_worker_callbacks(self, setup=None): """ """ return self.sync(self._unregister_worker_callbacks, setup=setup) From 6d2b93961810186bcece90930f0afac7d862fa08 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Mon, 3 Dec 2018 18:33:56 -0800 Subject: [PATCH 06/17] Use setup = (name, func). if setup = func is given, generate a name. --- distributed/client.py | 55 ++++++++++++++++++-------------- distributed/scheduler.py | 30 ++++++----------- distributed/tests/test_worker.py | 17 +++++----- 3 files changed, 48 insertions(+), 54 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 863953b27c5..a8d8752507e 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -103,6 +103,21 @@ def _del_global_client(c): except KeyError: pass +def _serialize_named_func(named_func): + if isinstance(named_func, tuple): + name, func = named_func + else: + func = named_func + name = None + + serialized = dumps(func) + + if name is None: + h = md5(serialized) + name = funcname(func) + '-' + h.hexdigest() + + named_func = (name, serialized) + return named_func class Future(WrappedKey): """ A remotely running computation @@ -3547,28 +3562,24 @@ def _get_task_stream(self, start=None, stop=None, count=None, plot=False, raise gen.Return(msgs) @gen.coroutine - def _register_worker_callbacks(self, setup=None, name=None): - callbacks = {} - names = {} + def _register_worker_callbacks(self, setup=None): + # prepare the callbacks and their names if setup is not None: - serialized = dumps(setup) - callbacks['setup'] = serialized - if name is None: - h = md5(serialized) - name = funcname(setup) + '-' + h.hexdigest() - names['setup'] = name - - responses = yield self.scheduler.register_worker_callbacks(callbacks=callbacks, names=names) + setup = _serialize_named_func(setup) + + responses = yield self.scheduler.register_worker_callbacks(setup=setup) + results = {} for key, resp in responses.items(): if resp['status'] == 'OK': results[key] = resp['result'] elif resp['status'] == 'error': six.reraise(*clean_exception(**resp)) - raise gen.Return(names) - def register_worker_callbacks(self, setup=None, name=None): + raise gen.Return(results) + + def register_worker_callbacks(self, setup=None): """ Registers a setup callback function for all current and future workers. @@ -3586,23 +3597,19 @@ def register_worker_callbacks(self, setup=None, name=None): Parameters ---------- - setup : callable(dask_worker: Worker) -> None - Function to register and run on all workers - name : string, or None - A unique identifier for the callbacks. If None, a default identifier - will be generated from the type of the callback and the function. - A good candidate value for this argument is the full qualified name - of the function. + setup : callable(dask_worker: Worker) -> None, or tuple of (name, callable) + Function to register and run on all workers. If a name is not given, + then it is generated from the name and content of the callable. """ - return self.sync(self._register_worker_callbacks, setup=setup, name=name) + return self.sync(self._register_worker_callbacks, setup=setup) @gen.coroutine def _unregister_worker_callbacks(self, setup=None): - names = {} + if setup is not None: - names['setup'] = setup + setup = _serialize_named_func(setup) - result = yield self.scheduler.unregister_worker_callbacks(names=names) + result = yield self.scheduler.unregister_worker_callbacks(setup=setup) raise gen.Return(result) def unregister_worker_callbacks(self, setup=None): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 33bd18e6931..c807f3dc672 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3091,41 +3091,29 @@ def get_task_stream(self, comm=None, start=None, stop=None, count=None): return ts.collect(start=start, stop=stop, count=count) @gen.coroutine - def register_worker_callbacks(self, comm, callbacks, names): + def register_worker_callbacks(self, comm, setup=None): """ Registers a set of event driven callback functions on workers for the given name. - Parameters - ---------- - 'callbacks': dict - a dictionary of serialized callback functions. Currently - supported callbacks are: - - - 'setup' : a function that shall be called on workers when they first - connect to the scheduler. The function is called on every existing worker. - - 'names' : dict - a dictionary of function names for each entry in callbacks. - + setup must be a tuple of (name, serialized_function) """ responses = {} - if 'setup' in callbacks: - setup = callbacks['setup'] - name = names['setup'] + if setup is not None: + name, func = setup # add the setup function to the list to run them on new clients. - self.worker_setups[name] = setup + self.worker_setups[name] = func # trigger the setup function on the existing clients. - responses.update((yield self.broadcast(msg=dict(op='run', function=setup)))) + responses.update((yield self.broadcast(msg=dict(op='run', function=func)))) raise gen.Return(responses) @gen.coroutine - def unregister_worker_callbacks(self, comm, names): + def unregister_worker_callbacks(self, comm, setup=None): # add the setup function to the list to run them on new clients. - if 'setup' in names: - name = names['setup'] + if setup is not None: + name, func = setup self.worker_setups.pop(name) raise gen.Return(None) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index c47eba55465..4c094f4f62a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1242,14 +1242,13 @@ def test_startup3(dask_worker): yield worker._close() # Add a preload function - names = yield c.register_worker_callbacks(setup=mystartup) - assert len(names) == 1 + result = yield c.register_worker_callbacks(setup=mystartup) + assert len(result) == 2 assert len(s.worker_setups) == 1 # Add the same preload function, again - names_again = yield c.register_worker_callbacks(setup=mystartup) - assert len(names_again) == 1 - assert names_again['setup'] == names['setup'] + result = yield c.register_worker_callbacks(setup=mystartup) + assert len(result) == 2 assert len(s.worker_setups) == 1 # Check it has been ran on existing worker @@ -1264,8 +1263,8 @@ def test_startup3(dask_worker): yield worker._close() # Register another preload function - names2 = yield c.register_worker_callbacks(setup=mystartup2) - assert len(names2) == 1 + result = yield c.register_worker_callbacks(setup=mystartup2) + assert len(result) == 2 assert len(s.worker_setups) == 2 # Check it has been run @@ -1273,9 +1272,9 @@ def test_startup3(dask_worker): assert list(result.values()) == [True] * 2 # unregister a preload function - names3 = yield c.register_worker_callbacks(setup=mystartup3) + yield c.register_worker_callbacks(setup=mystartup3) assert len(s.worker_setups) == 3 - yield c.unregister_worker_callbacks(setup=names3['setup']) + yield c.unregister_worker_callbacks(setup=mystartup3) assert len(s.worker_setups) == 2 # Start a worker and check it is ran on it From 86d20ec67c8aa4f1b648364f753091bbc85616a3 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Tue, 4 Dec 2018 08:54:07 -0800 Subject: [PATCH 07/17] Add docstring for unregister, and lint fixes. --- distributed/client.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index a8d8752507e..59be485bdb7 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -103,7 +103,12 @@ def _del_global_client(c): except KeyError: pass + def _serialize_named_func(named_func): + """ takes a function or a tuple of (name, func) + + returns a tuple of (name, serialized_func) + """ if isinstance(named_func, tuple): name, func = named_func else: @@ -119,6 +124,7 @@ def _serialize_named_func(named_func): named_func = (name, serialized) return named_func + class Future(WrappedKey): """ A remotely running computation @@ -3598,8 +3604,8 @@ def register_worker_callbacks(self, setup=None): Parameters ---------- setup : callable(dask_worker: Worker) -> None, or tuple of (name, callable) - Function to register and run on all workers. If a name is not given, - then it is generated from the name and content of the callable. + Function to register and run on all workers. + If a name is not given, then it is generated from the callable. """ return self.sync(self._register_worker_callbacks, setup=setup) @@ -3613,7 +3619,14 @@ def _unregister_worker_callbacks(self, setup=None): raise gen.Return(result) def unregister_worker_callbacks(self, setup=None): - """ """ + """ Unregisters a worker callback. + + Parameters + ---------- + setup : callable(dask_worker: Worker) -> None, or tuple of (name, callable) + The setup callback to remove; it will be matched by the name. + If a name is not given, then it is generated from the callable. + """ return self.sync(self._unregister_worker_callbacks, setup=setup) From bbc7f09fe2b49cea7121a226b436adef02a58b59 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Tue, 4 Dec 2018 09:33:30 -0800 Subject: [PATCH 08/17] Change comments. --- distributed/client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 59be485bdb7..d22df88b2f5 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3593,10 +3593,10 @@ def register_worker_callbacks(self, setup=None): function will run immediately on all currently connected workers. It will also be run upon connection by any workers that are added in the future. Multiple setup functions can be registered - We only keep one function version per name. + We only keep one function version per name. The callback function shall be indempotent, and may be called several - times. The order callback functions are called are undefined. + times. The order of invocation of callback functions is undefined. If the function takes an input argument named ``dask_worker`` then that variable will be populated with the worker itself. @@ -3606,6 +3606,8 @@ def register_worker_callbacks(self, setup=None): setup : callable(dask_worker: Worker) -> None, or tuple of (name, callable) Function to register and run on all workers. If a name is not given, then it is generated from the callable. + + """ return self.sync(self._register_worker_callbacks, setup=setup) From 011a7d9745470157977664af823b4aabd4bb0d6c Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Tue, 4 Dec 2018 09:33:40 -0800 Subject: [PATCH 09/17] Do not run the same function twice. --- distributed/scheduler.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index c807f3dc672..8303c2f3a7b 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3101,11 +3101,14 @@ def register_worker_callbacks(self, comm, setup=None): if setup is not None: name, func = setup - # add the setup function to the list to run them on new clients. - self.worker_setups[name] = func + oldfunc = self.worker_setups.get(name, "") - # trigger the setup function on the existing clients. - responses.update((yield self.broadcast(msg=dict(op='run', function=func)))) + if oldfunc != func: + # add the setup function to the list to run them on new clients. + self.worker_setups[name] = func + + # trigger the setup function on the existing clients. + responses.update((yield self.broadcast(msg=dict(op='run', function=func)))) raise gen.Return(responses) From e3b095a14afbf3e342cd1d76f992b0b5a205a070 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Tue, 4 Dec 2018 09:58:21 -0800 Subject: [PATCH 10/17] Stop asserting the return value of register. There is no longer a guarentee the setup function is always ran together with the register call: If the same function is registered twice it will not run the second time. --- distributed/tests/test_worker.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 4c094f4f62a..bcf20d3b152 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1242,13 +1242,11 @@ def test_startup3(dask_worker): yield worker._close() # Add a preload function - result = yield c.register_worker_callbacks(setup=mystartup) - assert len(result) == 2 + yield c.register_worker_callbacks(setup=mystartup) assert len(s.worker_setups) == 1 # Add the same preload function, again - result = yield c.register_worker_callbacks(setup=mystartup) - assert len(result) == 2 + yield c.register_worker_callbacks(setup=mystartup) assert len(s.worker_setups) == 1 # Check it has been ran on existing worker @@ -1263,8 +1261,7 @@ def test_startup3(dask_worker): yield worker._close() # Register another preload function - result = yield c.register_worker_callbacks(setup=mystartup2) - assert len(result) == 2 + yield c.register_worker_callbacks(setup=mystartup2) assert len(s.worker_setups) == 2 # Check it has been run From a963509dee27d600dd095cccc05804b37db06608 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Tue, 4 Dec 2018 10:02:24 -0800 Subject: [PATCH 11/17] Update tests to assert the same function is ran once. --- distributed/tests/test_worker.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index bcf20d3b152..048663a6721 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1203,8 +1203,10 @@ def test_custom_metrics(c, s, a, b): @gen_cluster(client=True) def test_register_worker_callbacks(c, s, a, b): #preload function to run - def mystartup(dask_worker): - dask_worker.init_variable = 1 + def mystartup1(dask_worker): + if not hasattr(dask_worker, "init_variable"): + dask_worker.init_variable = 0 + dask_worker.init_variable = dask_worker.init_variable + 1 def mystartup2(): import os @@ -1216,9 +1218,12 @@ def mystartup3(dask_worker): dask_worker.init_variable2 = 1 #Check that preload function has been run - def test_import(dask_worker): + def test_startup1(dask_worker): return hasattr(dask_worker, 'init_variable') - # and dask_worker.init_variable == 1 + + def test_run_once(dask_worker): + return (hasattr(dask_worker, 'init_variable') + and dask_worker.init_variable == 1) def test_startup2(): import os @@ -1229,7 +1234,7 @@ def test_startup3(dask_worker): # Nothing has been run yet assert len(s.worker_setups) == 0 - result = yield c.run(test_import) + result = yield c.run(test_startup1) assert list(result.values()) == [False] * 2 result = yield c.run(test_startup2) assert list(result.values()) == [False] * 2 @@ -1237,26 +1242,29 @@ def test_startup3(dask_worker): # Start a worker and check that startup is not run worker = Worker(s.address, loop=s.loop) yield worker._start() - result = yield c.run(test_import, workers=[worker.address]) + result = yield c.run(test_startup1, workers=[worker.address]) assert list(result.values()) == [False] yield worker._close() # Add a preload function - yield c.register_worker_callbacks(setup=mystartup) + yield c.register_worker_callbacks(setup=mystartup1) assert len(s.worker_setups) == 1 # Add the same preload function, again - yield c.register_worker_callbacks(setup=mystartup) + yield c.register_worker_callbacks(setup=mystartup1) assert len(s.worker_setups) == 1 # Check it has been ran on existing worker - result = yield c.run(test_import) + result = yield c.run(test_startup1) + assert list(result.values()) == [True] * 2 + + result = yield c.run(test_run_once) assert list(result.values()) == [True] * 2 # Start a worker and check it is ran on it worker = Worker(s.address, loop=s.loop) yield worker._start() - result = yield c.run(test_import, workers=[worker.address]) + result = yield c.run(test_startup1, workers=[worker.address]) assert list(result.values()) == [True] yield worker._close() @@ -1277,7 +1285,7 @@ def test_startup3(dask_worker): # Start a worker and check it is ran on it worker = Worker(s.address, loop=s.loop) yield worker._start() - result = yield c.run(test_import, workers=[worker.address]) + result = yield c.run(test_startup1, workers=[worker.address]) assert list(result.values()) == [True] result = yield c.run(test_startup2, workers=[worker.address]) From 0a5653e737167d655eb4a7cd56c38d54adc2525d Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Tue, 4 Dec 2018 10:05:14 -0800 Subject: [PATCH 12/17] Add more assertions about named callbacks. --- distributed/tests/test_worker.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 048663a6721..d66e09406e2 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1268,8 +1268,12 @@ def test_startup3(dask_worker): assert list(result.values()) == [True] yield worker._close() - # Register another preload function - yield c.register_worker_callbacks(setup=mystartup2) + # Register another preload function, twice with a name + yield c.register_worker_callbacks(setup=('mystartup2', mystartup2)) + assert 'mystartup2' in s.worker_setups + yield c.register_worker_callbacks(setup=('mystartup2', mystartup2)) + assert 'mystartup2' in s.worker_setups + assert len(s.worker_setups) == 2 assert len(s.worker_setups) == 2 # Check it has been run @@ -1282,6 +1286,14 @@ def test_startup3(dask_worker): yield c.unregister_worker_callbacks(setup=mystartup3) assert len(s.worker_setups) == 2 + # unregister a preload function with name + yield c.register_worker_callbacks(setup=('mystartup3', mystartup3)) + assert len(s.worker_setups) == 3 + assert 'mystartup3' in s.worker_setups + yield c.unregister_worker_callbacks(setup=('mystartup3', None)) + assert 'mystartup3' not in s.worker_setups + assert len(s.worker_setups) == 2 + # Start a worker and check it is ran on it worker = Worker(s.address, loop=s.loop) yield worker._start() From bf31ecff87990dfc23773b462e27c97ba2698243 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Sun, 9 Dec 2018 16:07:56 -0800 Subject: [PATCH 13/17] Update format of docstring. --- distributed/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/client.py b/distributed/client.py index d22df88b2f5..0201fe0bb61 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3603,7 +3603,7 @@ def register_worker_callbacks(self, setup=None): Parameters ---------- - setup : callable(dask_worker: Worker) -> None, or tuple of (name, callable) + setup : callable(dask_worker: Worker) -> None or tuple of (name, callable) Function to register and run on all workers. If a name is not given, then it is generated from the callable. From c1fc6170955f15f4baf3513f6d5ef87aa1eba613 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Sun, 9 Dec 2018 16:08:07 -0800 Subject: [PATCH 14/17] Switch to tokenize --- distributed/client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 0201fe0bb61..bc3d60a404d 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -22,7 +22,6 @@ import socket import warnings import weakref -from hashlib import md5 import dask from dask.base import tokenize, normalize_token, collections_to_dsk @@ -118,8 +117,8 @@ def _serialize_named_func(named_func): serialized = dumps(func) if name is None: - h = md5(serialized) - name = funcname(func) + '-' + h.hexdigest() + h = tokenize(serialized) + name = funcname(func) + '-' + h named_func = (name, serialized) return named_func From 654a11421476d95278d6b74dc76efe5aa1f83561 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Sun, 9 Dec 2018 16:08:17 -0800 Subject: [PATCH 15/17] More docstring fixes. --- distributed/scheduler.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8303c2f3a7b..bf9fabc2766 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3092,9 +3092,10 @@ def get_task_stream(self, comm=None, start=None, stop=None, count=None): @gen.coroutine def register_worker_callbacks(self, comm, setup=None): - """ Registers a set of event driven callback functions on workers for the given name. + """ + Registers a set of event driven callback functions on workers for the given name. - setup must be a tuple of (name, serialized_function) + setup must be a tuple of (name, serialized_function) """ responses = {} From 61d1555b54d56f19b023d4efe12238b94da987cf Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Sun, 9 Dec 2018 16:08:48 -0800 Subject: [PATCH 16/17] unregister_worker_callbacks no need to be explicit coroutine. --- distributed/scheduler.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index bf9fabc2766..af440096124 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3113,14 +3113,13 @@ def register_worker_callbacks(self, comm, setup=None): raise gen.Return(responses) - @gen.coroutine def unregister_worker_callbacks(self, comm, setup=None): # add the setup function to the list to run them on new clients. if setup is not None: name, func = setup self.worker_setups.pop(name) - raise gen.Return(None) + return None ##################### # State Transitions # From a5a6a304ea9a536c14627e5574f49a89ba14e7a1 Mon Sep 17 00:00:00 2001 From: Yu Feng Date: Sun, 9 Dec 2018 16:23:54 -0800 Subject: [PATCH 17/17] remove _unregister_worker_callbacks from Client; update docstrings. --- distributed/client.py | 48 +++++++++++++++++++++------------------- distributed/scheduler.py | 14 ++++++++---- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index bc3d60a404d..ace70689013 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3588,11 +3588,15 @@ def register_worker_callbacks(self, setup=None): """ Registers a setup callback function for all current and future workers. - This registers a new setup function for workers in this cluster. The - function will run immediately on all currently connected workers. It - will also be run upon connection by any workers that are added in the - future. Multiple setup functions can be registered - We only keep one function version per name. + This registers a new setup function for workers in this cluster. + The function will run immediately on all currently connected workers. + It will also be run upon connection by any workers that are added in the + future. + Multiple setup functions can be registered. + An optional callback name can be provided by passing a tuple as + the argument. We only keep one function version per callback name. + If the callback name is not given, a unique name is generated by + tokenizing the function. The callback function shall be indempotent, and may be called several times. The order of invocation of callback functions is undefined. @@ -3602,33 +3606,31 @@ def register_worker_callbacks(self, setup=None): Parameters ---------- - setup : callable(dask_worker: Worker) -> None or tuple of (name, callable) + setup : callable(dask_worker: Worker) -> None, + or tuple of (name, callable) Function to register and run on all workers. - If a name is not given, then it is generated from the callable. - + If a name is not given, then a name is generated from the callable. """ return self.sync(self._register_worker_callbacks, setup=setup) - @gen.coroutine - def _unregister_worker_callbacks(self, setup=None): + def unregister_worker_callbacks(self, setup=None): + """ + Unregisters a worker callback registered via `register_worker_callbacks`. + See `register_worker_callbacks` for the definition of the arguments. + + Parameters + ---------- + setup : callable(dask_worker: Worker) -> None, + or tuple of (name, callable) + The setup callback to remove; it will be matched by the name. + If a name is not given, then a name is generated from the callable. + """ if setup is not None: setup = _serialize_named_func(setup) - result = yield self.scheduler.unregister_worker_callbacks(setup=setup) - raise gen.Return(result) - - def unregister_worker_callbacks(self, setup=None): - """ Unregisters a worker callback. - - Parameters - ---------- - setup : callable(dask_worker: Worker) -> None, or tuple of (name, callable) - The setup callback to remove; it will be matched by the name. - If a name is not given, then it is generated from the callable. - """ - return self.sync(self._unregister_worker_callbacks, setup=setup) + return self.sync(self.scheduler.unregister_worker_callbacks, setup=setup) class Executor(Client): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index af440096124..3c06b7db46f 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3093,7 +3093,8 @@ def get_task_stream(self, comm=None, start=None, stop=None, count=None): @gen.coroutine def register_worker_callbacks(self, comm, setup=None): """ - Registers a set of event driven callback functions on workers for the given name. + Registers a set of event driven callback functions + on workers for the given name. setup must be a tuple of (name, serialized_function) """ @@ -3114,13 +3115,18 @@ def register_worker_callbacks(self, comm, setup=None): raise gen.Return(responses) def unregister_worker_callbacks(self, comm, setup=None): - # add the setup function to the list to run them on new clients. + """ + Unregisters a set of event driven callback functions on workers + for the given name. + + setup must be a tuple of (name, serialized_function). + The value of serialized_function is unused. + + """ if setup is not None: name, func = setup self.worker_setups.pop(name) - return None - ##################### # State Transitions # #####################