-
-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler task transition tracing #5954
Conversation
30dba93
to
c229a65
Compare
Unit Test Results 18 files ±0 18 suites ±0 10h 15m 31s ⏱️ + 20m 12s For more details on these failures, see this check. Results for commit 7bf0117. ± Comparison against base commit 2ff681c. ♻️ This comment has been updated with latest results. |
Thanks for the comments @fjetter. I think this is ready for further review. One observation I'd like to make is that there are a number of members that follow the following pattern: def do_something(..., stimulus_id=None):
stimulus_id = stimulus_id of f"do-something-{time()}" For example, reschedule: #5954 (comment). These patterns exist to ensure that
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor comments. The biggest thing is that I would like us try my suggestion about keeping the stimulus_id high level and not pass it down to every transition method. That would be really nice, I think. A similar thing could be done on worker side, I believe but I suggest to not change anything about the worker signatures in this PR.
a: tuple = parent._transition(key, finish, *args, **kwargs) | ||
a: tuple = parent._transition( | ||
key, finish, *args, stimulus_id=stimulus_id, **kwargs | ||
) | ||
recommendations, client_msgs, worker_msgs = a | ||
self.send_all(client_msgs, worker_msgs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if a cleaner approach to this would be to not add stimulus_id to every transition_X_Y
method but instead deal with the required mutations here.
the only thing a transition_X_Y
method can (should) do with the stimulus_id is to attach it to a worker or client message. However, why don't we attach this to every worker message here and save ourselves these dirty signature?
e.g.
def transition_memory_forgotten(self, key):
...
# This is the only place we're actually using the stim ID. _propagate_forgotten only adds it to the worker_msgs. we can add this on a higher level and don't need to pass it down into every method.
_propagate_forgotten(
self, ts, recommendations, worker_msgs
)
return recommendations, client_msgs, worker_msgs
I haven't verified if this works but it would be much less invasive.
Adding the stim ID could be performed as part of send_all where we're iterating over the messages anyhow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a look at the 15 transitions in SchedulerState, of which 6 take stimulus_id's, while 9 do not. I think the two numbers are close enough that either approach might be ugly and my vote would be for the ugliness of extra stimulus kwargs in the 15 transition functions.
I was thinking about this a bit more and two other approaches occurred to me. Here's the flavour of the first:
import inspect
class TransitionFunction:
def __init__(self, fn):
assert callable(fn)
self.sig = inspect.signature(fn)
def stimulus_in_sig(self):
pass # implement
def __callable__(self, *args, **kw):
if not self.stimulus_in_sig():
kw.pop("stimulus_id", None)
return self.fn(*args, **kw)
return self.fn(*args, **kw)
class SchedulerState:
self._transitions_table = {
("released", "waiting"): TransitionFunction(self.transition_released_waiting),
("waiting", "released"): TransitionFunction(self.transition_waiting_released),
...
("released", "erred"): TransitionFunction(self.transition_released_erred),
}
The second idea is not yet fully formed but it might be possible to automatically generate and inject stimulus_id's
into the distributed.core.Server
class handlers. Then, it might be possible to store the stimulus_id'
s in ContextVars that can be passed through async/sync call frames and inspected at the point where we need the stimulus_id
's. This could be combined with Tensorflow's ideas around variable scoping (e.g. see https://www.tensorflow.org/api_docs/python/tf/name_scope)
It also might be possible to track the call frames inspect.currentframe()
back to distributed.core.Server
handlers and automatically derive stimulus_id
's.
I think the nice thing about this approach is it discard's the need to generate and pass stimulus_id
's around the code base -- one could simply retrieve an appropriately generated stimulus. On the other hand, the logic might be too complicated and magical.
Pinging @graingert in case there's some problem with this approach and I spend to much time down this rabbithole.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stated in a simpler way, I'm thinking of an ExitStack-like construct which
- Would be automatically initialised with supplied or generated stimulus_id's at the Server handlers
- Supports overriding of existing stimulus_id's throughout Server sub-classes.
- Is safe for use with asyncio (I think contextvars gives us this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stated in a simpler way, I'm thinking of an ExitStack-like construct which
Looks like AsyncExitStack is a possibility here.
Closed in favour of #6095 |
pre-commit run --all-files