-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Alternatives for current ensure_communicating #6497
Alternatives for current ensure_communicating #6497
Comments
While I'm warm to deduplicate data_needed vs. data_needed_per_worker, I don't see how any of this removes the problem about when to trigger My suggestion: Remove
Individual GatherDep instructions stop being enriched when their individual nbytes exceed an individual threshold At the top of instructions = list({id(instr): instr for instr in instructions}.values())
flushing This closes #6462 (comment) for both the case of multiple fetches from the same event as well as for the case of multiple fetches from multiple commands in short succession, and will cause calling |
The big difference between my proposal above and both the current system and the proposal to remove e.g. current system will fetch 50 keys from worker a and 40 keys from worker b |
The proposal that includes an instruction |
|
I see this as a major antipattern to the current worker state machine refactoring, as it would require the whole contents of |
Good point. Taking a step back, I'm wondering how we'd approach this from a TDD perspective.
This could be roughly written up as something like the following assert ComputeTaskEvent(key='f1', who_has={"f2": {"workerB"}, "f3", {"workerB"}, ...) in state.event_log
...
assert GatherDep(worker="workerB", to_gather={"f2", "f3"}) in state.instructions_log and also assert AcquireReplicasEvent(who_has={"f2": {"workerB"}, "f3", {"workerB"}, ...) in state.event_log
assert GatherDep(worker="workerB", to_gather={"f2", "f3"}) in state.instructions_log correct? I'm wondering if we are already able to write such a test down, after #6566 About the actual problem, I'm wondering if the order of transitioning cannot be used to influence this. I think right now the above requests would play out like (depth first)
Would it help if we instead performed a breadth-first transitioning?
|
Breadth-first would help. Not necessary with my design above though - could you comment on it? For testing, I envision something like instructions = state.handle_stimulus(AcquireReplicasEvent(...))
assert instructions == [
GatherDep(...),
GatherDep(...),
GatherDep(...),
]
assert state.tasks["x"].state == "flight"
assert state.tasks["y"].state == "fetch" |
sure, I'll need to give it another pass
I would actually prefer to leave the task states out of this for this case. I think the events and instructions should be enough and would make for great high level tests. |
IIUC your algorithm would make poor decisions in the following case Below shows data needed as a sorted list in a simplified format. Omitting key, priority etc. merely nbytes and who_has Worker.comm_threshold_bytes = 100
[
# nbytes, who_has
(1, {"A"}), # -> new GatherDep for A
(1000, {"C"}), # -> new GatherDep for C
# <--- Break since sum(AllGatherDepNbytes) > Worker.comm_thresholds_bytes
(1, {"A"}),
(1, {"A"}),
(1, {"A"}),
(1, {"A"}),
(1, {"A"}),
] I think fetching all tasks from A and then fetching the task from C is better. Your proposal about saturated GatherDep requests is interesting. For the sake of completeness, if we simply limit by number of outgoing connections, we'd be exposed to a similar situation as above with the following queue Worker.total_out_connections = 3
[
# nbytes, who_has
(1, {"A"}), # -> new GatherDep for A
(1, {"B"}), # -> new GatherDep for B
(1, {"C"}), # -> new GatherDep for C
# <--- Break since count(outgoing_requests) >= Worker.total_out_connections
(1, {"A"}),
(1, {"A"}),
(1, {"A"}),
(1, {"A"}),
(1, {"A"}),
] Only filtering by saturated requests would not have this problem. However, I don't feel great about this proposal since it would effectively reduce the It is interesting since it asks the question about what this local/per-request limit should actually be. Do we even need a per-request limit / Zooming out a bit, there are a couple of questions that connect our two proposals which I believe can be discussed mostly separately
1.A) Maintain data_needed_per_worker to_fetch = defaultdict(list)
leftover_data_per_worker = {}
while data_per_worker and not above_global_size_threshold():
worker, tasks = pick_global_best_worker(data_per_worker)
if worker is None: # E.g. all in-flight. Handled by pick_global_best_worker
break
while tasks:
ts = tasks.pop()
if above_local_threshold(worker, ts):
leftover_data_per_worker[worker] = tasks
break
to_fetch[worker].append(ts)
self.data_per_worker.update(leftover_data_per_worker) 1B) Keep data_needed to_fetch = defaultdict(list)
self.skipped_worker_in_flight_or_busy = []
while data_needed and not above_global_size_threshold():
ts = data_needed.pop()
worker = pick_best_worker_for_task(ts)
if above_local_threshold(worker, ts):
self.skipped_worker_in_flight_or_busy.append(ts)
continue
to_fetch[worker].append(ts)
# TODO: IIUC your proposal would envision having skipped_worker_in_flight_or_busy to be consolidated elsewhere?? We could engage in a thorough runtime analysis and try to estimate which is faster. My gut feeling tells me mine should fare better even with the amortized cost of maintaining
We may not need local limits. Not sure. I see room for this in both proposals and we can kick this down the road. I just had the feeling it was more important in your proposal than in mine because you talked about
I think this is the big one. I see a couple of options 3A) Mutate things in place like what you described. I think there are nuances but overall this would amount to either mutating issued instructions or mutating some state the instruction references (e.g. instruction points to a dict/set on the WorkerState we keep updating). Honestly, this feels like cheating and equally opposed to the new design as letting the server perform transition. This proposal would harm replayability / lineage and I would like to avoid this 3B) Delay 3C) Use a breadth first transition. this might benefit us in other areas as well since it would allow us to consider priorities more strictly. FWIW, I encountered an ordering issue about transitions before on scheduler side, see #6248 (comment) 3D) Perform some aggregation before merge_instructions([
GatherDepDecision(worker="A", key="foo"),
GatherDepDecision(worker="A", key="bar"),
GatherDepDecision(worker="B", key="baz"),
])
>> {GatherDep(worker="A", keys={"foo", "bar"}), GatherDep(worker="B", keys={"baz"}) Of course, these three questions are related but I think we can make any combination work with appropriate adjustments Conclusion
FWIW, I've come across use cases for an insertion ordered set a couple of times now. We might want to consider adding such a collection instead of doing any deduplication. Should also be trivial to implement if we re-use the HeapSet. |
I played with this and it doesn't work: https://github.com/crusaderky/distributed/tree/WSMR/breadth_first Story of the new test
Depth-first (without the change to
Breadth-first actually doesn't change anything. We're calling
I appreciate the pragmaticity of this one. However it must be accompanied by a solution to the O(nlogn) issue for unschedulable tasks, since on its own it would further exacerbate it.
This is also reasonable. |
We've recently been refactoring the worker state machine (#5736)
The new system works great for single-task-based transitions but feels awkward as soon as we're trying to treat groups or batches of tasks similarly which is particularly important for fetching remote data. We are trying to not perform a network request per key but rather try to perform a request per worker and try to get as many keys of it as reasonable.
This mechanism worked so far by running
Worker.(_)ensure_communicating
frequently which iterates over all tasks and schedules a fetch/gather_dep (pseudo code below)This has a couple of problems
EnsureCommunicatingAfterTransitions
instruction was introduced (see also Remove EnsureCommunicatingAfterTransitions #6462) to mimic earlier behaviorProposal
I think we can get rid of
Worker.data_needed
entirely. Since we're storing everything in heaps, by now,Worker.data_needed_per_worker
carries everything we need if we change how ensure_communicating works which might have other interesting side effects.To illustrate this, consider the following two helper functions
With these two helpers we should be able to construct a new, simplified version if we switch our loop from tasks to workers
or even change the
GatherDep
instruction to worker-only and delay the decision of what tasks to fetch until this instruction is acted on, which typically only happens after all transitions inWorker.transitions
are worked offcc @crusaderky
The text was updated successfully, but these errors were encountered: