Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cluster): Allow appending RDB to existing store #3505

Merged
merged 11 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ TEST_F(ClusterFamilyTest, ClusterFirstConfigCallDropsEntriesNotOwnedByNode) {
EXPECT_EQ(Run({"save", "df"}), "OK");

auto save_info = service_->server_family().GetLastSaveInfo();
EXPECT_EQ(Run({"debug", "load", save_info.file_name}), "OK");
EXPECT_EQ(Run({"dfly", "load", save_info.file_name}), "OK");
EXPECT_EQ(CheckedInt({"dbsize"}), 50000);

ConfigSingleNodeCluster("abcd1234");
Expand Down
65 changes: 13 additions & 52 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,6 @@ void DebugCmd::Run(CmdArgList args) {
" arguments. Each descriptor is prefixed by its frequency count",
"OBJECT <key> [COMPRESS]",
" Show low-level info about `key` and associated value.",
"LOAD <filename>",
"RELOAD [option ...]",
" Save the RDB on disk and reload it back to memory. Valid <option> values:",
" * NOSAVE: the database will be loaded from an existing RDB file.",
Expand Down Expand Up @@ -431,10 +430,6 @@ void DebugCmd::Run(CmdArgList args) {
return Watched();
}

if (subcmd == "LOAD" && args.size() == 2) {
return Load(ArgS(args, 1));
}

if (subcmd == "OBJECT" && args.size() >= 2) {
string_view key = ArgS(args, 1);
args.remove_prefix(2);
Expand Down Expand Up @@ -500,7 +495,19 @@ void DebugCmd::Reload(CmdArgList args) {
}

string last_save_file = sf_.GetLastSaveInfo().file_name;
Load(last_save_file);

sf_.FlushAll(cntx_);

if (auto fut_ec = sf_.Load(last_save_file, ServerFamily::LoadExistingKeys::kFail); fut_ec) {
GenericError ec = fut_ec->Get();
if (ec) {
string msg = ec.Format();
LOG(WARNING) << "Could not load file " << msg;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's error

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this code from below
We shouldn't get alerts for failing debug commands IMO

return cntx_->SendError(msg);
}
}

cntx_->SendOk();
}

void DebugCmd::Replica(CmdArgList args) {
Expand Down Expand Up @@ -529,52 +536,6 @@ void DebugCmd::Replica(CmdArgList args) {
return cntx_->SendError(UnknownSubCmd("replica", "DEBUG"));
}

void DebugCmd::Load(string_view filename) {
if (!ServerState::tlocal()->is_master) {
return cntx_->SendError("Replica cannot load data");
}

auto new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);

if (new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
return cntx_->SendError("Could not load file");
}

absl::Cleanup rev_state = [this] {
sf_.service().SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
};

const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
intrusive_ptr<Transaction> flush_trans(new Transaction{cid});
flush_trans->InitByArgs(cntx_->ns, 0, {});
VLOG(1) << "Performing flush";
error_code ec = sf_.Drakarys(flush_trans.get(), DbSlice::kDbAll);
if (ec) {
LOG(ERROR) << "Error flushing db " << ec.message();
}

fs::path path(filename);

if (filename.empty()) {
fs::path dir_path(GetFlag(FLAGS_dir));
string filename = GetFlag(FLAGS_dbfilename);
dir_path.append(filename);
path = dir_path;
}

if (auto fut_ec = sf_.Load(path.generic_string()); fut_ec) {
GenericError ec = fut_ec->Get();
if (ec) {
string msg = ec.Format();
LOG(WARNING) << "Could not load file " << msg;
return cntx_->SendError(msg);
}
}

cntx_->SendOk();
}

optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args) {
if (args.size() < 2) {
cntx_->SendError(UnknownSubCmd("populate", "DEBUG"));
Expand Down
3 changes: 0 additions & 3 deletions src/server/debugcmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ class DebugCmd {

void Run(CmdArgList args);

// A public function that loads a snapshot.
void Load(std::string_view filename);

static void Shutdown();

private:
Expand Down
65 changes: 61 additions & 4 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,33 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return ReplicaOffset(args, cntx);
}

if (sub_cmd == "LOAD" && args.size() == 2) {
DebugCmd debug_cmd{sf_, cntx};
debug_cmd.Load(ArgS(args, 1));
return;
if (sub_cmd == "LOAD") {
return Load(args, cntx);
}

if (sub_cmd == "HELP") {
string_view help_arr[] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It strange for me that help returns array, please consider to use R"()" string literal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a usual thing to return an array for commands in valkey

Copy link
Collaborator Author

@chakaz chakaz Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, as a human user it's easier to see errors like this than "line1\nline2" etc

"DFLY <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"THREAD",
" Returns connection thread index and number of threads",
"THREAD <thread-id>",
" Migrates connection to thread <thread-id>",
"EXPIRE",
" Collects all expired items.",
"REPLICAOFFSET",
" Returns LSN (log sequence number) per shard. These are the sequential ids of the ",
" journal entry.",
"LOAD <filename> [APPEND]",
" Loads <filename> RDB/DFS file into the data store.",
" * APPEND: Existing keys are NOT removed before loading the file, conflicting ",
" keys (that exist in both data store and in file) are overridden.",
"HELP",
" Prints this help.",
};
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
return rb->SendSimpleStrArr(help_arr);
}

cntx->SendError(kSyntaxErr);
}

Expand Down Expand Up @@ -500,6 +522,41 @@ void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) {
}
}

void DflyCmd::Load(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
parser.ExpectTag("LOAD");
string_view filename = parser.Next();
ServerFamily::LoadExistingKeys existing_keys = ServerFamily::LoadExistingKeys::kFail;

if (parser.HasNext()) {
parser.ExpectTag("APPEND");
existing_keys = ServerFamily::LoadExistingKeys::kOverride;
}

if (parser.HasNext()) {
parser.Error();
}

if (parser.HasError()) {
return cntx->SendError(kSyntaxErr);
}

if (existing_keys == ServerFamily::LoadExistingKeys::kFail) {
sf_->FlushAll(cntx);
}

if (auto fut_ec = sf_->Load(filename, existing_keys); fut_ec) {
GenericError ec = fut_ec->Get();
if (ec) {
string msg = ec.Format();
LOG(WARNING) << "Could not load file " << msg;
return cntx->SendError(msg);
}
}

cntx->SendOk();
}

OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(!flow->full_sync_fb.IsJoinable());
DCHECK(shard);
Expand Down
2 changes: 2 additions & 0 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ class DflyCmd {
// Return journal records num sent for each flow of replication.
void ReplicaOffset(CmdArgList args, ConnectionContext* cntx);

void Load(CmdArgList args, ConnectionContext* cntx);

// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

Expand Down
11 changes: 10 additions & 1 deletion src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ extern "C" {
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
#include "server/cluster/cluster_defs.h"
#include "server/cluster/cluster_family.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/hset_family.h"
Expand Down Expand Up @@ -2481,7 +2483,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {

auto& res = *op_res;
res.it->first.SetSticky(item->is_sticky);
if (!res.is_new) {
if (!override_existing_keys_ && !res.is_new) {
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
}

Expand Down Expand Up @@ -2520,6 +2522,13 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
return ec;
}

if (!load_unowned_slots_ && cluster::IsClusterEnabled()) {
const cluster::ClusterConfig* cluster_config = cluster::ClusterFamily::cluster_config();
if (cluster_config != nullptr && !cluster_config->IsMySlot(item->key)) {
return kOk; // Ignoring item
}
}

/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
Expand Down
11 changes: 11 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,16 @@ class RdbLoader : protected RdbLoaderBase {

~RdbLoader();

void SetOverrideExistingKeys(bool override) {
override_existing_keys_ = override;
}

void SetLoadUnownedSlots(bool load_unowned) {
load_unowned_slots_ = load_unowned;
}

std::error_code Load(::io::Source* src);

void set_source_limit(size_t n) {
source_limit_ = n;
}
Expand Down Expand Up @@ -273,6 +282,8 @@ class RdbLoader : protected RdbLoaderBase {

private:
Service* service_;
bool override_existing_keys_ = false;
bool load_unowned_slots_ = false;
ScriptMgr* script_mgr_;
std::vector<ItemsBuf> shard_buf_;

Expand Down
28 changes: 25 additions & 3 deletions src/server/rdb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
ASSERT_EQ(resp, "OK");

auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"debug", "load", save_info.file_name});
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");
ASSERT_EQ(50000, CheckedInt({"dbsize"}));
}
Expand All @@ -182,7 +182,7 @@ TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) {
ASSERT_EQ(resp, "OK");

auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"debug", "load", save_info.file_name});
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");
}

Expand Down Expand Up @@ -265,7 +265,7 @@ TEST_F(RdbTest, ReloadExpired) {
ASSERT_EQ(resp, "OK");
auto save_info = service_->server_family().GetLastSaveInfo();
AdvanceTime(2000);
resp = Run({"debug", "load", save_info.file_name});
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");
resp = Run({"get", "key"});
ASSERT_THAT(resp, ArgType(RespExpr::NIL));
Expand Down Expand Up @@ -543,4 +543,26 @@ TEST_F(RdbTest, SBF) {
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
}

TEST_F(RdbTest, DflyLoadAppend) {
// Create an RDB with (k1,1) value in it saved as `filename`
EXPECT_EQ(Run({"set", "k1", "1"}), "OK");
EXPECT_EQ(Run({"save", "df"}), "OK");
string filename = service_->server_family().GetLastSaveInfo().file_name;

// Without APPEND option - db should be flushed
EXPECT_EQ(Run({"set", "k1", "TO-BE-FLUSHED"}), "OK");
EXPECT_EQ(Run({"set", "k2", "TO-BE-FLUSHED"}), "OK");
EXPECT_EQ(Run({"dfly", "load", filename}), "OK");
EXPECT_THAT(Run({"dbsize"}), IntArg(1));
EXPECT_EQ(Run({"get", "k1"}), "1");

// With APPEND option - db shouldn't be flushed, but k1 should be overridden
EXPECT_EQ(Run({"set", "k1", "TO-BE-OVERRIDDEN"}), "OK");
EXPECT_EQ(Run({"set", "k2", "2"}), "OK");
EXPECT_EQ(Run({"dfly", "load", filename, "append"}), "OK");
EXPECT_THAT(Run({"dbsize"}), IntArg(2));
EXPECT_EQ(Run({"get", "k1"}), "1");
EXPECT_EQ(Run({"get", "k2"}), "2");
}

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ error_code Replica::InitiatePSync() {
}

RdbLoader loader(NULL);
loader.SetLoadUnownedSlots(true);
loader.set_source_limit(snapshot_size);
// TODO: to allow registering callbacks within loader to send '\n' pings back to master.
// Also to allow updating last_io_time_.
Expand Down Expand Up @@ -935,6 +936,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
flow_id_(flow_id) {
executor_ = std::make_unique<JournalExecutor>(service);
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
rdb_loader_->SetLoadUnownedSlots(true);
}

DflyShardReplica::~DflyShardReplica() {
Expand Down
Loading
Loading