Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Switch to stable state replication #473

Merged
merged 9 commits into from
Nov 17, 2022
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
default_stages: [commit]
exclude: 'src\/redis\/.*'
repos:
- repo: local
hooks:
Expand Down
2 changes: 2 additions & 0 deletions src/redis/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))

/* Range 200-240 is used by Dragonfly specific opcodes */

/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
Expand Down
26 changes: 21 additions & 5 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const {

PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
auto [it, added] = AddEntry(cntx, key, std::move(obj), expire_at_ms);
auto [it, added] = AddOrSkip(cntx, key, std::move(obj), expire_at_ms);
CHECK(added);

return it;
Expand Down Expand Up @@ -571,12 +571,14 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
return OpStatus::OK;
}

pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx,
std::string_view key, PrimeValue obj,
uint64_t expire_at_ms,
bool force_update) noexcept(false) {
DCHECK(!obj.IsRef());

pair<PrimeIterator, bool> res = AddOrFind(cntx, key);
if (!res.second) // have not inserted.
if (!res.second && !force_update) // have not inserted.
return res;

auto& db = *db_arr_[cntx.db_index];
Expand All @@ -588,12 +590,26 @@ pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key
if (expire_at_ms) {
it->second.SetExpire(true);
uint64_t delta = expire_at_ms - expire_base_[0];
CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
auto [eit, inserted] = db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta));
CHECK(inserted || force_update);
if (!inserted) {
eit->second = ExpirePeriod(delta);
}
}

return res;
}

pair<PrimeIterator, bool> DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true);
}

pair<PrimeIterator, bool> DbSlice::AddOrSkip(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false);
}

size_t DbSlice::DbSize(DbIndex db_ind) const {
DCHECK_LT(db_ind, db_array_size());

Expand Down
13 changes: 11 additions & 2 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,16 @@ class DbSlice {
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(const Context& cntx,
std::string_view key) noexcept(false);

// Same as AddOrSkip, but overwrites in case entry exists.
// Returns second=true if insertion took place.
std::pair<PrimeIterator, bool> AddOrUpdate(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms) noexcept(false);

// Returns second=true if insertion took place, false otherwise.
// expire_at_ms equal to 0 - means no expiry.
// throws: bad_alloc is insertion could not happen due to out of memory.
std::pair<PrimeIterator, bool> AddEntry(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);
std::pair<PrimeIterator, bool> AddOrSkip(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms) noexcept(false);

// Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry.
Expand Down Expand Up @@ -285,6 +290,10 @@ class DbSlice {
void InvalidateDbWatches(DbIndex db_indx);

private:
std::pair<PrimeIterator, bool> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update) noexcept(false);

void CreateDb(DbIndex index);
size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table);

Expand Down
113 changes: 93 additions & 20 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return Sync(args, cntx);
}

if (sub_cmd == "STARTSTABLE" && args.size() == 3) {
return StartStable(args, cntx);
}

if (sub_cmd == "EXPIRE") {
return Expire(args, cntx);
}
Expand Down Expand Up @@ -258,16 +262,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return;

unique_lock lk(sync_info->mu);
if (sync_info->state != SyncState::PREPARATION)
return rb->SendError(kInvalidState);

// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info->flows) {
if (!flow.conn) {
return rb->SendError(kInvalidState);
}
}
if (!CheckReplicaStateOrReply(*sync_info, SyncState::PREPARATION, rb))
return;

// Start full sync.
{
Expand All @@ -288,6 +284,38 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return rb->SendOk();
}

void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
string_view sync_id_str = ArgS(args, 2);

VLOG(1) << "Got DFLY STARTSTABLE " << sync_id_str;

auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb);
if (!sync_id)
return;

unique_lock lk(sync_info->mu);
if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb))
return;

{
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) {
status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal());
return OpStatus::OK;
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));

if (*status != OpStatus::OK)
return rb->SendError(kInvalidState);
}

sync_info->state = SyncState::STABLE_SYNC;
return rb->SendOk();
}

void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) {
Expand All @@ -304,14 +332,42 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
SaveMode save_mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false));

// Shard can be null for io thread.
if (shard != nullptr) {
flow->saver->StartSnapshotInShard(false, shard);
auto ec = sf_->journal()->OpenInThread(false, string_view());
CHECK(!ec);
flow->saver->StartSnapshotInShard(true, shard);
}

flow->fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow);
return OpStatus::OK;
}

OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) {
// Shard can be null for io thread.
if (shard != nullptr) {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
flow->saver->StopSnapshotInShard(shard);
}

// Wait for full sync to finish.
if (flow->fb.joinable()) {
flow->fb.join();
}

if (shard != nullptr) {
flow->saver.reset();
romange marked this conversation as resolved.
Show resolved Hide resolved

// TODO: Add cancellation.
auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) {
// TODO: Serialize event.
ReqSerializer serializer{flow->conn->socket()};
serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString()));
});
}

return OpStatus::OK;
}

void DflyCmd::FullSyncFb(FlowInfo* flow) {
error_code ec;
RdbSaver* saver = flow->saver.get();
Expand All @@ -328,22 +384,20 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) {
return;
}

if (saver->Mode() != SaveMode::SUMMARY) {
// TODO: we should be able to stop earlier if requested.
ec = saver->SaveBody(nullptr);
if (ec) {
LOG(ERROR) << ec;
return;
}
// TODO: we should be able to stop earlier if requested.
romange marked this conversation as resolved.
Show resolved Hide resolved
ec = saver->SaveBody(nullptr);
if (ec) {
LOG(ERROR) << ec;
return;
}

VLOG(1) << "Sending full sync EOF";

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
LOG(ERROR) << ec;
return;
}

ec = flow->conn->socket()->Shutdown(SHUT_RDWR);
}

uint32_t DflyCmd::CreateSyncSession() {
Expand Down Expand Up @@ -429,6 +483,25 @@ pair<uint32_t, shared_ptr<DflyCmd::SyncInfo>> DflyCmd::GetSyncInfoOrReply(std::s
return {sync_id, sync_it->second};
}

bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected,
RedisReplyBuilder* rb) {
if (sync_info.state != expected) {
rb->SendError(kInvalidState);
return false;
}

// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info.flows) {
if (!flow.conn) {
rb->SendError(kInvalidState);
return false;
}
}

return true;
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}
Expand Down
17 changes: 13 additions & 4 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Journal;

class DflyCmd {
public:
enum class SyncState { PREPARATION, FULL_SYNC, CANCELLED };
enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED };

struct FlowInfo {
FlowInfo() = default;
Expand Down Expand Up @@ -80,18 +80,24 @@ class DflyCmd {
// Register connection as flow for sync session.
void Flow(CmdArgList args, ConnectionContext* cntx);

// SYNC <masterid> <syncid> <flowid>
// Migrate connection to required flow thread.
// Stub: will be replcaed with full sync.
// SYNC <syncid>
// Initiate full sync.
void Sync(CmdArgList args, ConnectionContext* cntx);

// STARTSTABLE <syncid>
// Switch to stable state replication.
void StartStable(CmdArgList args, ConnectionContext* cntx);

// EXPIRE
// Check all keys for expiry.
void Expire(CmdArgList args, ConnectionContext* cntx);

// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard);

// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard);

// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow);

Expand All @@ -108,6 +114,9 @@ class DflyCmd {
std::pair<uint32_t, std::shared_ptr<SyncInfo>> GetSyncInfoOrReply(std::string_view id,
facade::RedisReplyBuilder* rb);

bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected,
facade::RedisReplyBuilder* rb);

ServerFamily* sf_;

util::ListenerInterface* listener_;
Expand Down
2 changes: 1 addition & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice&
return false;
}
DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()};
auto [it, added] = db_slice.AddEntry(context, key, std::move(pv), item.expire_ms);
auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), item.expire_ms);
return added;
}

Expand Down
12 changes: 12 additions & 0 deletions src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

// Range 200-240 is used by DF extensions.

// This opcode is sent by the master Dragonfly instance to a replica
// to notify that it finished streaming static data and is ready
// to switch to the stable state replication phase.
const uint8_t RDB_OPCODE_FULLSYNC_END = 200;
10 changes: 8 additions & 2 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/hset_family.h"
#include "server/rdb_extensions.h"
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/set_family.h"
Expand Down Expand Up @@ -1553,6 +1554,12 @@ error_code RdbLoader::Load(io::Source* src) {
break;
}

if (type == RDB_OPCODE_FULLSYNC_END) {
if (full_sync_cut_cb)
full_sync_cut_cb();
continue;
}

if (type == RDB_OPCODE_SELECTDB) {
unsigned dbid = 0;

Expand Down Expand Up @@ -1815,8 +1822,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms)
continue;

auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms);

auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms);
if (!added) {
LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind;
}
Expand Down
10 changes: 10 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}

// Set callback for receiving RDB_OPCODE_FULLSYNC_END.
// This opcode is used by a master instance to notify it finished streaming static data
// and is ready to switch to stable state sync.
void SetFullSyncCutCb(std::function<void()> cb) {
full_sync_cut_cb = std::move(cb);
}

private:
struct ObjSettings;
std::error_code LoadKeyValPair(int type, ObjSettings* settings);
Expand All @@ -194,6 +201,9 @@ class RdbLoader : protected RdbLoaderBase {
::boost::fibers::mutex mu_;
std::error_code ec_; // guarded by mu_
std::atomic_bool stop_early_{false};

// Callback when receiving RDB_OPCODE_FULLSYNC_END
std::function<void()> full_sync_cut_cb;
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace dfly
Loading