From 5cc767d89f54475cf3bd68896c969e252fd891be Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Wed, 15 Nov 2023 23:12:13 +0300 Subject: [PATCH] fix: fix2 + enable pause test Signed-off-by: Vladislav Oleshko --- src/facade/dragonfly_connection.h | 2 +- src/facade/dragonfly_listener.h | 7 +++++-- src/server/main_service.cc | 5 ++--- src/server/server_family.cc | 2 ++ src/server/server_state.h | 2 -- tests/dragonfly/replication_test.py | 1 + 6 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 98d08b83e87b..ac0de8c563df 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -155,7 +155,7 @@ class Connection : public util::Connection { void SendAclUpdateAsync(AclUpdateMessage msg); // If any dispatch is currently in progress, increment counter and send checkpoint message to - // decrement it once finished. + // decrement it once finished. It ignore_paused is true, paused dispatches are ignored. void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false); // Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not diff --git a/src/facade/dragonfly_listener.h b/src/facade/dragonfly_listener.h index a2baf539f8fe..163fa435f2ed 100644 --- a/src/facade/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -82,7 +82,10 @@ class Listener : public util::ListenerInterface { }; // Dispatch tracker allows tracking the dispatch state of connections and blocking until all -// detected busy connections finished dispatching. Ignores issuer if set. +// detected busy connections finished dispatching. Ignores issuer connection. +// +// Mostly used to detect when global state changes (takeover, pause, cluster config update) are +// visible to all commands and no commands are still running according to the old state / config. class DispatchTracker { public: DispatchTracker(absl::Span, facade::Connection* issuer = nullptr, @@ -93,7 +96,7 @@ class DispatchTracker { // Wait until all tracked connections finished dispatching. // Returns true on success, false if timeout was reached. - bool Wait(absl::Duration); + bool Wait(absl::Duration timeout); private: void Handle(unsigned thread_index, util::Connection* conn); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 26161bdba7fe..ad9cd931c50e 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1059,9 +1059,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) bool dispatching_in_multi = under_script || under_multi; if (VLOG_IS_ON(2) && cntx->conn() /* no owner in replica context */) { - const char* lua = under_script ? "LUA " : ""; - LOG(INFO) << "Got (" << cntx->conn()->GetClientId() << "): " << lua << args - << " in dbid=" << dfly_cntx->conn_state.db_index; + LOG(INFO) << "Got (" << cntx->conn()->GetClientId() << "): " << (under_script ? "LUA " : "") + << args << " in dbid=" << dfly_cntx->conn_state.db_index; } if (!dispatching_in_multi) { // Don't interrupt running multi commands diff --git a/src/server/server_family.cc b/src/server/server_family.cc index beaf49d96c36..288ec206fa62 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1294,6 +1294,8 @@ void ServerFamily::ClientPause(CmdArgList args, ConnectionContext* cntx) { // Exlude already paused commands from the busy count. DispatchTracker tracker{GetListeners(), cntx->conn(), true /* ignore paused commands */}; service_.proactor_pool().Await([&tracker, pause_state](util::ProactorBase* pb) { + // Commands don't suspend before checking the pause state, so + // it's impossible to deadlock on waiting for a command that will be paused. tracker.TrackOnThread(); ServerState::tlocal()->SetPauseState(pause_state, true); }); diff --git a/src/server/server_state.h b/src/server/server_state.h index a74850134eb1..47d17f0179fa 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -254,8 +254,6 @@ class ServerState { // public struct - to allow initialization. // notified when the break is over. int client_pauses_[2] = {}; EventCount client_pause_ec_; - bool pause_dispatch_ = false; - EventCount pause_dispatch_ec_; using Counter = util::SlidingCounter<7>; Counter qps_; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 6111e5976a0c..36146988a991 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1763,6 +1763,7 @@ async def test_search(df_local_factory): ].id == "k0" +# @pytest.mark.slow @pytest.mark.asyncio async def test_client_pause_with_replica(df_local_factory, df_seeder_factory): master = df_local_factory.create(proactor_threads=4)