From 35fac73bcb753b44be48356690d324267c95cde1 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Thu, 16 Nov 2023 00:33:13 +0300 Subject: [PATCH] fix: fix squashing :( Signed-off-by: Vladislav Oleshko --- src/facade/dragonfly_connection.cc | 31 +++++++++++++++++------------- src/facade/dragonfly_connection.h | 2 ++ src/facade/ok_main.cc | 3 ++- src/facade/service_interface.h | 4 +++- src/server/main_service.cc | 15 +++++++++++++-- src/server/main_service.h | 4 ++-- 6 files changed, 40 insertions(+), 19 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 7b08fde53cce..9b0d25c1c863 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -272,6 +272,7 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); self->last_interaction_ = time(nullptr); + self->skip_next_squashing_ = false; } void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) { @@ -960,24 +961,19 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_); vector squash_cmds; - vector squash_msgs; - squash_cmds.reserve(dispatch_q_.size()); - squash_msgs.reserve(dispatch_q_.size()); - while (!dispatch_q_.empty()) { - auto& msg = dispatch_q_.front(); + for (auto& msg : dispatch_q_) { CHECK(holds_alternative(msg.handle)) - << "Found " << msg.handle.index() << " on " << DebugInfo(); + << msg.handle.index() << " on " << DebugInfo(); - squash_msgs.push_back(std::move(std::get(msg.handle))); - squash_cmds.push_back(absl::MakeSpan(squash_msgs.back()->args)); - dispatch_q_.pop_front(); + auto& pmsg = get(msg.handle); + squash_cmds.push_back(absl::MakeSpan(pmsg->args)); } cc_->async_dispatch = true; - service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); + size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared builder->FlushBatch(); @@ -986,8 +982,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { cc_->async_dispatch = false; - for (auto& msg : squash_msgs) - RecycleMessage(MessageHandle{std::move(msg)}); + auto it = dispatch_q_.begin(); + while (it->IsIntrusive()) // Skip all newly received intrusive messages + ++it; + + for (auto rit = it; rit != it + dispatched; ++rit) + RecycleMessage(std::move(*rit)); + + dispatch_q_.erase(it, it + dispatched); + + // If interrupted due to pause, fall back to regular dispatch + skip_next_squashing_ = dispatched != squash_cmds.size(); } void Connection::ClearPipelinedMessages() { @@ -1068,7 +1073,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { bool squashing_enabled = squashing_threshold > 0; bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold; bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size(); - if (squashing_enabled && threshold_reached && are_all_plain_cmds) { + if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) { SquashPipeline(builder); } else { MessageHandle msg = move(dispatch_q_.front()); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index ac0de8c563df..b1952aa896e5 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -347,6 +347,8 @@ class Connection : public util::Connection { bool migration_enabled_; util::fb2::ProactorBase* migration_request_ = nullptr; + bool skip_next_squashing_ = false; // Forcefully skip next squashing + // Pooled pipeline messages per-thread // Aggregated while handling pipelines, gradually released while handling regular commands. static thread_local std::vector pipeline_req_pool_; diff --git a/src/facade/ok_main.cc b/src/facade/ok_main.cc index 0d4a5803b9b6..925977ebfe16 100644 --- a/src/facade/ok_main.cc +++ b/src/facade/ok_main.cc @@ -27,9 +27,10 @@ class OkService : public ServiceInterface { (*cntx)->SendOk(); } - void DispatchManyCommands(absl::Span args_lists, ConnectionContext* cntx) final { + size_t DispatchManyCommands(absl::Span args_lists, ConnectionContext* cntx) final { for (auto args : args_lists) DispatchCommand(args, cntx); + return args_lists.size(); } void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index 7f02604cc552..97420e2e1853 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -27,7 +27,9 @@ class ServiceInterface { virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0; - virtual void DispatchManyCommands(absl::Span args_list, ConnectionContext* cntx) = 0; + // Returns number of processed commands + virtual size_t DispatchManyCommands(absl::Span args_list, + ConnectionContext* cntx) = 0; virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, ConnectionContext* cntx) = 0; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index ad9cd931c50e..47ab80bc1050 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1228,14 +1228,16 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo return true; } -void Service::DispatchManyCommands(absl::Span args_list, - facade::ConnectionContext* cntx) { +size_t Service::DispatchManyCommands(absl::Span args_list, + facade::ConnectionContext* cntx) { ConnectionContext* dfly_cntx = static_cast(cntx); DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning()); vector stored_cmds; intrusive_ptr dist_trans; + size_t dispatched = 0; + auto perform_squash = [&] { if (stored_cmds.empty()) return; @@ -1251,10 +1253,16 @@ void Service::DispatchManyCommands(absl::Span args_list, dfly_cntx->transaction = dist_trans.get(); MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), dfly_cntx, this, true, false); dfly_cntx->transaction = nullptr; + + dispatched += stored_cmds.size(); stored_cmds.clear(); }; for (auto args : args_list) { + // Stop accumulating when a pause is requested, fall back to regular dispatch + if (dfly::ServerState::tlocal()->IsPaused()) + break; + ToUpper(&args[0]); const auto [cid, tail_args] = FindCmd(args); @@ -1281,12 +1289,15 @@ void Service::DispatchManyCommands(absl::Span args_list, // Dispatch non squashed command only after all squshed commands were executed and replied DispatchCommand(args, cntx); + dispatched++; } perform_squash(); if (dist_trans) dist_trans->UnlockMulti(); + + return dispatched; } void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, diff --git a/src/server/main_service.h b/src/server/main_service.h index a745f11851af..8a7559359723 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -48,8 +48,8 @@ class Service : public facade::ServiceInterface { void DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) final; // Execute multiple consecutive commands, possibly in parallel by squashing - void DispatchManyCommands(absl::Span args_list, - facade::ConnectionContext* cntx) final; + size_t DispatchManyCommands(absl::Span args_list, + facade::ConnectionContext* cntx) final; // Check VerifyCommandExecution and invoke command with args bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx,