Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DispatchTracker to replace everything #2179

Merged
merged 7 commits into from
Dec 5, 2023

Conversation

dranikpg
Copy link
Contributor

@dranikpg dranikpg commented Nov 15, 2023

Extends AwaitDispatches to be a re-usable DispatchTracker

In contrast to an AwaitDispatches() call, the tracker can be used to track connection dispatches exactly when the state is changed, so we actually wait for the right commands to finish, not for anything that is running roughly at the same time

Re-adopt CLIENT PAUSE to use it

  • Removes the dispatch pause as we don't need it anymore when we exclude paused commands from tracking
  • Fixes the bug with multiple client pauses not being possible (because the old AwaitDispatches doesn't detect paused ones)

Adds tracking to cluster family

  • Let's decide or put it off again 😅

@dranikpg dranikpg force-pushed the update-await-dispatches branch from 613588f to 0d60332 Compare November 15, 2023 16:14
@dranikpg dranikpg changed the title feat: DispatchTracker feat: DispatchTracker to replace everything Nov 15, 2023
@dranikpg dranikpg force-pushed the update-await-dispatches branch from f460e93 to d918f6c Compare November 15, 2023 17:04
@@ -396,4 +373,30 @@ ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
return pp->at(res_id);
}

DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
facade::Connection* issuer, bool ignore_paused)
: listeners_{listeners.begin(), listeners.end()},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make it safe to pass a single {this}, but if we just copy the span it becomes a dangling array... So either use a vector or include absl fixed array, but given how rare this operation is we can simply use a vector

Comment on lines +1284 to 1301
// Set global pause state and track commands that are running when the pause state is flipped.
// Exlude already paused commands from the busy count.
DispatchTracker tracker{GetListeners(), cntx->conn(), true /* ignore paused commands */};
service_.proactor_pool().Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
// it's impossible to deadlock on waiting for a command that will be paused.
tracker.TrackOnThread();
ServerState::tlocal()->SetPauseState(pause_state, true);
});
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I use this instead of the pausing-dispatch fix... Should work if we don't suspend before checking pause state

Comment on lines +505 to +509
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()};
auto cb = [&tracker, &new_config](util::ProactorBase* pb) {
tl_cluster_config = new_config;
tracker.TrackOnThread();
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can now pause writes when replacing the cluster config, to actually return an error without applying. wdyt?

Copy link
Contributor Author

@dranikpg dranikpg Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in

run {
  paused = true
  tracker.track_on_thread()
}

if not tracker.wait( 1s ): 
  return "Can't replace cluster config with constant ops running"

# no writes
run {
  tl_config = new_config
  paused = false
}

but why should we pause under heavy load without long running ops 😞 ?

One more option would be using a global tx, but that doesn't prevent write commands from still being scheduled

@dranikpg dranikpg force-pushed the update-await-dispatches branch 2 times, most recently from 142bdd5 to 80c011b Compare November 15, 2023 21:41
Comment on lines 1256 to 1264
// Stop accumulating when a pause is requested, fall back to regular dispatch
if (dfly::ServerState::tlocal()->IsPaused())
break;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic of await dispatches is the following: Every command that is dispatching during paused = false, will eventually conclude and will reply to our checkpoint request

That is why we have to abort early here to handle the checkpoint, and then we invoke non-squashed dispatch for the next command to become paused

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to break here or we can change the logic to pause inside InvokeCmd instead inside DispatchCommand and this way we will pause on squashed commands as well, the same change that I did in my last PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer breaking here because I don't know how to easily solve it the other way round.

As long as DispatchManyCommands is running, we have async_dispatch=true, even when it just switches commands. This implementation doesn't block clients, so we can only track running commands when we set the pause state. If we set paused during pipeline squashing and let it block, we will wait for it forever

We don't have this problem now, because we:

  • block clients
  • wait for squashing to finish (with AwaitCurrentDispatches)
  • don't have 100% guarantees with AwaitCurrentDispatches. We currently have two balancing ThisFiber::Yield calls in dragonfly_connection, so AwaitCurrentDispatches might see the connection fiber sleeping and not dispatching, even if it really has more work to do... And by the time we reply OK to the client, we could still be running commands that we missed (+ to feat: DispatchTracker to replace everything #2179 (comment))

@dranikpg dranikpg force-pushed the update-await-dispatches branch from 80c011b to c8d3dfb Compare November 15, 2023 22:02
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()};
auto cb = [&tracker, &new_config](util::ProactorBase* pb) {
tl_cluster_config = new_config;
tracker.TrackOnThread();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need TrackOnThread?
why TrackAll after AwaitFiberOnAll is finished to update the config on all threads is not good?

Copy link
Contributor Author

@dranikpg dranikpg Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do it correctly if it takes only a few lines 🙂


TrackAll after AwaitFiberOnAll is finished to update the config on all threads is not good

Because it's possibly waiting for totally different stuff to finish, including commands that would already be running with the new cluster config. The connection has spurious suspends (Yields()), so we might miss operations

@@ -1277,12 +1283,15 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,

// Dispatch non squashed command only after all squshed commands were executed and replied
DispatchCommand(args, cntx);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are calling DispatchCommand which in case of puase will make us wait. It could be that between the check above of IsPaused and this line the SetPause was called. So it will be safer to breake here incase IsPaused is true. I understand this flow is not triggered now as we call DispatchCommand here only for multi/eval, but if for some reason we will change the flow and call DispatchCommand in other cases this will create a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, true, good catch!

@dranikpg dranikpg force-pushed the update-await-dispatches branch from c8d3dfb to 35fac73 Compare November 27, 2023 19:12
@dranikpg
Copy link
Contributor Author

Fixed.

I don't like how DispatchManyCommands looks, will try to simplify it in the future 😢 For now I'd suggest moving on

@dranikpg dranikpg marked this pull request as ready for review November 28, 2023 06:52
adiholden
adiholden previously approved these changes Nov 28, 2023
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
@dranikpg
Copy link
Contributor Author

Fixed a bug: I forgot to remove AwaitPauseState from InvokeCmd() and check pause state when starting DispatchMany

@dranikpg dranikpg requested a review from adiholden November 29, 2023 07:36
@dranikpg
Copy link
Contributor Author

I'll merge only after 1.13 is cut

@dranikpg
Copy link
Contributor Author

dranikpg commented Dec 5, 2023

I see 1.13 is cut, let's give it some mileage on regtests

@dranikpg dranikpg merged commit 11ef662 into dragonflydb:main Dec 5, 2023
7 checks passed
@dranikpg dranikpg deleted the update-await-dispatches branch December 25, 2023 14:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants