Skip to content

Commit

Permalink
fix: fix squashing :(
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Nov 15, 2023
1 parent 7092a47 commit 80c011b
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 21 deletions.
30 changes: 17 additions & 13 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg

self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
}

void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) {
Expand Down Expand Up @@ -937,23 +938,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
DCHECK_EQ(dispatch_q_.size(), dispatch_q_cmds_count_);

vector<CmdArgList> squash_cmds;
vector<PipelineMessagePtr> squash_msgs;

squash_cmds.reserve(dispatch_q_.size());
squash_msgs.reserve(dispatch_q_.size());

while (!dispatch_q_.empty()) {
auto& msg = dispatch_q_.front();
for (auto& msg : dispatch_q_) {
CHECK(holds_alternative<PipelineMessagePtr>(msg.handle));

squash_msgs.push_back(std::move(std::get<PipelineMessagePtr>(msg.handle)));
squash_cmds.push_back(absl::MakeSpan(squash_msgs.back()->args));
dispatch_q_.pop_front();
auto& pmsg = get<PipelineMessage>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg.args));
}

cc_->async_dispatch = true;

service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());

if (dispatch_q_cmds_count_ == squash_cmds.size()) { // Flush if no new commands appeared
builder->FlushBatch();
Expand All @@ -962,8 +957,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {

cc_->async_dispatch = false;

for (auto& msg : squash_msgs)
RecycleMessage(MessageHandle{std::move(msg)});
auto it = dispatch_q_.begin();
while (it->IsIntrusive()) // Skip all newly received intrusive messages
++it;

for (auto rit = it; rit != it + dispatched; ++rit)
RecycleMessage(std::move(*rit));

dispatch_q_.erase(it, it + dispatched);

// If interrupted due to pause, fall back to regular dispatch
skip_next_squashing_ = dispatched != squash_cmds.size();
}

// DispatchFiber handles commands coming from the InputLoop.
Expand Down Expand Up @@ -1008,7 +1012,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
bool squashing_enabled = squashing_threshold > 0;
bool threshold_reached = dispatch_q_cmds_count_ > squashing_threshold;
bool are_all_plain_cmds = dispatch_q_cmds_count_ == dispatch_q_.size();
if (squashing_enabled && threshold_reached && are_all_plain_cmds) {
if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) {
SquashPipeline(builder);
} else {
MessageHandle msg = move(dispatch_q_.front());
Expand Down
2 changes: 2 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ class Connection : public util::Connection {
bool migration_enabled_;
util::fb2::ProactorBase* migration_request_ = nullptr;

bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Pooled pipeline messages per-thread
// Aggregated while handling pipelines, gradually released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
Expand Down
1 change: 1 addition & 0 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
sync_dispatch = false;
journal_emulated = false;
paused = false;
running_cmd = false;

subscriptions = 0;
}
Expand Down
3 changes: 2 additions & 1 deletion src/facade/ok_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class OkService : public ServiceInterface {
(*cntx)->SendOk();
}

void DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
size_t DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
for (auto args : args_lists)
DispatchCommand(args, cntx);
return args_lists.size();
}

void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
Expand Down
4 changes: 3 additions & 1 deletion src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class ServiceInterface {

virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0;

virtual void DispatchManyCommands(absl::Span<CmdArgList> args_list, ConnectionContext* cntx) = 0;
// Returns number of processed commands
virtual size_t DispatchManyCommands(absl::Span<CmdArgList> args_list,
ConnectionContext* cntx) = 0;

virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) = 0;
Expand Down
18 changes: 14 additions & 4 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1225,14 +1225,16 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
return true;
}

void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) {
size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) {
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning());

vector<StoredCmd> stored_cmds;
intrusive_ptr<Transaction> dist_trans;

size_t dispatched = 0;

auto perform_squash = [&] {
if (stored_cmds.empty())
return;
Expand All @@ -1245,10 +1247,16 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
dfly_cntx->transaction = dist_trans.get();
MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), dfly_cntx, this, true, false);
dfly_cntx->transaction = nullptr;

dispatched += stored_cmds.size();
stored_cmds.clear();
};

for (auto args : args_list) {
// Stop accumulating when a pause is requested, fall back to regular dispatch
if (dfly::ServerState::tlocal()->IsPaused())
break;

ToUpper(&args[0]);
const auto [cid, tail_args] = FindCmd(args);

Expand All @@ -1263,9 +1271,8 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
// invocations, we can potentially execute multiple eval in parallel, which is very powerful
// paired with shardlocal eval
const bool is_eval = CO::IsEvalKind(ArgS(args, 0));
const bool is_pause = dfly::ServerState::tlocal()->IsPaused();

if (!is_multi && !is_eval && cid != nullptr && !is_pause) {
if (!is_multi && !is_eval && cid != nullptr) {
stored_cmds.reserve(args_list.size());
stored_cmds.emplace_back(cid, tail_args);
continue;
Expand All @@ -1276,12 +1283,15 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,

// Dispatch non squashed command only after all squshed commands were executed and replied
DispatchCommand(args, cntx);
dispatched++;
}

perform_squash();

if (dist_trans)
dist_trans->UnlockMulti();

return dispatched;
}

void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
Expand Down
4 changes: 2 additions & 2 deletions src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ class Service : public facade::ServiceInterface {
void DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) final;

// Execute multiple consecutive commands, possibly in parallel by squashing
void DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) final;
size_t DispatchManyCommands(absl::Span<CmdArgList> args_list,
facade::ConnectionContext* cntx) final;

// Check VerifyCommandExecution and invoke command with args
bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx,
Expand Down

0 comments on commit 80c011b

Please sign in to comment.