Skip to content

Commit

Permalink
feat(cluster): Add --cluster_id flag (#2695)
Browse files Browse the repository at this point in the history
* feat(cluster): Add `--cluster_id` flag

This flag sets the unique ID of a node in a cluster.

It is UB (and bad) to set the same IDs to multiple nodes in the same
cluster.

If unset (default), the `master_replid` (previously known as `master_id`) is used.

Fixes #2643
Related to #2636

* gh comments

* oops - revert line removal

* fix

* replica

* disallow cluster_node_id in emulated mode

* fix replica test
  • Loading branch information
chakaz authored Mar 10, 2024
1 parent 7f02d40 commit 8b31195
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 29 deletions.
25 changes: 18 additions & 7 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include "server/server_state.h"

ABSL_FLAG(std::string, cluster_announce_ip, "", "ip that cluster commands announce to the client");
ABSL_FLAG(std::string, cluster_node_id, "",
"ID within a cluster, used for slot assignment. MUST be unique. If empty, uses master "
"replication ID (random string)");

ABSL_DECLARE_FLAG(int32_t, port);

Expand All @@ -49,7 +52,16 @@ thread_local shared_ptr<ClusterConfig> tl_cluster_config;

ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) {
CHECK_NOTNULL(server_family_);

ClusterConfig::Initialize();

id_ = absl::GetFlag(FLAGS_cluster_node_id);
if (id_.empty()) {
id_ = server_family_->master_replid();
} else if (ClusterConfig::IsEmulated()) {
LOG(ERROR) << "Setting --cluster_node_id in emulated mode is unsupported";
exit(1);
}
}

ClusterConfig* ClusterFamily::cluster_config() {
Expand All @@ -70,7 +82,7 @@ ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->conn()->LocalBindAddress() : cluster_announce_ip;

info.master = {.id = server_family_->master_id(),
info.master = {.id = id_,
.ip = preferred_endpoint,
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))};

Expand All @@ -82,7 +94,7 @@ ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const
} else {
info.master = {
.id = etl.remote_client_id_, .ip = replication_info->host, .port = replication_info->port};
info.replicas.push_back({.id = server_family_->master_id(),
info.replicas.push_back({.id = id_,
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))});
}
Expand Down Expand Up @@ -254,9 +266,9 @@ void ClusterNodesImpl(const ClusterShards& config, string_view my_id, Connection

void ClusterFamily::ClusterNodes(ConnectionContext* cntx) {
if (ClusterConfig::IsEmulated()) {
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, server_family_->master_id(), cntx);
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, cntx);
} else if (tl_cluster_config != nullptr) {
return ClusterNodesImpl(tl_cluster_config->GetConfig(), server_family_->master_id(), cntx);
return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, cntx);
} else {
return cntx->SendError(kClusterNotConfigured);
}
Expand Down Expand Up @@ -406,7 +418,7 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(WrongNumArgsError("DFLYCLUSTER MYID"));
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
rb->SendBulkString(server_family_->master_id());
rb->SendBulkString(id_);
}

namespace {
Expand Down Expand Up @@ -481,8 +493,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
return cntx->SendError("Invalid JSON cluster config", kSyntaxErrType);
}

shared_ptr<ClusterConfig> new_config =
ClusterConfig::CreateFromConfig(server_family_->master_id(), json.value());
shared_ptr<ClusterConfig> new_config = ClusterConfig::CreateFromConfig(id_, json.value());
if (new_config == nullptr) {
LOG(WARNING) << "Can't set cluster config";
return cntx->SendError("Invalid cluster configuration.");
Expand Down
2 changes: 2 additions & 0 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class ClusterFamily {
private:
ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const;

std::string id_;

ServerFamily* server_family_ = nullptr;
};

Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Got DFLY FLOW master_id: " << master_id << " sync_id: " << sync_id_str
<< " flow: " << flow_id_str << " seq: " << seqid.value_or(-1);

if (master_id != sf_->master_id()) {
if (master_id != sf_->master_replid()) {
return rb->SendError(kBadMasterId);
}

Expand Down
12 changes: 6 additions & 6 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "server/server_family.h"

#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h> // for master_id_ generation.
#include <absl/random/random.h> // for master_replid_ generation.
#include <absl/strings/match.h>
#include <absl/strings/str_join.h>
#include <absl/strings/str_replace.h>
Expand Down Expand Up @@ -615,8 +615,8 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {

{
absl::InsecureBitGen eng;
master_id_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE);
DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_id_.size());
master_replid_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE);
DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_replid_.size());
}

if (auto ec =
Expand Down Expand Up @@ -2036,7 +2036,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port,
",state=", r.state, ",lag=", r.lsn_lag));
}
append("master_replid", master_id_);
append("master_replid", master_replid_);
} else {
append("role", "replica");

Expand Down Expand Up @@ -2273,7 +2273,7 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
}

// Create a new replica and assing it
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_id());
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_replid());
replica_ = new_replica;

// TODO: disconnect pending blocked clients (pubsub, blocking commands)
Expand Down Expand Up @@ -2385,7 +2385,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
// The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads> <version>
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
rb->StartArray(4);
rb->SendSimpleString(master_id_);
rb->SendSimpleString(master_replid_);
rb->SendSimpleString(sync_id);
rb->SendLong(replica_info->flows.size());
rb->SendLong(unsigned(DflyVersion::CURRENT_VER));
Expand Down
6 changes: 3 additions & 3 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ class ServerFamily {
void PauseReplication(bool pause);
std::optional<ReplicaOffsetInfo> GetReplicaOffsetInfo();

const std::string& master_id() const {
return master_id_;
const std::string& master_replid() const {
return master_replid_;
}

journal::Journal* journal() {
Expand Down Expand Up @@ -282,7 +282,7 @@ class ServerFamily {
std::unique_ptr<journal::Journal> journal_;
std::unique_ptr<DflyCmd> dfly_cmd_;

std::string master_id_;
std::string master_replid_;

time_t start_time_ = 0; // in seconds, epoch time.

Expand Down
39 changes: 32 additions & 7 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ async def test_cluster_nodes(df_server, async_client):
"""


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cluster_node_id": "inigo montoya"})
async def test_cluster_node_id(df_local_factory: DflyInstanceFactory):
node = df_local_factory.create(port=BASE_PORT)
df_local_factory.start_all([node])

conn = node.client()
assert "inigo montoya" == await get_node_id(conn)

await close_clients(conn)


@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFactory):
# Start and configure cluster with 2 nodes
Expand Down Expand Up @@ -306,7 +317,7 @@ async def test_cluster_slot_ownership_changes(df_local_factory: DflyInstanceFact

# Tests that master commands to the replica are applied regardless of slot ownership
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_replica_sets_non_owned_keys(df_local_factory):
async def test_cluster_replica_sets_non_owned_keys(df_local_factory: DflyInstanceFactory):
# Start and configure cluster with 1 master and 1 replica, both own all slots
master = df_local_factory.create(admin_port=BASE_PORT + 1000)
replica = df_local_factory.create(admin_port=BASE_PORT + 1001)
Expand Down Expand Up @@ -570,14 +581,20 @@ async def test_cluster_blocking_command(df_server):
await close_clients(c_master, c_master_admin)


@pytest.mark.parametrize("set_cluster_node_id", [True, False])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(
df_local_factory: DflyInstanceFactory,
df_seeder_factory: DflySeederFactory,
set_cluster_node_id: bool,
):
# Start and configure cluster with 3 masters and 3 replicas
masters = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
df_local_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
cluster_node_id=f"master{i}" if set_cluster_node_id else "",
)
for i in range(3)
]
df_local_factory.start_all(masters)
Expand All @@ -586,11 +603,17 @@ async def test_cluster_native_client(
master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin))

replicas = [
df_local_factory.create(port=BASE_PORT + 100 + i, admin_port=BASE_PORT + i + 1100)
df_local_factory.create(
port=BASE_PORT + 100 + i,
admin_port=BASE_PORT + i + 1100,
cluster_node_id=f"replica{i}" if set_cluster_node_id else "",
replicaof=f"localhost:{BASE_PORT + i}",
)
for i in range(3)
]
df_local_factory.start_all(replicas)
c_replicas = [replica.client() for replica in replicas]
await asyncio.gather(*(wait_available_async(c) for c in c_replicas))
c_replicas_admin = [replica.admin_client() for replica in replicas]
replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin))

Expand Down Expand Up @@ -678,10 +701,12 @@ async def test_random_keys():
await asyncio.gather(*(wait_available_async(c) for c in c_replicas))

# Make sure that getting a value from a replica works as well.
replica_response = await client.execute_command(
"get", "key0", target_nodes=aioredis.RedisCluster.REPLICAS
)
assert "value" in replica_response.values()
# We use connections directly to NOT follow 'MOVED' error, as that will redirect to the master.
for c in c_replicas:
try:
assert await c.get("key0")
except redis.exceptions.ResponseError as e:
assert e.args[0].startswith("MOVED")

# Push new config
config = f"""
Expand Down
6 changes: 1 addition & 5 deletions tests/dragonfly/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,8 @@ async def wait_available_async(client: aioredis.Redis, timeout=10):
start = time.time()
while (time.time() - start) < timeout:
try:
await client.get("key")
await client.ping()
return
except aioredis.ResponseError as e:
if "MOVED" in str(e):
# MOVED means we *can* serve traffic, but 'key' does not belong to an owned slot
return
except aioredis.BusyLoadingError as e:
assert "Dragonfly is loading the dataset in memory" in str(e)

Expand Down

0 comments on commit 8b31195

Please sign in to comment.