diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 66d8fdaac90..eb4f235248d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -819,6 +819,9 @@ class Scheduler(ServerNode): report results * **task_duration:** ``{key-prefix: time}`` Time we expect certain functions to take, e.g. ``{'sum': 0.25}`` + * **task_net_nbytes** ``{key-prefix: int}`` + Expected change in cluster memory usage, in bytes, from completing + a task with a given prefix * **coroutines:** ``[Futures]``: A list of active futures that control operation """ @@ -953,6 +956,7 @@ def __init__( # Prefix-keyed containers self.task_duration = {prefix: 0.00001 for prefix in fast_tasks} + self.task_net_nbytes = defaultdict(lambda: 0) self.unknown_durations = defaultdict(set) # Client state @@ -2294,6 +2298,7 @@ def send_task_to_worker(self, worker, key): "key": key, "priority": ts.priority, "duration": self.get_task_duration(ts), + "net_nbytes": self.task_net_nbytes[ts.prefix], } if ts.resource_restrictions: msg["resource_restrictions"] = ts.resource_restrictions @@ -3449,6 +3454,8 @@ def _add_to_memory( ): """ Add *ts* to the set of in-memory tasks. + + Mutates recommendations inplace. """ if self.validate: assert ts not in ws.has_what @@ -3472,9 +3479,13 @@ def _add_to_memory( s.discard(ts) if not s and not dts.who_wants: recommendations[dts.key] = "released" + if dts.nbytes: + self.task_net_nbytes[ts.prefix] -= dts.nbytes if not ts.waiters and not ts.who_wants: recommendations[ts.key] = "released" + if ts.nbytes: + self.task_net_nbytes[ts.prefix] -= ts.nbytes else: msg = {"op": "key-in-memory", "key": ts.key} if type is not None: @@ -3780,6 +3791,7 @@ def transition_processing_memory( ############################ if nbytes is not None: ts.set_nbytes(nbytes) + self.task_net_nbytes[ts.prefix] += nbytes recommendations = OrderedDict() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e8d2a96ee60..0b24668ad80 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1609,6 +1609,21 @@ async def test_async_context_manager(): assert not s.workers +@gen_cluster(client=True) +def test_net_nbytes(c, s, a, b): + dsk = { + "a-1": (bytearray, 1000), + "a-2": (bytearray, 1000), + "b-2": (lambda t, u: t[:10], "a-1", "a-2"), + } + x = c.get(dsk, "b-2", sync=False) + result = yield x + assert result == bytearray(10) + assert s.task_net_nbytes["a"] > 0 + assert s.task_net_nbytes["b"] < 0 + # TODO: Add asserts on the worker's properties + + @pytest.mark.asyncio async def test_allowed_failures_config(): async with Scheduler(port=0, allowed_failures=10) as s: diff --git a/distributed/worker.py b/distributed/worker.py index e124ba6ab1f..1c35afcd91c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -167,7 +167,7 @@ class Worker(ServerNode): * **data_needed**: deque(keys) The keys whose data we still lack, arranged in a deque * **waiting_for_data**: ``{kep: {deps}}`` - A dynamic verion of dependencies. All dependencies that we still don't + A dynamic version of dependencies. All dependencies that we still don't have for a particular key. * **ready**: [keys] Keys that are ready to run. Stored in a LIFO stack @@ -355,6 +355,7 @@ def __init__( self.priorities = dict() self.generation = 0 self.durations = dict() + self.net_nbytes = defaultdict(lambda: 0) self.startstops = defaultdict(list) self.resource_restrictions = dict() @@ -1233,6 +1234,7 @@ def add_task( nbytes=None, priority=None, duration=None, + net_nbytes=None, resource_restrictions=None, actor=False, **kwargs2 @@ -1257,6 +1259,9 @@ def add_task( priority = tuple(priority) + (self.generation,) self.generation -= 1 + if net_nbytes is not None: + self.net_nbytes[key_split(key)] = net_nbytes + if self.dep_state.get(key) == "memory": self.task_state[key] = "memory" self.send_task_state_to_scheduler(key)