Skip to content

Commit

Permalink
got websockets roughly working
Browse files Browse the repository at this point in the history
  • Loading branch information
bgreni committed Jan 14, 2024
1 parent 2a13928 commit faad86f
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 7 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ add_subdirectory(Core)

if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/main.cpp")
add_executable(app main.cpp)
target_link_libraries(app PRIVATE Solana.Rpc Solana.Core.Programs)
target_link_libraries(app PRIVATE Solana.Rpc Solana.Core.Programs Solana.Core.Network)
endif()

set(BUILD_TEST ON CACHE BOOL "Whether to build tests")
Expand Down
8 changes: 6 additions & 2 deletions Core/Common/include/Solana/Core/Util/Util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ namespace Solana {
static constexpr u64 SolToLamports(f64 sol) { return static_cast<u64>(sol * static_cast<f64>(LAMPORTS_PER_SOL)); }
};
template<typename T>
void print(const T & thing) {
static void print(const T & thing) {
std::cout << thing << "\n";
}

static void print(bool b) {
print(b ? "true" : "false");
}

template<typename ...Types>
void print(const Types & ... things) {
static void print(const Types & ... things) {
(print(things), ...);
}
}
4 changes: 3 additions & 1 deletion Core/Network/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ file(
"${CMAKE_CURRENT_SOURCE_DIR}/include/include")
file(GLOB SOURCES CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)

add_library(${PROJECT_NAME} STATIC ${SOURCES} ${HEADER_LIST})
add_library(${PROJECT_NAME} STATIC ${SOURCES} ${HEADER_LIST}
Websocket.cpp
include/Solana/Core/Network/Websocket.hpp)

target_link_libraries(${PROJECT_NAME} PUBLIC
Solana.Core.Common
Expand Down
143 changes: 143 additions & 0 deletions Core/Network/Websocket.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#include "Solana/Core/Network/Websocket.hpp"
#include <iostream>
#include <future>

using namespace Solana::Network;

namespace {
void fail(beast::error_code ec, char const* what) {
throw std::runtime_error(std::string("ERROR: ") + what + " : " + ec.message());
}

static const std::unordered_map<std::string, std::string> unsubMethods = {
{"slotSubscribe", "slotUnsubscribe"}
};
}

Websocket::Websocket(net::io_context& ioc, ssl::context& ctx)
: ws(make_strand(ioc), ctx)
, resolver(make_strand(ioc))
{
}

std::future<bool> Websocket::unsubscribe(json & message) {
const auto rpcId = message["id"].get<int>();
const auto subId = message["params"][0].get<int>();
auto & sub = activeHandlers.at(subId);
message["method"] = unsubMethods.at(sub.method);
std::promise<bool> prom;
auto fut = prom.get_future();
pendingUnsubs[rpcId] = std::move(prom);
doWrite(message);
activeHandlers.erase(subId);
return fut;
}

void Websocket::unsubscribeAll() {
auto message = json::object();
message["jsonrpc"] = "2.0";
message["id"] = 1;
for (const auto & pair : std::as_const(activeHandlers)) {
message["method"] = unsubMethods.at(pair.second.method);
message["params"] = json::array({pair.first});
doWrite(message);
}
activeHandlers.clear();
}

void Websocket::run(const Url & url) {

auto host = url.endpoint;
auto port = url.service;

auto ep = resolver.resolve({host, port});

if(!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), host.c_str()))
throw beast::system_error(
beast::error_code(
static_cast<int>(::ERR_get_error()),
net::error::get_ssl_category()),
"Failed to set SNI Hostname");

get_lowest_layer(ws).connect(ep);

host += ':' + port;

ws.next_layer().handshake(ssl::stream_base::client);
ws.handshake(host, url.targetBase);
doRead();
}

void Websocket::doRead() {
ws.async_read(
buffer,
beast::bind_front_handler(
&Websocket::onRead,
shared_from_this()));
}

std::future<int> Websocket::subscribe(const json & message, MessageHandler && callback) {
std::promise<int> res;
auto fut = res.get_future();
pendingHandlers[message["id"].get<int>()] = {
std::move(res),
{
message["method"],
callback
}
};
doWrite(message);
return fut;
}

void Websocket::onRead(beast::error_code ec,
std::size_t bytes_transferred) {

if (!ws.is_open()) return;

json message;
try {
message = json::parse(
std::string((char *)buffer.data().data(), buffer.size()));
} catch (...) {
print("failed to parse message", beast::make_printable(buffer.data()));
}

if (!message.contains("params")) {
if (!message["result"].is_boolean()) {
const auto subId = message["result"].get<int>();
const auto messageId = message["id"].get<int>();
auto & pending = pendingHandlers[messageId];
activeHandlers[subId] = pending.sub;
pending.prom.set_value(subId);
pendingHandlers.erase(messageId);
} else {
const auto success = message["result"].get<bool>();
const auto rpcId = message["id"].get<int>();
if (pendingUnsubs.contains(rpcId)) {
pendingUnsubs.at(rpcId).set_value(success);
}
}
} else {
const auto subId = message["params"]["subscription"].get<int>();
if (!activeHandlers.contains(subId)) {
print("Could not find handler for subscription:", subId);
} else {
activeHandlers[subId].handler(message);
}
}

buffer.clear();

if (cancelled) return;

ws.async_read(
buffer,
beast::bind_front_handler(
&Websocket::onRead,
shared_from_this()));
}

void Websocket::doWrite(const json & message) {
ws.write(net::buffer(message.dump()));
}
4 changes: 3 additions & 1 deletion Core/Network/include/Solana/Core/Network/HttpClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace Solana::Network {
auto url = *parse_uri(endpoint);
service = url.scheme();
this->endpoint = url.host();
targetBase = url.path() + (url.has_query() ? ("?" + url.query()) : "");
targetBase = url.path() + (url.has_query() ? ("?" + url.query()) : "/");
}
std::string endpoint;
std::string service;
Expand All @@ -58,6 +58,8 @@ namespace Solana::Network {
ioc.stop();
}

Url getUrl() { return url; }

template<typename T>
std::future<T> post(const json & body) {
Request req{};
Expand Down
94 changes: 94 additions & 0 deletions Core/Network/include/Solana/Core/Network/Websocket.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#pragma once

#define BOOST_ASIO_DISABLE_CONCEPTS

#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include "nlohmann/json.hpp"
#include <thread>
#include <atomic>
#include "Solana/Core/Network/HttpClient.hpp"
#include <unordered_map>
#include "Solana/Core/Util/Util.hpp"

namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
namespace ssl = boost::asio::ssl; // from <boost/asio/ssl.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>

using json = nlohmann::json;

namespace Solana {
using MessageHandler = std::function<void(const json &)>;
}

namespace Solana::Network {

class Websocket : public std::enable_shared_from_this<Websocket> {
public:
Websocket(net::io_context & ioc, ssl::context & ctx);
~Websocket() = default;

void shutdown() {
unsubscribeAll();
cancelled = true;
try {
ws.close(websocket::close_code::normal);
} catch (...) {
/* sometimes "stream truncated" gets thrown here
but vinnie says you can ignore it
https://github.com/boostorg/beast/issues/824#issuecomment-338412225
frankly this is the end of the websockets lifetime anyway
so who really cares anyway.
*/
}

}

void run(const Url & url);

std::future<int> subscribe(const json & message, MessageHandler && callback);
std::future<bool> unsubscribe(json & message);

private:
void doRead();
void doWrite(const json & message);

void onRead(beast::error_code ec, std::size_t bytes_transferred);

void unsubscribeAll();

websocket::stream<
beast::ssl_stream<beast::tcp_stream>> ws;
tcp::resolver resolver;
beast::flat_buffer buffer;

struct Subscription {
std::string method;
MessageHandler handler;
};

struct PendingSubscription {
std::promise<int> prom;
Subscription sub;
};

std::unordered_map<int, PendingSubscription> pendingHandlers;
std::unordered_map<int, Subscription> activeHandlers;
std::unordered_map<int, std::promise<bool>> pendingUnsubs;

std::atomic_bool cancelled = false;
};
}

64 changes: 64 additions & 0 deletions Rpc/Rpc.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,67 @@
#include "Solana/Rpc/Rpc.hpp"
#include <chrono>

using namespace Solana;

namespace {
net::io_context ioc = {};
std::atomic_int messageCounter = 0;
}

Rpc Rpc::DefaultMainnet() {
return Rpc("https://api.mainnet-beta.solana.com");
}

Rpc::~Rpc() {
ws->shutdown();
wsThread.join();
}

void Rpc::runWs() {
ssl::context ctx(boost::asio::ssl::context::tlsv13_client);
ctx.set_default_verify_paths();
ctx.set_options(
boost::asio::ssl::context::default_workarounds
| boost::asio::ssl::context::no_sslv2
| boost::asio::ssl::context::no_sslv3);

ws = std::make_shared<Network::Websocket>(ioc, ctx);
auto url = client.getUrl();
url.service = "443";
ws->run(url);

if (ioc.stopped()) ioc.restart();
ready = true;
cv.notify_all();

ioc.run();
}

std::future<bool> Rpc::removeSubscription(int subId) {
auto message = json::object();
message["jsonrpc"] = "2.0";
message["id"] = ++messageCounter;
message["params"] = json::array({subId});

return ws->unsubscribe(message);
}

std::future<int> Rpc::createSubscription(
const json & message,
MessageHandler && handler) {

std::unique_lock lock(wsMutex);

cv.wait(lock, [this](){return ready.load();});

return ws->subscribe(message, std::move(handler));
}

std::future<int> Rpc::onSlot(MessageHandler && handler) {
auto message = json::object();
message["jsonrpc"] = "2.0";
message["id"] = ++messageCounter;
message["method"] = "slotSubscribe";

return createSubscription(message, std::move(handler));
}
Loading

0 comments on commit faad86f

Please sign in to comment.