Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Scheduler and Worker memory tracking #2847

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,9 @@ class Scheduler(ServerNode):
report results
* **task_duration:** ``{key-prefix: time}``
Time we expect certain functions to take, e.g. ``{'sum': 0.25}``
* **task_net_nbytes** ``{key-prefix: int}``
Expected change in cluster memory usage, in bytes, from completing
a task with a given prefix
* **coroutines:** ``[Futures]``:
A list of active futures that control operation
"""
Expand Down Expand Up @@ -953,6 +956,7 @@ def __init__(

# Prefix-keyed containers
self.task_duration = {prefix: 0.00001 for prefix in fast_tasks}
self.task_net_nbytes = defaultdict(lambda: 0)
self.unknown_durations = defaultdict(set)

# Client state
Expand Down Expand Up @@ -2294,6 +2298,7 @@ def send_task_to_worker(self, worker, key):
"key": key,
"priority": ts.priority,
"duration": self.get_task_duration(ts),
"net_nbytes": self.task_net_nbytes[ts.prefix],
}
if ts.resource_restrictions:
msg["resource_restrictions"] = ts.resource_restrictions
Expand Down Expand Up @@ -3449,6 +3454,8 @@ def _add_to_memory(
):
"""
Add *ts* to the set of in-memory tasks.

Mutates recommendations inplace.
"""
if self.validate:
assert ts not in ws.has_what
Expand All @@ -3472,9 +3479,13 @@ def _add_to_memory(
s.discard(ts)
if not s and not dts.who_wants:
recommendations[dts.key] = "released"
if dts.nbytes:
self.task_net_nbytes[ts.prefix] -= dts.nbytes

if not ts.waiters and not ts.who_wants:
recommendations[ts.key] = "released"
if ts.nbytes:
self.task_net_nbytes[ts.prefix] -= ts.nbytes
else:
msg = {"op": "key-in-memory", "key": ts.key}
if type is not None:
Expand Down Expand Up @@ -3780,6 +3791,7 @@ def transition_processing_memory(
############################
if nbytes is not None:
ts.set_nbytes(nbytes)
self.task_net_nbytes[ts.prefix] += nbytes

recommendations = OrderedDict()

Expand Down
15 changes: 15 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,21 @@ async def test_async_context_manager():
assert not s.workers


@gen_cluster(client=True)
def test_net_nbytes(c, s, a, b):
dsk = {
"a-1": (bytearray, 1000),
"a-2": (bytearray, 1000),
"b-2": (lambda t, u: t[:10], "a-1", "a-2"),
}
x = c.get(dsk, "b-2", sync=False)
result = yield x
assert result == bytearray(10)
assert s.task_net_nbytes["a"] > 0
assert s.task_net_nbytes["b"] < 0
# TODO: Add asserts on the worker's properties


@pytest.mark.asyncio
async def test_allowed_failures_config():
async with Scheduler(port=0, allowed_failures=10) as s:
Expand Down
7 changes: 6 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class Worker(ServerNode):
* **data_needed**: deque(keys)
The keys whose data we still lack, arranged in a deque
* **waiting_for_data**: ``{kep: {deps}}``
A dynamic verion of dependencies. All dependencies that we still don't
A dynamic version of dependencies. All dependencies that we still don't
have for a particular key.
* **ready**: [keys]
Keys that are ready to run. Stored in a LIFO stack
Expand Down Expand Up @@ -355,6 +355,7 @@ def __init__(
self.priorities = dict()
self.generation = 0
self.durations = dict()
self.net_nbytes = defaultdict(lambda: 0)
self.startstops = defaultdict(list)
self.resource_restrictions = dict()

Expand Down Expand Up @@ -1233,6 +1234,7 @@ def add_task(
nbytes=None,
priority=None,
duration=None,
net_nbytes=None,
resource_restrictions=None,
actor=False,
**kwargs2
Expand All @@ -1257,6 +1259,9 @@ def add_task(
priority = tuple(priority) + (self.generation,)
self.generation -= 1

if net_nbytes is not None:
self.net_nbytes[key_split(key)] = net_nbytes

if self.dep_state.get(key) == "memory":
self.task_state[key] = "memory"
self.send_task_state_to_scheduler(key)
Expand Down