Skip to content

Commit

Permalink
feat(server): Implement robust error & cancellation on replica (#531)
Browse files Browse the repository at this point in the history
  • Loading branch information
dranikpg authored Dec 11, 2022
1 parent a0fe3c3 commit f98d6f3
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 156 deletions.
37 changes: 37 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <absl/strings/str_cat.h>
#include <mimalloc.h>

#include <system_error>

extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
Expand Down Expand Up @@ -242,6 +244,14 @@ bool ScanOpts::Matches(std::string_view val_name) const {
return stringmatchlen(pattern.data(), pattern.size(), val_name.data(), val_name.size(), 0) == 1;
}

GenericError::operator std::error_code() const {
return ec_;
}

GenericError::operator bool() const {
return bool(ec_);
}

std::string GenericError::Format() const {
if (!ec_)
return "";
Expand All @@ -252,4 +262,31 @@ std::string GenericError::Format() const {
return absl::StrCat(ec_.message(), ":", details_);
}

GenericError Context::GetError() {
std::lock_guard lk(mu_);
return err_;
}

const Cancellation* Context::GetCancellation() const {
return this;
}

void Context::Cancel() {
Error(std::make_error_code(errc::operation_canceled), "Context cancelled");
}

void Context::Reset(ErrHandler handler) {
std::lock_guard lk{mu_};
err_ = {};
err_handler_ = std::move(handler);
Cancellation::flag_.store(false, std::memory_order_relaxed);
}

GenericError Context::Switch(ErrHandler handler) {
std::lock_guard lk{mu_};
if (!err_)
err_handler_ = std::move(handler);
return err_;
}

} // namespace dfly
94 changes: 60 additions & 34 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ template <typename RandGen> std::string GetRandomHex(RandGen& gen, size_t len) {
}

// AggregateValue is a thread safe utility to store the first
// non-default value.
// truthy value;
template <typename T> struct AggregateValue {
bool operator=(T val) {
std::lock_guard l{mu_};
if (current_ == T{} && val != T{}) {
if (!bool(current_) && bool(val)) {
current_ = val;
}
return val != T{};
return bool(val);
}

T operator*() {
Expand All @@ -184,23 +184,27 @@ template <typename T> struct AggregateValue {
}

operator bool() {
return **this != T{};
return bool(**this);
}

private:
::boost::fibers::mutex mu_{};
T current_{};
};

// Thread safe utility to store the first non null error.
using AggregateError = AggregateValue<std::error_code>;

// Thread safe utility to store the first non OK status.
using AggregateStatus = AggregateValue<facade::OpStatus>;
static_assert(facade::OpStatus::OK == facade::OpStatus{},
"Default intitialization should be OK value");
static_assert(bool(facade::OpStatus::OK) == false,
"Default intitialization should be a falsy OK value");

// Re-usable component for signaling cancellation.
// Simple wrapper around atomic flag.
// Simple wrapper interface around atomic cancellation flag.
struct Cancellation {
Cancellation() : flag_{false} {
}

void Cancel() {
flag_.store(true, std::memory_order_relaxed);
}
Expand All @@ -209,7 +213,7 @@ struct Cancellation {
return flag_.load(std::memory_order_relaxed);
}

private:
protected:
std::atomic_bool flag_;
};

Expand All @@ -222,51 +226,73 @@ class GenericError {
GenericError(std::error_code ec, std::string details) : ec_{ec}, details_{std::move(details)} {
}

std::error_code GetError() const {
return ec_;
}
operator std::error_code() const;
operator bool() const;

const std::string& GetDetails() const {
return details_;
}

operator bool() const {
return bool(ec_);
}

// Get string representation of error.
std::string Format() const;
std::string Format() const; // Get string representation of error.

private:
std::error_code ec_;
std::string details_;
};

// Thread safe utility to store the first non null generic error.
using AggregateGenericError = AggregateValue<GenericError>;

// Contest combines Cancellation and AggregateGenericError in one class.
// Allows setting an error_handler to run on errors.
class Context : public Cancellation {
// Context is a utility for managing error reporting and cancellation for complex tasks.
//
// When submitting an error with `Error`, only the first is stored (as in aggregate values).
// Then a special error handler is run, if present, and the context is cancelled.
//
// Manual cancellation with `Cancel` is simulated by reporting an `errc::operation_canceled` error.
// This allows running the error handler and representing this scenario as an error.
class Context : protected Cancellation {
public:
// The error handler should return false if this error is ignored.
using ErrHandler = std::function<bool(const GenericError&)>;
using ErrHandler = std::function<void(const GenericError&)>;

Context() = default;
Context(ErrHandler err_handler) : Cancellation{}, err_handler_{std::move(err_handler)} {
Context(ErrHandler err_handler) : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} {
}

template <typename... T> void Error(T... ts) {
// Cancels the context by submitting an `errc::operation_canceled` error.
void Cancel();
using Cancellation::IsCancelled;

const Cancellation* GetCancellation() const;

GenericError GetError();

// Report an error by submitting arguments for GenericError.
// If this is the first error that occured, then the error handler is run
// and the context is cancelled.
//
// Note: this function blocks when called from inside an error handler.
template <typename... T> GenericError Error(T... ts) {
std::lock_guard lk{mu_};
if (err_)
return;
return err_;

GenericError new_err{std::forward<T>(ts)...};
if (!err_handler_ || err_handler_(new_err)) {
err_ = std::move(new_err);
Cancel();
}
if (err_handler_)
err_handler_(new_err);

err_ = std::move(new_err);
Cancellation::Cancel();

return err_;
}

// Reset error and cancellation flag, assign new error handler.
void Reset(ErrHandler handler);

// Atomically replace the error handler if no error is present, and return the
// current stored error. This function can be used to transfer cleanup responsibility safely
//
// Beware, never do this manually in two steps. If you check for cancellation,
// set the error handler and initialize resources, then the new error handler
// will never run if the context was cancelled between the first two steps.
GenericError Switch(ErrHandler handler);

private:
GenericError err_;
ErrHandler err_handler_;
Expand Down
33 changes: 23 additions & 10 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
// Use explicit assignment for replica_ptr, because capturing structured bindings is C++20.
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
status = StartFullSyncInThread(&replica_ptr->flows[index], &replica_ptr->cntx,
EngineShard::tlocal());
};
Expand Down Expand Up @@ -283,7 +284,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, replica_ptr](unsigned index, auto*) {
auto cb = [this, &status, replica_ptr = replica_ptr](unsigned index, auto*) {
EngineShard* shard = EngineShard::tlocal();
FlowInfo* flow = &replica_ptr->flows[index];

Expand Down Expand Up @@ -325,7 +326,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
// Shard can be null for io thread.
if (shard != nullptr) {
CHECK(!sf_->journal()->OpenInThread(false, ""sv)); // can only happen in persistent mode.
flow->saver->StartSnapshotInShard(true, cntx, shard);
flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
}

flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
Expand Down Expand Up @@ -380,16 +381,19 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
}

if (ec) {
return cntx->Error(ec);
cntx->Error(ec);
return;
}

if (ec = saver->SaveBody(cntx, nullptr); ec) {
return cntx->Error(ec);
if (ec = saver->SaveBody(cntx->GetCancellation(), nullptr); ec) {
cntx->Error(ec);
return;
}

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
return cntx->Error(ec);
cntx->Error(ec);
return;
}
}

Expand All @@ -405,8 +409,6 @@ uint32_t DflyCmd::CreateSyncSession() {
// StopReplication needs to run async to prevent blocking
// the error handler.
::boost::fibers::fiber{&DflyCmd::StopReplication, this, sync_id}.detach();

return true; // Cancel context
};

auto replica_ptr = make_shared<ReplicaInfo>(flow_count, std::move(err_handler));
Expand Down Expand Up @@ -532,7 +534,18 @@ bool DflyCmd::CheckReplicaStateOrReply(const ReplicaInfo& sync_info, SyncState e
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}

void DflyCmd::Shutdown() {
ReplicaInfoMap pending;
{
std::lock_guard lk(mu_);
pending = std::move(replica_infos_);
}

for (auto [sync_id, replica_ptr] : pending) {
CancelReplication(sync_id, replica_ptr);
}
}

void DflyCmd::FlowInfo::TryShutdownSocket() {
Expand Down
8 changes: 6 additions & 2 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,11 @@ class DflyCmd {

void OnClose(ConnectionContext* cntx);

// Stop all background processes so we can exit in orderly manner.
void BreakOnShutdown();

// Stop all background processes so we can exit in orderly manner.
void Shutdown();

// Create new sync session.
uint32_t CreateSyncSession();

Expand Down Expand Up @@ -185,7 +187,9 @@ class DflyCmd {
TxId journal_txid_ = 0;

uint32_t next_sync_id_ = 1;
absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>> replica_infos_;

using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_;

::boost::fibers::mutex mu_; // Guard global operations. See header top for locking levels.
};
Expand Down
3 changes: 2 additions & 1 deletion src/server/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include <atomic>
#include <string>
#include <system_error>

#include "facade/error.h"

Expand All @@ -21,7 +22,7 @@ using facade::kWrongTypeErr;

#define RETURN_ON_ERR(x) \
do { \
auto __ec = (x); \
std::error_code __ec = (x); \
if (__ec) { \
LOG(ERROR) << "Error " << __ec << " while calling " #x; \
return __ec; \
Expand Down
Loading

0 comments on commit f98d6f3

Please sign in to comment.