Skip to content

Commit

Permalink
add keep alive logic
Browse files Browse the repository at this point in the history
  • Loading branch information
chenxiemin committed Jan 9, 2016
1 parent fdd1d54 commit 150ab1d
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 49 deletions.
3 changes: 2 additions & 1 deletion client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class MyAgent : public IServantClientSink, public IServantClientDataSink,
public: virtual void OnDisconnect()
{
LOGI("OnDisconnect");
mtimer->Stop();
if (NULL != mtimer.get())
mtimer->Stop();
mtimer.reset();
}

Expand Down
28 changes: 17 additions & 11 deletions common/client-state-connected.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ namespace p2p {

void ServantClient::ClientStateConnected::Logout()
{
// fire notify
PClient->FireOnDisconnectNofity();

// change to logouting state
shared_ptr<ServantClient::ClientState> oldState = PClient->SetStateInternal(SERVANT_CLIENT_LOGOUTING);
shared_ptr<ServantClient::ClientState> oldState = PClient->SetStateInternal(SERVANT_CLIENT_DISCONNECTING);
// resend logout event
PClient->meventThread->PutEvent(SERVANT_CLIENT_EVENT_LOGOUT);
}

void ServantClient::ClientStateConnected::Disconnect()
{
shared_ptr<ServantClient::ClientState> oldState = PClient->SetStateInternal(SERVANT_CLIENT_DISCONNECTING);
// resend disconnect event
PClient->meventThread->PutEvent(SERVANT_CLIENT_EVENT_DISCONNECT);
}

int ServantClient::ClientStateConnected::SendTo(const uint8_t *buf, int len)
{
if (len >= TransceiverU::MAX_RECEIVE_BUFFER_SIZE - sizeof(Message)) {
Expand All @@ -31,26 +34,29 @@ int ServantClient::ClientStateConnected::SendTo(const uint8_t *buf, int len)
memcpy(Buffer, &msg, sizeof(Message));
memcpy(Buffer + sizeof(Message), buf, len);

return PClient->mtransport->SendTo(PeerCandidate, Buffer, sizeof(Message) + len);
return PClient->mtransport->SendTo(PClient->PeerCandidate, Buffer, sizeof(Message)+len);
}

int ServantClient::ClientStateConnected::OnMessage(shared_ptr<ReceiveMessage> message)
{
if (!message->GetRemoteCandidate()->Equal(PeerCandidate)) {
if (!message->GetRemoteCandidate()->Equal(PClient->PeerCandidate)) {
LOGE("Unwanted message from peer: %s %s",
message->GetRemoteCandidate()->ToString().c_str(),
PeerCandidate->ToString().c_str());
PClient->PeerCandidate->ToString().c_str());
return -1;
}

const Message *pmsg = message->GetMessage();
if (CXM_P2P_MESSAGE_USER_DATA != pmsg->type) {
if (CXM_P2P_MESSAGE_USER_DATA == pmsg->type) {
shared_ptr<ReceiveData> recvData = message->GetReceiveData();
PClient->FireOnDataNofity(shared_ptr<P2PPacket>(new P2PPacket(recvData)));
} else if (CXM_P2P_MESSAGE_PEER_DISCONNECT == pmsg->type) {
PClient->Disconnect();
} else {
LOGE("Unwant message type: %d", pmsg->type);
return -1;
}

shared_ptr<ReceiveData> recvData = message->GetReceiveData();
PClient->FireOnDataNofity(shared_ptr<P2PPacket>(new P2PPacket(recvData)));

return 0;
}
Expand Down
46 changes: 22 additions & 24 deletions common/client-state-connecting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void ServantClient::ClientStateConnecting::OnTimer()
{
unique_lock<mutex> lock(mmutex);

if (NULL == this->PeerCandidate.get()) {
if (NULL == PClient->PeerCandidate.get()) {
// request peer address from server
Message msg;
msg.type = CXM_P2P_MESSAGE_REQUEST;
Expand Down Expand Up @@ -79,11 +79,11 @@ int ServantClient::ClientStateConnecting::OnMessage(shared_ptr<ReceiveMessage> m
return 0;
}

this->PeerCandidate = shared_ptr<Candidate>(new Candidate(
PClient->PeerCandidate = shared_ptr<Candidate>(new Candidate(
message->GetMessage()->u.client.uc.replyRequest.remoteIp,
message->GetMessage()->u.client.uc.replyRequest.remotePort));
LOGI("Receiving peer address from server: %s",
this->PeerCandidate->ToString().c_str());
PClient->PeerCandidate->ToString().c_str());

return 0;
} case CXM_P2P_MESSAGE_REPLY_CONNECT: {
Expand All @@ -96,8 +96,8 @@ int ServantClient::ClientStateConnecting::OnMessage(shared_ptr<ReceiveMessage> m

for (int i = 0; i < 3; i++) {
for (int j = 0; j < 3; j++) {
shared_ptr<Candidate> sendCandidate = shared_ptr<Candidate>(
new Candidate(PeerCandidate->Ip(), PeerCandidate->Port() + i));
shared_ptr<Candidate> sendCandidate = shared_ptr<Candidate>(new Candidate(
PClient->PeerCandidate->Ip(), PClient->PeerCandidate->Port() + i));
int res = PClient->mtransport->SendTo(sendCandidate,
(uint8_t *)&msg, sizeof(Message));
if (0 != res)
Expand All @@ -113,11 +113,11 @@ int ServantClient::ClientStateConnecting::OnMessage(shared_ptr<ReceiveMessage> m
return 0;
} case CXM_P2P_MESSAGE_DO_P2P_CONNECT: {
// update remote peer candidate if in SYM NAT
if (!PeerCandidate->Equal(message->GetRemoteCandidate())) {
if (!PClient->PeerCandidate->Equal(message->GetRemoteCandidate())) {
LOGI("Receive DO_P2P from different candidate, update %s to %s",
PeerCandidate->ToString().c_str(),
message->GetRemoteCandidate()->ToString().c_str());
PeerCandidate = message->GetRemoteCandidate();
PClient->PeerCandidate->ToString().c_str(),
message->GetRemoteCandidate()->ToString().c_str());
PClient->PeerCandidate = message->GetRemoteCandidate();
}

// send p2p connect
Expand All @@ -126,25 +126,25 @@ int ServantClient::ClientStateConnecting::OnMessage(shared_ptr<ReceiveMessage> m
msg.type = CXM_P2P_MESSAGE_REPLY_P2P_CONNECT;
strncpy(msg.u.p2p.up.p2pReply.key, SERVANT_P2P_REPLY_MESSAGE, CLIENT_NAME_LENGTH);

int res = PClient->mtransport->SendTo(PeerCandidate,
int res = PClient->mtransport->SendTo(PClient->PeerCandidate,
(uint8_t *)&msg, sizeof(Message));
if (0 != res)
LOGE("Cannot send reply p2p connect to %s: %d",
PeerCandidate->ToString().c_str(), res);
PClient->PeerCandidate->ToString().c_str(), res);
else
LOGI("Receiving DO_P2P_CONNECT command from peer %s, "
"send back REPLY_P2P_CONNECT with key: %s",
PeerCandidate->ToString().c_str(), SERVANT_P2P_REPLY_MESSAGE);
PClient->PeerCandidate->ToString().c_str(), SERVANT_P2P_REPLY_MESSAGE);

return 0;
} case CXM_P2P_MESSAGE_REPLY_P2P_CONNECT: {
LOGI("Receive REPLY_P2P_CONNECT from peer: %s",
message->GetRemoteCandidate()->ToString().c_str());
if (!PeerCandidate->Equal(message->GetRemoteCandidate())) {
if (!PClient->PeerCandidate->Equal(message->GetRemoteCandidate())) {
LOGI("Receive REPLY_P2P from different candidate, update %s to %s",
PeerCandidate->ToString().c_str(),
PClient->PeerCandidate->ToString().c_str(),
message->GetRemoteCandidate()->ToString().c_str());
PeerCandidate = message->GetRemoteCandidate();
PClient->PeerCandidate = message->GetRemoteCandidate();
}

// send p2p connect to remote candidate again
Expand All @@ -153,19 +153,19 @@ int ServantClient::ClientStateConnecting::OnMessage(shared_ptr<ReceiveMessage> m
msg.type = CXM_P2P_MESSAGE_REPLY_P2P_CONNECT;
strncpy(msg.u.p2p.up.p2pReply.key, SERVANT_P2P_REPLY_MESSAGE, CLIENT_NAME_LENGTH);

int res = PClient->mtransport->SendTo(PeerCandidate,
int res = PClient->mtransport->SendTo(PClient->PeerCandidate,
(uint8_t *)&msg, sizeof(Message));
if (0 != res)
LOGE("Cannot send REPLY_P2P_CONNECT to %s: %d",
PeerCandidate->ToString().c_str(), res);
PClient->PeerCandidate->ToString().c_str(), res);
else
LOGI("Receiving REPLY_P2P_CONNECT command from peer %s, "
" send again REPLY_P2P_CONNECT with key: %s",
PeerCandidate->ToString().c_str(), SERVANT_P2P_REPLY_MESSAGE);
PClient->PeerCandidate->ToString().c_str(), SERVANT_P2P_REPLY_MESSAGE);

LOGI("P2P connection establis successfully between local %s and peer %s",
PClient->mtransport->GetLocalCandidate()->ToString().c_str(),
PeerCandidate->ToString().c_str());
PClient->PeerCandidate->ToString().c_str());

// stop timer
if (NULL != this->mtimer.get())
Expand All @@ -174,11 +174,9 @@ int ServantClient::ClientStateConnecting::OnMessage(shared_ptr<ReceiveMessage> m
// hold on this reference to prevent self deleted
shared_ptr<ServantClient::ClientState> oldState =
PClient->SetStateInternal(SERVANT_CLIENT_CONNECTED);
// pass peer candidate
shared_ptr<ServantClient::ClientStateConnected> connected =
dynamic_pointer_cast<ServantClient::ClientStateConnected>(PClient->mstate);
assert(NULL != connected.get());
connected->PeerCandidate = this->PeerCandidate;

// start peer keep alive
PClient->StartPeerKeepAlive();

// fire notify
PClient->FireOnConnectNofity();
Expand Down
31 changes: 29 additions & 2 deletions common/client-state-disconnecting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,38 @@ namespace p2p {

void ServantClient::ClientStateDisconnecting::Logout()
{
// change to logouting state
shared_ptr<ServantClient::ClientState> oldState = PClient->SetStateInternal(SERVANT_CLIENT_LOGOUTING);
this->DisconnectInternal();
shared_ptr<ServantClient::ClientState> oldState = PClient->SetStateInternal(SERVANT_CLIENT_LOGIN);

// resend logout event
PClient->meventThread->PutEvent(SERVANT_CLIENT_EVENT_LOGOUT);
}

void ServantClient::ClientStateDisconnecting::Disconnect()
{
this->DisconnectInternal();
shared_ptr<ServantClient::ClientState> oldState = PClient->SetStateInternal(SERVANT_CLIENT_LOGIN);
}

void ServantClient::ClientStateDisconnecting::DisconnectInternal()
{
// send remote peer disconnect message
Message msg;
msg.type = CXM_P2P_MESSAGE_PEER_DISCONNECT;

int res = PClient->mtransport->SendTo(PClient->PeerCandidate,
(uint8_t *)&msg, sizeof(Message));
if (0 != res)
LOGE("Cannot send reply p2p connect to %s: %d",
PClient->PeerCandidate->ToString().c_str(), res);

// stop peer keep alive
PClient->StopPeerKeepAlive();

PClient->FireOnDisconnectNofity();
// clear remote peer after disconnect
PClient->PeerCandidate.reset();
}

}
}
4 changes: 1 addition & 3 deletions common/client-state-login.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ namespace p2p {

void ServantClient::ClientStateLogin::OnStateForeground()
{
// Notify connect
PClient->FireOnLoginNofity();
}

int ServantClient::ClientStateLogin::OnMessage(std::shared_ptr<ReceiveMessage> message)
Expand Down Expand Up @@ -41,7 +39,7 @@ int ServantClient::ClientStateLogin::OnMessage(std::shared_ptr<ReceiveMessage> m
assert(NULL != connectingState.get());

// trigger connecting state to do p2p connect with remote peer
connectingState->PeerCandidate = peerCandidate;
PClient->PeerCandidate = peerCandidate;
connectingState->OnMessage(message);

return 0;
Expand Down
7 changes: 7 additions & 0 deletions common/client-state-loginning.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ int ServantClient::ClientStateLogining::OnMessage(shared_ptr<ReceiveMessage> mes
// receive login success message, goto login state
shared_ptr<ServantClient::ClientState> oldState = PClient->SetStateInternal(SERVANT_CLIENT_LOGIN);

// Notify connect
PClient->FireOnLoginNofity();
// start keep alive for p2p server
PClient->StartServerKeepAlive();

// start login keep alive

#if 0 // do not auto connect
// start p2p connection
PClient->meventThread->PutEvent(SERVANT_CLIENT_EVENT_CONNECT);
Expand Down
8 changes: 7 additions & 1 deletion common/client-state-logouting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ void ServantClient::ClientStateLogouting::Logout()
if (0 != res)
LOGE("Cannot send logout message: %d", res);

// stop p2p server keep alive
PClient->StopServerKeepAlive();

// close server transport
if (NULL != PClient->mtransport.get()) {
PClient->mtransport->Close();
PClient->mtransport.reset();
}

// notify logout
// fire notify
PClient->FireOnDisconnectNofity();

// notify CV
LOGD("Before notify logout success");
unique_lock<mutex> lock(PClient->mlogoutMutex);
PClient->mlogoutCV.notify_one();
Expand Down
4 changes: 3 additions & 1 deletion common/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ typedef enum {
CXM_P2P_MESSAGE_DO_P2P_CONNECT,
CXM_P2P_MESSAGE_REPLY_P2P_CONNECT,
CXM_P2P_MESSAGE_USER_DATA,
CXM_P2P_MESSAGE_SERVER_KEEP_ALIVE,
CXM_P2P_MESSAGE_PEER_KEEP_ALIVE,
CXM_P2P_MESSAGE_PEER_DISCONNECT,
CXM_P2P_MESSAGE_COUNT // remain last
} CXM_P2P_MESSAGE_TYPE;

Expand All @@ -49,7 +52,6 @@ struct Message

union {
struct {

} server;
struct {
char clientName[CLIENT_NAME_LENGTH + 1];
Expand Down
Loading

0 comments on commit 150ab1d

Please sign in to comment.