Skip to content

Commit

Permalink
feat(server): Multi instance repl tests + small fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Nov 13, 2022
1 parent e495985 commit 0e08789
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 180 deletions.
3 changes: 0 additions & 3 deletions src/redis/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@
/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))

// DFLY EXTENSIONS: WHAT FILE TO PUT THEM?
#define RDB_OPCODE_FULLSYNC_END 233

/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
Expand Down
53 changes: 9 additions & 44 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,42 +300,20 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb))
return;

// TODO: Temporary solution
{
TransactionGuard tg{cntx->transaction};
AggregateStatus status;

auto cb = [this, &status, sync_info = sync_info](Transaction* t, EngineShard* shard) {
unsigned index = shard->shard_id();
status = StartStableSyncInThread(&sync_info->flows[index], shard);
return OpStatus::OK;
};
cntx->transaction->ScheduleSingleHop(std::move(cb));

auto cb2 = [this, &status, sync_info = sync_info](unsigned index, auto*) {
if (EngineShard::tlocal() != nullptr) return OpStatus::OK;
auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) {
status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal());
return OpStatus::OK;
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb2));
shard_set->pool()->AwaitFiberOnAll(std::move(cb));

if (*status != OpStatus::OK)
return rb->SendError(kInvalidState);
}

//{
// TransactionGuard tg{cntx->transaction};
// AggregateStatus status;

// auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) {
// status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal());
// return OpStatus::OK;
// };
// shard_set->pool()->AwaitFiberOnAll(std::move(cb));

// if (*status != OpStatus::OK)
// return rb->SendError(kInvalidState);
//}

sync_info->state = SyncState::STABLE_SYNC;
return rb->SendOk();
}
Expand Down Expand Up @@ -379,11 +357,12 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) {
if (shard != nullptr) {
flow->saver.reset();

// Start stable state fiber.
flow->fb = boost::fibers::fiber(&DflyCmd::StableSyncFb, this, flow);

// TODO: Temporary solution
flow->fb.join();
// TODO: Add cancellation.
auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) {
// TODO: Serialize event.
ReqSerializer serializer{flow->conn->socket()};
serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString()));
});
}

return OpStatus::OK;
Expand Down Expand Up @@ -419,20 +398,6 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) {
LOG(ERROR) << ec;
return;
}

// ec = flow->conn->socket()->Shutdown(SHUT_RDWR);
}

void DflyCmd::StableSyncFb(FlowInfo* flow) {
auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) {
// TODO: Temporary solution
if (flow->conn == nullptr) return;

// TODO: Serialize event.
ReqSerializer serializer{flow->conn->socket()};
serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString()));
//CHECK(!serializer.ec());
});
}

uint32_t DflyCmd::CreateSyncSession() {
Expand Down
5 changes: 1 addition & 4 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,12 @@ class DflyCmd {
// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard);

// Start stable sync in thread. Start StableSyncFB. Called for each flow.
// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard);

// Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow);

// Fiber that runs stable sync for each flow.
void StableSyncFb(FlowInfo* flow);

// Unregister flow. Must be called when flow disconnects.
void UnregisterFlow(FlowInfo*);

Expand Down
13 changes: 13 additions & 0 deletions src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

/*
Opcode range 230-240 is used by DF extensions.
*/

const uint8_t RDB_OPCODE_FULLSYNC_END = 230;


7 changes: 3 additions & 4 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extern "C" {
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/rdb_extensions.h"
#include "strings/human_readable.h"

ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size);
Expand Down Expand Up @@ -1554,10 +1555,8 @@ error_code RdbLoader::Load(io::Source* src) {
}

if (type == RDB_OPCODE_FULLSYNC_END) {
VLOG(0) << "GOT FULLSYNC OPCODE";
if (fullsyncb)
fullsyncb();
// notify full sync end
if (full_sync_cut_cb)
full_sync_cut_cb();
continue;
}

Expand Down
6 changes: 5 additions & 1 deletion src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}

std::function<void()> fullsyncb;
void SetFullSyncCutCb(std::function<void()> cb) {
full_sync_cut_cb = std::move(cb);
}

private:
struct ObjSettings;
Expand All @@ -196,6 +198,8 @@ class RdbLoader : protected RdbLoaderBase {
::boost::fibers::mutex mu_;
std::error_code ec_; // guarded by mu_
std::atomic_bool stop_early_{false};

std::function<void()> full_sync_cut_cb;
};

} // namespace dfly
1 change: 1 addition & 0 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/snapshot.h"
#include "server/rdb_extensions.h"
#include "util/fibers/simple_channel.h"

namespace dfly {
Expand Down
7 changes: 3 additions & 4 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -613,15 +613,14 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) {
io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss};

RdbLoader loader(NULL);
loader.fullsyncb = [this, sb, ran = false]() mutable {
if (ran) return;
{
loader.SetFullSyncCutCb([this, sb, ran = false]() mutable {
if (!ran) {
std::unique_lock lk(sb->mu_);
sb->flows_left--;
ran = true;
}
sb->cv_.notify_all();
};
});
loader.Load(&ps);

// Try finding eof token.
Expand Down
1 change: 1 addition & 0 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ void SliceSnapshot::SerializeEntriesFb() {
mu_.lock();
mu_.unlock();

for (int i = 0; i < 10; i++)
CHECK(!rdb_serializer_->SendFullSyncCut());
FlushSfile(true);

Expand Down
4 changes: 2 additions & 2 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {

// Fast path - for uncontended keys, just run the callback.
// That applies for single key operations like set, get, lpush etc.
if (shard->db_slice().CheckLock(mode, lock_args)) {
if (shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode)) {
RunQuickie(shard);
return true;
}
Expand All @@ -814,7 +814,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED);
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYLOCK_ACQUIRED;
DCHECK(!lock_acquired); // Because CheckLock above failed.
//DCHECK(!lock_acquired); // Because CheckLock above failed.

DVLOG(1) << "Rescheduling into TxQueue " << DebugId();

Expand Down
Loading

0 comments on commit 0e08789

Please sign in to comment.