-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP Allowing named worker-setups #2391
base: main
Are you sure you want to change the base?
Changes from all commits
1862c9e
13322a8
f6913c2
eb52c33
69558a5
6d2b939
86d20ec
bbc7f09
011a7d9
e3b095a
a963509
0a5653e
bf31ecf
c1fc617
654a114
61d1555
a5a6a30
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1203,69 +1203,110 @@ def test_custom_metrics(c, s, a, b): | |
@gen_cluster(client=True) | ||
def test_register_worker_callbacks(c, s, a, b): | ||
#preload function to run | ||
def mystartup(dask_worker): | ||
dask_worker.init_variable = 1 | ||
def mystartup1(dask_worker): | ||
if not hasattr(dask_worker, "init_variable"): | ||
dask_worker.init_variable = 0 | ||
dask_worker.init_variable = dask_worker.init_variable + 1 | ||
|
||
def mystartup2(): | ||
import os | ||
os.environ['MY_ENV_VALUE'] = 'WORKER_ENV_VALUE' | ||
return "Env set." | ||
|
||
#preload function to run | ||
def mystartup3(dask_worker): | ||
dask_worker.init_variable2 = 1 | ||
|
||
#Check that preload function has been run | ||
def test_import(dask_worker): | ||
def test_startup1(dask_worker): | ||
return hasattr(dask_worker, 'init_variable') | ||
# and dask_worker.init_variable == 1 | ||
|
||
def test_run_once(dask_worker): | ||
return (hasattr(dask_worker, 'init_variable') | ||
and dask_worker.init_variable == 1) | ||
|
||
def test_startup2(): | ||
import os | ||
return os.getenv('MY_ENV_VALUE', None) == 'WORKER_ENV_VALUE' | ||
|
||
def test_startup3(dask_worker): | ||
return hasattr(dask_worker, 'init_variable2') | ||
|
||
# Nothing has been run yet | ||
assert len(s.worker_setups) == 0 | ||
result = yield c.run(test_import) | ||
result = yield c.run(test_startup1) | ||
assert list(result.values()) == [False] * 2 | ||
result = yield c.run(test_startup2) | ||
assert list(result.values()) == [False] * 2 | ||
|
||
# Start a worker and check that startup is not run | ||
worker = Worker(s.address, loop=s.loop) | ||
yield worker._start() | ||
result = yield c.run(test_import, workers=[worker.address]) | ||
result = yield c.run(test_startup1, workers=[worker.address]) | ||
assert list(result.values()) == [False] | ||
yield worker._close() | ||
|
||
# Add a preload function | ||
response = yield c.register_worker_callbacks(setup=mystartup) | ||
assert len(response) == 2 | ||
yield c.register_worker_callbacks(setup=mystartup1) | ||
assert len(s.worker_setups) == 1 | ||
|
||
# Add the same preload function, again | ||
yield c.register_worker_callbacks(setup=mystartup1) | ||
assert len(s.worker_setups) == 1 | ||
|
||
# Check it has been ran on existing worker | ||
result = yield c.run(test_import) | ||
result = yield c.run(test_startup1) | ||
assert list(result.values()) == [True] * 2 | ||
|
||
result = yield c.run(test_run_once) | ||
assert list(result.values()) == [True] * 2 | ||
|
||
# Start a worker and check it is ran on it | ||
worker = Worker(s.address, loop=s.loop) | ||
yield worker._start() | ||
result = yield c.run(test_import, workers=[worker.address]) | ||
result = yield c.run(test_startup1, workers=[worker.address]) | ||
assert list(result.values()) == [True] | ||
yield worker._close() | ||
|
||
# Register another preload function | ||
response = yield c.register_worker_callbacks(setup=mystartup2) | ||
assert len(response) == 2 | ||
# Register another preload function, twice with a name | ||
yield c.register_worker_callbacks(setup=('mystartup2', mystartup2)) | ||
assert 'mystartup2' in s.worker_setups | ||
yield c.register_worker_callbacks(setup=('mystartup2', mystartup2)) | ||
assert 'mystartup2' in s.worker_setups | ||
assert len(s.worker_setups) == 2 | ||
assert len(s.worker_setups) == 2 | ||
|
||
# Check it has been run | ||
result = yield c.run(test_startup2) | ||
assert list(result.values()) == [True] * 2 | ||
|
||
# unregister a preload function | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed 'register_worker_callbacks' to return the registered names to allow unregisterring callback functions with inferred names. |
||
yield c.register_worker_callbacks(setup=mystartup3) | ||
assert len(s.worker_setups) == 3 | ||
yield c.unregister_worker_callbacks(setup=mystartup3) | ||
assert len(s.worker_setups) == 2 | ||
|
||
# unregister a preload function with name | ||
yield c.register_worker_callbacks(setup=('mystartup3', mystartup3)) | ||
assert len(s.worker_setups) == 3 | ||
assert 'mystartup3' in s.worker_setups | ||
yield c.unregister_worker_callbacks(setup=('mystartup3', None)) | ||
assert 'mystartup3' not in s.worker_setups | ||
assert len(s.worker_setups) == 2 | ||
|
||
# Start a worker and check it is ran on it | ||
worker = Worker(s.address, loop=s.loop) | ||
yield worker._start() | ||
result = yield c.run(test_import, workers=[worker.address]) | ||
result = yield c.run(test_startup1, workers=[worker.address]) | ||
assert list(result.values()) == [True] | ||
|
||
result = yield c.run(test_startup2, workers=[worker.address]) | ||
assert list(result.values()) == [True] | ||
|
||
# startup3 is not ran, as it is unregistered before worker is added. | ||
result = yield c.run(test_startup3, workers=[worker.address]) | ||
assert list(result.values()) == [False] | ||
|
||
yield worker._close() | ||
|
||
# Final exception test | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -513,6 +513,8 @@ def __init__(self, scheduler_ip=None, scheduler_port=None, | |
io_loop=self.io_loop) | ||
self.periodic_callbacks['profile-cycle'] = pc | ||
|
||
self.worker_setups = {} | ||
|
||
_global_workers.append(weakref.ref(self)) | ||
|
||
################## | ||
|
@@ -610,23 +612,26 @@ def _register_with_scheduler(self): | |
raise ValueError("Unexpected response from register: %r" % | ||
(response,)) | ||
else: | ||
# Retrieve eventual init functions and run them | ||
for function_bytes in response['worker-setups']: | ||
setup_function = pickle.loads(function_bytes) | ||
if has_arg(setup_function, 'dask_worker'): | ||
result = setup_function(dask_worker=self) | ||
else: | ||
result = setup_function() | ||
logger.info('Init function %s ran: output=%s' % (setup_function, result)) | ||
|
||
logger.info(' Registered to: %26s', self.scheduler.address) | ||
logger.info('-' * 49) | ||
|
||
# Retrieve eventual init functions (only worker-setups for now) | ||
for name, function_bytes in response['worker-setups'].items(): | ||
self.worker_setups[name] = pickle.loads(function_bytes) | ||
|
||
self.batched_stream = BatchedSend(interval='2ms', loop=self.loop) | ||
self.batched_stream.start(comm) | ||
self.periodic_callbacks['heartbeat'].start() | ||
self.loop.add_callback(self.handle_scheduler, comm) | ||
|
||
# run eventual init functions (only worker-setups for now) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this to run the worker init functions after heartbeat; also I am using the locak worker_setups directionary rather than the unpickled function directly. |
||
for name, setup_function in self.worker_setups.items(): | ||
if has_arg(setup_function, 'dask_worker'): | ||
result = setup_function(dask_worker=self) | ||
else: | ||
result = setup_function() | ||
logger.info('Init function %s : %s ran: output=%s' % (name, setup_function, result)) | ||
|
||
@gen.coroutine | ||
def heartbeat(self): | ||
if not self.heartbeat_active: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For spacing I recommend the following: