Skip to content

Commit

Permalink
fix: fix squashing :(
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Nov 27, 2023
1 parent 5cc767d commit 35fac73
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 19 deletions.
31 changes: 18 additions & 13 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg

self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
}

void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) {
Expand Down Expand Up @@ -960,24 +961,19 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);

vector<CmdArgList> squash_cmds;
vector<PipelineMessagePtr> squash_msgs;

squash_cmds.reserve(dispatch_q_.size());
squash_msgs.reserve(dispatch_q_.size());

while (!dispatch_q_.empty()) {
auto& msg = dispatch_q_.front();
for (auto& msg : dispatch_q_) {
CHECK(holds_alternative<PipelineMessagePtr>(msg.handle))
<< "Found " << msg.handle.index() << " on " << DebugInfo();
<< msg.handle.index() << " on " << DebugInfo();

squash_msgs.push_back(std::move(std::get<PipelineMessagePtr>(msg.handle)));
squash_cmds.push_back(absl::MakeSpan(squash_msgs.back()->args));
dispatch_q_.pop_front();
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg->args));
}

cc_->async_dispatch = true;

service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());

if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
builder->FlushBatch();
Expand All @@ -986,8 +982,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {

cc_->async_dispatch = false;

for (auto& msg : squash_msgs)
RecycleMessage(MessageHandle{std::move(msg)});
auto it = dispatch_q_.begin();
while (it->IsIntrusive()) // Skip all newly received intrusive messages
++it;

for (auto rit = it; rit != it + dispatched; ++rit)
RecycleMessage(std::move(*rit));

dispatch_q_.erase(it, it + dispatched);

// If interrupted due to pause, fall back to regular dispatch
skip_next_squashing_ = dispatched != squash_cmds.size();
}

void Connection::ClearPipelinedMessages() {
Expand Down Expand Up @@ -1068,7 +1073,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
bool squashing_enabled = squashing_threshold > 0;
bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold;
bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size();
if (squashing_enabled && threshold_reached && are_all_plain_cmds) {
if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) {
SquashPipeline(builder);
} else {
MessageHandle msg = move(dispatch_q_.front());
Expand Down
2 changes: 2 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ class Connection : public util::Connection {
bool migration_enabled_;
util::fb2::ProactorBase* migration_request_ = nullptr;

bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Pooled pipeline messages per-thread
// Aggregated while handling pipelines, gradually released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
Expand Down
3 changes: 2 additions & 1 deletion src/facade/ok_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class OkService : public ServiceInterface {
(*cntx)->SendOk();
}

void DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
size_t DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
for (auto args : args_lists)
DispatchCommand(args, cntx);
return args_lists.size();
}

void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
Expand Down
4 changes: 3 additions & 1 deletion src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class ServiceInterface {

virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0;

virtual void DispatchManyCommands(absl::Span<CmdArgList> args_list, ConnectionContext* cntx) = 0;
// Returns number of processed commands
virtual size_t DispatchManyCommands(absl::Span<CmdArgList> args_list,
ConnectionContext* cntx) = 0;

virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) = 0;
Expand Down
15 changes: 13 additions & 2 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1228,14 +1228,16 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
return true;
}

void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) {
size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) {
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning());

vector<StoredCmd> stored_cmds;
intrusive_ptr<Transaction> dist_trans;

size_t dispatched = 0;

auto perform_squash = [&] {
if (stored_cmds.empty())
return;
Expand All @@ -1251,10 +1253,16 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
dfly_cntx->transaction = dist_trans.get();
MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), dfly_cntx, this, true, false);
dfly_cntx->transaction = nullptr;

dispatched += stored_cmds.size();
stored_cmds.clear();
};

for (auto args : args_list) {
// Stop accumulating when a pause is requested, fall back to regular dispatch
if (dfly::ServerState::tlocal()->IsPaused())
break;

ToUpper(&args[0]);
const auto [cid, tail_args] = FindCmd(args);

Expand All @@ -1281,12 +1289,15 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,

// Dispatch non squashed command only after all squshed commands were executed and replied
DispatchCommand(args, cntx);
dispatched++;
}

perform_squash();

if (dist_trans)
dist_trans->UnlockMulti();

return dispatched;
}

void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
Expand Down
4 changes: 2 additions & 2 deletions src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class Service : public facade::ServiceInterface {
void DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) final;

// Execute multiple consecutive commands, possibly in parallel by squashing
void DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) final;
size_t DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) final;

// Check VerifyCommandExecution and invoke command with args
bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx,
Expand Down

0 comments on commit 35fac73

Please sign in to comment.