Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start slot migration #2218

Merged
merged 10 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/facade/cmd_arg_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,16 @@ template <typename T> T CmdArgParser::Num(size_t idx) {
} else if constexpr (std::is_same_v<T, double>) {
if (absl::SimpleAtod(arg, &out))
return out;
} else if constexpr (std::is_integral_v<T>) {
} else if constexpr (std::is_integral_v<T> && sizeof(T) >= sizeof(int32_t)) {
if (absl::SimpleAtoi(arg, &out))
return out;
} else if constexpr (std::is_integral_v<T> && sizeof(T) < sizeof(int32_t)) {
int32_t tmp;
if (absl::SimpleAtoi(arg, &tmp)) {
out = tmp; // out can not store the whole tmp
if (tmp == out)
return out;
}
}

Report(INVALID_INT, idx);
Expand All @@ -58,6 +65,8 @@ template uint64_t CmdArgParser::Num<uint64_t>(size_t);
template int64_t CmdArgParser::Num<int64_t>(size_t);
template uint32_t CmdArgParser::Num<uint32_t>(size_t);
template int32_t CmdArgParser::Num<int32_t>(size_t);
template uint16_t CmdArgParser::Num<uint16_t>(size_t);
template int16_t CmdArgParser::Num<int16_t>(size_t);

ErrorReply CmdArgParser::ErrorInfo::MakeReply() const {
switch (type) {
Expand Down
3 changes: 2 additions & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc command_registry.
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc
cluster/cluster_family.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc
cluster/cluster_family.cc cluster/cluster_slot_migration.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)

cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float)
Expand Down
75 changes: 74 additions & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
#include "facade/cmd_arg_parser.h"
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "server/acl/acl_commands_def.h"
#include "server/cluster/cluster_slot_migration.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/dflycmd.h"
Expand Down Expand Up @@ -392,6 +394,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return DflyClusterMyId(args, cntx);
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
} else if (sub_cmd == "START-SLOT-MIGRATION") {
return DflyClusterStartSlotMigration(args, cntx);
}

return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
Expand Down Expand Up @@ -589,6 +593,72 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn
return rb->SendOk();
}

void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx) {
SinkReplyBuilder* rb = cntx->reply_builder();

args.remove_prefix(1); // Removes "START-SLOT-MIGRATION" subcmd string

CmdArgParser parser(args);
auto [host_ip, port] = parser.Next<std::string_view, uint16_t>();
std::vector<SlotRange> slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
} while (parser.HasNext());

if (auto err = parser.Error(); err)
return (*cntx)->SendError(err->MakeReply());
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved

ClusterSlotMigration node(std::string(host_ip), port, slots);
node.Start(cntx);

return rb->SendOk();
}

void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
args.remove_prefix(1);
if (sub_cmd == "CONF") {
MigrationConf(args, cntx);
} else {
(*cntx)->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType);
}
}

void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config";
CmdArgParser parser{args};
auto port = parser.Next<uint16_t>();
(void)port; // we need it for the next step

std::vector<ClusterConfig::SlotRange> slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
} while (parser.HasNext());

if (!tl_cluster_config) {
(*cntx)->SendError(kClusterNotConfigured);
return;
}

for (const auto& migration_range : slots) {
for (auto i = migration_range.start; i <= migration_range.end; ++i) {
if (!tl_cluster_config->IsMySlot(i)) {
VLOG(1) << "Invalid migration slot rane";
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
(*cntx)->SendError("Invalid slots range");
return;
}
}
}

cntx->conn()->SetName("slot_migration_ctrl");

(*cntx)->SendLong(shard_set->size());
return;
}

using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);

inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
Expand All @@ -603,6 +673,7 @@ constexpr uint32_t kCluster = SLOW;
constexpr uint32_t kDflyCluster = ADMIN | SLOW;
constexpr uint32_t kReadOnly = FAST | CONNECTION;
constexpr uint32_t kReadWrite = FAST | CONNECTION;
constexpr uint32_t kDflyMigrate = ADMIN | SLOW | DANGEROUS;
} // namespace acl

void ClusterFamily::Register(CommandRegistry* registry) {
Expand All @@ -612,7 +683,9 @@ void ClusterFamily::Register(CommandRegistry* registry) {
acl::kDflyCluster}
.HFUNC(DflyCluster)
<< CI{"READONLY", CO::READONLY, 1, 0, 0, acl::kReadOnly}.HFUNC(ReadOnly)
<< CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite);
<< CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite)
<< CI{"DFLYMIGRATE", CO::ADMIN | CO::HIDDEN, -1, 0, 0, acl::kDflyMigrate}.HFUNC(
DflyMigrate);
}

} // namespace dfly
4 changes: 4 additions & 0 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class ClusterFamily {
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx);
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx);
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);

void MigrationConf(CmdArgList args, ConnectionContext* cntx);

ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const;

Expand Down
80 changes: 80 additions & 0 deletions src/server/cluster/cluster_slot_migration.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/cluster/cluster_slot_migration.h"

#include <absl/flags/flag.h>

#include "base/logging.h"
#include "server/error.h"
#include "server/main_service.h"

ABSL_FLAG(int, source_connect_timeout_ms, 20000,
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
"Timeout for establishing connection to a source node");

ABSL_DECLARE_FLAG(int32_t, port);

namespace dfly {

using namespace std;
using namespace facade;
using absl::GetFlag;

ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots)
: ProtocolClient(move(host_ip), port), slots_(std::move(slots)) {
}

ClusterSlotMigration::~ClusterSlotMigration() {
}

error_code ClusterSlotMigration::Start(ConnectionContext* cntx) {
VLOG(1) << "Starting slot migration";

auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code {
if (ec) {
(*cntx)->SendError(absl::StrCat(msg, ec.message()));
}
return ec;
};

VLOG(1) << "Resolving host DNS";
error_code ec = ResolveHostDns();
RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns"));

VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));

VLOG(1) << "Greeting";
ec = Greet();
RETURN_ON_ERR(check_connection_error(ec, "couldn't greet source "));

(*cntx)->SendOk();

return {};
}

error_code ClusterSlotMigration::Greet() {
ResetParser(false);
VLOG(1) << "greeting message handling";
RETURN_ON_ERR(SendCommandAndReadResponse("PING"));
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG"));

auto port = absl::GetFlag(FLAGS_port);
auto cmd = absl::StrCat("DFLYMIGRATE CONF ", port);
for (const auto& s : slots_) {
absl::StrAppend(&cmd, " ", s.start, " ", s.end);
}
VLOG(1) << "Migration command: " << cmd;
RETURN_ON_ERR(SendCommandAndReadResponse(cmd));
// Response is: num_shards
if (!CheckRespFirstTypes({RespExpr::INT64}))
return make_error_code(errc::bad_message);

souce_shards_num_ = get<int64_t>(LastResponseArgs()[0].u);

return error_code{};
}

} // namespace dfly
24 changes: 24 additions & 0 deletions src/server/cluster/cluster_slot_migration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once

#include "server/protocol_client.h"

namespace dfly {

class ClusterSlotMigration : ProtocolClient {
public:
ClusterSlotMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
~ClusterSlotMigration();

std::error_code Start(ConnectionContext* cntx);

private:
std::error_code Greet();
std::vector<ClusterConfig::SlotRange> slots_;
size_t souce_shards_num_ = 0;
};

} // namespace dfly
2 changes: 1 addition & 1 deletion src/server/protocol_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ ProtocolClient::~ProtocolClient() {
#endif
}

error_code ProtocolClient::ResolveMasterDns() {
error_code ProtocolClient::ResolveHostDns() {
char ip_addr[INET6_ADDRSTRLEN];
int resolve_res = ResolveDns(server_context_.host, ip_addr);
if (resolve_res != 0) {
Expand Down
3 changes: 2 additions & 1 deletion src/server/protocol_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <absl/container/inlined_vector.h>
#include <absl/strings/escaping.h>

#include <boost/fiber/barrier.hpp>
#include <queue>
Expand Down Expand Up @@ -63,7 +64,7 @@ class ProtocolClient {
// the DNS resolution step.
explicit ProtocolClient(ServerContext context);

std::error_code ResolveMasterDns(); // Resolve master dns
std::error_code ResolveHostDns();
// Connect to master and authenticate if needed.
std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx);

Expand Down
4 changes: 2 additions & 2 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ error_code Replica::Start(ConnectionContext* cntx) {

// 1. Resolve dns.
VLOG(1) << "Resolving master DNS";
error_code ec = ResolveMasterDns();
error_code ec = ResolveHostDns();
RETURN_ON_ERR(check_connection_error(ec, "could not resolve master dns"));

// 2. Connect socket.
Expand Down Expand Up @@ -179,7 +179,7 @@ void Replica::MainReplicationFb() {
if (is_paused_)
continue;

ec = ResolveMasterDns();
ec = ResolveHostDns();
if (ec) {
LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec;
continue;
Expand Down
63 changes: 63 additions & 0 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,3 +691,66 @@ async def test_random_keys():

await test_random_keys()
await client.close()


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
# Check slot migration from one node to another
nodes = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
]

df_local_factory.start_all(nodes)

c_nodes = [node.client() for node in nodes]
c_nodes_admin = [node.admin_client() for node in nodes]

node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))

config = f"""
[
{{
"slot_ranges": [
{{
"start": 0,
"end": LAST_SLOT_CUTOFF
}}
],
"master": {{
"id": "{node_ids[0]}",
"ip": "localhost",
"port": {nodes[0].port}
}},
"replicas": []
}},
{{
"slot_ranges": [
{{
"start": NEXT_SLOT_CUTOFF,
"end": 16383
}}
],
"master": {{
"id": "{node_ids[1]}",
"ip": "localhost",
"port": {nodes[1].port}
}},
"replicas": []
}}
]
"""

await push_config(
config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
c_nodes_admin,
)

res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "5200", "5259"
)

assert "OK" == res

await c_nodes_admin[0].close()
await c_nodes_admin[1].close()
Loading