Skip to content

Commit

Permalink
Add new callback event for new session from leader (#115)
Browse files Browse the repository at this point in the history
* We should differentiate opening connection and receiving message
from valid leader, as leader can be changed without closing connection.
  • Loading branch information
greensky00 authored Mar 20, 2020
1 parent ed4eb8d commit cbe1bee
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
36 changes: 34 additions & 2 deletions include/libnuraft/callback.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ public:
* ctx: pointer to `ConnectionArgs`.
*/
ConnectionClosed = 17,

/**
* Invoked when a session receives a message from the valid leader
* first time. This callback is preceded by `ConnectionOpened`
* event.
* ctx: pointer to `ConnectionArgs`.
*/
NewSessionFromLeader = 18,
};

struct Param {
Expand Down Expand Up @@ -173,11 +181,35 @@ public:
struct ConnectionArgs {
ConnectionArgs(uint64_t id = 0,
const std::string& addr = std::string(),
uint32_t p = 0)
: sessionId(id), address(addr), port(p) {}
uint32_t p = 0,
int32_t srv_id = -1,
bool is_leader = false)
: sessionId(id), address(addr), port(p)
, srvId(srv_id), isLeader(is_leader) {}
/**
* ID of session.
*/
uint64_t sessionId;

/**
* Endpoint address.
*/
std::string address;

/**
* Endpoint port.
*/
uint32_t port;

/**
* Endpoint server ID if given.
*/
int32_t srvId;

/**
* `true` if the endpoint server is leader.
*/
bool isLeader;
};

using func_type = std::function<ReturnCode(Type, Param*)>;
Expand Down
65 changes: 62 additions & 3 deletions src/asio_service.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {
, header_(buffer::alloc(RPC_REQ_HEADER_SIZE))
, l_(logger)
, callback_(callback)
, src_id_(-1)
, is_leader_(false)
{
p_tr("asio rpc session created: %p", this);
}
Expand Down Expand Up @@ -252,7 +254,6 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {
std::placeholders::_1 ) );
#endif
} else {
invoke_connection_callback(true);
this->start(self);
}
}
Expand All @@ -264,7 +265,6 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {
session_id_,
socket_.remote_endpoint().address().to_string().c_str(),
socket_.remote_endpoint().port() );
invoke_connection_callback(true);
this->start(self);

} else {
Expand Down Expand Up @@ -388,10 +388,17 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {

private:
void invoke_connection_callback(bool is_open) {
if (is_leader_ && src_id_ != handler_->get_leader()) {
// Leader has been changed without closing session.
is_leader_ = false;
}

cb_func::ConnectionArgs
args( session_id_,
socket_.remote_endpoint().address().to_string(),
socket_.remote_endpoint().port() );
socket_.remote_endpoint().port(),
src_id_,
is_leader_ );
cb_func::Param cb_param( handler_->get_id(),
handler_->get_leader(),
-1,
Expand Down Expand Up @@ -442,6 +449,44 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {
ulong last_idx = hdr->get_ulong();
ulong commit_idx = hdr->get_ulong();

if (src_id_ == -1) {
// It means this is the first message on this session.
// Invoke callback function of new connection.
src_id_ = src;
invoke_connection_callback(true);

} else if (is_leader_ && src_id_ != handler_->get_leader()) {
// Leader has been changed without closing session.
is_leader_ = false;
}

if (!is_leader_) {
// If leader flag is not set, we identify whether the endpoint
// server is leader based on the message type (only leader
// can send below message types).
if ( t == msg_type::append_entries_request ||
t == msg_type::sync_log_request ||
t == msg_type::join_cluster_request ||
t == msg_type::leave_cluster_request ||
t == msg_type::install_snapshot_request ||
t == msg_type::priority_change_request ||
t == msg_type::custom_notification_request ) {
is_leader_ = true;
cb_func::ConnectionArgs
args( session_id_,
socket_.remote_endpoint().address().to_string(),
socket_.remote_endpoint().port(),
src_id_,
is_leader_ );
cb_func::Param cb_param( handler_->get_id(),
handler_->get_leader(),
-1,
&args );
handler_->invoke_callback( cb_func::NewSessionFromLeader,
&cb_param );
}
}

std::string meta_str;
ptr<req_msg> req = cs_new<req_msg>
( term, t, src, dst, last_term, last_idx, commit_idx );
Expand Down Expand Up @@ -612,6 +657,20 @@ class rpc_session : public std::enable_shared_from_this<rpc_session> {
ptr<buffer> header_;
ptr<logger> l_;
session_closed_callback callback_;

/**
* Source server (endpoint) ID, used to check whether it is leader.
* This value is `-1` at the beginning, which denotes this session
* hasn't received any message from the endpoint.
* Note that this ID should not be changed throughout the life time
* of the session.
*/
int32 src_id_;

/**
* `true` if the endpoint server was leader when it was last seen.
*/
bool is_leader_;
};

// rpc listener implementation
Expand Down

0 comments on commit cbe1bee

Please sign in to comment.