Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(server): reject replicaof while loading from snapshot #2338

Merged
merged 5 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,26 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
if (ReplicaOfFlag flag = GetFlag(FLAGS_replicaof); flag.has_value()) {
service_.proactor_pool().GetNextProactor()->Await(
chakaz marked this conversation as resolved.
Show resolved Hide resolved
[this, &flag]() { this->Replicate(flag.host, flag.port); });
return; // DONT load any snapshots
} else { // load from snapshot only if --replicaof is empty
LoadFromSnapshot();
}

const auto load_path_result = snapshot_storage_->LoadPath(flag_dir, GetFlag(FLAGS_dbfilename));
const auto create_snapshot_schedule_fb = [this] {
snapshot_schedule_fb_ =
service_.proactor_pool().GetNextProactor()->LaunchFiber([this] { SnapshotScheduling(); });
};
config_registry.RegisterMutable(
"snapshot_cron", [this, create_snapshot_schedule_fb](const absl::CommandLineFlag& flag) {
JoinSnapshotSchedule();
create_snapshot_schedule_fb();
return true;
});
create_snapshot_schedule_fb();
}

void ServerFamily::LoadFromSnapshot() {
const auto load_path_result =
snapshot_storage_->LoadPath(GetFlag(FLAGS_dir), GetFlag(FLAGS_dbfilename));
if (load_path_result) {
const std::string load_path = *load_path_result;
if (!load_path.empty()) {
Expand All @@ -507,19 +523,6 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
LOG(ERROR) << "Failed to load snapshot: " << load_path_result.error().Format();
}
}

const auto create_snapshot_schedule_fb = [this] {
snapshot_schedule_fb_ =
service_.proactor_pool().GetNextProactor()->LaunchFiber([this] { SnapshotScheduling(); });
};
config_registry.RegisterMutable(
"snapshot_cron", [this, create_snapshot_schedule_fb](const absl::CommandLineFlag& flag) {
JoinSnapshotSchedule();
create_snapshot_schedule_fb();
return true;
});

create_snapshot_schedule_fb();
}

void ServerFamily::JoinSnapshotSchedule() {
Expand Down Expand Up @@ -1937,9 +1940,14 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx,
ActionOnConnectionFail on_err) {
LOG(INFO) << "Replicating " << host << ":" << port_sv;

unique_lock lk(replicaof_mu_); // Only one REPLICAOF command can run at a time

// We should not execute replica of command while loading from snapshot.
if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) {
cntx->SendError("Can not execute during LOADING");
return;
}

// If NO ONE was supplied, just stop the current replica (if it exists)
if (IsReplicatingNoOne(host, port_sv)) {
if (!ServerState::tlocal()->is_master) {
Expand Down
1 change: 1 addition & 0 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class ServerFamily {

private:
void JoinSnapshotSchedule();
void LoadFromSnapshot();

uint32_t shard_count() const {
return shard_set->size();
Expand Down
30 changes: 30 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1806,3 +1806,33 @@ async def test_client_pause_with_replica(df_local_factory, df_seeder_factory):
assert await seeder.compare(capture, port=replica.port)

await disconnect_clients(c_master, c_replica)


async def test_replicaof_reject_on_load(df_local_factory, df_seeder_factory):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_local_factory.create()
replica = df_local_factory.create(dbfilename=f"dump_{tmp_file_name}")
df_local_factory.start_all([master, replica])

seeder = df_seeder_factory.create(port=replica.port, keys=30000)
await seeder.run(target_deviation=0.1)
c_replica = replica.client()
dbsize = await c_replica.dbsize()
assert dbsize >= 9000

replica.stop()
replica.start()
c_replica = replica.client()
# Check replica of not alowed while loading snapshot
try:
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
assert False
except aioredis.ResponseError as e:
assert "Can not execute during LOADING" in str(e)
# Check one we finish loading snapshot replicaof success
await wait_available_async(c_replica)
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")

await c_replica.close()
master.stop()
replica.stop()
Loading