diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 8f6074183799..ebd3cd353d22 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -266,6 +266,7 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); self->last_interaction_ = time(nullptr); + self->skip_next_squashing_ = false; } void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) { @@ -937,23 +938,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { DCHECK_EQ(dispatch_q_.size(), dispatch_q_cmds_count_); vector squash_cmds; - vector squash_msgs; - squash_cmds.reserve(dispatch_q_.size()); - squash_msgs.reserve(dispatch_q_.size()); - while (!dispatch_q_.empty()) { - auto& msg = dispatch_q_.front(); + for (auto& msg : dispatch_q_) { CHECK(holds_alternative(msg.handle)); - - squash_msgs.push_back(std::move(std::get(msg.handle))); - squash_cmds.push_back(absl::MakeSpan(squash_msgs.back()->args)); - dispatch_q_.pop_front(); + auto& pmsg = get(msg.handle); + squash_cmds.push_back(absl::MakeSpan(pmsg.args)); } cc_->async_dispatch = true; - service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); + size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get()); if (dispatch_q_cmds_count_ == squash_cmds.size()) { // Flush if no new commands appeared builder->FlushBatch(); @@ -962,8 +957,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) { cc_->async_dispatch = false; - for (auto& msg : squash_msgs) - RecycleMessage(MessageHandle{std::move(msg)}); + auto it = dispatch_q_.begin(); + while (it->IsIntrusive()) // Skip all newly received intrusive messages + ++it; + + for (auto rit = it; rit != it + dispatched; ++rit) + RecycleMessage(std::move(*rit)); + + dispatch_q_.erase(it, it + dispatched); + + // If interrupted due to pause, fall back to regular dispatch + skip_next_squashing_ = dispatched != squash_cmds.size(); } // DispatchFiber handles commands coming from the InputLoop. @@ -1008,7 +1012,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { bool squashing_enabled = squashing_threshold > 0; bool threshold_reached = dispatch_q_cmds_count_ > squashing_threshold; bool are_all_plain_cmds = dispatch_q_cmds_count_ == dispatch_q_.size(); - if (squashing_enabled && threshold_reached && are_all_plain_cmds) { + if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) { SquashPipeline(builder); } else { MessageHandle msg = move(dispatch_q_.front()); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 9010bcbedfac..f12695c6fb6d 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -332,6 +332,8 @@ class Connection : public util::Connection { bool migration_enabled_; util::fb2::ProactorBase* migration_request_ = nullptr; + bool skip_next_squashing_ = false; // Forcefully skip next squashing + // Pooled pipeline messages per-thread // Aggregated while handling pipelines, gradually released while handling regular commands. static thread_local std::vector pipeline_req_pool_; diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 20db05a63727..02e8e63d56ba 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -133,6 +133,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow sync_dispatch = false; journal_emulated = false; paused = false; + running_cmd = false; subscriptions = 0; } diff --git a/src/facade/ok_main.cc b/src/facade/ok_main.cc index 0d4a5803b9b6..925977ebfe16 100644 --- a/src/facade/ok_main.cc +++ b/src/facade/ok_main.cc @@ -27,9 +27,10 @@ class OkService : public ServiceInterface { (*cntx)->SendOk(); } - void DispatchManyCommands(absl::Span args_lists, ConnectionContext* cntx) final { + size_t DispatchManyCommands(absl::Span args_lists, ConnectionContext* cntx) final { for (auto args : args_lists) DispatchCommand(args, cntx); + return args_lists.size(); } void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index 7f02604cc552..97420e2e1853 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -27,7 +27,9 @@ class ServiceInterface { virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0; - virtual void DispatchManyCommands(absl::Span args_list, ConnectionContext* cntx) = 0; + // Returns number of processed commands + virtual size_t DispatchManyCommands(absl::Span args_list, + ConnectionContext* cntx) = 0; virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, ConnectionContext* cntx) = 0; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index d05fc311a7ca..304836cb4f13 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1225,14 +1225,16 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo return true; } -void Service::DispatchManyCommands(absl::Span args_list, - facade::ConnectionContext* cntx) { +size_t Service::DispatchManyCommands(absl::Span args_list, + facade::ConnectionContext* cntx) { ConnectionContext* dfly_cntx = static_cast(cntx); DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning()); vector stored_cmds; intrusive_ptr dist_trans; + size_t dispatched = 0; + auto perform_squash = [&] { if (stored_cmds.empty()) return; @@ -1245,10 +1247,16 @@ void Service::DispatchManyCommands(absl::Span args_list, dfly_cntx->transaction = dist_trans.get(); MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), dfly_cntx, this, true, false); dfly_cntx->transaction = nullptr; + + dispatched += stored_cmds.size(); stored_cmds.clear(); }; for (auto args : args_list) { + // Stop accumulating when a pause is requested, fall back to regular dispatch + if (dfly::ServerState::tlocal()->IsPaused()) + break; + ToUpper(&args[0]); const auto [cid, tail_args] = FindCmd(args); @@ -1263,9 +1271,8 @@ void Service::DispatchManyCommands(absl::Span args_list, // invocations, we can potentially execute multiple eval in parallel, which is very powerful // paired with shardlocal eval const bool is_eval = CO::IsEvalKind(ArgS(args, 0)); - const bool is_pause = dfly::ServerState::tlocal()->IsPaused(); - if (!is_multi && !is_eval && cid != nullptr && !is_pause) { + if (!is_multi && !is_eval && cid != nullptr) { stored_cmds.reserve(args_list.size()); stored_cmds.emplace_back(cid, tail_args); continue; @@ -1276,12 +1283,15 @@ void Service::DispatchManyCommands(absl::Span args_list, // Dispatch non squashed command only after all squshed commands were executed and replied DispatchCommand(args, cntx); + dispatched++; } perform_squash(); if (dist_trans) dist_trans->UnlockMulti(); + + return dispatched; } void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, diff --git a/src/server/main_service.h b/src/server/main_service.h index 190cf40c7f69..5a63da97a7a5 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -48,8 +48,8 @@ class Service : public facade::ServiceInterface { void DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) final; // Execute multiple consecutive commands, possibly in parallel by squashing - void DispatchManyCommands(absl::Span args_list, - facade::ConnectionContext* cntx) final; + size_t DispatchManyCommands(absl::Span args_list, + facade::ConnectionContext* cntx) final; // Check VerifyCommandExecution and invoke command with args bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx,