From a5694337bdb944439af26c424b3d36311deccc57 Mon Sep 17 00:00:00 2001 From: Borys Date: Sun, 19 Nov 2023 12:45:48 +0200 Subject: [PATCH 01/10] feat: add new command START-SLOT-MIGRATION --- src/server/cluster/cluster_family.cc | 40 ++++++++++++++++++++++++++++ src/server/cluster/cluster_family.h | 3 ++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 9dcce7987915..da7baa6bf8fe 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -12,6 +12,7 @@ #include "base/flags.h" #include "base/logging.h" #include "core/json_object.h" +#include "facade/cmd_arg_parser.h" #include "facade/dragonfly_connection.h" #include "facade/error.h" #include "server/acl/acl_commands_def.h" @@ -392,6 +393,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { return DflyClusterMyId(args, cntx); } else if (sub_cmd == "FLUSHSLOTS") { return DflyClusterFlushSlots(args, cntx); + } else if (sub_cmd == "START-SLOT-MIGRATION") { + return DflyClusterStartSlotMigration(args, cntx); } return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType); @@ -589,6 +592,43 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn return rb->SendOk(); } +void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx) { + SinkReplyBuilder* rb = cntx->reply_builder(); + + args.remove_prefix(1); // Removes "START-SLOT-MIGRATION" subcmd string + + // [ .. + CmdArgParser parser(args); + auto host_ip = parser.Next(); + auto port = parser.Next(); + std::vector slots; + tl_cluster_config->Initialize do { + auto slot_start = parser.Next(); + auto slot_end = parser.Next(); + slots.emplace_back(slot_start, slot_end); + } + while (parser.HasNext()) + + if (auto err = parser.Error(); err) + return (*cntx)->SendError(err->MakeReply()); + + SlotSet slots; + slots.reserve(args.size()); + for (size_t i = 0; i < args.size(); ++i) { + unsigned slot; + if (!absl::SimpleAtoi(ArgS(args, i), &slot) || (slot > ClusterConfig::kMaxSlotNum)) { + return rb->SendError(kSyntaxErrType); + } + slots.insert(static_cast(slot)); + } + + VLOG(1) << "Connecting to master"; + ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_); + RETURN_ON_ERR(check_connection_error(ec, kConnErr)); + + return rb->SendOk(); +} + using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx); inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 6a60f3bc7e77..bbd7061e0272 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -45,8 +45,9 @@ class ClusterFamily { void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx); void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx); void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); + void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx) - ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; + ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; ServerFamily* server_family_ = nullptr; }; From 2ca27659209e28f32141fb0b1c7179367edb5296 Mon Sep 17 00:00:00 2001 From: Borys Date: Sun, 26 Nov 2023 11:24:36 +0200 Subject: [PATCH 02/10] feat: add new command START_SLOT_MIGRATION --- src/server/CMakeLists.txt | 3 +- src/server/cluster/cluster_family.cc | 34 +++------- src/server/cluster/cluster_family.h | 4 +- src/server/cluster/cluster_slot_migration.cc | 64 +++++++++++++++++++ src/server/cluster/cluster_slot_migration.h | 21 ++++++ src/server/protocol_client.h | 1 + tests/dragonfly/cluster_test.py | 67 ++++++++++++++++++++ 7 files changed, 167 insertions(+), 27 deletions(-) create mode 100644 src/server/cluster/cluster_slot_migration.cc create mode 100644 src/server/cluster/cluster_slot_migration.h diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9f9a1caaa256..01be21810e43 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -39,7 +39,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc command_registry. set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc - cluster/cluster_family.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc + cluster/cluster_family.cc cluster/cluster_slot_migration.cc + acl/user.cc acl/user_registry.cc acl/acl_family.cc acl/validator.cc acl/helpers.cc) cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index da7baa6bf8fe..fddf81d6efbc 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -16,6 +16,7 @@ #include "facade/dragonfly_connection.h" #include "facade/error.h" #include "server/acl/acl_commands_def.h" +#include "server/cluster/cluster_slot_migration.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/dflycmd.h" @@ -597,34 +598,19 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon args.remove_prefix(1); // Removes "START-SLOT-MIGRATION" subcmd string - // [ .. CmdArgParser parser(args); - auto host_ip = parser.Next(); - auto port = parser.Next(); + auto [host_ip, port] = parser.Next(); std::vector slots; - tl_cluster_config->Initialize do { - auto slot_start = parser.Next(); - auto slot_end = parser.Next(); + do { + auto [slot_start, slot_end] = parser.Next(); slots.emplace_back(slot_start, slot_end); - } - while (parser.HasNext()) - - if (auto err = parser.Error(); err) - return (*cntx)->SendError(err->MakeReply()); - - SlotSet slots; - slots.reserve(args.size()); - for (size_t i = 0; i < args.size(); ++i) { - unsigned slot; - if (!absl::SimpleAtoi(ArgS(args, i), &slot) || (slot > ClusterConfig::kMaxSlotNum)) { - return rb->SendError(kSyntaxErrType); - } - slots.insert(static_cast(slot)); - } + } while (parser.HasNext()); + (void)slots; + if (auto err = parser.Error(); err) + return (*cntx)->SendError(err->MakeReply()); - VLOG(1) << "Connecting to master"; - ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_); - RETURN_ON_ERR(check_connection_error(ec, kConnErr)); + ClusterSlotMigration node(std::string(host_ip), port); + node.Start(cntx); return rb->SendOk(); } diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index bbd7061e0272..ce838c63cfbb 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -45,9 +45,9 @@ class ClusterFamily { void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx); void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx); void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); - void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx) + void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx); - ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; + ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; ServerFamily* server_family_ = nullptr; }; diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc new file mode 100644 index 000000000000..5b2698bd0f91 --- /dev/null +++ b/src/server/cluster/cluster_slot_migration.cc @@ -0,0 +1,64 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#include "server/cluster/cluster_slot_migration.h" + +#include + +#include "base/logging.h" +#include "server/error.h" +#include "server/main_service.h" + +ABSL_FLAG(int, source_connect_timeout_ms, 20000, + "Timeout for establishing connection to a replication master"); + +namespace dfly { + +using namespace std; +using absl::GetFlag; + +ClusterSlotMigration::ClusterSlotMigration(string host, uint16_t port) + : ProtocolClient(std::move(host), port) { +} + +ClusterSlotMigration::~ClusterSlotMigration() { +} + +error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { + VLOG(1) << "Starting slot migration"; + + auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code { + if (ec) { + (*cntx)->SendError(absl::StrCat(msg, ec.message())); + cntx_.Cancel(); + } + return ec; + }; + + VLOG(1) << "Resolving source host DNS"; + error_code ec = ResolveMasterDns(); + RETURN_ON_ERR(check_connection_error(ec, "could not resolve master dns")); + + VLOG(1) << "Connecting to source"; + ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); + RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source")); + + VLOG(1) << "Greeting"; + ec = Greet(); + RETURN_ON_ERR(check_connection_error(ec, "could not greet master ")); + + (*cntx)->SendOk(); + + return {}; +} + +error_code ClusterSlotMigration::Greet() { + ResetParser(false); + VLOG(1) << "greeting message handling"; + RETURN_ON_ERR(SendCommandAndReadResponse("PING")); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG")); + + return error_code{}; +} + +} // namespace dfly diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h new file mode 100644 index 000000000000..cee2747e5d31 --- /dev/null +++ b/src/server/cluster/cluster_slot_migration.h @@ -0,0 +1,21 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include "server/protocol_client.h" + +namespace dfly { + +class ClusterSlotMigration : ProtocolClient { + public: + ClusterSlotMigration(std::string source_host, uint16_t port); + ~ClusterSlotMigration(); + + std::error_code Start(ConnectionContext* cntx); + + private: + std::error_code Greet(); +}; + +} // namespace dfly diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index e553782e8c86..a3e6bff85f68 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include #include diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 416395d168e8..d78d83c82f2e 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -224,6 +224,10 @@ async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFact c_nodes_admin, ) + res = await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "START-SLOT-MIGRATION", "localhost", str(BASE_PORT + 1000), "5200", "5259" + ) + # Slot for "KEY1" is 5259 # Insert a key that should stay in node0 @@ -691,3 +695,66 @@ async def test_random_keys(): await test_random_keys() await client.close() + + +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): + # Check slot migration from one node to another + nodes = [ + df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) + for i in range(2) + ] + + df_local_factory.start_all(nodes) + + c_nodes = [node.client() for node in nodes] + c_nodes_admin = [node.admin_client() for node in nodes] + + node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) + + config = f""" + [ + {{ + "slot_ranges": [ + {{ + "start": 0, + "end": LAST_SLOT_CUTOFF + }} + ], + "master": {{ + "id": "{node_ids[0]}", + "ip": "localhost", + "port": {nodes[0].port} + }}, + "replicas": [] + }}, + {{ + "slot_ranges": [ + {{ + "start": NEXT_SLOT_CUTOFF, + "end": 16383 + }} + ], + "master": {{ + "id": "{node_ids[1]}", + "ip": "localhost", + "port": {nodes[1].port} + }}, + "replicas": [] + }} + ] + """ + + await push_config( + config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), + c_nodes_admin, + ) + + res = await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "START-SLOT-MIGRATION", "localhost", str(BASE_PORT + 1000), "5200", "5259" + ) + + assert "OK" == res + + await c_nodes_admin[0].close() + await c_nodes_admin[1].close() From 79ac9a18161a9c19517f0ca16a5c457533234adc Mon Sep 17 00:00:00 2001 From: Borys Date: Mon, 27 Nov 2023 21:47:02 +0200 Subject: [PATCH 03/10] feat: add MGRTConfig command --- src/facade/cmd_arg_parser.cc | 11 ++++- src/server/cluster/cluster_family.cc | 46 +++++++++++++++++--- src/server/cluster/cluster_family.h | 2 + src/server/cluster/cluster_slot_migration.cc | 28 +++++++++--- src/server/cluster/cluster_slot_migration.h | 5 ++- tests/dragonfly/cluster_test.py | 2 +- 6 files changed, 80 insertions(+), 14 deletions(-) diff --git a/src/facade/cmd_arg_parser.cc b/src/facade/cmd_arg_parser.cc index 01665cf49468..22cda2180c00 100644 --- a/src/facade/cmd_arg_parser.cc +++ b/src/facade/cmd_arg_parser.cc @@ -43,9 +43,16 @@ template T CmdArgParser::Num(size_t idx) { } else if constexpr (std::is_same_v) { if (absl::SimpleAtod(arg, &out)) return out; - } else if constexpr (std::is_integral_v) { + } else if constexpr (std::is_integral_v && sizeof(T) >= sizeof(int32_t)) { if (absl::SimpleAtoi(arg, &out)) return out; + } else if constexpr (std::is_integral_v && sizeof(T) < sizeof(int32_t)) { + int32_t tmp; + if (absl::SimpleAtoi(arg, &tmp)) { + out = tmp; // out can not store the whole tmp + if (tmp == out) + return out; + } } Report(INVALID_INT, idx); @@ -58,6 +65,8 @@ template uint64_t CmdArgParser::Num(size_t); template int64_t CmdArgParser::Num(size_t); template uint32_t CmdArgParser::Num(size_t); template int32_t CmdArgParser::Num(size_t); +template uint16_t CmdArgParser::Num(size_t); +template int16_t CmdArgParser::Num(size_t); ErrorReply CmdArgParser::ErrorInfo::MakeReply() const { switch (type) { diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index fddf81d6efbc..024695ff73ee 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -599,22 +599,55 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon args.remove_prefix(1); // Removes "START-SLOT-MIGRATION" subcmd string CmdArgParser parser(args); - auto [host_ip, port] = parser.Next(); + auto [host_ip, port] = parser.Next(); std::vector slots; do { - auto [slot_start, slot_end] = parser.Next(); + auto [slot_start, slot_end] = parser.Next(); slots.emplace_back(slot_start, slot_end); } while (parser.HasNext()); - (void)slots; + if (auto err = parser.Error(); err) return (*cntx)->SendError(err->MakeReply()); - ClusterSlotMigration node(std::string(host_ip), port); + ClusterSlotMigration node(std::string(host_ip), port, slots); node.Start(cntx); return rb->SendOk(); } +void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { + VLOG(1) << "Create slot migration config"; + CmdArgParser parser{args}; + auto port = parser.Next(); + (void)port; // we need it for the next step + + std::vector slots; + do { + auto [slot_start, slot_end] = parser.Next(); + slots.emplace_back(slot_start, slot_end); + } while (parser.HasNext()); + + if (!tl_cluster_config) { + (*cntx)->SendError(kClusterNotConfigured); + return; + } + + for (const auto& migration_range : slots) { + for (auto i = migration_range.start; i <= migration_range.end; ++i) { + if (!tl_cluster_config->IsMySlot(i)) { + VLOG(1) << "Invalid migration slot rane"; + (*cntx)->SendError("Invalid slots range"); + return; + } + } + } + + cntx->conn()->SetName("slot_migration_ctrl"); + + (*cntx)->SendLong(shard_set->size()); + return; +} + using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx); inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { @@ -629,6 +662,7 @@ constexpr uint32_t kCluster = SLOW; constexpr uint32_t kDflyCluster = ADMIN | SLOW; constexpr uint32_t kReadOnly = FAST | CONNECTION; constexpr uint32_t kReadWrite = FAST | CONNECTION; +constexpr uint32_t kMGRTConf = ADMIN | SLOW | DANGEROUS; } // namespace acl void ClusterFamily::Register(CommandRegistry* registry) { @@ -638,7 +672,9 @@ void ClusterFamily::Register(CommandRegistry* registry) { acl::kDflyCluster} .HFUNC(DflyCluster) << CI{"READONLY", CO::READONLY, 1, 0, 0, acl::kReadOnly}.HFUNC(ReadOnly) - << CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite); + << CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite) + << CI{"MGRTCONF", CO::ADMIN | CO::LOADING, -3, 0, 0, acl::kMGRTConf}.HFUNC( + MigrationConf); } } // namespace dfly diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index ce838c63cfbb..67203ee10707 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -47,6 +47,8 @@ class ClusterFamily { void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx); + void MigrationConf(CmdArgList args, ConnectionContext* cntx); + ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; ServerFamily* server_family_ = nullptr; diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index 5b2698bd0f91..12217d12eba6 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -10,15 +10,19 @@ #include "server/main_service.h" ABSL_FLAG(int, source_connect_timeout_ms, 20000, - "Timeout for establishing connection to a replication master"); + "Timeout for establishing connection to a source node"); + +ABSL_DECLARE_FLAG(int32_t, port); namespace dfly { using namespace std; +using namespace facade; using absl::GetFlag; -ClusterSlotMigration::ClusterSlotMigration(string host, uint16_t port) - : ProtocolClient(std::move(host), port) { +ClusterSlotMigration::ClusterSlotMigration(string host, uint16_t port, + std::vector slots) + : ProtocolClient(std::move(host), port), slots_(std::move(slots)) { } ClusterSlotMigration::~ClusterSlotMigration() { @@ -30,14 +34,13 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code { if (ec) { (*cntx)->SendError(absl::StrCat(msg, ec.message())); - cntx_.Cancel(); } return ec; }; VLOG(1) << "Resolving source host DNS"; error_code ec = ResolveMasterDns(); - RETURN_ON_ERR(check_connection_error(ec, "could not resolve master dns")); + RETURN_ON_ERR(check_connection_error(ec, "could not resolve source dns")); VLOG(1) << "Connecting to source"; ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); @@ -45,7 +48,7 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { VLOG(1) << "Greeting"; ec = Greet(); - RETURN_ON_ERR(check_connection_error(ec, "could not greet master ")); + RETURN_ON_ERR(check_connection_error(ec, "couldn't greet source ")); (*cntx)->SendOk(); @@ -58,6 +61,19 @@ error_code ClusterSlotMigration::Greet() { RETURN_ON_ERR(SendCommandAndReadResponse("PING")); PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG")); + auto port = absl::GetFlag(FLAGS_port); + auto cmd = absl::StrCat("MGRTCONF ", port); + for (const auto& s : slots_) { + cmd = absl::StrCat(cmd, " ", s.start, " ", s.end); + } + VLOG(1) << "Migration command: " << cmd; + RETURN_ON_ERR(SendCommandAndReadResponse(cmd)); + // Response is: num_shards + if (!CheckRespFirstTypes({RespExpr::INT64})) + return make_error_code(errc::bad_message); + + flows_num_ = get(LastResponseArgs()[0].u); + return error_code{}; } diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index cee2747e5d31..3bd3f1ccd6fa 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -9,13 +9,16 @@ namespace dfly { class ClusterSlotMigration : ProtocolClient { public: - ClusterSlotMigration(std::string source_host, uint16_t port); + ClusterSlotMigration(std::string source_host, uint16_t port, + std::vector slots); ~ClusterSlotMigration(); std::error_code Start(ConnectionContext* cntx); private: std::error_code Greet(); + std::vector slots_; + size_t flows_num_ = 0; }; } // namespace dfly diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d78d83c82f2e..0c3e33e9de99 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -751,7 +751,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): ) res = await c_nodes_admin[1].execute_command( - "DFLYCLUSTER", "START-SLOT-MIGRATION", "localhost", str(BASE_PORT + 1000), "5200", "5259" + "DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(BASE_PORT + 1000), "5200", "5259" ) assert "OK" == res From d6e2841f9a5e38131b510569ca9d5d350ece2eed Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 28 Nov 2023 08:34:34 +0200 Subject: [PATCH 04/10] fix: build fix --- src/server/cluster/cluster_family.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 024695ff73ee..bb489de05bd3 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -600,10 +600,10 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon CmdArgParser parser(args); auto [host_ip, port] = parser.Next(); - std::vector slots; + std::vector slots; do { auto [slot_start, slot_end] = parser.Next(); - slots.emplace_back(slot_start, slot_end); + slots.emplace_back(SlotRange{slot_start, slot_end}); } while (parser.HasNext()); if (auto err = parser.Error(); err) From 93dfd5a1c2097f7c5c7691ad5baef970187fc30d Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 28 Nov 2023 10:11:24 +0200 Subject: [PATCH 05/10] refactor: add initEndpoint() instead of resolveMasterDns() --- src/server/cluster/cluster_family.cc | 2 +- src/server/cluster/cluster_slot_migration.cc | 6 ++---- src/server/protocol_client.cc | 6 +++++- src/server/protocol_client.h | 3 ++- src/server/replica.cc | 4 ++-- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index bb489de05bd3..4d0d70378dd9 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -624,7 +624,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { std::vector slots; do { auto [slot_start, slot_end] = parser.Next(); - slots.emplace_back(slot_start, slot_end); + slots.emplace_back(SlotRange{slot_start, slot_end}); } while (parser.HasNext()); if (!tl_cluster_config) { diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index 12217d12eba6..9e3d09df95de 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -38,12 +38,10 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { return ec; }; - VLOG(1) << "Resolving source host DNS"; - error_code ec = ResolveMasterDns(); - RETURN_ON_ERR(check_connection_error(ec, "could not resolve source dns")); + InitEndpoint(); VLOG(1) << "Connecting to source"; - ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); + auto ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source")); VLOG(1) << "Greeting"; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 2dbf95a8749c..ed935d741495 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -212,7 +212,7 @@ ProtocolClient::~ProtocolClient() { #endif } -error_code ProtocolClient::ResolveMasterDns() { +error_code ProtocolClient::InitEndpointWithDns() { char ip_addr[INET6_ADDRSTRLEN]; int resolve_res = ResolveDns(server_context_.host, ip_addr); if (resolve_res != 0) { @@ -228,6 +228,10 @@ error_code ProtocolClient::ResolveMasterDns() { return error_code{}; } +void ProtocolClient::InitEndpoint() { + server_context_.endpoint = {ip::make_address(server_context_.host), server_context_.port}; +} + error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx) { ProactorBase* mythread = ProactorBase::me(); diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index a3e6bff85f68..f26997d5b2ee 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -64,7 +64,8 @@ class ProtocolClient { // the DNS resolution step. explicit ProtocolClient(ServerContext context); - std::error_code ResolveMasterDns(); // Resolve master dns + std::error_code InitEndpointWithDns(); + void InitEndpoint(); // Connect to master and authenticate if needed. std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx); diff --git a/src/server/replica.cc b/src/server/replica.cc index 5c504d3cb93c..b95e08ad332e 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -101,7 +101,7 @@ error_code Replica::Start(ConnectionContext* cntx) { // 1. Resolve dns. VLOG(1) << "Resolving master DNS"; - error_code ec = ResolveMasterDns(); + error_code ec = InitEndpointWithDns(); RETURN_ON_ERR(check_connection_error(ec, "could not resolve master dns")); // 2. Connect socket. @@ -179,7 +179,7 @@ void Replica::MainReplicationFb() { if (is_paused_) continue; - ec = ResolveMasterDns(); + ec = InitEndpointWithDns(); if (ec) { LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec; continue; From e3e4856f7a9a4ff3978aaf21e7761c3e0af7ef37 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 28 Nov 2023 12:52:13 +0200 Subject: [PATCH 06/10] fix: fix test --- tests/dragonfly/cluster_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 0c3e33e9de99..bc6e8081ee1a 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -224,10 +224,6 @@ async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFact c_nodes_admin, ) - res = await c_nodes_admin[1].execute_command( - "DFLYCLUSTER", "START-SLOT-MIGRATION", "localhost", str(BASE_PORT + 1000), "5200", "5259" - ) - # Slot for "KEY1" is 5259 # Insert a key that should stay in node0 From 0eeffa770703dd683b7706302810de80420dac31 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 28 Nov 2023 18:53:30 +0200 Subject: [PATCH 07/10] refactor: address comments --- src/server/cluster/cluster_family.cc | 14 ++++++++++++-- src/server/cluster/cluster_family.h | 1 + src/server/cluster/cluster_slot_migration.cc | 12 +++++------- src/server/cluster/cluster_slot_migration.h | 4 ++-- src/server/protocol_client.cc | 11 +++++++---- src/server/protocol_client.h | 3 ++- 6 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 4d0d70378dd9..f31c36ae335c 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -615,6 +615,17 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon return rb->SendOk(); } +void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[0]); + string_view sub_cmd = ArgS(args, 0); + args.remove_prefix(1); + if (sub_cmd == "CONF") { + MigrationConf(args, cntx); + } else { + (*cntx)->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType); + } +} + void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Create slot migration config"; CmdArgParser parser{args}; @@ -673,8 +684,7 @@ void ClusterFamily::Register(CommandRegistry* registry) { .HFUNC(DflyCluster) << CI{"READONLY", CO::READONLY, 1, 0, 0, acl::kReadOnly}.HFUNC(ReadOnly) << CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite) - << CI{"MGRTCONF", CO::ADMIN | CO::LOADING, -3, 0, 0, acl::kMGRTConf}.HFUNC( - MigrationConf); + << CI{"DFLYMIGRATE", CO::ADMIN, -1, 0, 0, acl::kMGRTConf}.HFUNC(DflyMigrate); } } // namespace dfly diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 67203ee10707..466214d25773 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -46,6 +46,7 @@ class ClusterFamily { void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx); void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx); + void DflyMigrate(CmdArgList args, ConnectionContext* cntx); void MigrationConf(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index 9e3d09df95de..9846e465b4dc 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -20,9 +20,9 @@ using namespace std; using namespace facade; using absl::GetFlag; -ClusterSlotMigration::ClusterSlotMigration(string host, uint16_t port, +ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, std::vector slots) - : ProtocolClient(std::move(host), port), slots_(std::move(slots)) { + : ProtocolClient(ServerContext::CreateFromIp(move(host_ip), port)), slots_(std::move(slots)) { } ClusterSlotMigration::~ClusterSlotMigration() { @@ -38,8 +38,6 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { return ec; }; - InitEndpoint(); - VLOG(1) << "Connecting to source"; auto ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source")); @@ -60,9 +58,9 @@ error_code ClusterSlotMigration::Greet() { PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG")); auto port = absl::GetFlag(FLAGS_port); - auto cmd = absl::StrCat("MGRTCONF ", port); + auto cmd = absl::StrCat("DFLYMIGRATE CONF ", port); for (const auto& s : slots_) { - cmd = absl::StrCat(cmd, " ", s.start, " ", s.end); + absl::StrAppend(&cmd, " ", s.start, " ", s.end); } VLOG(1) << "Migration command: " << cmd; RETURN_ON_ERR(SendCommandAndReadResponse(cmd)); @@ -70,7 +68,7 @@ error_code ClusterSlotMigration::Greet() { if (!CheckRespFirstTypes({RespExpr::INT64})) return make_error_code(errc::bad_message); - flows_num_ = get(LastResponseArgs()[0].u); + souce_shards_num_ = get(LastResponseArgs()[0].u); return error_code{}; } diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index 3bd3f1ccd6fa..1dbe1eb64200 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -9,7 +9,7 @@ namespace dfly { class ClusterSlotMigration : ProtocolClient { public: - ClusterSlotMigration(std::string source_host, uint16_t port, + ClusterSlotMigration(std::string host_ip, uint16_t port, std::vector slots); ~ClusterSlotMigration(); @@ -18,7 +18,7 @@ class ClusterSlotMigration : ProtocolClient { private: std::error_code Greet(); std::vector slots_; - size_t flows_num_ = 0; + size_t souce_shards_num_ = 0; }; } // namespace dfly diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index ed935d741495..40fe874953e5 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -154,6 +154,13 @@ std::string ProtocolClient::ServerContext::Description() const { return absl::StrCat(host, ":", port); } +ProtocolClient::ServerContext ProtocolClient::ServerContext::CreateFromIp(std::string ip, + uint16_t port) { + ServerContext res{move(ip), port, {}}; + res.endpoint = {ip::make_address(res.host), res.port}; + return res; +} + void ValidateClientTlsFlags() { if (!absl::GetFlag(FLAGS_tls_replication)) { return; @@ -228,10 +235,6 @@ error_code ProtocolClient::InitEndpointWithDns() { return error_code{}; } -void ProtocolClient::InitEndpoint() { - server_context_.endpoint = {ip::make_address(server_context_.host), server_context_.port}; -} - error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx) { ProactorBase* mythread = ProactorBase::me(); diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index f26997d5b2ee..86898340b7b9 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -53,6 +53,8 @@ class ProtocolClient { protected: struct ServerContext { + static ServerContext CreateFromIp(std::string ip, uint16_t port); + std::string host; uint16_t port; boost::asio::ip::tcp::endpoint endpoint; @@ -65,7 +67,6 @@ class ProtocolClient { explicit ProtocolClient(ServerContext context); std::error_code InitEndpointWithDns(); - void InitEndpoint(); // Connect to master and authenticate if needed. std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx); From f6e3b787a7676228da6789a4c609e52fd9cf2f9b Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 29 Nov 2023 12:02:49 +0200 Subject: [PATCH 08/10] refactor: address comments --- src/server/cluster/cluster_family.cc | 5 +++-- src/server/cluster/cluster_slot_migration.cc | 8 ++++++-- src/server/protocol_client.cc | 9 +-------- src/server/protocol_client.h | 4 +--- src/server/replica.cc | 4 ++-- tests/dragonfly/cluster_test.py | 2 +- 6 files changed, 14 insertions(+), 18 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index f31c36ae335c..5bb6413eaf03 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -673,7 +673,7 @@ constexpr uint32_t kCluster = SLOW; constexpr uint32_t kDflyCluster = ADMIN | SLOW; constexpr uint32_t kReadOnly = FAST | CONNECTION; constexpr uint32_t kReadWrite = FAST | CONNECTION; -constexpr uint32_t kMGRTConf = ADMIN | SLOW | DANGEROUS; +constexpr uint32_t kDflyMigrate = ADMIN | SLOW | DANGEROUS; } // namespace acl void ClusterFamily::Register(CommandRegistry* registry) { @@ -684,7 +684,8 @@ void ClusterFamily::Register(CommandRegistry* registry) { .HFUNC(DflyCluster) << CI{"READONLY", CO::READONLY, 1, 0, 0, acl::kReadOnly}.HFUNC(ReadOnly) << CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite) - << CI{"DFLYMIGRATE", CO::ADMIN, -1, 0, 0, acl::kMGRTConf}.HFUNC(DflyMigrate); + << CI{"DFLYMIGRATE", CO::ADMIN | CO::HIDDEN, -1, 0, 0, acl::kDflyMigrate}.HFUNC( + DflyMigrate); } } // namespace dfly diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index 9846e465b4dc..878b858825cb 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -22,7 +22,7 @@ using absl::GetFlag; ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, std::vector slots) - : ProtocolClient(ServerContext::CreateFromIp(move(host_ip), port)), slots_(std::move(slots)) { + : ProtocolClient(move(host_ip), port), slots_(std::move(slots)) { } ClusterSlotMigration::~ClusterSlotMigration() { @@ -38,8 +38,12 @@ error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { return ec; }; + VLOG(1) << "Resolving host DNS"; + error_code ec = ResolveHostDns(); + RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns")); + VLOG(1) << "Connecting to source"; - auto ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); + ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source")); VLOG(1) << "Greeting"; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 40fe874953e5..3ea8fff1f111 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -154,13 +154,6 @@ std::string ProtocolClient::ServerContext::Description() const { return absl::StrCat(host, ":", port); } -ProtocolClient::ServerContext ProtocolClient::ServerContext::CreateFromIp(std::string ip, - uint16_t port) { - ServerContext res{move(ip), port, {}}; - res.endpoint = {ip::make_address(res.host), res.port}; - return res; -} - void ValidateClientTlsFlags() { if (!absl::GetFlag(FLAGS_tls_replication)) { return; @@ -219,7 +212,7 @@ ProtocolClient::~ProtocolClient() { #endif } -error_code ProtocolClient::InitEndpointWithDns() { +error_code ProtocolClient::ResolveHostDns() { char ip_addr[INET6_ADDRSTRLEN]; int resolve_res = ResolveDns(server_context_.host, ip_addr); if (resolve_res != 0) { diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 86898340b7b9..f57e3482b172 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -53,8 +53,6 @@ class ProtocolClient { protected: struct ServerContext { - static ServerContext CreateFromIp(std::string ip, uint16_t port); - std::string host; uint16_t port; boost::asio::ip::tcp::endpoint endpoint; @@ -66,7 +64,7 @@ class ProtocolClient { // the DNS resolution step. explicit ProtocolClient(ServerContext context); - std::error_code InitEndpointWithDns(); + std::error_code ResolveHostDns(); // Connect to master and authenticate if needed. std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx); diff --git a/src/server/replica.cc b/src/server/replica.cc index b95e08ad332e..f4f629d7c95e 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -101,7 +101,7 @@ error_code Replica::Start(ConnectionContext* cntx) { // 1. Resolve dns. VLOG(1) << "Resolving master DNS"; - error_code ec = InitEndpointWithDns(); + error_code ec = ResolveHostDns(); RETURN_ON_ERR(check_connection_error(ec, "could not resolve master dns")); // 2. Connect socket. @@ -179,7 +179,7 @@ void Replica::MainReplicationFb() { if (is_paused_) continue; - ec = InitEndpointWithDns(); + ec = ResolveHostDns(); if (ec) { LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec; continue; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index bc6e8081ee1a..65f863583832 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -747,7 +747,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): ) res = await c_nodes_admin[1].execute_command( - "DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(BASE_PORT + 1000), "5200", "5259" + "DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "5200", "5259" ) assert "OK" == res From 0b3c5fb0cafa8b106a3cf3533e02eb0e098c1a97 Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 29 Nov 2023 12:21:01 +0200 Subject: [PATCH 09/10] refactor: address comments --- src/server/cluster/cluster_family.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 5bb6413eaf03..5bd1d609a085 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -646,7 +646,8 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { for (const auto& migration_range : slots) { for (auto i = migration_range.start; i <= migration_range.end; ++i) { if (!tl_cluster_config->IsMySlot(i)) { - VLOG(1) << "Invalid migration slot rane"; + VLOG(1) << "Invalid migration slot " << i << ' in range ' << migration_range.start << ':' + << migration_range.end; (*cntx)->SendError("Invalid slots range"); return; } From c828bc2a3524f02ff3851e7ef5fec20c44b731b9 Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 29 Nov 2023 12:28:17 +0200 Subject: [PATCH 10/10] fix: fix build --- src/server/cluster/cluster_family.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 5bd1d609a085..16093e0ea822 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -646,7 +646,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { for (const auto& migration_range : slots) { for (auto i = migration_range.start; i <= migration_range.end; ++i) { if (!tl_cluster_config->IsMySlot(i)) { - VLOG(1) << "Invalid migration slot " << i << ' in range ' << migration_range.start << ':' + VLOG(1) << "Invalid migration slot " << i << " in range " << migration_range.start << ':' << migration_range.end; (*cntx)->SendError("Invalid slots range"); return;