From faad86f373c609455881a491619e0e57a6356ccf Mon Sep 17 00:00:00 2001 From: bgreni Date: Sun, 14 Jan 2024 00:55:06 -0700 Subject: [PATCH] got websockets roughly working --- CMakeLists.txt | 2 +- Core/Common/include/Solana/Core/Util/Util.hpp | 8 +- Core/Network/CMakeLists.txt | 4 +- Core/Network/Websocket.cpp | 143 ++++++++++++++++++ .../Solana/Core/Network/HttpClient.hpp | 4 +- .../include/Solana/Core/Network/Websocket.hpp | 94 ++++++++++++ Rpc/Rpc.cpp | 64 ++++++++ Rpc/include/Solana/Rpc/Rpc.hpp | 35 ++++- 8 files changed, 347 insertions(+), 7 deletions(-) create mode 100644 Core/Network/Websocket.cpp create mode 100644 Core/Network/include/Solana/Core/Network/Websocket.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 188a736..26fd08f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,7 +21,7 @@ add_subdirectory(Core) if (EXISTS "${CMAKE_CURRENT_SOURCE_DIR}/main.cpp") add_executable(app main.cpp) - target_link_libraries(app PRIVATE Solana.Rpc Solana.Core.Programs) + target_link_libraries(app PRIVATE Solana.Rpc Solana.Core.Programs Solana.Core.Network) endif() set(BUILD_TEST ON CACHE BOOL "Whether to build tests") diff --git a/Core/Common/include/Solana/Core/Util/Util.hpp b/Core/Common/include/Solana/Core/Util/Util.hpp index fd01d74..17c9728 100644 --- a/Core/Common/include/Solana/Core/Util/Util.hpp +++ b/Core/Common/include/Solana/Core/Util/Util.hpp @@ -9,12 +9,16 @@ namespace Solana { static constexpr u64 SolToLamports(f64 sol) { return static_cast(sol * static_cast(LAMPORTS_PER_SOL)); } }; template - void print(const T & thing) { + static void print(const T & thing) { std::cout << thing << "\n"; } + static void print(bool b) { + print(b ? "true" : "false"); + } + template - void print(const Types & ... things) { + static void print(const Types & ... things) { (print(things), ...); } } diff --git a/Core/Network/CMakeLists.txt b/Core/Network/CMakeLists.txt index 80c6dde..ed697f8 100644 --- a/Core/Network/CMakeLists.txt +++ b/Core/Network/CMakeLists.txt @@ -7,7 +7,9 @@ file( "${CMAKE_CURRENT_SOURCE_DIR}/include/include") file(GLOB SOURCES CONFIGURE_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp) -add_library(${PROJECT_NAME} STATIC ${SOURCES} ${HEADER_LIST}) +add_library(${PROJECT_NAME} STATIC ${SOURCES} ${HEADER_LIST} + Websocket.cpp + include/Solana/Core/Network/Websocket.hpp) target_link_libraries(${PROJECT_NAME} PUBLIC Solana.Core.Common diff --git a/Core/Network/Websocket.cpp b/Core/Network/Websocket.cpp new file mode 100644 index 0000000..a9e00a8 --- /dev/null +++ b/Core/Network/Websocket.cpp @@ -0,0 +1,143 @@ +#include "Solana/Core/Network/Websocket.hpp" +#include +#include + +using namespace Solana::Network; + +namespace { + void fail(beast::error_code ec, char const* what) { + throw std::runtime_error(std::string("ERROR: ") + what + " : " + ec.message()); + } + + static const std::unordered_map unsubMethods = { + {"slotSubscribe", "slotUnsubscribe"} + }; +} + +Websocket::Websocket(net::io_context& ioc, ssl::context& ctx) + : ws(make_strand(ioc), ctx) + , resolver(make_strand(ioc)) +{ +} + +std::future Websocket::unsubscribe(json & message) { + const auto rpcId = message["id"].get(); + const auto subId = message["params"][0].get(); + auto & sub = activeHandlers.at(subId); + message["method"] = unsubMethods.at(sub.method); + std::promise prom; + auto fut = prom.get_future(); + pendingUnsubs[rpcId] = std::move(prom); + doWrite(message); + activeHandlers.erase(subId); + return fut; +} + +void Websocket::unsubscribeAll() { + auto message = json::object(); + message["jsonrpc"] = "2.0"; + message["id"] = 1; + for (const auto & pair : std::as_const(activeHandlers)) { + message["method"] = unsubMethods.at(pair.second.method); + message["params"] = json::array({pair.first}); + doWrite(message); + } + activeHandlers.clear(); +} + +void Websocket::run(const Url & url) { + + auto host = url.endpoint; + auto port = url.service; + + auto ep = resolver.resolve({host, port}); + + if(!SSL_set_tlsext_host_name(ws.next_layer().native_handle(), host.c_str())) + throw beast::system_error( + beast::error_code( + static_cast(::ERR_get_error()), + net::error::get_ssl_category()), + "Failed to set SNI Hostname"); + + get_lowest_layer(ws).connect(ep); + + host += ':' + port; + + ws.next_layer().handshake(ssl::stream_base::client); + ws.handshake(host, url.targetBase); + doRead(); +} + +void Websocket::doRead() { + ws.async_read( + buffer, + beast::bind_front_handler( + &Websocket::onRead, + shared_from_this())); +} + +std::future Websocket::subscribe(const json & message, MessageHandler && callback) { + std::promise res; + auto fut = res.get_future(); + pendingHandlers[message["id"].get()] = { + std::move(res), + { + message["method"], + callback + } + }; + doWrite(message); + return fut; +} + +void Websocket::onRead(beast::error_code ec, + std::size_t bytes_transferred) { + + if (!ws.is_open()) return; + + json message; + try { + message = json::parse( + std::string((char *)buffer.data().data(), buffer.size())); + } catch (...) { + print("failed to parse message", beast::make_printable(buffer.data())); + } + + if (!message.contains("params")) { + if (!message["result"].is_boolean()) { + const auto subId = message["result"].get(); + const auto messageId = message["id"].get(); + auto & pending = pendingHandlers[messageId]; + activeHandlers[subId] = pending.sub; + pending.prom.set_value(subId); + pendingHandlers.erase(messageId); + } else { + const auto success = message["result"].get(); + const auto rpcId = message["id"].get(); + if (pendingUnsubs.contains(rpcId)) { + pendingUnsubs.at(rpcId).set_value(success); + } + } + } else { + const auto subId = message["params"]["subscription"].get(); + if (!activeHandlers.contains(subId)) { + print("Could not find handler for subscription:", subId); + } else { + activeHandlers[subId].handler(message); + } + } + + buffer.clear(); + + if (cancelled) return; + + ws.async_read( + buffer, + beast::bind_front_handler( + &Websocket::onRead, + shared_from_this())); +} + +void Websocket::doWrite(const json & message) { + ws.write(net::buffer(message.dump())); +} diff --git a/Core/Network/include/Solana/Core/Network/HttpClient.hpp b/Core/Network/include/Solana/Core/Network/HttpClient.hpp index 341fa00..474ea72 100644 --- a/Core/Network/include/Solana/Core/Network/HttpClient.hpp +++ b/Core/Network/include/Solana/Core/Network/HttpClient.hpp @@ -32,7 +32,7 @@ namespace Solana::Network { auto url = *parse_uri(endpoint); service = url.scheme(); this->endpoint = url.host(); - targetBase = url.path() + (url.has_query() ? ("?" + url.query()) : ""); + targetBase = url.path() + (url.has_query() ? ("?" + url.query()) : "/"); } std::string endpoint; std::string service; @@ -58,6 +58,8 @@ namespace Solana::Network { ioc.stop(); } + Url getUrl() { return url; } + template std::future post(const json & body) { Request req{}; diff --git a/Core/Network/include/Solana/Core/Network/Websocket.hpp b/Core/Network/include/Solana/Core/Network/Websocket.hpp new file mode 100644 index 0000000..ff22011 --- /dev/null +++ b/Core/Network/include/Solana/Core/Network/Websocket.hpp @@ -0,0 +1,94 @@ +#pragma once + +#define BOOST_ASIO_DISABLE_CONCEPTS + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "nlohmann/json.hpp" +#include +#include +#include "Solana/Core/Network/HttpClient.hpp" +#include +#include "Solana/Core/Util/Util.hpp" + +namespace beast = boost::beast; // from +namespace http = beast::http; // from +namespace websocket = beast::websocket; // from +namespace net = boost::asio; // from +namespace ssl = boost::asio::ssl; // from +using tcp = boost::asio::ip::tcp; // from + +using json = nlohmann::json; + +namespace Solana { + using MessageHandler = std::function; +} + +namespace Solana::Network { + + class Websocket : public std::enable_shared_from_this { + public: + Websocket(net::io_context & ioc, ssl::context & ctx); + ~Websocket() = default; + + void shutdown() { + unsubscribeAll(); + cancelled = true; + try { + ws.close(websocket::close_code::normal); + } catch (...) { + /* sometimes "stream truncated" gets thrown here + but vinnie says you can ignore it + https://github.com/boostorg/beast/issues/824#issuecomment-338412225 + + frankly this is the end of the websockets lifetime anyway + so who really cares anyway. + */ + } + + } + + void run(const Url & url); + + std::future subscribe(const json & message, MessageHandler && callback); + std::future unsubscribe(json & message); + + private: + void doRead(); + void doWrite(const json & message); + + void onRead(beast::error_code ec, std::size_t bytes_transferred); + + void unsubscribeAll(); + + websocket::stream< + beast::ssl_stream> ws; + tcp::resolver resolver; + beast::flat_buffer buffer; + + struct Subscription { + std::string method; + MessageHandler handler; + }; + + struct PendingSubscription { + std::promise prom; + Subscription sub; + }; + + std::unordered_map pendingHandlers; + std::unordered_map activeHandlers; + std::unordered_map> pendingUnsubs; + + std::atomic_bool cancelled = false; + }; +} + diff --git a/Rpc/Rpc.cpp b/Rpc/Rpc.cpp index 29d3fbc..88ac440 100644 --- a/Rpc/Rpc.cpp +++ b/Rpc/Rpc.cpp @@ -1,3 +1,67 @@ #include "Solana/Rpc/Rpc.hpp" +#include +using namespace Solana; +namespace { + net::io_context ioc = {}; + std::atomic_int messageCounter = 0; +} + +Rpc Rpc::DefaultMainnet() { + return Rpc("https://api.mainnet-beta.solana.com"); +} + +Rpc::~Rpc() { + ws->shutdown(); + wsThread.join(); +} + +void Rpc::runWs() { + ssl::context ctx(boost::asio::ssl::context::tlsv13_client); + ctx.set_default_verify_paths(); + ctx.set_options( + boost::asio::ssl::context::default_workarounds + | boost::asio::ssl::context::no_sslv2 + | boost::asio::ssl::context::no_sslv3); + + ws = std::make_shared(ioc, ctx); + auto url = client.getUrl(); + url.service = "443"; + ws->run(url); + + if (ioc.stopped()) ioc.restart(); + ready = true; + cv.notify_all(); + + ioc.run(); +} + +std::future Rpc::removeSubscription(int subId) { + auto message = json::object(); + message["jsonrpc"] = "2.0"; + message["id"] = ++messageCounter; + message["params"] = json::array({subId}); + + return ws->unsubscribe(message); +} + +std::future Rpc::createSubscription( + const json & message, + MessageHandler && handler) { + + std::unique_lock lock(wsMutex); + + cv.wait(lock, [this](){return ready.load();}); + + return ws->subscribe(message, std::move(handler)); +} + +std::future Rpc::onSlot(MessageHandler && handler) { + auto message = json::object(); + message["jsonrpc"] = "2.0"; + message["id"] = ++messageCounter; + message["method"] = "slotSubscribe"; + + return createSubscription(message, std::move(handler)); +} \ No newline at end of file diff --git a/Rpc/include/Solana/Rpc/Rpc.hpp b/Rpc/include/Solana/Rpc/Rpc.hpp index fa0aef3..9b9610d 100644 --- a/Rpc/include/Solana/Rpc/Rpc.hpp +++ b/Rpc/include/Solana/Rpc/Rpc.hpp @@ -18,12 +18,28 @@ #include "Solana/Rpc/Methods/SendTransaction.hpp" #include "Solana/Rpc/Methods/RequestAirdrop.hpp" #include "Solana/Rpc/Methods/WithJsonReply.hpp" +#include "Solana/Core/Network/Websocket.hpp" +#include +#include +#include +#include namespace Solana { class Rpc { public: - Rpc(const std::string & endpoint) - : client(endpoint) {} + + static Rpc DefaultMainnet(); + + explicit Rpc(const std::string & endpoint) + : client(endpoint) + , wsThread(&Rpc::runWs, this) + { + + } + + ~Rpc(); + + Rpc(const Rpc & other) = delete; template std::future> send(const T & req) { @@ -43,8 +59,23 @@ namespace Solana { auto res = client.post>(j); return res; } + + std::future onSlot(MessageHandler && handler); + std::future removeSubscription(int subId); + private: + void runWs(); + std::future createSubscription( + const json & message, + MessageHandler && handler); + private: + Network::HttpClient client; + std::shared_ptr ws; + std::thread wsThread; + std::mutex wsMutex; + std::condition_variable cv; + std::atomic_bool ready = false; }; }