Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

修复状态机 #91

Merged
merged 1 commit into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/core/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ void Client::handleTunMessage() {

// 获取当前对端状态机的状态
uint32_t peerIp = Address::netToHost(header->daddr);
PeerConnState state = this->dispatcher.getPeerState(peerIp);
PeerConnState peerState = this->dispatcher.getPeerState(peerIp);

// 处于连接状态,直接发送,不需要其他操作
if (state == PeerConnState::CONNECTED) {
if (peerState == PeerConnState::CONNECTED) {
this->dispatcher.write(buffer);
continue;
}
Expand All @@ -266,14 +266,14 @@ void Client::handleTunMessage() {
message.buffer.append(buffer);
ws.write(message);

if (state == PeerConnState::INIT) {
if (peerState == PeerConnState::INIT || peerState == PeerConnState::SYNCHRONIZING) {
uint32_t pubIp;
uint16_t pubPort;
if (this->dispatcher.fetchPublicInfo(pubIp, pubPort)) {
continue;
spdlog::warn("fetch public info failed");
}
sendPeerMessage(this->tun.getIP(), peerIp, pubIp, pubPort);
this->dispatcher.createPeerPublicInfo(peerIp);
sendPeerConnMessage(this->tun.getIP(), peerIp, pubIp, pubPort);
this->dispatcher.updatePeerState(peerIp);
}
}
Candy::shutdown();
Expand Down Expand Up @@ -331,7 +331,7 @@ void Client::sendAuthMessage() {
return;
}

void Client::sendPeerMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort) {
void Client::sendPeerConnMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort) {
PeerConnMessage header;
header.tunSrcIp = Address::hostToNet(src);
header.tunDestIp = Address::hostToNet(dst);
Expand Down Expand Up @@ -396,8 +396,14 @@ void Client::handlePeerConnMessage(WebSocketMessage &message) {
uint16_t pubPort = Address::netToHost(header->pubPort);

if (tunDestIp != this->tun.getIP()) {
spdlog::warn("peer message dest not match: {:n}", spdlog::to_hex(message.buffer));
spdlog::warn("peer conn message dest not match: {:n}", spdlog::to_hex(message.buffer));
}

if (this->dispatcher.getPeerState(tunSrcIp) == PeerConnState::CONNECTED) {
spdlog::info("ignore peer conn connected message");
return;
}

this->dispatcher.updatePeerPublicInfo(tunSrcIp, pubIp, pubPort);
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Client {

void sendDynamicAddressMessage();
void sendAuthMessage();
void sendPeerMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort);
void sendPeerConnMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort);

void handleForwardMessage(WebSocketMessage &message);
void handleDynamicAddressMessage(WebSocketMessage &message);
Expand Down
129 changes: 97 additions & 32 deletions src/peer/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <openssl/evp.h>
#include <openssl/rand.h>
#include <openssl/sha.h>
#include <spdlog/fmt/bin_to_hex.h>
#include <spdlog/spdlog.h>
#include <string.h>
#include <utility/uri.h>
Expand Down Expand Up @@ -45,7 +46,7 @@ struct stun_response {
uint16_t length;
uint32_t cookie;
uint8_t id[12];
uint8_t attr[8];
uint8_t attr[0];
};

}; // namespace
Expand Down Expand Up @@ -156,8 +157,9 @@ int Dispatcher::fetchPublicInfo(uint32_t &pubIp, uint16_t &pubPort) {
return -1;
}

this->pubIp = 0;
this->pubPort = 0;
std::unique_lock<std::mutex> lock(this->pubMutex);

this->stunResponded = false;

stun_request request;
int len = sendto(fd, &request, sizeof(request), 0, info->ai_addr, info->ai_addrlen);
Expand All @@ -166,43 +168,68 @@ int Dispatcher::fetchPublicInfo(uint32_t &pubIp, uint16_t &pubPort) {
return -1;
}

std::unique_lock<std::mutex> lock(this->pubMutex);
if (this->pubCondition.wait_for(lock, std::chrono::seconds(1), [&] { return this->pubIp && this->pubPort; })) {
pubIp = this->pubIp;
pubPort = this->pubPort;
return 0;
if (!this->pubCondition.wait_for(lock, std::chrono::seconds(1), [&] { return this->stunResponded; })) {
spdlog::warn("recv stun response timeout");
return -1;
}
if (!this->stunResponded || this->pubIp == 0 || this->pubPort == 0) {
spdlog::warn("invalid public info: ip {:x} port {}", this->pubIp, this->pubPort);
return -1;
}

return -1;
pubIp = this->pubIp;
pubPort = this->pubPort;
return 0;
}
#endif

int Dispatcher::updatePeerPublicInfo(uint32_t tunIp, uint32_t pubIp, uint16_t pubPort) {
if (this->running) {
std::unique_lock<std::shared_mutex> lock(this->ipPeerMapMutex);
Peer &peer = this->ipPeerMap[tunIp];
peer.state = PeerConnState::CONNECTING;

if (peer.state == PeerConnState::INIT) {
peer.state = PeerConnState::SYNCHRONIZING;
spdlog::info("peer state changed: peer {:x} INIT -> SYNCHRONIZING", tunIp);
} else if (peer.state == PeerConnState::FAILED) {
peer.state = PeerConnState::SYNCHRONIZING;
spdlog::info("peer state changed: peer {:x} FAILED -> SYNCHRONIZING", tunIp);
} else if (peer.state == PeerConnState::PERPARING) {
peer.state = PeerConnState::CONNECTING;
spdlog::info("peer state changed: peer {:x} PERPARING -> CONNECTING", tunIp);
} else {
return 0;
}

peer.tunIp = tunIp;
peer.pubIp = pubIp;
peer.pubPort = pubPort;
peer.tickCount = 0;
peer.updateKey(this->password);
spdlog::debug("update peer public info:tun ip {:x} pub ip {:x} pub port {}", tunIp, pubIp, pubPort);
spdlog::debug("update peer public info: tun ip {:x} pub ip {:x} pub port {}", tunIp, pubIp, pubPort);
}
return 0;
}

int Dispatcher::createPeerPublicInfo(uint32_t tunIp) {
int Dispatcher::updatePeerState(uint32_t tunIp) {
if (this->running) {
std::unique_lock<std::shared_mutex> lock(this->ipPeerMapMutex);
Peer &peer = this->ipPeerMap[tunIp];

if (peer.state == PeerConnState::INIT) {
peer.state = PeerConnState::PERPARING;
peer.tunIp = tunIp;
peer.tickCount = 0;
peer.updateKey(this->password);
spdlog::debug("create peer public info: tun ip {:x}", tunIp);
spdlog::info("peer state changed: peer {:x} INIT -> PERPARING", tunIp);
} else if (peer.state == PeerConnState::SYNCHRONIZING) {
peer.state = PeerConnState::CONNECTING;
spdlog::info("peer state changed: peer {:x} SYNCHRONIZING -> CONNECTING", tunIp);
} else {
return 0;
}

peer.tunIp = tunIp;
peer.tickCount = 0;
peer.updateKey(this->password);
spdlog::debug("create peer public info: tun ip {:x}", tunIp);
}
return 0;
}
Expand All @@ -212,8 +239,6 @@ PeerConnState Dispatcher::getPeerState(uint32_t ip) {
return PeerConnState::FAILED;
}

std::shared_lock<std::shared_mutex> lock(this->ipPeerMapMutex);

auto it = this->ipPeerMap.find(ip);
if (it == this->ipPeerMap.end()) {
return PeerConnState::INIT;
Expand Down Expand Up @@ -384,21 +409,43 @@ int Dispatcher::tick() {
while (this->running) {
std::this_thread::sleep_for(std::chrono::seconds(1));

std::shared_lock<std::shared_mutex> lock(this->ipPeerMapMutex);
std::unique_lock<std::shared_mutex> lock(this->ipPeerMapMutex);
for (auto &[ip, peer] : this->ipPeerMap) {
// 初始状态或者已经确定连接失败的不再做处理
if (peer.state == PeerConnState::INIT || peer.state == PeerConnState::FAILED) {
continue;
}
// 处于主动连接或被动连接状态,在当前状态超过了 60 秒,进入失败状态
if ((peer.state == PeerConnState::PERPARING || peer.state == PeerConnState::CONNECTING) && peer.tickCount > 60) {
peer.state = PeerConnState::FAILED;
continue;
// 主动待连接状态,此时不发包
if (peer.state == PeerConnState::PERPARING) {
if (peer.tickCount > 5) {
peer.state = PeerConnState::INIT;
spdlog::info("peer state changed: peer {:x} PERPARING -> INIT", ip);
continue;
}
}
// 处于连接状态,当收到心跳后, tickCount 会重置,已经超过 3 秒没有重置,标记为断开连接
if (peer.state == PeerConnState::CONNECTED && peer.tickCount > 3) {
peer.state = PeerConnState::INIT;
continue;
// 被动待连接状态,此时不发包
if (peer.state == PeerConnState::SYNCHRONIZING) {
if (peer.tickCount > 3) {
peer.state = PeerConnState::INIT;
spdlog::info("peer state changed: peer {:x} SYNCHRONIZING -> INIT", ip);
continue;
}
}
// 尝试连接状态,连续发包,最多尝试 5 次
if (peer.state == PeerConnState::CONNECTING) {
if (peer.tickCount > 5) {
peer.state = PeerConnState::FAILED;
spdlog::info("peer state changed: peer {:x} CONNECTING -> FAILED", ip);
continue;
}
}
// 处于连接状态,当收到心跳后, tickCount 会重置,已经超过 2 秒没有重置,标记为初始状态,有新包会重新开始连接
if (peer.state == PeerConnState::CONNECTED) {
if (peer.tickCount > 2) {
peer.state = PeerConnState::INIT;
spdlog::info("peer state changed: peer {:x} CONNECTED -> INIT", ip);
continue;
}
}
// 发送心跳
if (peer.state == PeerConnState::CONNECTING || peer.state == PeerConnState::CONNECTED) {
Expand Down Expand Up @@ -456,7 +503,7 @@ int Dispatcher::handleHeartbeatMsg(const std::string &msg, uint32_t pubIp, uint1
return -1;
}

std::shared_lock<std::shared_mutex> lock(this->ipPeerMapMutex);
std::unique_lock<std::shared_mutex> lock(this->ipPeerMapMutex);

PeerMessageHeartbeat *heartbeat = (PeerMessageHeartbeat *)msg.c_str();
if (!this->ipPeerMap.contains(Address::netToHost(heartbeat->tunIp))) {
Expand All @@ -465,11 +512,15 @@ int Dispatcher::handleHeartbeatMsg(const std::string &msg, uint32_t pubIp, uint1
spdlog::debug("peer heartbeat unknown tun ip: {}", address.getIpStr());
return -1;
}

Peer &peer = this->ipPeerMap[Address::netToHost(heartbeat->tunIp)];
if (pubIp != peer.pubIp || pubPort != peer.pubPort) {
spdlog::debug("the source address does not match: ip {:x} {:x} port {} {}", pubIp, peer.pubIp, pubPort, peer.pubPort);
return -1;
}
if (peer.state != PeerConnState::CONNECTED) {
spdlog::info("peer state changed: peer {:x} CONNECTED", peer.tunIp);
}
peer.state = PeerConnState::CONNECTED;
peer.tickCount = 0;
return 0;
Expand Down Expand Up @@ -567,6 +618,9 @@ int Dispatcher::recvRawUdp(uint32_t &ip, uint16_t &port, std::string &msg) {

#if defined(__linux__) || defined(__linux)
int Dispatcher::handleStunResponse(const std::string &msg) {
uint32_t ip = 0;
uint16_t port = 0;

if (msg.length() < sizeof(stun_response)) {
spdlog::warn("invalid stun response length: {}", msg.length());
return -1;
Expand All @@ -582,24 +636,35 @@ int Dispatcher::handleStunResponse(const std::string &msg) {
// mapped address
if (Address::netToHost(*(uint16_t *)(attr + pos)) == 0x0001) {
pos += 6; // 跳过 2 字节类型, 2 字节长度, 1 字节保留, 1 字节IP版本号,指向端口号
this->pubPort = Address::netToHost(*(uint16_t *)(attr + pos));
port = Address::netToHost(*(uint16_t *)(attr + pos));
pos += 2; // 跳过2字节端口号,指向地址
this->pubIp = Address::netToHost(*(uint32_t *)(attr + pos));
ip = Address::netToHost(*(uint32_t *)(attr + pos));
break;
}
// xor mapped address
if (Address::netToHost(*(uint16_t *)(attr + pos)) == 0x0020) {
pos += 6; // 跳过 2 字节类型, 2 字节长度, 1 字节保留, 1 字节IP版本号,指向端口号
this->pubPort = Address::netToHost(*(uint16_t *)(attr + pos)) ^ 0x2112;
port = Address::netToHost(*(uint16_t *)(attr + pos)) ^ 0x2112;
pos += 2; // 跳过2字节端口号,指向地址
this->pubIp = Address::netToHost(*(uint32_t *)(attr + pos)) ^ 0x2112a442;
ip = Address::netToHost(*(uint32_t *)(attr + pos)) ^ 0x2112a442;
break;
}
// 跳过 2 字节类型,指向属性长度
pos += 2;
// 跳过 2 字节长度和用该属性其他内容
pos += 2 + Address::netToHost(*(uint16_t *)(attr + pos));
}
if (ip && port) {
spdlog::info("stun response: ip {:x} port {}", ip, port);
this->pubIp = ip;
this->pubPort = port;
} else {
spdlog::warn("stun response parse failed: {:n}", spdlog::to_hex(msg));
}
{
std::lock_guard<std::mutex> lock(this->pubMutex);
this->stunResponded = true;
}
pubCondition.notify_one();
return 0;
}
Expand Down
9 changes: 5 additions & 4 deletions src/peer/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ class Dispatcher {
// 从 STUN 服务端获取公网地址和端口
int fetchPublicInfo(uint32_t &pubIp, uint16_t &pubPort);

// 状态从 INIT 切换到 PREPARING,只在对端为 INIT 状态的时候从 STUN 获取自己的公网信息并发送给对方,
// PREPARING 状态时不再从 STUN 服务器获取公网信息.
int createPeerPublicInfo(uint32_t tunIp);

// 收到对端发送的公网信息,连接状态切换为 CONNECTING, 并开始向对方发包尝试建立连接,
int updatePeerPublicInfo(uint32_t tunIp, uint32_t pubIp, uint16_t pubPort);

// 状态从 INIT 切换到 PREPARING,只在对端为 INIT 状态的时候从 STUN 获取自己的公网信息并发送给对方,
// PREPARING 状态时不再从 STUN 服务器获取公网信息.
int updatePeerState(uint32_t tunIp);

// Client 获取对端状态,当状态为 INIT 时,调用 fetchPublicInfo 获取本机地址,并通过服务端发送给目标机器,尝试建立连接.
// 只有状态为 CONNECTED 时,直接发送,否则都要通过 Server 转发.
PeerConnState getPeerState(uint32_t ip);
Expand Down Expand Up @@ -88,6 +88,7 @@ class Dispatcher {
uint32_t stunIp;
uint16_t stunPort;

bool stunResponded;
uint32_t pubIp;
uint16_t pubPort;
std::mutex pubMutex;
Expand Down
16 changes: 6 additions & 10 deletions src/peer/peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,13 @@

namespace Candy {

// 主动尝试连接: INIT -> PERPARING
// 收到连接请求: * -> CONNECTING
// 连接成功: CONNECTING -> CONNECTED
// 连接失败: CONNECTING -> FAILED
// 成功的连接丢失心跳: CONNECTED -> INIT
enum class PeerConnState {
INIT, // 初始状态
PERPARING, // 准备连接
CONNECTING, // 尝试连接
CONNECTED, // 处于连接状态
FAILED, // 连接失败
INIT,
PERPARING,
SYNCHRONIZING,
CONNECTING,
CONNECTED,
FAILED,
};

struct Peer {
Expand Down