From 64d2d0b4f193a1907226952def4902b976891caa Mon Sep 17 00:00:00 2001 From: shahar Date: Tue, 13 Aug 2024 15:54:03 +0300 Subject: [PATCH 1/9] feat(cluster): Allow appending RDB to existing store This change makes the following changes: 1. Removes `DEBUG LOAD`, as we already have `DFLY LOAD` 2. Adds `APPEND` option to `DFLY LOAD` that loads an RDB _without_ first flushing the data store, overriding existing keys 3. Does not load keys belonging to unowned slots, if in cluster mode --- src/server/debugcmd.cc | 64 ++++++++----------------------------- src/server/debugcmd.h | 3 -- src/server/dflycmd.cc | 57 ++++++++++++++++++++++++++++++--- src/server/dflycmd.h | 2 ++ src/server/rdb_load.cc | 14 ++++++-- src/server/rdb_load.h | 4 ++- src/server/replica.cc | 4 +-- src/server/server_family.cc | 45 ++++++++++++++++++++------ src/server/server_family.h | 8 +++-- 9 files changed, 127 insertions(+), 74 deletions(-) diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 2f66b9a2cc1b..5bdc6ebc73ac 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -431,10 +431,6 @@ void DebugCmd::Run(CmdArgList args) { return Watched(); } - if (subcmd == "LOAD" && args.size() == 2) { - return Load(ArgS(args, 1)); - } - if (subcmd == "OBJECT" && args.size() >= 2) { string_view key = ArgS(args, 1); args.remove_prefix(2); @@ -500,7 +496,19 @@ void DebugCmd::Reload(CmdArgList args) { } string last_save_file = sf_.GetLastSaveInfo().file_name; - Load(last_save_file); + + sf_.FlushAll(cntx_); + + if (auto fut_ec = sf_.Load(last_save_file, RdbLoader::ExistingKeys::kFail); fut_ec) { + GenericError ec = fut_ec->Get(); + if (ec) { + string msg = ec.Format(); + LOG(WARNING) << "Could not load file " << msg; + return cntx_->SendError(msg); + } + } + + cntx_->SendOk(); } void DebugCmd::Replica(CmdArgList args) { @@ -529,52 +537,6 @@ void DebugCmd::Replica(CmdArgList args) { return cntx_->SendError(UnknownSubCmd("replica", "DEBUG")); } -void DebugCmd::Load(string_view filename) { - if (!ServerState::tlocal()->is_master) { - return cntx_->SendError("Replica cannot load data"); - } - - auto new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); - - if (new_state != GlobalState::LOADING) { - LOG(WARNING) << new_state << " in progress, ignored"; - return cntx_->SendError("Could not load file"); - } - - absl::Cleanup rev_state = [this] { - sf_.service().SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - }; - - const CommandId* cid = sf_.service().FindCmd("FLUSHALL"); - intrusive_ptr flush_trans(new Transaction{cid}); - flush_trans->InitByArgs(cntx_->ns, 0, {}); - VLOG(1) << "Performing flush"; - error_code ec = sf_.Drakarys(flush_trans.get(), DbSlice::kDbAll); - if (ec) { - LOG(ERROR) << "Error flushing db " << ec.message(); - } - - fs::path path(filename); - - if (filename.empty()) { - fs::path dir_path(GetFlag(FLAGS_dir)); - string filename = GetFlag(FLAGS_dbfilename); - dir_path.append(filename); - path = dir_path; - } - - if (auto fut_ec = sf_.Load(path.generic_string()); fut_ec) { - GenericError ec = fut_ec->Get(); - if (ec) { - string msg = ec.Format(); - LOG(WARNING) << "Could not load file " << msg; - return cntx_->SendError(msg); - } - } - - cntx_->SendOk(); -} - optional DebugCmd::ParsePopulateArgs(CmdArgList args) { if (args.size() < 2) { cntx_->SendError(UnknownSubCmd("populate", "DEBUG")); diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index 607e0ecb9a30..de8ead10e04f 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -30,9 +30,6 @@ class DebugCmd { void Run(CmdArgList args); - // A public function that loads a snapshot. - void Load(std::string_view filename); - static void Shutdown(); private: diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 82b15b6a95fd..e412a9bb104d 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -163,11 +163,25 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { return ReplicaOffset(args, cntx); } - if (sub_cmd == "LOAD" && args.size() == 2) { - DebugCmd debug_cmd{sf_, cntx}; - debug_cmd.Load(ArgS(args, 1)); - return; + if (sub_cmd == "LOAD") { + return Load(args, cntx); + } + + if (sub_cmd == "HELP") { + // TODO: expand this to all sub commands + string_view help_arr[] = { + "DFLY [ [value] [opt] ...]. Subcommands are:", + "LOAD [APPEND]", + " Loads RDB/DFS file into the data store.", + " * APPEND: Existing keys are NOT removed before loading the file, conflicting ", + " keys (that exist in both data store and in file) are overridden.", + "HELP", + " Prints this help.", + }; + auto* rb = static_cast(cntx->reply_builder()); + return rb->SendSimpleStrArr(help_arr); } + cntx->SendError(kSyntaxErr); } @@ -500,6 +514,41 @@ void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) { } } +void DflyCmd::Load(CmdArgList args, ConnectionContext* cntx) { + CmdArgParser parser{args}; + parser.ExpectTag("LOAD"); + string_view filename = parser.Next(); + RdbLoader::ExistingKeys existing_keys = RdbLoader::ExistingKeys::kFail; + + if (parser.HasNext()) { + parser.ExpectTag("APPEND"); + existing_keys = RdbLoader::ExistingKeys::kOverride; + } + + if (parser.HasNext()) { + parser.Error(); + } + + if (parser.HasError()) { + return cntx->SendError(kSyntaxErr); + } + + if (existing_keys == RdbLoader::ExistingKeys::kFail) { + sf_->FlushAll(cntx); + } + + if (auto fut_ec = sf_->Load(filename, existing_keys); fut_ec) { + GenericError ec = fut_ec->Get(); + if (ec) { + string msg = ec.Format(); + LOG(WARNING) << "Could not load file " << msg; + return cntx->SendError(msg); + } + } + + cntx->SendOk(); +} + OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(!flow->full_sync_fb.IsJoinable()); DCHECK(shard); diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 0112871a3a89..6f52dff733fb 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -199,6 +199,8 @@ class DflyCmd { // Return journal records num sent for each flow of replication. void ReplicaOffset(CmdArgList args, ConnectionContext* cntx); + void Load(CmdArgList args, ConnectionContext* cntx); + // Start full sync in thread. Start FullSyncFb. Called for each flow. facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 926f16af5d6c..bb011baecd15 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -35,6 +35,8 @@ extern "C" { #include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" +#include "server/cluster/cluster_defs.h" +#include "server/cluster/cluster_family.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/hset_family.h" @@ -1917,8 +1919,9 @@ struct RdbLoader::ObjSettings { ObjSettings() = default; }; -RdbLoader::RdbLoader(Service* service) +RdbLoader::RdbLoader(Service* service, ExistingKeys existing_keys) : service_{service}, + existing_keys_{existing_keys}, script_mgr_{service == nullptr ? nullptr : service->script_mgr()}, shard_buf_{shard_set->size()} { } @@ -2481,7 +2484,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { auto& res = *op_res; res.it->first.SetSticky(item->is_sticky); - if (!res.is_new) { + if (existing_keys_ == ExistingKeys::kFail && !res.is_new) { LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind; } @@ -2520,6 +2523,13 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { return ec; } + if (cluster::IsClusterEnabled()) { + const cluster::ClusterConfig* cluster_config = cluster::ClusterFamily::cluster_config(); + if (cluster_config != nullptr && !cluster_config->IsMySlot(item->key)) { + return kOk; // Ignoring item + } + } + /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 60fa0e1940fe..640d8fe331ec 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -177,7 +177,8 @@ class RdbLoaderBase { class RdbLoader : protected RdbLoaderBase { public: - explicit RdbLoader(Service* service); + enum class ExistingKeys { kFail, kOverride }; + explicit RdbLoader(Service* service, ExistingKeys existing_keys); ~RdbLoader(); @@ -273,6 +274,7 @@ class RdbLoader : protected RdbLoaderBase { private: Service* service_; + ExistingKeys existing_keys_; ScriptMgr* script_mgr_; std::vector shard_buf_; diff --git a/src/server/replica.cc b/src/server/replica.cc index 2189f703ecf6..d146b369f83f 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -423,7 +423,7 @@ error_code Replica::InitiatePSync() { JournalExecutor{&service_}.FlushAll(); } - RdbLoader loader(NULL); + RdbLoader loader(NULL, RdbLoader::ExistingKeys::kFail); loader.set_source_limit(snapshot_size); // TODO: to allow registering callbacks within loader to send '\n' pings back to master. // Also to allow updating last_io_time_. @@ -933,7 +933,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m multi_shard_exe_(multi_shard_exe), flow_id_(flow_id) { executor_ = std::make_unique(service); - rdb_loader_ = std::make_unique(&service_); + rdb_loader_ = std::make_unique(&service_, RdbLoader::ExistingKeys::kFail); } DflyShardReplica::~DflyShardReplica() { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 32eb7cca66e8..c57860916627 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -50,7 +50,6 @@ extern "C" { #include "server/main_service.h" #include "server/memory_cmd.h" #include "server/protocol_client.h" -#include "server/rdb_load.h" #include "server/rdb_save.h" #include "server/script_mgr.h" #include "server/server_state.h" @@ -874,7 +873,7 @@ void ServerFamily::LoadFromSnapshot() { if (load_path_result) { const std::string load_path = *load_path_result; if (!load_path.empty()) { - load_result_ = Load(load_path); + load_result_ = Load(load_path, RdbLoader::ExistingKeys::kFail); } } else { if (std::error_code(load_path_result.error()) == std::errc::no_such_file_or_directory) { @@ -935,13 +934,40 @@ struct AggregateLoadResult { std::atomic keys_read; }; +void ServerFamily::FlushAll(ConnectionContext* cntx) { + const CommandId* cid = service_.FindCmd("FLUSHALL"); + boost::intrusive_ptr flush_trans(new Transaction{cid}); + flush_trans->InitByArgs(cntx->ns, 0, {}); + VLOG(1) << "Performing flush"; + error_code ec = Drakarys(flush_trans.get(), DbSlice::kDbAll); + if (ec) { + LOG(ERROR) << "Error flushing db " << ec.message(); + } +} + // Load starts as many fibers as there are files to load each one separately. // It starts one more fiber that waits for all load fibers to finish and returns the first // error (if any occured) with a future. -std::optional> ServerFamily::Load(const std::string& load_path) { +std::optional> ServerFamily::Load(string_view load_path, + RdbLoader::ExistingKeys existing_keys) { + fs::path path(load_path); + + if (load_path.empty()) { + fs::path dir_path(GetFlag(FLAGS_dir)); + string filename = GetFlag(FLAGS_dbfilename); + dir_path.append(filename); + path = dir_path; + } + DCHECK_GT(shard_count(), 0u); - auto paths_result = snapshot_storage_->LoadPaths(load_path); + if (!ServerState::tlocal()->is_master) { + fb2::Future future; + future.Resolve(string("Replica cannot load data")); + return future; + } + + auto paths_result = snapshot_storage_->LoadPaths(path.generic_string()); if (!paths_result) { LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format(); @@ -952,7 +978,7 @@ std::optional> ServerFamily::Load(const std::string& l std::vector paths = *paths_result; - LOG(INFO) << "Loading " << load_path; + LOG(INFO) << "Loading " << path.generic_string(); auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); if (new_state != GlobalState::LOADING) { @@ -979,8 +1005,8 @@ std::optional> ServerFamily::Load(const std::string& l proactor = pool.GetNextProactor(); } - auto load_fiber = [this, aggregated_result, path = std::move(path)]() { - auto load_result = LoadRdb(path); + auto load_fiber = [this, aggregated_result, existing_keys, path = std::move(path)]() { + auto load_result = LoadRdb(path, existing_keys); if (load_result.has_value()) aggregated_result->keys_read.fetch_add(*load_result); else @@ -1040,13 +1066,14 @@ void ServerFamily::SnapshotScheduling() { } } -io::Result ServerFamily::LoadRdb(const std::string& rdb_file) { +io::Result ServerFamily::LoadRdb(const std::string& rdb_file, + RdbLoader::ExistingKeys existing_keys) { error_code ec; io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file); if (res) { io::FileSource fs(*res); - RdbLoader loader{&service_}; + RdbLoader loader{&service_, existing_keys}; ec = loader.Load(&fs); if (!ec) { VLOG(1) << "Done loading RDB from " << rdb_file << ", keys loaded: " << loader.keys_loaded(); diff --git a/src/server/server_family.h b/src/server/server_family.h index 5c38bc4406fd..1c0ae08a8526 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -16,6 +16,7 @@ #include "server/dflycmd.h" #include "server/engine_shard_set.h" #include "server/namespaces.h" +#include "server/rdb_load.h" #include "server/replica.h" #include "server/server_state.h" #include "util/fibers/fiberqueue_threadpool.h" @@ -189,9 +190,12 @@ class ServerFamily { LastSaveInfo GetLastSaveInfo() const; + void FlushAll(ConnectionContext* cntx); + // Load snapshot from file (.rdb file or summary.dfs file) and return // future with error_code. - std::optional> Load(const std::string& file_name); + std::optional> Load(std::string_view file_name, + RdbLoader::ExistingKeys existing_keys); bool TEST_IsSaving() const; @@ -279,7 +283,7 @@ class ServerFamily { void ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, ActionOnConnectionFail on_error); // Returns the number of loaded keys if successful. - io::Result LoadRdb(const std::string& rdb_file); + io::Result LoadRdb(const std::string& rdb_file, RdbLoader::ExistingKeys existing_keys); void SnapshotScheduling(); From 921e3a72bc18a89ebcb0e14f5cd8e1e9475b3184 Mon Sep 17 00:00:00 2001 From: shahar Date: Tue, 13 Aug 2024 21:54:58 +0300 Subject: [PATCH 2/9] fix tests --- src/server/cluster/cluster_family_test.cc | 2 +- src/server/rdb_test.cc | 18 +++++++++--------- tests/dragonfly/snapshot_test.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index feaa1d6d5875..a2489e1e948b 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -597,7 +597,7 @@ TEST_F(ClusterFamilyTest, ClusterFirstConfigCallDropsEntriesNotOwnedByNode) { EXPECT_EQ(Run({"save", "df"}), "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); - EXPECT_EQ(Run({"debug", "load", save_info.file_name}), "OK"); + EXPECT_EQ(Run({"dfly", "load", save_info.file_name}), "OK"); EXPECT_EQ(CheckedInt({"dbsize"}), 50000); ConfigSingleNodeCluster("abcd1234"); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index b56a6162d9f9..32853e82c0fe 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -85,14 +85,14 @@ TEST_F(RdbTest, Crc) { TEST_F(RdbTest, LoadEmpty) { io::FileSource fs = GetSource("empty.rdb"); - RdbLoader loader(NULL); + RdbLoader loader(NULL, RdbLoader::ExistingKeys::kFail); auto ec = loader.Load(&fs); CHECK(!ec); } TEST_F(RdbTest, LoadSmall6) { io::FileSource fs = GetSource("redis6_small.rdb"); - RdbLoader loader{service_.get()}; + RdbLoader loader{service_.get(), RdbLoader::ExistingKeys::kFail}; // must run in proactor thread in order to avoid polluting the serverstate // in the main, testing thread. @@ -129,7 +129,7 @@ TEST_F(RdbTest, LoadSmall6) { TEST_F(RdbTest, Stream) { io::FileSource fs = GetSource("redis6_stream.rdb"); - RdbLoader loader{service_.get()}; + RdbLoader loader{service_.get(), RdbLoader::ExistingKeys::kFail}; // must run in proactor thread in order to avoid polluting the serverstate // in the main, testing thread. @@ -167,7 +167,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { ASSERT_EQ(resp, "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); - resp = Run({"debug", "load", save_info.file_name}); + resp = Run({"dfly", "load", save_info.file_name}); ASSERT_EQ(resp, "OK"); ASSERT_EQ(50000, CheckedInt({"dbsize"})); } @@ -182,7 +182,7 @@ TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) { ASSERT_EQ(resp, "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); - resp = Run({"debug", "load", save_info.file_name}); + resp = Run({"dfly", "load", save_info.file_name}); ASSERT_EQ(resp, "OK"); } @@ -265,7 +265,7 @@ TEST_F(RdbTest, ReloadExpired) { ASSERT_EQ(resp, "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); AdvanceTime(2000); - resp = Run({"debug", "load", save_info.file_name}); + resp = Run({"dfly", "load", save_info.file_name}); ASSERT_EQ(resp, "OK"); resp = Run({"get", "key"}); ASSERT_THAT(resp, ArgType(RespExpr::NIL)); @@ -448,7 +448,7 @@ class HllRdbTest : public RdbTest, public testing::WithParamInterface {} TEST_P(HllRdbTest, Hll) { io::FileSource fs = GetSource("hll.rdb"); - RdbLoader loader{service_.get()}; + RdbLoader loader{service_.get(), RdbLoader::ExistingKeys::kFail}; // must run in proactor thread in order to avoid polluting the serverstate // in the main, testing thread. @@ -479,7 +479,7 @@ TEST_F(RdbTest, LoadSmall7) { // 3. A set called my-set encoded as RDB_TYPE_SET_LISTPACK // 4. A zset called my-zset encoded as RDB_TYPE_ZSET_LISTPACK io::FileSource fs = GetSource("redis7_small.rdb"); - RdbLoader loader{service_.get()}; + RdbLoader loader{service_.get(), RdbLoader::ExistingKeys::kFail}; // must run in proactor thread in order to avoid polluting the serverstate // in the main, testing thread. @@ -521,7 +521,7 @@ TEST_F(RdbTest, RedisJson) { // '{"company":"DragonflyDB","product":"Dragonfly","website":"https://dragondlydb.io","years-active":[2021,2022,2023,2024,"and // more!"]}' io::FileSource fs = GetSource("redis_json.rdb"); - RdbLoader loader{service_.get()}; + RdbLoader loader{service_.get(), RdbLoader::ExistingKeys::kFail}; // must run in proactor thread in order to avoid polluting the serverstate // in the main, testing thread. diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index b96407e3437d..8c5c1655b1e3 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -271,7 +271,7 @@ async def test_s3_snapshot(self, async_client): await async_client.execute_command("SAVE DF snapshot") assert await async_client.flushall() await async_client.execute_command( - "DEBUG LOAD " + "DFLY LOAD " + os.environ["DRAGONFLY_S3_BUCKET"] + str(self.tmp_dir) + "/snapshot-summary.dfs" From f047798a31d2caa02ec2e889dbcc3b1708318825 Mon Sep 17 00:00:00 2001 From: shahar Date: Wed, 14 Aug 2024 08:34:04 +0300 Subject: [PATCH 3/9] fix regtests --- tests/dragonfly/snapshot_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 984482d27661..a7070f178d18 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -451,7 +451,7 @@ async def test_tiered_entries(async_client: aioredis.Redis): await async_client.execute_command("SAVE", "DF") assert await async_client.flushall() await async_client.execute_command( - "DEBUG", + "DFLY", "LOAD", "tiered-entries-summary.dfs", ) @@ -488,7 +488,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis): load_task = asyncio.create_task( async_client.execute_command( - "DEBUG", + "DFLY", "LOAD", "tiered-entries-summary.dfs", ) From e39b34ff3a34a06cd970712a973f44367116c735 Mon Sep 17 00:00:00 2001 From: shahar Date: Wed, 14 Aug 2024 09:05:49 +0300 Subject: [PATCH 4/9] update HELP --- src/server/debugcmd.cc | 1 - src/server/dflycmd.cc | 10 +++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 5bdc6ebc73ac..e6cdb18c3837 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -373,7 +373,6 @@ void DebugCmd::Run(CmdArgList args) { " arguments. Each descriptor is prefixed by its frequency count", "OBJECT [COMPRESS]", " Show low-level info about `key` and associated value.", - "LOAD ", "RELOAD [option ...]", " Save the RDB on disk and reload it back to memory. Valid