Skip to content

Commit

Permalink
fix: fix2 + enable pause test
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Nov 27, 2023
1 parent dc93c3d commit 5cc767d
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class Connection : public util::Connection {
void SendAclUpdateAsync(AclUpdateMessage msg);

// If any dispatch is currently in progress, increment counter and send checkpoint message to
// decrement it once finished.
// decrement it once finished. It ignore_paused is true, paused dispatches are ignored.
void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false);

// Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not
Expand Down
7 changes: 5 additions & 2 deletions src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ class Listener : public util::ListenerInterface {
};

// Dispatch tracker allows tracking the dispatch state of connections and blocking until all
// detected busy connections finished dispatching. Ignores issuer if set.
// detected busy connections finished dispatching. Ignores issuer connection.
//
// Mostly used to detect when global state changes (takeover, pause, cluster config update) are
// visible to all commands and no commands are still running according to the old state / config.
class DispatchTracker {
public:
DispatchTracker(absl::Span<facade::Listener* const>, facade::Connection* issuer = nullptr,
Expand All @@ -93,7 +96,7 @@ class DispatchTracker {

// Wait until all tracked connections finished dispatching.
// Returns true on success, false if timeout was reached.
bool Wait(absl::Duration);
bool Wait(absl::Duration timeout);

private:
void Handle(unsigned thread_index, util::Connection* conn);
Expand Down
5 changes: 2 additions & 3 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1059,9 +1059,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
bool dispatching_in_multi = under_script || under_multi;

if (VLOG_IS_ON(2) && cntx->conn() /* no owner in replica context */) {
const char* lua = under_script ? "LUA " : "";
LOG(INFO) << "Got (" << cntx->conn()->GetClientId() << "): " << lua << args
<< " in dbid=" << dfly_cntx->conn_state.db_index;
LOG(INFO) << "Got (" << cntx->conn()->GetClientId() << "): " << (under_script ? "LUA " : "")
<< args << " in dbid=" << dfly_cntx->conn_state.db_index;
}

if (!dispatching_in_multi) { // Don't interrupt running multi commands
Expand Down
2 changes: 2 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,8 @@ void ServerFamily::ClientPause(CmdArgList args, ConnectionContext* cntx) {
// Exlude already paused commands from the busy count.
DispatchTracker tracker{GetListeners(), cntx->conn(), true /* ignore paused commands */};
service_.proactor_pool().Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
// it's impossible to deadlock on waiting for a command that will be paused.
tracker.TrackOnThread();
ServerState::tlocal()->SetPauseState(pause_state, true);
});
Expand Down
2 changes: 0 additions & 2 deletions src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ class ServerState { // public struct - to allow initialization.
// notified when the break is over.
int client_pauses_[2] = {};
EventCount client_pause_ec_;
bool pause_dispatch_ = false;
EventCount pause_dispatch_ec_;

using Counter = util::SlidingCounter<7>;
Counter qps_;
Expand Down
1 change: 1 addition & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1763,6 +1763,7 @@ async def test_search(df_local_factory):
].id == "k0"


# @pytest.mark.slow
@pytest.mark.asyncio
async def test_client_pause_with_replica(df_local_factory, df_seeder_factory):
master = df_local_factory.create(proactor_threads=4)
Expand Down

0 comments on commit 5cc767d

Please sign in to comment.