diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 27eae00e0782..b3149f4234bb 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -25,6 +25,9 @@ #include "server/server_state.h" ABSL_FLAG(std::string, cluster_announce_ip, "", "ip that cluster commands announce to the client"); +ABSL_FLAG(std::string, cluster_node_id, "", + "ID within a cluster, used for slot assignment. MUST be unique. If empty, uses master " + "replication ID (random string)"); ABSL_DECLARE_FLAG(int32_t, port); @@ -49,7 +52,16 @@ thread_local shared_ptr tl_cluster_config; ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) { CHECK_NOTNULL(server_family_); + ClusterConfig::Initialize(); + + id_ = absl::GetFlag(FLAGS_cluster_node_id); + if (id_.empty()) { + id_ = server_family_->master_replid(); + } else if (ClusterConfig::IsEmulated()) { + LOG(ERROR) << "Setting --cluster_node_id in emulated mode is unsupported"; + exit(1); + } } ClusterConfig* ClusterFamily::cluster_config() { @@ -70,7 +82,7 @@ ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const std::string preferred_endpoint = cluster_announce_ip.empty() ? cntx->conn()->LocalBindAddress() : cluster_announce_ip; - info.master = {.id = server_family_->master_id(), + info.master = {.id = id_, .ip = preferred_endpoint, .port = static_cast(absl::GetFlag(FLAGS_port))}; @@ -82,7 +94,7 @@ ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const } else { info.master = { .id = etl.remote_client_id_, .ip = replication_info->host, .port = replication_info->port}; - info.replicas.push_back({.id = server_family_->master_id(), + info.replicas.push_back({.id = id_, .ip = cntx->conn()->LocalBindAddress(), .port = static_cast(absl::GetFlag(FLAGS_port))}); } @@ -254,9 +266,9 @@ void ClusterNodesImpl(const ClusterShards& config, string_view my_id, Connection void ClusterFamily::ClusterNodes(ConnectionContext* cntx) { if (ClusterConfig::IsEmulated()) { - return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, server_family_->master_id(), cntx); + return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, cntx); } else if (tl_cluster_config != nullptr) { - return ClusterNodesImpl(tl_cluster_config->GetConfig(), server_family_->master_id(), cntx); + return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, cntx); } else { return cntx->SendError(kClusterNotConfigured); } @@ -406,7 +418,7 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(WrongNumArgsError("DFLYCLUSTER MYID")); } auto* rb = static_cast(cntx->reply_builder()); - rb->SendBulkString(server_family_->master_id()); + rb->SendBulkString(id_); } namespace { @@ -481,8 +493,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) return cntx->SendError("Invalid JSON cluster config", kSyntaxErrType); } - shared_ptr new_config = - ClusterConfig::CreateFromConfig(server_family_->master_id(), json.value()); + shared_ptr new_config = ClusterConfig::CreateFromConfig(id_, json.value()); if (new_config == nullptr) { LOG(WARNING) << "Can't set cluster config"; return cntx->SendError("Invalid cluster configuration."); diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 204536ca2884..8a4391361420 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -97,6 +97,8 @@ class ClusterFamily { private: ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; + std::string id_; + ServerFamily* server_family_ = nullptr; }; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 9de3976f4d61..bcacd13f7786 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -190,7 +190,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Got DFLY FLOW master_id: " << master_id << " sync_id: " << sync_id_str << " flow: " << flow_id_str << " seq: " << seqid.value_or(-1); - if (master_id != sf_->master_id()) { + if (master_id != sf_->master_replid()) { return rb->SendError(kBadMasterId); } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 6a119e98cb92..a5e591049efd 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -5,7 +5,7 @@ #include "server/server_family.h" #include -#include // for master_id_ generation. +#include // for master_replid_ generation. #include #include #include @@ -615,8 +615,8 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { { absl::InsecureBitGen eng; - master_id_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE); - DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_id_.size()); + master_replid_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE); + DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_replid_.size()); } if (auto ec = @@ -2036,7 +2036,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port, ",state=", r.state, ",lag=", r.lsn_lag)); } - append("master_replid", master_id_); + append("master_replid", master_replid_); } else { append("role", "replica"); @@ -2273,7 +2273,7 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn } // Create a new replica and assing it - auto new_replica = make_shared(string(host), port, &service_, master_id()); + auto new_replica = make_shared(string(host), port, &service_, master_replid()); replica_ = new_replica; // TODO: disconnect pending blocked clients (pubsub, blocking commands) @@ -2385,7 +2385,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { // The response for 'capa dragonfly' is: auto* rb = static_cast(cntx->reply_builder()); rb->StartArray(4); - rb->SendSimpleString(master_id_); + rb->SendSimpleString(master_replid_); rb->SendSimpleString(sync_id); rb->SendLong(replica_info->flows.size()); rb->SendLong(unsigned(DflyVersion::CURRENT_VER)); diff --git a/src/server/server_family.h b/src/server/server_family.h index 15d1b0a2aa42..2c38e1c0328b 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -187,8 +187,8 @@ class ServerFamily { void PauseReplication(bool pause); std::optional GetReplicaOffsetInfo(); - const std::string& master_id() const { - return master_id_; + const std::string& master_replid() const { + return master_replid_; } journal::Journal* journal() { @@ -282,7 +282,7 @@ class ServerFamily { std::unique_ptr journal_; std::unique_ptr dfly_cmd_; - std::string master_id_; + std::string master_replid_; time_t start_time_ = 0; // in seconds, epoch time. diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index e34a6f861533..87d25e68172e 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -174,6 +174,17 @@ async def test_cluster_nodes(df_server, async_client): """ +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"}) +async def test_cluster_node_id(df_local_factory: DflyInstanceFactory): + node = df_local_factory.create(port=BASE_PORT) + df_local_factory.start_all([node]) + + conn = node.client() + assert "inigo montoya" == await get_node_id(conn) + + await close_clients(conn) + + @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFactory): # Start and configure cluster with 2 nodes @@ -306,7 +317,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFact # Tests that master commands to the replica are applied regardless of slot ownership @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_cluster_replica_sets_non_owned_keys(df_local_factory): +async def test_cluster_replica_sets_non_owned_keys(df_local_factory: DflyInstanceFactory): # Start and configure cluster with 1 master and 1 replica, both own all slots master = df_local_factory.create(admin_port=BASE_PORT + 1000) replica = df_local_factory.create(admin_port=BASE_PORT + 1001) @@ -570,14 +581,20 @@ async def test_cluster_blocking_command(df_server): await close_clients(c_master, c_master_admin) +@pytest.mark.parametrize("set_cluster_node_id", [True, False]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_native_client( df_local_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory, + set_cluster_node_id: bool, ): # Start and configure cluster with 3 masters and 3 replicas masters = [ - df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) + df_local_factory.create( + port=BASE_PORT + i, + admin_port=BASE_PORT + i + 1000, + cluster_node_id=f"master{i}" if set_cluster_node_id else "", + ) for i in range(3) ] df_local_factory.start_all(masters) @@ -586,11 +603,17 @@ async def test_cluster_native_client( master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin)) replicas = [ - df_local_factory.create(port=BASE_PORT + 100 + i, admin_port=BASE_PORT + i + 1100) + df_local_factory.create( + port=BASE_PORT + 100 + i, + admin_port=BASE_PORT + i + 1100, + cluster_node_id=f"replica{i}" if set_cluster_node_id else "", + replicaof=f"localhost:{BASE_PORT + i}", + ) for i in range(3) ] df_local_factory.start_all(replicas) c_replicas = [replica.client() for replica in replicas] + await asyncio.gather(*(wait_available_async(c) for c in c_replicas)) c_replicas_admin = [replica.admin_client() for replica in replicas] replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin)) @@ -678,10 +701,12 @@ async def test_random_keys(): await asyncio.gather(*(wait_available_async(c) for c in c_replicas)) # Make sure that getting a value from a replica works as well. - replica_response = await client.execute_command( - "get", "key0", target_nodes=aioredis.RedisCluster.REPLICAS - ) - assert "value" in replica_response.values() + # We use connections directly to NOT follow 'MOVED' error, as that will redirect to the master. + for c in c_replicas: + try: + assert await c.get("key0") + except redis.exceptions.ResponseError as e: + assert e.args[0].startswith("MOVED") # Push new config config = f""" diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index b2176dad5bf3..47c42c91440f 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -46,12 +46,8 @@ async def wait_available_async(client: aioredis.Redis, timeout=10): start = time.time() while (time.time() - start) < timeout: try: - await client.get("key") + await client.ping() return - except aioredis.ResponseError as e: - if "MOVED" in str(e): - # MOVED means we *can* serve traffic, but 'key' does not belong to an owned slot - return except aioredis.BusyLoadingError as e: assert "Dragonfly is loading the dataset in memory" in str(e)