Skip to content

Commit

Permalink
支持对等连接
Browse files Browse the repository at this point in the history
  • Loading branch information
lanthora committed Sep 3, 2023
1 parent 64cac20 commit 72d60cc
Show file tree
Hide file tree
Showing 21 changed files with 1,082 additions and 70 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ include_directories(${spdlog_SOURCE_DIR}/include)
add_subdirectory(${SOURCE}/core)
add_subdirectory(${SOURCE}/websocket)
add_subdirectory(${SOURCE}/tun)
add_subdirectory(${SOURCE}/peer)
add_subdirectory(${SOURCE}/utility)

target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE core)
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE websocket)
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE tun)
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE peer)
target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE utility)

target_link_libraries(${CMAKE_PROJECT_NAME} PRIVATE ixwebsocket)
Expand Down
151 changes: 134 additions & 17 deletions src/core/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ int Client::setDynamicAddress(const std::string &cidr) {
return 0;
}

int Client::setStun(const std::string &stun) {
this->stun = stun;
return 0;
}

std::string Client::getAddress() {
return this->localAddress;
}
Expand Down Expand Up @@ -73,7 +78,11 @@ int Client::shutdown() {
if (this->tunThread.joinable()) {
this->tunThread.join();
}
if (this->dispatcherThread.joinable()) {
this->dispatcherThread.join();
}

this->dispatcher.shutdown();
this->tun.down();
this->ws.disconnect();
return 0;
Expand All @@ -90,7 +99,7 @@ int Client::startWsThread() {
}

// 只需要开 wsThread, 执行过程中会设置 tun 并开 tunThread.
this->wsThread = std::move(std::thread(&Client::handleWebSocketMessage, this));
this->wsThread = std::move(std::thread([&] { this->handleWebSocketMessage(); }));
return 0;
}

Expand All @@ -111,12 +120,35 @@ int Client::startTunThread() {
return -1;
}

this->tunThread = std::move(std::thread(&Client::handleTunMessage, this));
this->tunThread = std::move(std::thread([&] { this->handleTunMessage(); }));

sendAuthMessage();
return 0;
}

int Client::startDispatcherThread() {
if (this->stun.empty()) {
spdlog::info("stun is empty, peer-to-peer connections are not enabled");
return 0;
}
if (this->dispatcher.setPassword(this->password)) {
return -1;
}
if (this->dispatcher.setStun(this->stun)) {
return -1;
}
if (this->dispatcher.setTunIP(this->tun.getIP())) {
return -1;
}
if (this->dispatcher.run()) {
return -1;
}

this->dispatcherThread = std::move(std::thread([&] { this->handleDispatcherMessage(); }));

return 0;
}

void Client::handleWebSocketMessage() {
int error;
WebSocketMessage message;
Expand All @@ -132,17 +164,22 @@ void Client::handleWebSocketMessage() {
break;
}
if (message.type == WebSocketMessageType::Message) {
// TYPE_FORWARD, 拆包后转发给 TUN 设备
if (message.buffer.front() == MessageType::TYPE_FORWARD) {
// FORWARD, 拆包后转发给 TUN 设备
if (message.buffer.front() == MessageType::FORWARD) {
handleForwardMessage(message);
continue;
}
// 收到动态地址响应包,启动 TUN 设备并发送 Auth 包
if (message.buffer.front() == MessageType::TYPE_DYNAMIC_ADDRESS) {
if (message.buffer.front() == MessageType::DHCP) {
handleDynamicAddressMessage(message);
continue;
}
spdlog::warn("unknown message type. type {}", message.buffer.front());
// 收到对端连接请求包
if (message.buffer.front() == MessageType::PEER) {
handlePeerConnMessage(message);
continue;
}
spdlog::warn("unknown message: {:n}", spdlog::to_hex(message.buffer));
continue;
}

Expand All @@ -153,6 +190,11 @@ void Client::handleWebSocketMessage() {
Candy::shutdown();
break;
}
if (startDispatcherThread()) {
spdlog::critical("start dispatcher thread failed");
Candy::shutdown();
break;
}
continue;
}

Expand Down Expand Up @@ -181,7 +223,7 @@ void Client::handleWebSocketMessage() {

void Client::handleTunMessage() {
int error;
WebSocketMessage message;

std::string buffer;
IPv4Header *header;

Expand All @@ -208,17 +250,53 @@ void Client::handleTunMessage() {
continue;
}

// 目前客户端只与服务端通信,所以可以不加判断的直接把数据发给服务端.
// 未来支持端到端通信后,发送数据前先判断到目的地址能否直连,并通过直连的连接发送
message.buffer.clear();
message.buffer.push_back(MessageType::TYPE_FORWARD);
// 获取当前对端状态机的状态
uint32_t peerIp = Address::netToHost(header->daddr);
PeerConnState state = this->dispatcher.getPeerState(peerIp);

// 处于连接状态,直接发送,不需要其他操作
if (state == PeerConnState::CONNECTED) {
this->dispatcher.write(buffer);
continue;
}

// 通过 WebSocket 转发
WebSocketMessage message;
message.buffer.push_back(MessageType::FORWARD);
message.buffer.append(buffer);
ws.write(message);

if (state == PeerConnState::INIT) {
uint32_t pubIp;
uint16_t pubPort;
if (this->dispatcher.fetchPublicInfo(pubIp, pubPort)) {
continue;
}
sendPeerMessage(this->tun.getIP(), peerIp, pubIp, pubPort);
this->dispatcher.createPeerPublicInfo(peerIp);
}
}
Candy::shutdown();
return;
}

void Client::handleDispatcherMessage() {
std::string buffer;
int len;
while (this->running) {
len = this->dispatcher.read(buffer);
if (len == 0) {
continue;
}
if (len < 0) {
spdlog::error("handle dispatcher message error");
continue;
}
this->tun.write(buffer);
}
return;
}

void Client::sendDynamicAddressMessage() {
Address address;
if (address.cidrUpdate(this->dynamicAddress)) {
Expand All @@ -227,11 +305,11 @@ void Client::sendDynamicAddressMessage() {
return;
}

DynamicAddressHeader header(address.getCidr());
DynamicAddressMessage header(address.getCidr());
header.updateHash(this->password);

WebSocketMessage message;
message.buffer.assign((char *)(&header), sizeof(DynamicAddressHeader));
message.buffer.assign((char *)(&header), sizeof(DynamicAddressMessage));
this->ws.write(message);
return;
}
Expand All @@ -253,14 +331,29 @@ void Client::sendAuthMessage() {
return;
}

void Client::sendPeerMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort) {
PeerConnMessage header;
header.tunSrcIp = Address::hostToNet(src);
header.tunDestIp = Address::hostToNet(dst);
header.pubIp = Address::hostToNet(pubIp);
header.pubPort = Address::hostToNet(pubPort);

WebSocketMessage message;
message.buffer.assign((char *)(&header), sizeof(PeerConnMessage));
this->ws.write(message);

spdlog::debug("send peer message: src {:x} dst {:x} ip {:x} port {}", src, dst, pubIp, pubPort);
return;
}

void Client::handleDynamicAddressMessage(WebSocketMessage &message) {
if (message.buffer.size() != sizeof(DynamicAddressHeader)) {
spdlog::warn("invalid dynamic address package: len {}", message.buffer.length());
if (message.buffer.size() < sizeof(DynamicAddressMessage)) {
spdlog::warn("invalid dynamic address message: len {}", message.buffer.length());
spdlog::debug("dynamic address buffer: {:n}", spdlog::to_hex(message.buffer));
return;
}

DynamicAddressHeader *header = (DynamicAddressHeader *)message.buffer.c_str();
DynamicAddressMessage *header = (DynamicAddressMessage *)message.buffer.c_str();

Address address;
if (address.cidrUpdate(header->cidr)) {
Expand All @@ -272,17 +365,41 @@ void Client::handleDynamicAddressMessage(WebSocketMessage &message) {
if (startTunThread()) {
spdlog::critical("start tun thread with dynamic address failed");
Candy::shutdown();
return;
}
if (startDispatcherThread()) {
spdlog::critical("start dispatcher thread failed");
Candy::shutdown();
return;
}
}

void Client::handleForwardMessage(WebSocketMessage &message) {
if (message.buffer.size() < sizeof(ForwardHeader)) {
spdlog::warn("invalid forward package: {:n}", spdlog::to_hex(message.buffer));
spdlog::warn("invalid forward message: {:n}", spdlog::to_hex(message.buffer));
}

const char *src = message.buffer.c_str() + sizeof(ForwardHeader::type);
const size_t len = message.buffer.length() - sizeof(ForwardHeader::type);
this->tun.write(std::string(src, len));
}

void Client::handlePeerConnMessage(WebSocketMessage &message) {
if (message.buffer.size() < sizeof(PeerConnMessage)) {
spdlog::warn("invalid peer message: {:n}", spdlog::to_hex(message.buffer));
}
PeerConnMessage *header = (PeerConnMessage *)message.buffer.c_str();

uint32_t tunSrcIp = Address::netToHost(header->tunSrcIp);
uint32_t tunDestIp = Address::netToHost(header->tunDestIp);
uint32_t pubIp = Address::netToHost(header->pubIp);
uint16_t pubPort = Address::netToHost(header->pubPort);

if (tunDestIp != this->tun.getIP()) {
spdlog::warn("peer message dest not match: {:n}", spdlog::to_hex(message.buffer));
}
this->dispatcher.updatePeerPublicInfo(tunSrcIp, pubIp, pubPort);
return;
}

}; // namespace Candy
15 changes: 15 additions & 0 deletions src/core/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#ifndef CANDY_CORE_CLIENT_H
#define CANDY_CORE_CLIENT_H

#include "peer/dispatcher.h"
#include "tun/tun.h"
#include "websocket/client.h"
#include <string>
Expand All @@ -26,6 +27,9 @@ class Client {
// 设置默认动态 IP 地址,向服务端建议使用这个地址,这个地址不可用时服务端将返回一个可用的新地址
int setDynamicAddress(const std::string &cidr);

// 设置 STUN 服务端,用于开启对等连接
int setStun(const std::string &stun);

// 获取当前地址,设置的静态地址或者由服务端分发的动态地址
std::string getAddress();

Expand All @@ -36,30 +40,41 @@ class Client {
private:
int startWsThread();
int startTunThread();
int startDispatcherThread();

// 处理来自 WebSocket 服务端的消息
void handleWebSocketMessage();
// 处理来自 TUN 设备的消息
void handleTunMessage();
// 处理来自 Dispatcher 的消息
void handleDispatcherMessage();

void sendDynamicAddressMessage();
void sendAuthMessage();
void sendPeerMessage(uint32_t src, uint32_t dst, uint32_t pubIp, uint16_t pubPort);

void handleForwardMessage(WebSocketMessage &message);
void handleDynamicAddressMessage(WebSocketMessage &message);
void handlePeerConnMessage(WebSocketMessage &message);

std::string tunName;
std::string password;
std::string wsUri;
// tunThread 和 dispatcherThread 运行依赖正确的 localAddress, 赋值后才能启动这两个线程
std::string localAddress;
std::string dynamicAddress;

std::string stun;

std::thread wsThread;
std::thread tunThread;
std::thread dispatcherThread;

bool running = false;

Tun tun;
WebSocketClient ws;
Dispatcher dispatcher;
};

}; // namespace Candy
Expand Down
16 changes: 10 additions & 6 deletions src/core/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
namespace Candy {

AuthHeader::AuthHeader(uint32_t ip) {
this->type = MessageType::TYPE_AUTH;
this->type = MessageType::AUTH;
this->ip = Address::hostToNet(ip);
this->timestamp = Time::hostToNet(Time::unixTime());
}
Expand Down Expand Up @@ -42,23 +42,23 @@ bool AuthHeader::check(const std::string &password) {
}

ForwardHeader::ForwardHeader() {
this->type = MessageType::TYPE_FORWARD;
this->type = MessageType::FORWARD;
}

DynamicAddressHeader::DynamicAddressHeader(const std::string &cidr) {
this->type = MessageType::TYPE_DYNAMIC_ADDRESS;
DynamicAddressMessage::DynamicAddressMessage(const std::string &cidr) {
this->type = MessageType::DHCP;
this->timestamp = Time::hostToNet(Time::unixTime());
std::strcpy(this->cidr, cidr.c_str());
}

void DynamicAddressHeader::updateHash(const std::string &password) {
void DynamicAddressMessage::updateHash(const std::string &password) {
std::string data;
data.append(password);
data.append((char *)&timestamp, sizeof(timestamp));
SHA256((unsigned char *)data.data(), data.size(), this->hash);
}

bool DynamicAddressHeader::check(const std::string &password) {
bool DynamicAddressMessage::check(const std::string &password) {
// 检查时间戳
if (std::abs(Time::unixTime() - (int64_t)Time::netToHost(this->timestamp)) > 30) {
spdlog::warn("dynamic address header timestamp check failed: timestamp {}", Time::netToHost(this->timestamp));
Expand All @@ -80,4 +80,8 @@ bool DynamicAddressHeader::check(const std::string &password) {
return true;
}

PeerConnMessage::PeerConnMessage() {
this->type = MessageType::PEER;
}

}; // namespace Candy
Loading

0 comments on commit 72d60cc

Please sign in to comment.