Skip to content

Commit

Permalink
fix(server): Small fixes in replication code
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Nov 14, 2022
1 parent aa18ee2 commit d57a17e
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 27 deletions.
2 changes: 2 additions & 0 deletions src/redis/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))

/* Range 200-240 is used by Dragonfly specific opcodes */

/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
Expand Down
10 changes: 5 additions & 5 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,10 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
return OpStatus::OK;
}

std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update) noexcept(false) {
std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx,
std::string_view key, PrimeValue obj,
uint64_t expire_at_ms,
bool force_update) noexcept(false) {
DCHECK(!obj.IsRef());

pair<PrimeIterator, bool> res = AddOrFind(cntx, key);
Expand All @@ -600,11 +601,10 @@ std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx,
}

pair<PrimeIterator, bool> DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true);
}


pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false);
Expand Down
8 changes: 4 additions & 4 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ class DbSlice {
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(const Context& cntx,
std::string_view key) noexcept(false);

// Same as AddEntry, but overwrites in case entry exists. Returns second=true
// if insertion took place.
std::pair<PrimeIterator, bool> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);
// Same as AddEntry, but overwrites in case entry exists.
// Returns second=true if insertion took place.
std::pair<PrimeIterator, bool> AddOrUpdate(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms) noexcept(false);

// Returns second=true if insertion took place, false otherwise.
// expire_at_ms equal to 0 - means no expiry.
Expand Down
4 changes: 1 addition & 3 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ void DflyCmd::OnClose(ConnectionContext* cntx) {
if (!session_id)
return;

VLOG(0) << "Disconnected !!! " << flow_id;

if (flow_id == kuint32max) {
DeleteSyncSession(session_id);
} else {
Expand Down Expand Up @@ -290,7 +288,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
string_view sync_id_str = ArgS(args, 2);

VLOG(0) << "Got DFLY STARTSTABLE " << sync_id_str;
VLOG(1) << "Got DFLY STARTSTABLE " << sync_id_str;

auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb);
if (!sync_id)
Expand Down
4 changes: 2 additions & 2 deletions src/server/rdb_extensions.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#pragma once

/*
Opcode range 230-240 is used by DF extensions.
Range 200-240 is used by DF extensions.
*/

const uint8_t RDB_OPCODE_FULLSYNC_END = 230;
const uint8_t RDB_OPCODE_FULLSYNC_END = 200;
2 changes: 1 addition & 1 deletion src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ extern "C" {
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/snapshot.h"
#include "server/rdb_extensions.h"
#include "server/snapshot.h"
#include "util/fibers/simple_channel.h"

namespace dfly {
Expand Down
9 changes: 2 additions & 7 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -450,10 +450,9 @@ error_code Replica::InitiateDflySync() {

// Wait for all flows to receive full sync cut.
{
VLOG(0) << "BEFORE FULL SYNC WAIT";
VLOG(1) << "Blocking before full sync cut";
std::unique_lock lk(sb.mu_);
sb.cv_.wait(lk, [&]() { return sb.flows_left == 0; });
VLOG(0) << "AFTER FULL SYNC WAIT";
}

LOG(INFO) << "Full sync finished";
Expand Down Expand Up @@ -520,8 +519,6 @@ error_code Replica::ConsumeDflyStream() {
for (auto& sub_repl : shard_flows_)
sub_repl->sync_fb_.join();

VLOG(0) << "FULL SYNC FIBERS JOINED";

AggregateError all_ec;
vector<vector<unsigned>> partition = Partition(num_df_flows_);
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) {
Expand Down Expand Up @@ -636,7 +633,7 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) {
}
}

// Keep laoder leftover.
// Keep loader leftover.
io::Bytes unused = chained_tail.unused_prefix();
if (unused.size() > 0) {
leftover_buf_.reset(new base::IoBuf{unused.size()});
Expand Down Expand Up @@ -679,8 +676,6 @@ void Replica::StableSyncDflyFb() {
ec = ParseAndExecute(&io_buf);
}

VLOG(0) << "GOT EC " << ec.message();

return;
}

Expand Down
6 changes: 5 additions & 1 deletion src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Replica {
R_SYNC_OK = 0x10,
};

// A generic barrier that is used for waiting for
// flow fibers to become ready for the stable state switch.
struct SyncBlock {
SyncBlock(unsigned flows) : flows_left{flows} {
}
Expand Down Expand Up @@ -87,11 +89,13 @@ class Replica {
// Start replica initialized as dfly flow.
std::error_code StartFullSyncFlow(SyncBlock* block);

// Transition into stable state mode as dfly flow.
std::error_code StartStableSyncFlow();

// Single flow Dragonfly full sync fiber spawned by StartFullSyncFlow.
// Single flow full sync fiber spawned by StartFullSyncFlow.
void FullSyncDflyFb(SyncBlock* block, std::string eof_token);

// Single flow stable state sync fiber spawned by StartStableSyncFlow.
void StableSyncDflyFb();

private: /* Utility */
Expand Down
8 changes: 4 additions & 4 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
import pytest
import asyncio
import aioredis
import redis

from .utility import *
from . import dfly_args


BASE_PORT = 1111
Expand All @@ -22,6 +20,7 @@
(4, [1] * 12, 10000, 4000),
]


@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, n_keys, n_stream_keys", replication_cases)
async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_stream_keys):
Expand All @@ -46,7 +45,7 @@ async def stream_data():
""" Stream data during stable state replication phase and afterwards """
gen = gen_test_data(n_stream_keys, seed=2)
for chunk in grouper(3, gen):
await c_master.mset({k:v for k,v in chunk})
await c_master.mset({k: v for k, v in chunk})

async def run_replication(c_replica):
await c_replica.execute_command("REPLICAOF localhost " + str(master.port))
Expand All @@ -65,7 +64,8 @@ async def check_replication(c_replica):
await asyncio.gather(*(asyncio.create_task(run_replication(c))
for c in c_replicas))

assert not stream_fut.done(), "Weak testcase. Increase number of streamed keys to surpass full sync"
assert not stream_fut.done(
), "Weak testcase. Increase number of streamed keys to surpass full sync"
await stream_fut

# Check full sync results
Expand Down

0 comments on commit d57a17e

Please sign in to comment.