forked from dask/distributed
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Replace register_worker_callbacks with worker plugins (dask#2453)
* Add worker plugins * add docstring * Replace legacy worker_callbacks with worker_plugins * add and test name keyword * fix missing import * black * respond to feedback * Handle errors again * Expand docstring
- Loading branch information
Showing
5 changed files
with
206 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
from distributed.utils_test import gen_cluster | ||
from distributed import Worker | ||
|
||
|
||
class MyPlugin: | ||
name = "MyPlugin" | ||
|
||
def __init__(self, data): | ||
self.data = data | ||
|
||
def setup(self, worker): | ||
assert isinstance(worker, Worker) | ||
self.worker = worker | ||
self.worker._my_plugin_status = "setup" | ||
self.worker._my_plugin_data = self.data | ||
|
||
def teardown(self, worker): | ||
assert isinstance(worker, Worker) | ||
self.worker._my_plugin_status = "teardown" | ||
|
||
|
||
@gen_cluster(client=True, ncores=[]) | ||
def test_create_with_client(c, s): | ||
yield c.register_worker_plugin(MyPlugin(123)) | ||
|
||
worker = Worker(s.address, loop=s.loop) | ||
yield worker._start() | ||
assert worker._my_plugin_status == "setup" | ||
assert worker._my_plugin_data == 123 | ||
|
||
yield worker._close() | ||
assert worker._my_plugin_status == "teardown" | ||
|
||
|
||
@gen_cluster(client=True, worker_kwargs={"plugins": [MyPlugin(5)]}) | ||
def test_create_on_construction(c, s, a, b): | ||
assert len(a.plugins) == len(b.plugins) == 1 | ||
assert a._my_plugin_status == "setup" | ||
assert a._my_plugin_data == 5 | ||
|
||
|
||
@gen_cluster(client=True, worker_kwargs={"plugins": [MyPlugin(5)]}) | ||
def test_idempotence_with_name(c, s, a, b): | ||
a._my_plugin_data = 100 | ||
|
||
yield c.register_worker_plugin(MyPlugin(5)) | ||
|
||
assert a._my_plugin_data == 100 # call above has no effect | ||
|
||
|
||
@gen_cluster(client=True, worker_kwargs={"plugins": [MyPlugin(5)]}) | ||
def test_duplicate_with_no_name(c, s, a, b): | ||
assert len(a.plugins) == len(b.plugins) == 1 | ||
|
||
plugin = MyPlugin(10) | ||
plugin.name = "other-name" | ||
|
||
yield c.register_worker_plugin(plugin) | ||
|
||
assert len(a.plugins) == len(b.plugins) == 2 | ||
|
||
assert a._my_plugin_data == 10 | ||
|
||
yield c.register_worker_plugin(plugin) | ||
assert len(a.plugins) == len(b.plugins) == 2 | ||
|
||
yield c.register_worker_plugin(plugin, name="foo") | ||
assert len(a.plugins) == len(b.plugins) == 3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters