Skip to content

Commit

Permalink
feat(server): report slaves ip and listening port in INFO REPLICATION
Browse files Browse the repository at this point in the history
See #706 for more context

Signed-off-by: ashotland <[email protected]>
  • Loading branch information
ashotland committed Jan 22, 2023
1 parent ae017db commit e7c97a4
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 18 deletions.
15 changes: 10 additions & 5 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ struct ConnectionState {
util::fibers_ext::BlockingCounter borrow_token{0};
};

struct ReplicationInfo {
// If this server is master, and this connection is from a secondary replica,
// then it holds positive sync session id.
uint32_t repl_session_id = 0;
uint32_t repl_flow_id = kuint32max;
uint32_t repl_listening_port = 0;
};

enum MCGetMask {
FETCH_CAS_VER = 1,
};
Expand All @@ -86,12 +94,9 @@ struct ConnectionState {
// For get op - we use it as a mask of MCGetMask values.
uint32_t memcache_flag = 0;

// If this server is master, and this connection is from a secondary replica,
// then it holds positive sync session id.
uint32_t repl_session_id = 0;
uint32_t repl_flow_id = kuint32max;

ExecInfo exec_info;
ReplicationInfo replicaiton_info;

std::optional<ScriptInfo> script_info;
std::unique_ptr<SubscribeInfo> subscribe_info;
};
Expand Down
15 changes: 9 additions & 6 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ void JournalStreamer::WriterFb(io::Sink* dest) {

} // namespace

DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, SyncState sync_state)
: address(address) {
DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, uint32_t listening_port,
SyncState sync_state)
: address(address), listening_port(listening_port) {
switch (sync_state) {
case SyncState::PREPARATION:
state = "preparation";
Expand Down Expand Up @@ -296,8 +297,8 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {

// Set meta info on connection.
cntx->owner()->SetName(absl::StrCat("repl_flow_", sync_id));
cntx->conn_state.repl_session_id = sync_id;
cntx->conn_state.repl_flow_id = flow_id;
cntx->conn_state.replicaiton_info.repl_session_id = sync_id;
cntx->conn_state.replicaiton_info.repl_flow_id = flow_id;

absl::InsecureBitGen gen;
string eof_token = GetRandomHex(gen, 40);
Expand Down Expand Up @@ -491,6 +492,7 @@ uint32_t DflyCmd::CreateSyncSession(ConnectionContext* cntx) {
};

auto replica_ptr = make_shared<ReplicaInfo>(flow_count, cntx->owner()->RemoteEndpointAddress(),
cntx->conn_state.replicaiton_info.repl_listening_port,
std::move(err_handler));
auto [it, inserted] = replica_infos_.emplace(sync_id, std::move(replica_ptr));
CHECK(inserted);
Expand All @@ -499,7 +501,7 @@ uint32_t DflyCmd::CreateSyncSession(ConnectionContext* cntx) {
}

void DflyCmd::OnClose(ConnectionContext* cntx) {
unsigned session_id = cntx->conn_state.repl_session_id;
unsigned session_id = cntx->conn_state.replicaiton_info.repl_session_id;
if (!session_id)
return;

Expand Down Expand Up @@ -582,7 +584,8 @@ std::vector<DflyCmd::ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() {
std::vector<ReplicaRoleInfo> vec;
unique_lock lk(mu_);
for (const auto& info : replica_infos_) {
vec.emplace_back(info.second->address, info.second->state.load(memory_order_relaxed));
vec.emplace_back(info.second->address, info.second->listening_port,
info.second->state.load(memory_order_relaxed));
}
return vec;
}
Expand Down
11 changes: 7 additions & 4 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,24 @@ class DflyCmd {

// Stores information related to a single replica.
struct ReplicaInfo {
ReplicaInfo(unsigned flow_count, std::string address, Context::ErrHandler err_handler)
: state{SyncState::PREPARATION}, address{address}, cntx{std::move(err_handler)},
flows{flow_count} {
ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port,
Context::ErrHandler err_handler)
: state{SyncState::PREPARATION}, address{address},
listening_port(listening_port), cntx{std::move(err_handler)}, flows{flow_count} {
}

std::atomic<SyncState> state;
std::string address;
uint32_t listening_port;
Context cntx;

std::vector<FlowInfo> flows;
::boost::fibers::mutex mu; // See top of header for locking levels.
};
struct ReplicaRoleInfo {
ReplicaRoleInfo(std::string address, SyncState sync_state);
ReplicaRoleInfo(std::string address, uint32_t listening_port, SyncState sync_state);
std::string address;
uint32_t listening_port;
std::string state;
};

Expand Down
14 changes: 12 additions & 2 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ extern "C" {
ABSL_FLAG(bool, enable_multi_shard_sync, true,
"Execute multi shards commands on replica syncrhonized");

ABSL_DECLARE_FLAG(uint32_t, port);

namespace dfly {

using namespace std;
Expand Down Expand Up @@ -288,8 +290,16 @@ error_code Replica::Greet() {

io_buf.ConsumeInput(consumed);

// TODO: we may also send REPLCONF listening-port, ip-address
// See server.repl_state == REPL_STATE_SEND_PORT condition in replication.c
// Corresponds to server.repl_state == REPL_STATE_SEND_HANDSHAKE condition in replication.c
auto port = absl::GetFlag(FLAGS_port);
RETURN_ON_ERR(SendCommand(StrCat("REPLCONF listening-port ", port), &serializer));
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
if (!CheckRespIsSimpleReply("OK")) {
LOG(ERROR) << "Bad REPLCONF response " << ToSV(io_buf.InputBuffer());
return make_error_code(errc::bad_message);
}

io_buf.ConsumeInput(consumed);

// Corresponds to server.repl_state == REPL_STATE_SEND_CAPA
RETURN_ON_ERR(SendCommand("REPLCONF capa eof capa psync2", &serializer));
Expand Down
14 changes: 13 additions & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,12 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
if (etl.is_master) {
append("role", "master");
append("connected_slaves", m.conn_stats.num_replicas);
auto replicas = dfly_cmd_->GetReplicasRoleInfo();
for (auto i = 0; i < replicas.size(); i++) {
auto& r = replicas[i];
// e.g. slave0:ip=172.19.0.3,port=6379
append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port));
}
append("master_replid", master_id_);
} else {
append("role", "replica");
Expand Down Expand Up @@ -1549,7 +1555,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
cntx->owner()->SetName(absl::StrCat("repl_ctrl_", sid));

string sync_id = absl::StrCat("SYNC", sid);
cntx->conn_state.repl_session_id = sid;
cntx->conn_state.replicaiton_info.repl_session_id = sid;

if (!cntx->replica_conn) {
ServerState::tl_connection_stats()->num_replicas += 1;
Expand All @@ -1563,6 +1569,12 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(shard_set->pool()->size());
return;
}
} else if (cmd == "LISTENING-PORT") {
uint32_t replica_listening_port;
if (!absl::SimpleAtoi(arg, &replica_listening_port)) {
(*cntx)->SendError(kInvalidIntErr);
}
cntx->conn_state.replicaiton_info.repl_listening_port = replica_listening_port;
} else {
VLOG(1) << cmd << " " << arg;
}
Expand Down

0 comments on commit e7c97a4

Please sign in to comment.