-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: DispatchTracker to replace everything #2179
Conversation
613588f
to
0d60332
Compare
f460e93
to
d918f6c
Compare
@@ -396,4 +373,30 @@ ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) { | |||
return pp->at(res_id); | |||
} | |||
|
|||
DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners, | |||
facade::Connection* issuer, bool ignore_paused) | |||
: listeners_{listeners.begin(), listeners.end()}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to make it safe to pass a single {this}
, but if we just copy the span it becomes a dangling array... So either use a vector or include absl fixed array, but given how rare this operation is we can simply use a vector
// Set global pause state and track commands that are running when the pause state is flipped. | ||
// Exlude already paused commands from the busy count. | ||
DispatchTracker tracker{GetListeners(), cntx->conn(), true /* ignore paused commands */}; | ||
service_.proactor_pool().Await([&tracker, pause_state](util::ProactorBase* pb) { | ||
// Commands don't suspend before checking the pause state, so | ||
// it's impossible to deadlock on waiting for a command that will be paused. | ||
tracker.TrackOnThread(); | ||
ServerState::tlocal()->SetPauseState(pause_state, true); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I use this instead of the pausing-dispatch fix... Should work if we don't suspend before checking pause state
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()}; | ||
auto cb = [&tracker, &new_config](util::ProactorBase* pb) { | ||
tl_cluster_config = new_config; | ||
tracker.TrackOnThread(); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we can now pause writes when replacing the cluster config, to actually return an error without applying. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in
run {
paused = true
tracker.track_on_thread()
}
if not tracker.wait( 1s ):
return "Can't replace cluster config with constant ops running"
# no writes
run {
tl_config = new_config
paused = false
}
but why should we pause under heavy load without long running ops 😞 ?
One more option would be using a global tx, but that doesn't prevent write commands from still being scheduled
142bdd5
to
80c011b
Compare
src/server/main_service.cc
Outdated
// Stop accumulating when a pause is requested, fall back to regular dispatch | ||
if (dfly::ServerState::tlocal()->IsPaused()) | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current logic of await dispatches is the following: Every command that is dispatching during paused = false, will eventually conclude and will reply to our checkpoint request
That is why we have to abort early here to handle the checkpoint, and then we invoke non-squashed dispatch for the next command to become paused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have to break here or we can change the logic to pause inside InvokeCmd instead inside DispatchCommand and this way we will pause on squashed commands as well, the same change that I did in my last PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer breaking here because I don't know how to easily solve it the other way round.
As long as DispatchManyCommands is running, we have async_dispatch=true
, even when it just switches commands. This implementation doesn't block clients, so we can only track running commands when we set the pause state. If we set paused during pipeline squashing and let it block, we will wait for it forever
We don't have this problem now, because we:
- block clients
- wait for squashing to finish (with
AwaitCurrentDispatches
) - don't have 100% guarantees with
AwaitCurrentDispatches
. We currently have two balancingThisFiber::Yield
calls indragonfly_connection
, soAwaitCurrentDispatches
might see the connection fiber sleeping and not dispatching, even if it really has more work to do... And by the time we reply OK to the client, we could still be running commands that we missed (+ to feat: DispatchTracker to replace everything #2179 (comment))
80c011b
to
c8d3dfb
Compare
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()}; | ||
auto cb = [&tracker, &new_config](util::ProactorBase* pb) { | ||
tl_cluster_config = new_config; | ||
tracker.TrackOnThread(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need TrackOnThread?
why TrackAll after AwaitFiberOnAll is finished to update the config on all threads is not good?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not do it correctly if it takes only a few lines 🙂
TrackAll after AwaitFiberOnAll is finished to update the config on all threads is not good
Because it's possibly waiting for totally different stuff to finish, including commands that would already be running with the new cluster config. The connection has spurious suspends (Yields()), so we might miss operations
@@ -1277,12 +1283,15 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, | |||
|
|||
// Dispatch non squashed command only after all squshed commands were executed and replied | |||
DispatchCommand(args, cntx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are calling DispatchCommand which in case of puase will make us wait. It could be that between the check above of IsPaused and this line the SetPause was called. So it will be safer to breake here incase IsPaused is true. I understand this flow is not triggered now as we call DispatchCommand here only for multi/eval, but if for some reason we will change the flow and call DispatchCommand in other cases this will create a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yes, true, good catch!
c8d3dfb
to
35fac73
Compare
Fixed. I don't like how DispatchManyCommands looks, will try to simplify it in the future 😢 For now I'd suggest moving on |
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
5ce1b51
to
7ae9c6c
Compare
Fixed a bug: I forgot to remove AwaitPauseState from InvokeCmd() and check pause state when starting DispatchMany |
I'll merge only after 1.13 is cut |
I see 1.13 is cut, let's give it some mileage on regtests |
Extends AwaitDispatches to be a re-usable DispatchTracker
In contrast to an AwaitDispatches() call, the tracker can be used to track connection dispatches exactly when the state is changed, so we actually wait for the right commands to finish, not for anything that is running roughly at the same time
Re-adopt CLIENT PAUSE to use it
Adds tracking to cluster family