Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DispatchTracker to replace everything #2179

Merged
merged 7 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class ConnectionContext {
bool async_dispatch : 1; // whether this connection is amid an async dispatch
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused : 1; // whether this connection is paused due to CLIENT PAUSE

// How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions;
Expand Down
43 changes: 28 additions & 15 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,16 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg

self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
}

void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) {
// no-op
}

void Connection::DispatchOperations::operator()(CheckpointMessage msg) {
VLOG(1) << "Decremented checkpoint at " << self->DebugInfo();

msg.bc.Dec();
}

Expand Down Expand Up @@ -870,7 +873,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;
service_->AwaitOnPauseDispatch();

if (redis_parser_) {
parse_status = ParseRedis(orig_builder);
} else {
Expand Down Expand Up @@ -960,24 +963,19 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);

vector<CmdArgList> squash_cmds;
vector<PipelineMessagePtr> squash_msgs;

squash_cmds.reserve(dispatch_q_.size());
squash_msgs.reserve(dispatch_q_.size());

while (!dispatch_q_.empty()) {
auto& msg = dispatch_q_.front();
for (auto& msg : dispatch_q_) {
CHECK(holds_alternative<PipelineMessagePtr>(msg.handle))
<< "Found " << msg.handle.index() << " on " << DebugInfo();
<< msg.handle.index() << " on " << DebugInfo();

squash_msgs.push_back(std::move(std::get<PipelineMessagePtr>(msg.handle)));
squash_cmds.push_back(absl::MakeSpan(squash_msgs.back()->args));
dispatch_q_.pop_front();
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg->args));
}

cc_->async_dispatch = true;

service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());

if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
builder->FlushBatch();
Expand All @@ -986,8 +984,17 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {

cc_->async_dispatch = false;

for (auto& msg : squash_msgs)
RecycleMessage(MessageHandle{std::move(msg)});
auto it = dispatch_q_.begin();
while (it->IsIntrusive()) // Skip all newly received intrusive messages
++it;

for (auto rit = it; rit != it + dispatched; ++rit)
RecycleMessage(std::move(*rit));

dispatch_q_.erase(it, it + dispatched);

// If interrupted due to pause, fall back to regular dispatch
skip_next_squashing_ = dispatched != squash_cmds.size();
}

void Connection::ClearPipelinedMessages() {
Expand All @@ -1009,6 +1016,7 @@ void Connection::ClearPipelinedMessages() {
std::string Connection::DebugInfo() const {
std::string info = "{";

absl::StrAppend(&info, "address=", uint64_t(this), ", ");
absl::StrAppend(&info, "phase=", phase_, ", ");
absl::StrAppend(&info, "dispatch(s/a)=", cc_->sync_dispatch, " ", cc_->async_dispatch, ", ");
absl::StrAppend(&info, "closing=", cc_->conn_closing, ", ");
Expand Down Expand Up @@ -1068,7 +1076,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
bool squashing_enabled = squashing_threshold > 0;
bool threshold_reached = pending_pipeline_cmd_cnt_ > squashing_threshold;
bool are_all_plain_cmds = pending_pipeline_cmd_cnt_ == dispatch_q_.size();
if (squashing_enabled && threshold_reached && are_all_plain_cmds) {
if (squashing_enabled && threshold_reached && are_all_plain_cmds && !skip_next_squashing_) {
SquashPipeline(builder);
} else {
MessageHandle msg = move(dispatch_q_.front());
Expand Down Expand Up @@ -1186,10 +1194,15 @@ void Connection::SendAclUpdateAsync(AclUpdateMessage msg) {
SendAsync({make_unique<AclUpdateMessage>(std::move(msg))});
}

void Connection::SendCheckpoint(fb2::BlockingCounter bc) {
void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused) {
if (!IsCurrentlyDispatching())
return;

if (cc_->paused && !ignore_paused)
return;

VLOG(1) << "Sent checkpoint to " << DebugInfo();

bc.Add(1);
SendAsync({CheckpointMessage{bc}});
}
Expand Down
6 changes: 4 additions & 2 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ class Connection : public util::Connection {
void SendAclUpdateAsync(AclUpdateMessage msg);

// If any dispatch is currently in progress, increment counter and send checkpoint message to
// decrement it once finished.
void SendCheckpoint(util::fb2::BlockingCounter bc);
// decrement it once finished. It ignore_paused is true, paused dispatches are ignored.
void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false);

// Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not
// reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag.
Expand Down Expand Up @@ -347,6 +347,8 @@ class Connection : public util::Connection {
bool migration_enabled_;
util::fb2::ProactorBase* migration_request_ = nullptr;

bool skip_next_squashing_ = false; // Forcefully skip next squashing

// Pooled pipeline messages per-thread
// Aggregated while handling pipelines, gradually released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
Expand Down
59 changes: 31 additions & 28 deletions src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <memory>

#include "absl/functional/bind_front.h"
#include "facade/tls_error.h"

#ifdef DFLY_USE_SSL
Expand Down Expand Up @@ -234,32 +235,6 @@ void Listener::PreAcceptLoop(util::ProactorBase* pb) {
per_thread_.resize(pool()->size());
}

bool Listener::AwaitCurrentDispatches(absl::Duration timeout, util::Connection* issuer) {
// Fill blocking counter with ongoing dispatches
util::fb2::BlockingCounter bc{0};
this->TraverseConnections([bc, issuer](unsigned thread_index, util::Connection* conn) {
if (conn != issuer)
static_cast<Connection*>(conn)->SendCheckpoint(bc);
});

auto cancelled = make_shared<bool>(false);

// TODO: Add wait with timeout or polling to helio (including cancel flag)
util::MakeFiber([bc, cancelled = weak_ptr{cancelled}, start = absl::Now(), timeout]() mutable {
while (!cancelled.expired()) {
if (absl::Now() - start > timeout) {
VLOG(1) << "AwaitCurrentDispatches timed out";
*cancelled.lock() = true; // same thread, no promotion race
bc.Cancel();
}
ThisFiber::SleepFor(10ms);
}
}).Detach();

bc.Wait();
return !*cancelled;
}

bool Listener::IsPrivilegedInterface() const {
return role_ == Role::PRIVILEGED;
}
Expand All @@ -276,8 +251,10 @@ void Listener::PreShutdown() {
// This shouldn't take a long time: All clients should reject incoming commands
// at this stage since we're in SHUTDOWN mode.
// If a command is running for too long we give up and proceed.
const absl::Duration kDispatchShutdownTimeout = absl::Milliseconds(10);
if (!AwaitCurrentDispatches(kDispatchShutdownTimeout, nullptr)) {
DispatchTracker tracker{{this}};
tracker.TrackAll();

if (!tracker.Wait(absl::Milliseconds(10))) {
LOG(WARNING) << "Some commands are still being dispatched but didn't conclude in time. "
"Proceeding in shutdown.";
}
Expand Down Expand Up @@ -396,4 +373,30 @@ ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
return pp->at(res_id);
}

DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
facade::Connection* issuer, bool ignore_paused)
: listeners_{listeners.begin(), listeners.end()},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to make it safe to pass a single {this}, but if we just copy the span it becomes a dangling array... So either use a vector or include absl fixed array, but given how rare this operation is we can simply use a vector

issuer_{issuer},
ignore_paused_{ignore_paused} {
}

void DispatchTracker::TrackOnThread() {
for (auto* listener : listeners_)
listener->TraverseConnectionsOnThread(absl::bind_front(&DispatchTracker::Handle, this));
}

bool DispatchTracker::Wait(absl::Duration duration) {
return bc_.WaitFor(absl::ToChronoMilliseconds(duration));
}

void DispatchTracker::TrackAll() {
for (auto* listener : listeners_)
listener->TraverseConnections(absl::bind_front(&DispatchTracker::Handle, this));
}

void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) {
if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_)
fconn->SendCheckpoint(bc_, ignore_paused_);
}

} // namespace facade
28 changes: 28 additions & 0 deletions src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <absl/base/internal/spinlock.h>
#include <absl/time/time.h>

#include <memory>
#include <system_error>
Expand All @@ -22,6 +23,7 @@ typedef struct ssl_ctx_st SSL_CTX;
namespace facade {

class ServiceInterface;
class Connection;

class Listener : public util::ListenerInterface {
public:
Expand Down Expand Up @@ -79,4 +81,30 @@ class Listener : public util::ListenerInterface {
SSL_CTX* ctx_ = nullptr;
};

// Dispatch tracker allows tracking the dispatch state of connections and blocking until all
// detected busy connections finished dispatching. Ignores issuer connection.
//
// Mostly used to detect when global state changes (takeover, pause, cluster config update) are
// visible to all commands and no commands are still running according to the old state / config.
class DispatchTracker {
public:
DispatchTracker(absl::Span<facade::Listener* const>, facade::Connection* issuer = nullptr,
bool ignore_paused = false);

void TrackAll(); // Track busy connection on all threads
void TrackOnThread(); // Track busy connections on current thread

// Wait until all tracked connections finished dispatching.
// Returns true on success, false if timeout was reached.
bool Wait(absl::Duration timeout);

private:
void Handle(unsigned thread_index, util::Connection* conn);

std::vector<facade::Listener*> listeners_;
facade::Connection* issuer_;
util::fb2::BlockingCounter bc_{0};
bool ignore_paused_;
};

} // namespace facade
1 change: 1 addition & 0 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
async_dispatch = false;
sync_dispatch = false;
journal_emulated = false;
paused = false;

subscriptions = 0;
}
Expand Down
7 changes: 2 additions & 5 deletions src/facade/ok_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ class OkService : public ServiceInterface {
(*cntx)->SendOk();
}

void DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
size_t DispatchManyCommands(absl::Span<CmdArgList> args_lists, ConnectionContext* cntx) final {
for (auto args : args_lists)
DispatchCommand(args, cntx);
return args_lists.size();
}

void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
Expand All @@ -44,10 +45,6 @@ class OkService : public ServiceInterface {
ConnectionStats* GetThreadLocalConnectionStats() final {
return &tl_stats;
}

void AwaitOnPauseDispatch() {
return;
}
};

void RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
Expand Down
5 changes: 3 additions & 2 deletions src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,16 @@ class ServiceInterface {

virtual void DispatchCommand(CmdArgList args, ConnectionContext* cntx) = 0;

virtual void DispatchManyCommands(absl::Span<CmdArgList> args_list, ConnectionContext* cntx) = 0;
// Returns number of processed commands
virtual size_t DispatchManyCommands(absl::Span<CmdArgList> args_list,
ConnectionContext* cntx) = 0;

virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) = 0;

virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0;

virtual ConnectionStats* GetThreadLocalConnectionStats() = 0;
virtual void AwaitOnPauseDispatch() = 0;

virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
}
Expand Down
10 changes: 9 additions & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,10 +502,18 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
before = tl_cluster_config->GetOwnedSlots();
}

auto cb = [&](util::ProactorBase* pb) { tl_cluster_config = new_config; };
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()};
auto cb = [&tracker, &new_config](util::ProactorBase* pb) {
tl_cluster_config = new_config;
tracker.TrackOnThread();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need TrackOnThread?
why TrackAll after AwaitFiberOnAll is finished to update the config on all threads is not good?

Copy link
Contributor Author

@dranikpg dranikpg Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do it correctly if it takes only a few lines 🙂


TrackAll after AwaitFiberOnAll is finished to update the config on all threads is not good

Because it's possibly waiting for totally different stuff to finish, including commands that would already be running with the new cluster config. The connection has spurious suspends (Yields()), so we might miss operations

};
Comment on lines +505 to +509
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we can now pause writes when replacing the cluster config, to actually return an error without applying. wdyt?

Copy link
Contributor Author

@dranikpg dranikpg Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in

run {
  paused = true
  tracker.track_on_thread()
}

if not tracker.wait( 1s ): 
  return "Can't replace cluster config with constant ops running"

# no writes
run {
  tl_config = new_config
  paused = false
}

but why should we pause under heavy load without long running ops 😞 ?

One more option would be using a global tx, but that doesn't prevent write commands from still being scheduled

server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);

if (!tracker.Wait(absl::Seconds(1))) {
LOG(WARNING) << "Cluster config change timed out";
}

SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = GetDeletedSlots(is_first_config, before, after);
Expand Down
5 changes: 4 additions & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,10 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {

// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
// after this function exits but before the actual shutdown.
if (!sf_->AwaitCurrentDispatches(timeout_dur, cntx->conn())) {
facade::DispatchTracker tracker{sf_->GetListeners(), cntx->conn()};
tracker.TrackAll();

if (!tracker.Wait(timeout_dur)) {
LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur;
status = OpStatus::TIMED_OUT;
}
Expand Down
Loading
Loading