Skip to content

Commit

Permalink
feat: DispatchTracker to replace everything (#2179)
Browse files Browse the repository at this point in the history
* feat: DispatchTracker

Use a DispatchTracker to track ongoing dispatches for commands that change global state

---------

Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg authored Dec 5, 2023
1 parent f39eac5 commit 11ef662
Show file tree
Hide file tree
Showing 17 changed files with 165 additions and 143 deletions.
1 change: 1 addition & 0 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ConnectionContext {
bool async_dispatch : 1; // whether this connection is amid an async dispatch
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused : 1; // whether this connection is paused due to CLIENT PAUSE

// How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions;
Expand Down
43 changes: 28 additions & 15 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,16 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg

self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
}

void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) {
// no-op
}

void Connection::DispatchOperations::operator()(CheckpointMessage msg) {
VLOG(1) << "Decremented checkpoint at " << self->DebugInfo();

msg.bc.Dec();
}

Expand Down Expand Up @@ -881,7 +884,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;
service_->AwaitOnPauseDispatch();

if (redis_parser_) {
parse_status = ParseRedis(orig_builder);
} else {
Expand Down Expand Up @@ -971,24 +974,19 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);

vector<CmdArgList> squash_cmds;
vector<PipelineMessagePtr> squash_msgs;

squash_cmds.reserve(dispatch_q_.size());
squash_msgs.reserve(dispatch_q_.size());

while (!dispatch_q_.empty()) {
auto& msg = dispatch_q_.front();
for (auto& msg : dispatch_q_) {
CHECK(holds_alternative<PipelineMessagePtr>(msg.handle))
<< "Found " << msg.handle.index() << " on " << DebugInfo();
<< msg.handle.index() << " on " << DebugInfo();

squash_msgs.push_back(std::move(std::get<PipelineMessagePtr>(msg.handle)));
squash_cmds.push_back(absl::MakeSpan(squash_msgs.back()->args));
dispatch_q_.pop_front();
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg->args));
}

cc_->async_dispatch = true;

service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());

if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
builder->FlushBatch();
Expand All @@ -997,8 +995,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {

cc_->async_dispatch = false;

for (auto& msg : squash_msgs)
RecycleMessage(MessageHandle{std::move(msg)});
auto it = dispatch_q_.begin();
while (it->IsIntrusive()) // Skip all newly received intrusive messages
++it;

for (auto rit = it; rit != it + dispatched; ++rit)
RecycleMessage(std::move(*rit));

dispatch_q_.erase(it, it + dispatched);

// If interrupted due to pause, fall back to regular dispatch
skip_next_squashing_ = dispatched != squash_cmds.size();
}

void Connection::ClearPipelinedMessages() {
Expand All @@ -1020,6 +1027,7 @@ void Connection::ClearPipelinedMessages() {
std::string Connection::DebugInfo() const {
std::string info = "{";

absl::StrAppend(&info, "address=", uint64_t(this), ", ");
absl::StrAppend(&info, "phase=", phase_, ", ");
absl::StrAppend(&info, "dispatch(s/a)=", cc_->sync_dispatch, " ", cc_->async_dispatch, ", ");
absl::StrAppend(&info, "closing=", cc_->conn_closing, ", ");
Expand Down Expand Up @@ -1079,7 +1087,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
bool squashing_enabled = squashing_threshold > 0;
bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold;
bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size();
if (squashing_enabled && threshold_reached && are_all_plain_cmds) {
if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) {
SquashPipeline(builder);
} else {
MessageHandle msg = move(dispatch_q_.front());
Expand Down Expand Up @@ -1197,10 +1205,15 @@ void Connection::SendAclUpdateAsync(AclUpdateMessage msg) {
SendAsync({make_unique<AclUpdateMessage>(std::move(msg))});
}

void Connection::SendCheckpoint(fb2::BlockingCounter bc) {
void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused) {
if (!IsCurrentlyDispatching())
return;

if (cc_->paused && !ignore_paused)
return;

VLOG(1) << "Sent checkpoint to " << DebugInfo();

bc.Add(1);
SendAsync({CheckpointMessage{bc}});
}
Expand Down
6 changes: 4 additions & 2 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ class Connection : public util::Connection {
void SendAclUpdateAsync(AclUpdateMessage msg);

// If any dispatch is currently in progress, increment counter and send checkpoint message to
// decrement it once finished.
void SendCheckpoint(util::fb2::BlockingCounter bc);
// decrement it once finished. It ignore_paused is true, paused dispatches are ignored.
void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false);

// Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not
// reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag.
Expand Down Expand Up @@ -348,6 +348,8 @@ class Connection : public util::Connection {
bool migration_enabled_;
util::fb2::ProactorBase* migration_request_ = nullptr;

bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Pooled pipeline messages per-thread
// Aggregated while handling pipelines, gradually released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
Expand Down
59 changes: 31 additions & 28 deletions src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <memory>

#include "absl/functional/bind_front.h"
#include "facade/tls_error.h"

#ifdef DFLY_USE_SSL
Expand Down Expand Up @@ -234,32 +235,6 @@ void Listener::PreAcceptLoop(util::ProactorBase* pb) {
per_thread_.resize(pool()->size());
}

bool Listener::AwaitCurrentDispatches(absl::Duration timeout, util::Connection* issuer) {
// Fill blocking counter with ongoing dispatches
util::fb2::BlockingCounter bc{0};
this->TraverseConnections([bc, issuer](unsigned thread_index, util::Connection* conn) {
if (conn != issuer)
static_cast<Connection*>(conn)->SendCheckpoint(bc);
});

auto cancelled = make_shared<bool>(false);

// TODO: Add wait with timeout or polling to helio (including cancel flag)
util::MakeFiber([bc, cancelled = weak_ptr{cancelled}, start = absl::Now(), timeout]() mutable {
while (!cancelled.expired()) {
if (absl::Now() - start > timeout) {
VLOG(1) << "AwaitCurrentDispatches timed out";
*cancelled.lock() = true; // same thread, no promotion race
bc.Cancel();
}
ThisFiber::SleepFor(10ms);
}
}).Detach();

bc.Wait();
return !*cancelled;
}

bool Listener::IsPrivilegedInterface() const {
return role_ == Role::PRIVILEGED;
}
Expand All @@ -276,8 +251,10 @@ void Listener::PreShutdown() {
// This shouldn't take a long time: All clients should reject incoming commands
// at this stage since we're in SHUTDOWN mode.
// If a command is running for too long we give up and proceed.
const absl::Duration kDispatchShutdownTimeout = absl::Milliseconds(10);
if (!AwaitCurrentDispatches(kDispatchShutdownTimeout, nullptr)) {
DispatchTracker tracker{{this}};
tracker.TrackAll();

if (!tracker.Wait(absl::Milliseconds(10))) {
LOG(WARNING) << "Some commands are still being dispatched but didn't conclude in time. "
"Proceeding in shutdown.";
}
Expand Down Expand Up @@ -396,4 +373,30 @@ ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
return pp->at(res_id);
}

DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
facade::Connection* issuer, bool ignore_paused)
: listeners_{listeners.begin(), listeners.end()},
issuer_{issuer},
ignore_paused_{ignore_paused} {
}

void DispatchTracker::TrackOnThread() {
for (auto* listener : listeners_)
listener->TraverseConnectionsOnThread(absl::bind_front(&DispatchTracker::Handle, this));
}

bool DispatchTracker::Wait(absl::Duration duration) {
return bc_.WaitFor(absl::ToChronoMilliseconds(duration));
}

void DispatchTracker::TrackAll() {
for (auto* listener : listeners_)
listener->TraverseConnections(absl::bind_front(&DispatchTracker::Handle, this));
}

void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) {
if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_)
fconn->SendCheckpoint(bc_, ignore_paused_);
}

} // namespace facade
28 changes: 28 additions & 0 deletions src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <absl/base/internal/spinlock.h>
#include <absl/time/time.h>

#include <memory>
#include <system_error>
Expand All @@ -22,6 +23,7 @@ typedef struct ssl_ctx_st SSL_CTX;
namespace facade {

class ServiceInterface;
class Connection;

class Listener : public util::ListenerInterface {
public:
Expand Down Expand Up @@ -79,4 +81,30 @@ class Listener : public util::ListenerInterface {
SSL_CTX* ctx_ = nullptr;
};

// Dispatch tracker allows tracking the dispatch state of connections and blocking until all
// detected busy connections finished dispatching. Ignores issuer connection.
//
// Mostly used to detect when global state changes (takeover, pause, cluster config update) are
// visible to all commands and no commands are still running according to the old state / config.
class DispatchTracker {
public:
DispatchTracker(absl::Span<facade::Listener* const>, facade::Connection* issuer = nullptr,
bool ignore_paused = false);

void TrackAll(); // Track busy connection on all threads
void TrackOnThread(); // Track busy connections on current thread

// Wait until all tracked connections finished dispatching.
// Returns true on success, false if timeout was reached.
bool Wait(absl::Duration timeout);

private:
void Handle(unsigned thread_index, util::Connection* conn);

std::vector<facade::Listener*> listeners_;
facade::Connection* issuer_;
util::fb2::BlockingCounter bc_{0};
bool ignore_paused_;
};

} // namespace facade
1 change: 1 addition & 0 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
async_dispatch = false;
sync_dispatch = false;
journal_emulated = false;
paused = false;

subscriptions = 0;
}
Expand Down
7 changes: 2 additions & 5 deletions src/facade/ok_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class OkService : public ServiceInterface {
(*cntx)->SendOk();
}

void DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
size_t DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
for (auto args : args_lists)
DispatchCommand(args, cntx);
return args_lists.size();
}

void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
Expand All @@ -44,10 +45,6 @@ class OkService : public ServiceInterface {
ConnectionStats* GetThreadLocalConnectionStats() final {
return &tl_stats;
}

void AwaitOnPauseDispatch() {
return;
}
};

void RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
Expand Down
5 changes: 3 additions & 2 deletions src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ class ServiceInterface {

virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0;

virtual void DispatchManyCommands(absl::Span<CmdArgList> args_list, ConnectionContext* cntx) = 0;
// Returns number of processed commands
virtual size_t DispatchManyCommands(absl::Span<CmdArgList> args_list,
ConnectionContext* cntx) = 0;

virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) = 0;

virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0;

virtual ConnectionStats* GetThreadLocalConnectionStats() = 0;
virtual void AwaitOnPauseDispatch() = 0;

virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
}
Expand Down
10 changes: 9 additions & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,18 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
before = tl_cluster_config->GetOwnedSlots();
}

auto cb = [&](util::ProactorBase* pb) { tl_cluster_config = new_config; };
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()};
auto cb = [&tracker, &new_config](util::ProactorBase* pb) {
tl_cluster_config = new_config;
tracker.TrackOnThread();
};
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);

if (!tracker.Wait(absl::Seconds(1))) {
LOG(WARNING) << "Cluster config change timed out";
}

SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = GetDeletedSlots(is_first_config, before, after);
Expand Down
5 changes: 4 additions & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,10 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {

// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
// after this function exits but before the actual shutdown.
if (!sf_->AwaitCurrentDispatches(timeout_dur, cntx->conn())) {
facade::DispatchTracker tracker{sf_->GetListeners(), cntx->conn()};
tracker.TrackAll();

if (!tracker.Wait(timeout_dur)) {
LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur;
status = OpStatus::TIMED_OUT;
}
Expand Down
Loading

0 comments on commit 11ef662

Please sign in to comment.