Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Switch to stable state replication #473

Merged
merged 9 commits into from
Nov 17, 2022
24 changes: 20 additions & 4 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,12 +571,13 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
return OpStatus::OK;
}

pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update) noexcept(false) {
DCHECK(!obj.IsRef());

pair<PrimeIterator, bool> res = AddOrFind(cntx, key);
if (!res.second) // have not inserted.
if (!res.second && !force_update) // have not inserted.
return res;

auto& db = *db_arr_[cntx.db_index];
Expand All @@ -588,12 +589,27 @@ pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key
if (expire_at_ms) {
it->second.SetExpire(true);
uint64_t delta = expire_at_ms - expire_base_[0];
CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
auto [eit, inserted] = db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta));
CHECK(inserted || force_update);
if (!inserted) {
eit->second = ExpirePeriod(delta);
}
}

return res;
}

pair<PrimeIterator, bool> DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true);
}


pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj,
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false);
}

size_t DbSlice::DbSize(DbIndex db_ind) const {
DCHECK_LT(db_ind, db_array_size());

Expand Down
9 changes: 9 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ class DbSlice {
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(const Context& cntx,
std::string_view key) noexcept(false);

// Same as AddEntry, but overwrites in case entry exists. Returns second=true
// if insertion took place.
std::pair<PrimeIterator, bool> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);

// Returns second=true if insertion took place, false otherwise.
// expire_at_ms equal to 0 - means no expiry.
// throws: bad_alloc is insertion could not happen due to out of memory.
Expand Down Expand Up @@ -285,6 +290,10 @@ class DbSlice {
void InvalidateDbWatches(DbIndex db_indx);

private:
std::pair<PrimeIterator, bool> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update) noexcept(false);

void CreateDb(DbIndex index);
size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table);

Expand Down
113 changes: 93 additions & 20 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return Sync(args, cntx);
}

if (sub_cmd == "STARTSTABLE" && args.size() == 3) {
return StartStable(args, cntx);
}

if (sub_cmd == "EXPIRE") {
return Expire(args, cntx);
}
Expand All @@ -101,6 +105,8 @@ void DflyCmd::OnClose(ConnectionContext* cntx) {
if (!session_id)
return;

VLOG(0) << "Disconnected !!! " << flow_id;

if (flow_id == kuint32max) {
DeleteSyncSession(session_id);
} else {
Expand Down Expand Up @@ -258,16 +264,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return;

unique_lock lk(sync_info->mu);
if (sync_info->state != SyncState::PREPARATION)
return rb->SendError(kInvalidState);

// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info->flows) {
if (!flow.conn) {
return rb->SendError(kInvalidState);
}
}
if (!CheckReplicaStateOrReply(*sync_info, SyncState::PREPARATION, rb))
return;

// Start full sync.
{
Expand All @@ -288,6 +286,38 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return rb->SendOk();
}

void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
string_view sync_id_str = ArgS(args, 2);

VLOG(0) << "Got DFLY STARTSTABLE " << sync_id_str;

auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb);
if (!sync_id)
return;

unique_lock lk(sync_info->mu);
if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb))
return;

{
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) {
status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal());
return OpStatus::OK;
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));

if (*status != OpStatus::OK)
return rb->SendError(kInvalidState);
}

sync_info->state = SyncState::STABLE_SYNC;
return rb->SendOk();
}

void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) {
Expand All @@ -305,13 +335,39 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false));

if (shard != nullptr) {
flow->saver->StartSnapshotInShard(false, shard);
auto ec = sf_->journal()->OpenInThread(false, string_view());
CHECK(!ec);
flow->saver->StartSnapshotInShard(true, shard);
}

flow->fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow);
return OpStatus::OK;
}

OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) {
if (shard != nullptr) {
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
flow->saver->StopSnapshotInShard(shard);
}

// Wait for full sync to finish.
if (flow->fb.joinable()) {
flow->fb.join();
}

if (shard != nullptr) {
flow->saver.reset();
romange marked this conversation as resolved.
Show resolved Hide resolved

// TODO: Add cancellation.
auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) {
// TODO: Serialize event.
ReqSerializer serializer{flow->conn->socket()};
serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString()));
});
}

return OpStatus::OK;
}

void DflyCmd::FullSyncFb(FlowInfo* flow) {
error_code ec;
RdbSaver* saver = flow->saver.get();
Expand All @@ -328,22 +384,20 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) {
return;
}

if (saver->Mode() != SaveMode::SUMMARY) {
// TODO: we should be able to stop earlier if requested.
ec = saver->SaveBody(nullptr);
if (ec) {
LOG(ERROR) << ec;
return;
}
// TODO: we should be able to stop earlier if requested.
romange marked this conversation as resolved.
Show resolved Hide resolved
ec = saver->SaveBody(nullptr);
if (ec) {
LOG(ERROR) << ec;
return;
}

VLOG(1) << "Sending full sync EOF";

ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
LOG(ERROR) << ec;
return;
}

ec = flow->conn->socket()->Shutdown(SHUT_RDWR);
}

uint32_t DflyCmd::CreateSyncSession() {
Expand Down Expand Up @@ -429,6 +483,25 @@ pair<uint32_t, shared_ptr<DflyCmd::SyncInfo>> DflyCmd::GetSyncInfoOrReply(std::s
return {sync_id, sync_it->second};
}

bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected,
RedisReplyBuilder* rb) {
if (sync_info.state != expected) {
rb->SendError(kInvalidState);
return false;
}

// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info.flows) {
if (!flow.conn) {
rb->SendError(kInvalidState);
return false;
}
}

return true;
}

void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown";
}
Expand Down
17 changes: 13 additions & 4 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Journal;

class DflyCmd {
public:
enum class SyncState { PREPARATION, FULL_SYNC, CANCELLED };
enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED };

struct FlowInfo {
FlowInfo() = default;
Expand Down Expand Up @@ -80,18 +80,24 @@ class DflyCmd {
// Register connection as flow for sync session.
void Flow(CmdArgList args, ConnectionContext* cntx);

// SYNC <masterid> <syncid> <flowid>
// Migrate connection to required flow thread.
// Stub: will be replcaed with full sync.
// SYNC <syncid>
// Initiate full sync.
void Sync(CmdArgList args, ConnectionContext* cntx);

// STARTSTABLE <syncid>
// Switch to stable state replication.
void StartStable(CmdArgList args, ConnectionContext* cntx);

// EXPIRE
// Check all keys for expiry.
void Expire(CmdArgList args, ConnectionContext* cntx);

// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard);

// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard);

// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow);

Expand All @@ -108,6 +114,9 @@ class DflyCmd {
std::pair<uint32_t, std::shared_ptr<SyncInfo>> GetSyncInfoOrReply(std::string_view id,
facade::RedisReplyBuilder* rb);

bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected,
facade::RedisReplyBuilder* rb);

ServerFamily* sf_;

util::ListenerInterface* listener_;
Expand Down
11 changes: 11 additions & 0 deletions src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

/*
Opcode range 230-240 is used by DF extensions.
*/

const uint8_t RDB_OPCODE_FULLSYNC_END = 230;
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 8 additions & 2 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/hset_family.h"
#include "server/rdb_extensions.h"
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/set_family.h"
Expand Down Expand Up @@ -1553,6 +1554,12 @@ error_code RdbLoader::Load(io::Source* src) {
break;
}

if (type == RDB_OPCODE_FULLSYNC_END) {
if (full_sync_cut_cb)
full_sync_cut_cb();
continue;
}

if (type == RDB_OPCODE_SELECTDB) {
unsigned dbid = 0;

Expand Down Expand Up @@ -1815,8 +1822,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms)
continue;

auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms);

auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms);
if (!added) {
LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind;
}
Expand Down
6 changes: 6 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}

void SetFullSyncCutCb(std::function<void()> cb) {
full_sync_cut_cb = std::move(cb);
}

private:
struct ObjSettings;
std::error_code LoadKeyValPair(int type, ObjSettings* settings);
Expand All @@ -194,6 +198,8 @@ class RdbLoader : protected RdbLoaderBase {
::boost::fibers::mutex mu_;
std::error_code ec_; // guarded by mu_
std::atomic_bool stop_early_{false};

std::function<void()> full_sync_cut_cb;
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace dfly
21 changes: 15 additions & 6 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/snapshot.h"
#include "server/rdb_extensions.h"
#include "util/fibers/simple_channel.h"

namespace dfly {
Expand Down Expand Up @@ -581,6 +582,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{};
}

error_code RdbSerializer::SendFullSyncCut() {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
return FlushMem();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need FlushMem here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I want it to be sent immediately to the replica. Can't it be stuck inside the buffer if I don't flush it? It seems like it can

Copy link
Collaborator

@romange romange Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not really matter. but you call this twice: once in summary flow - it does not matter there, and the second place calls flushmem right after as far as i remember

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I call it twice for the SUMMARY flow, right. But don't forget about the snapshot that sends a FS cut as well. It can be stuck there

}

// TODO: if buf is large enough, it makes sense to write both mem_buf and buf
// directly to sink_.
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
Expand Down Expand Up @@ -918,12 +924,15 @@ error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR(impl_->serializer()->FlushMem());

VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();

error_code io_error = impl_->ConsumeChannel();
if (io_error) {
VLOG(1) << "io error " << io_error;
return io_error;
if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to send it here as well? it's not for a flow channel, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The io thread runs the rdb saver in SUMMARY mode to transfer only the header and lua scripts. I just decided it'll be more consistent if all threads use this opcode without any corner cases. So I need the summary mode to write is as well.

} else {
VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();
error_code io_error = impl_->ConsumeChannel();
if (io_error) {
VLOG(1) << "io error " << io_error;
return io_error;
}
}

RETURN_ON_ERR(SaveEpilog());
Expand Down
Loading