Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replica): Support FlushDB command for replication #580 #591

Merged
merged 4 commits into from
Dec 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/journal/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace dfly {
JournalExecutor::JournalExecutor(Service* service) : service_{service} {
}

void JournalExecutor::Execute(journal::ParsedEntry&& entry) {
void JournalExecutor::Execute(journal::ParsedEntry& entry) {
if (entry.payload) {
io::NullSink null_sink;
ConnectionContext conn_context{&null_sink, nullptr};
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Service;
class JournalExecutor {
public:
JournalExecutor(Service* service);
void Execute(journal::ParsedEntry&& entry);
void Execute(journal::ParsedEntry& entry);

private:
Service* service_;
Expand Down
6 changes: 6 additions & 0 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ error_code JournalWriter::Write(const journal::Entry& entry) {
return Write(entry.dbid);
case journal::Op::COMMAND:
RETURN_ON_ERR(Write(entry.txid));
RETURN_ON_ERR(Write(entry.shard_cnt));
return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload);
default:
break;
Expand Down Expand Up @@ -100,6 +101,10 @@ io::Result<uint16_t> JournalReader::ReadU16(io::Source* source) {
return ReadPackedUIntTyped<uint16_t>(source);
}

io::Result<uint32_t> JournalReader::ReadU32(io::Source* source) {
return ReadPackedUIntTyped<uint32_t>(source);
}

io::Result<uint64_t> JournalReader::ReadU64(io::Source* source) {
return ReadPackedUIntTyped<uint64_t>(source);
}
Expand Down Expand Up @@ -153,6 +158,7 @@ io::Result<journal::ParsedEntry> JournalReader::ReadEntry(io::Source* source) {
switch (entry.opcode) {
case journal::Op::COMMAND:
SET_OR_UNEXPECT(ReadU64(source), entry.txid);
SET_OR_UNEXPECT(ReadU32(source), entry.shard_cnt);
entry.payload = CmdArgVec{};
if (auto ec = Read(source, &*entry.payload); ec)
return make_unexpected(ec);
Expand Down
1 change: 1 addition & 0 deletions src/server/journal/serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct JournalReader {
// TODO: Templated endian encoding to not repeat...?
io::Result<uint8_t> ReadU8(io::Source* source);
io::Result<uint16_t> ReadU16(io::Source* source);
io::Result<uint32_t> ReadU32(io::Source* source);
io::Result<uint64_t> ReadU64(io::Source* source);

// Read string into internal buffer and return size.
Expand Down
13 changes: 7 additions & 6 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct EntryBase {
TxId txid;
Op opcode;
DbIndex dbid;
uint32_t shard_cnt;
};

// This struct represents a single journal entry.
Expand All @@ -34,11 +35,11 @@ struct Entry : public EntryBase {
std::pair<std::string_view, ArgSlice> // Command and its shard parts.
>;

Entry(TxId txid, DbIndex dbid, Payload pl)
: EntryBase{txid, journal::Op::COMMAND, dbid}, payload{pl} {
Entry(TxId txid, DbIndex dbid, Payload pl, uint32_t shard_cnt)
: EntryBase{txid, journal::Op::COMMAND, dbid, shard_cnt}, payload{pl} {
}

Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} {
Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} {
}

Payload payload;
Expand All @@ -50,11 +51,11 @@ struct ParsedEntry : public EntryBase {

ParsedEntry() = default;

ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} {
ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} {
}

ParsedEntry(TxId txid, DbIndex dbid, Payload pl)
: EntryBase{txid, journal::Op::COMMAND, dbid}, payload{pl} {
ParsedEntry(TxId txid, DbIndex dbid, Payload pl, uint32_t shard_cnt)
: EntryBase{txid, journal::Op::COMMAND, dbid, shard_cnt}, payload{pl} {
}

Payload payload;
Expand Down
15 changes: 8 additions & 7 deletions src/server/journal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ TEST(Journal, WriteRead) {
auto slice = [v = &slices](auto... ss) { return StoreSlice(v, ss...); };
auto list = [v = &lists](auto... ss) { return StoreList(v, ss...); };

std::vector<journal::Entry> test_entries = {{0, 0, make_pair("MSET", slice("A", "1", "B", "2"))},
{1, 0, make_pair("MSET", slice("C", "3"))},
{2, 0, list("DEL", "A", "B")},
{3, 1, list("LPUSH", "l", "v1", "v2")},
{4, 0, make_pair("MSET", slice("D", "4"))},
{5, 1, list("DEL", "l1")},
{6, 2, list("SET", "E", "2")}};
std::vector<journal::Entry> test_entries = {
{0, 0, make_pair("MSET", slice("A", "1", "B", "2")), 2},
{0, 0, make_pair("MSET", slice("C", "3")), 2},
{1, 0, list("DEL", "A", "B"), 2},
{2, 1, list("LPUSH", "l", "v1", "v2"), 1},
{3, 0, make_pair("MSET", slice("D", "4")), 1},
{4, 1, list("DEL", "l1"), 1},
{5, 2, list("SET", "E", "2"), 1}};

// Write all entries to string file.
io::StringSink ss;
Expand Down
2 changes: 1 addition & 1 deletion src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,7 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) {
while (done < num_entries) {
journal::ParsedEntry entry{};
SET_OR_RETURN(journal_reader_.ReadEntry(&bs), entry);
ex.Execute(std::move(entry));
ex.Execute(entry);
done++;
}

Expand Down
65 changes: 58 additions & 7 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ Replica::Replica(string host, uint16_t port, Service* se) : service_(*se) {
master_context_.port = port;
}

Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service)
Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service,
std::shared_ptr<Replica::MultiShardExecution> shared_exe_data)
: service_(*service), master_context_(context) {
master_context_.dfly_flow_id = dfly_flow_id;
multi_shard_exe_ = shared_exe_data;
}

Replica::~Replica() {
Expand Down Expand Up @@ -427,13 +429,13 @@ error_code Replica::InitiatePSync() {
// Initialize and start sub-replica for each flow.
error_code Replica::InitiateDflySync() {
DCHECK_GT(num_df_flows_, 0u);

multi_shard_exe_.reset(new MultiShardExecution());
shard_flows_.resize(num_df_flows_);
for (unsigned i = 0; i < num_df_flows_; ++i) {
shard_flows_[i].reset(new Replica(master_context_, i, &service_));
shard_flows_[i].reset(new Replica(master_context_, i, &service_, multi_shard_exe_));
}

// Blocked on untill all flows got full sync cut.
// Blocked on until all flows got full sync cut.
fibers_ext::BlockingCounter sync_block{num_df_flows_};

auto err_handler = [this, sync_block](const auto& ge) mutable {
Expand Down Expand Up @@ -705,14 +707,63 @@ void Replica::StableSyncDflyFb(Context* cntx) {
cntx->Error(res.error(), "Journal format error");
return;
}

executor.Execute(std::move(res.value()));

ExecuteEntry(&executor, res.value());
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
}
return;
}

void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry) {
if (entry.shard_cnt <= 1) { // not multi shard cmd
executor->Execute(entry);
return;
}

// Multi shard command flow:
// step 1: Fiber wait until all the fibers that should execute this tranaction got
// to the journal entry of the transaction.
// step 2: execute the command (All fibers)
// step 3: Fiber wait until all fibers finished the execution
// By step 1 we enforce that replica will execute multi shard commands that finished on master
// By step 3 we ensures the correctness of flushall/flushdb commands

// TODO: this implemantaion does not support atomicity in replica
// Although multi shard transaction happen in step 2 very close to each other,
// user can query replica between executions.
// To support atomicity we should have one fiber in step 2 which will excute all the entries of
// the transaction together. In case of global comand such as flushdb the command can be executed
// by only one fiber.

// TODO: support error handler in this flow

// Only the first fiber to reach the transaction will create data for transaction in map
multi_shard_exe_->map_mu.lock();
auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(entry.txid, entry.shard_cnt);

// Note: we must release the mutex befor calling wait on barrier
multi_shard_exe_->map_mu.unlock();

VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt
<< " was_insert: " << was_insert;

// step 1
it->second.barrier.wait();
// step 2
executor->Execute(entry);
// step 3
it->second.barrier.wait();

// Note: erase from map can be done only after all fibers returned from wait.
// The last fiber which will decrease the counter to 0 will be the one to erase the data from map
auto val = it->second.counter.fetch_sub(1, std::memory_order_relaxed);
VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. since you move the entry above, reading its fields can technically show wrong values.
  2. You do not use was_inserted to check that only a single fiber called flushdb. why?
  3. Please use fetch_sub when decrementing atomics. you will need to compare val with 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We can relax the interface on Execution to accept a mut-ref. We need mut because of the mutable CmdArgList in Dispatch (which can be changed with ToUpper etc inside the command)
  2. Because we can't tell now what kind of command that is - sharded part or global. We can, theoretically, check with the number of arguments it has or pass additional info. For now, the other executions on FLUSHDB will be no-ops (but redundant transactions)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding number 2, I will but not in this PR.
There are 2 TODO notes which I added.
I will work on another PR to support the execution of the commands by a single fiber, which will require different flow for global commands and non global commands.
I will work on another PR to support the cancellation flow, once the PR I created for heilo with barrier will be merged

<< " counter: " << val;
if (val == 1) {
std::lock_guard lg{multi_shard_exe_->map_mu};
multi_shard_exe_->tx_sync_execution.erase(entry.txid);
}
}

error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) {
DCHECK(parser_);

Expand Down
24 changes: 23 additions & 1 deletion src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
//
#pragma once

#include <boost/fiber/barrier.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/mutex.hpp>
#include <variant>

#include "base/io_buf.h"
#include "facade/facade_types.h"
#include "facade/redis_parser.h"
#include "server/common.h"
#include "server/journal/types.h"
#include "util/fiber_socket_base.h"
#include "util/fibers/fibers_ext.h"

Expand All @@ -21,6 +24,7 @@ namespace dfly {

class Service;
class ConnectionContext;
class JournalExecutor;

class Replica {
private:
Expand All @@ -46,6 +50,19 @@ class Replica {
R_SYNC_OK = 0x10,
};

struct MultiShardExecution {
boost::fibers::mutex map_mu;

struct TxExecutionSync {
boost::fibers::barrier barrier;
std::atomic_uint32_t counter;
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) {
}
};

std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
};

public:
Replica(std::string master_host, uint16_t port, Service* se);
~Replica();
Expand Down Expand Up @@ -81,7 +98,8 @@ class Replica {

private: /* Main dlfly flow mode functions */
// Initialize as single dfly flow.
Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service);
Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service,
std::shared_ptr<MultiShardExecution> shared_exe_data);

// Start replica initialized as dfly flow.
std::error_code StartFullSyncFlow(util::fibers_ext::BlockingCounter block, Context* cntx);
Expand Down Expand Up @@ -122,6 +140,8 @@ class Replica {
// Send command, update last_io_time, return error.
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);

void ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry);

public: /* Utility */
struct Info {
std::string host;
Expand Down Expand Up @@ -154,6 +174,8 @@ class Replica {
MasterContext master_context_;
std::unique_ptr<util::LinuxSocketBase> sock_;

std::shared_ptr<MultiShardExecution> multi_shard_exe_;

// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
::boost::fibers::fiber sync_fb_;
std::vector<std::unique_ptr<Replica>> shard_flows_;
Expand Down
2 changes: 1 addition & 1 deletion src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ void Transaction::LogJournalOnShard(EngineShard* shard) {
entry_payload =
make_pair(facade::ToSV(cmd_with_full_args_.front()), ShardArgsInShard(shard->shard_id()));
}
journal->RecordEntry(journal::Entry{txid_, db_index_, entry_payload});
journal->RecordEntry(journal::Entry{txid_, db_index_, entry_payload, unique_shard_cnt_});
}

void Transaction::BreakOnShutdown() {
Expand Down