diff --git a/plugins/http_plugin/http_plugin.cpp b/plugins/http_plugin/http_plugin.cpp index d72ef4c763..5313fc8a6d 100644 --- a/plugins/http_plugin/http_plugin.cpp +++ b/plugins/http_plugin/http_plugin.cpp @@ -28,7 +28,7 @@ namespace eosio { using std::regex; using boost::asio::ip::tcp; using std::shared_ptr; - + static http_plugin_defaults current_http_plugin_defaults; static bool verbose_http_errors = false; @@ -36,6 +36,10 @@ namespace eosio { current_http_plugin_defaults = config; } + std::string http_plugin::get_server_header() { + return current_http_plugin_defaults.server_header; + } + using http_plugin_impl_ptr = std::shared_ptr; class http_plugin_impl : public std::enable_shared_from_this { @@ -47,9 +51,9 @@ namespace eosio { http_plugin_impl& operator=(const http_plugin_impl&) = delete; http_plugin_impl& operator=(http_plugin_impl&&) = delete; - + std::optional listen_endpoint; - + std::optional unix_endpoint; shared_ptr > beast_server; @@ -272,7 +276,7 @@ namespace eosio { my->plugin_state->server_header = current_http_plugin_defaults.server_header; - + //watch out for the returns above when adding new code here } FC_LOG_AND_RETHROW() } @@ -309,7 +313,7 @@ namespace eosio { if(my->unix_endpoint) { try { my->create_beast_server(true); - + my->beast_unix_server->listen(*my->unix_endpoint); my->beast_unix_server->start_accept(); } catch ( const fc::exception& e ){ @@ -335,7 +339,7 @@ namespace eosio { } } }}, appbase::exec_queue::read_only); - + } catch (...) { fc_elog(logger(), "http_plugin startup fails, shutting down"); app().quit(); @@ -457,9 +461,9 @@ namespace eosio { fc::microseconds http_plugin::get_max_response_time()const { return my->plugin_state->max_response_time; } - + size_t http_plugin::get_max_body_size()const { return my->plugin_state->max_body_size; } - + } diff --git a/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp b/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp index f6d3d5c44d..50c47e038b 100644 --- a/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp +++ b/plugins/http_plugin/include/eosio/http_plugin/http_plugin.hpp @@ -79,6 +79,7 @@ namespace eosio { //must be called before initialize static void set_defaults(const http_plugin_defaults& config); + static std::string get_server_header(); APPBASE_PLUGIN_REQUIRES() void set_program_options(options_description&, options_description& cfg) override; @@ -122,7 +123,7 @@ namespace eosio { void register_metrics_listener(chain::plugin_interface::metrics_listener listener); size_t get_max_body_size()const; - + private: std::shared_ptr my; }; @@ -176,7 +177,7 @@ namespace eosio { }; /** - * @brief Used to trim whitespace from body. + * @brief Used to trim whitespace from body. * Returned string_view valid only for lifetime of body */ inline std::string_view make_trimmed_string_view(const std::string& body) { diff --git a/plugins/prometheus_plugin/include/eosio/prometheus_plugin/simple_rest_server.hpp b/plugins/prometheus_plugin/include/eosio/prometheus_plugin/simple_rest_server.hpp new file mode 100644 index 0000000000..f82492df28 --- /dev/null +++ b/plugins/prometheus_plugin/include/eosio/prometheus_plugin/simple_rest_server.hpp @@ -0,0 +1,234 @@ +#pragma once + +#include +#include +#include + +namespace eosio { namespace rest { + + // The majority of the code here are derived from boost source + // libs/beast/example/http/server/async/http_server_async.cpp + // with minimum modification and yet reusable. + + namespace beast = boost::beast; // from + namespace http = beast::http; // from + namespace net = boost::asio; // from + using tcp = boost::asio::ip::tcp; // from + template + class simple_server { + T* self() { return static_cast(this); } + + void fail(beast::error_code ec, char const* what) { self()->log_error(what, ec.message()); } + // Return a response for the given request. + http::response handle_request(http::request&& req) { + auto server_header = self()->server_header(); + // Returns a bad request response + auto const bad_request = [&req, &server_header](std::string_view why) { + http::response res{ http::status::bad_request, req.version() }; + res.set(http::field::server, server_header); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = std::string(why); + res.prepare_payload(); + return res; + }; + + // Returns a not found response + auto const not_found = [&req, &server_header](std::string_view target) { + http::response res{ http::status::not_found, req.version() }; + res.set(http::field::server, server_header); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = "The resource '" + std::string(target) + "' was not found."; + res.prepare_payload(); + return res; + }; + + // Returns a server error response + auto const server_error = [&req, &server_header](std::string_view what) { + http::response res{ http::status::internal_server_error, req.version() }; + res.set(http::field::server, server_header); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = "An error occurred: '" + std::string(what) + "'"; + res.prepare_payload(); + return res; + }; + + // Make sure we can handle the method + if (!self()->allow_method(req.method())) + return bad_request("Unknown HTTP-method"); + + // Request path must be absolute and not contain "..". + std::string_view target{req.target().data(), req.target().size()}; + if (target.empty() || target[0] != '/' || target.find("..") != std::string_view::npos) + return bad_request("Illegal request-target"); + + try { + auto res = self()->on_request(std::move(req)); + if (!res) + not_found(target); + return *res; + } catch (std::exception& ex) { return server_error(ex.what()); } + } + + class session : public std::enable_shared_from_this { + tcp::socket socket_; + boost::asio::io_context::strand strand_; + beast::flat_buffer buffer_; + http::request req_; + simple_server* server_; + std::shared_ptr> res_; + + public: + // Take ownership of the stream + session(net::io_context& ioc, tcp::socket&& socket, simple_server* server) + : socket_(std::move(socket)), strand_(ioc), server_(server) {} + + // Start the asynchronous operation + void run() { do_read(); } + + void do_read() { + // Make the request empty before reading, + // otherwise the operation behavior is undefined. + req_ = {}; + + // Read a request + http::async_read( + socket_, buffer_, req_, + boost::asio::bind_executor(strand_, [self = this->shared_from_this()](beast::error_code ec, + std::size_t bytes_transferred) { + self->on_read(ec, bytes_transferred); + })); + } + + void on_read(beast::error_code ec, std::size_t bytes_transferred) { + boost::ignore_unused(bytes_transferred); + + // This means they closed the connection + if (ec == http::error::end_of_stream) + return do_close(); + + if (ec) + return server_->fail(ec, "read"); + + // Send the response + send_response(server_->handle_request(std::move(req_))); + } + + void send_response(http::response&& msg) { + // The lifetime of the message has to extend + // for the duration of the async operation so + // we use a shared_ptr to manage it. + res_ = std::make_shared>(std::move(msg)); + + // Write the response + http::async_write(socket_, *res_, + boost::asio::bind_executor(socket_.get_executor(), + [self = this->shared_from_this(), close = res_->need_eof()]( + beast::error_code ec, std::size_t bytes_transferred) { + self->on_write(ec, bytes_transferred, close); + })); + } + + void on_write(boost::system::error_code ec, std::size_t bytes_transferred, bool close) { + boost::ignore_unused(bytes_transferred); + + if (ec) + return server_->fail(ec, "write"); + + if (close) { + // This means we should close the connection, usually because + // the response indicated the "Connection: close" semantic. + return do_close(); + } + + // We're done with the response so delete it + res_ = nullptr; + + // Read another request + do_read(); + } + + void do_close() { + // Send a TCP shutdown + beast::error_code ec; + socket_.shutdown(tcp::socket::shutdown_send, ec); + + // At this point the connection is closed gracefully + } + }; + + //------------------------------------------------------------------------------ + + // Accepts incoming connections and launches the sessions + class listener : public std::enable_shared_from_this { + net::io_context& ioc_; + tcp::acceptor acceptor_; + tcp::socket socket_; + simple_server* server_; + + public: + listener(net::io_context& ioc, tcp::endpoint endpoint, simple_server* server) + : ioc_(ioc), acceptor_(ioc), socket_(ioc), server_(server) { + boost::system::error_code ec; + + // Open the acceptor + acceptor_.open(endpoint.protocol(), ec); + if (ec) { + server_->fail(ec, "open"); + return; + } + + // Allow address reuse + acceptor_.set_option(net::socket_base::reuse_address(true), ec); + if (ec) { + server_->fail(ec, "set_option"); + return; + } + + // Bind to the server address + acceptor_.bind(endpoint, ec); + if (ec) { + server_->fail(ec, "bind"); + return; + } + + // Start listening for connections + acceptor_.listen(net::socket_base::max_listen_connections, ec); + if (ec) { + server_->fail(ec, "listen"); + return; + } + } + + // Start accepting incoming connections + void run() { + if (!acceptor_.is_open()) + return; + do_accept(); + } + + private: + void do_accept() { + acceptor_.async_accept( + socket_, [self = this->shared_from_this()](boost::system::error_code ec) { self->on_accept(ec); }); + } + + void on_accept(boost::system::error_code ec) { + if (ec) { + server_->fail(ec, "accept"); + } else { + // Create the session and run it + std::make_shared(ioc_, std::move(socket_), server_)->run(); + } + + // Accept another connection + do_accept(); + } + }; + + public: + void run(net::io_context& ioc, tcp::endpoint endpoint) { std::make_shared(ioc, endpoint, this)->run(); } + }; +}} // namespace eosio::rest \ No newline at end of file diff --git a/plugins/prometheus_plugin/prometheus_plugin.cpp b/plugins/prometheus_plugin/prometheus_plugin.cpp index 4753c6491e..33cc3cbd5a 100644 --- a/plugins/prometheus_plugin/prometheus_plugin.cpp +++ b/plugins/prometheus_plugin/prometheus_plugin.cpp @@ -1,8 +1,11 @@ #include +#include + #include #include #include #include +#include #include #include @@ -14,8 +17,8 @@ #include #include - namespace eosio { + static const char* prometheus_api_name = "/v1/prometheus/metrics"; using namespace prometheus; using namespace chain::plugin_interface; @@ -110,12 +113,41 @@ namespace eosio { } }; - struct prometheus_plugin_impl { + namespace http = boost::beast::http; + struct prometheus_plugin_impl : rest::simple_server { + + std::string server_header() const { + return http_plugin::get_server_header(); + } + + void log_error(char const* what, const std::string& message) { + elog("${what}: ${message}", ("what", what)("message", message)); + } + + bool allow_method(http::verb method) const { + return method == http::verb::get; + } + + std::optional> + on_request(http::request&& req) { + if(req.target() != prometheus_api_name) + return {}; + http::response res{ http::status::ok, req.version() }; + // Respond to GET request + res.set(http::field::server, server_header()); + res.set(http::field::content_type, "text/plain"); + res.keep_alive(req.keep_alive()); + res.body() = metrics(); + res.prepare_payload(); + return res; + } + eosio::chain::named_thread_pool _prometheus_thread_pool; boost::asio::io_context::strand _prometheus_strand; prometheus_plugin_metrics _metrics; map> _plugin_metrics; + boost::asio::ip::tcp::endpoint _endpoint; prometheus_plugin_impl(): _prometheus_strand(_prometheus_thread_pool.get_executor()){ } @@ -179,32 +211,11 @@ namespace eosio { return body; } - void metrics_async(chain::plugin_interface::next_function results) { - _prometheus_strand.post([self=this, results=std::move(results)]() { - results(self->metrics()); - }); + void start() { + run(_prometheus_thread_pool.get_executor(), _endpoint); + _prometheus_thread_pool.start( + 1, [](const fc::exception& e) { elog("Prometheus exception ${e}", ("e", e)); }); } - - }; - - using metrics_params = fc::variant_object; - - struct prometheus_api { - prometheus_plugin_impl& _pp; - fc::microseconds _max_response_time_us; - - fc::time_point start() const { - return fc::time_point::now() + _max_response_time_us; - } - - void metrics(const metrics_params&, chain::plugin_interface::next_function results) { - _pp.metrics_async(std::move(results)); - } - - prometheus_api(prometheus_plugin_impl& plugin, const fc::microseconds& max_response_time) - : _pp(plugin) - , _max_response_time_us(max_response_time){} - }; prometheus_plugin::prometheus_plugin() @@ -214,24 +225,35 @@ namespace eosio { prometheus_plugin::~prometheus_plugin() = default; void prometheus_plugin::set_program_options(options_description&, options_description& cfg) { + cfg.add_options() + ("prometheus-exporter-address", bpo::value()->default_value("127.0.0.1:9101"), + "The local IP and port to listen for incoming prometheus metrics http request."); } void prometheus_plugin::plugin_initialize(const variables_map& options) { my->initialize_metrics(); - auto& _http_plugin = app().get_plugin(); - fc::microseconds max_response_time = _http_plugin.get_max_response_time(); + string lipstr = options.at("prometheus-exporter-address").as(); + EOS_ASSERT(lipstr.size() > 0, chain::plugin_config_exception, "prometheus-exporter-address must have a value"); + + string host = lipstr.substr(0, lipstr.find(':')); + string port = lipstr.substr(host.size() + 1, lipstr.size()); - prometheus_api handle(*my, max_response_time); - app().get_plugin().add_async_api({ - CALL_ASYNC_WITH_400(prometheus, handle, eosio, metrics, std::string, 200, http_params_types::no_params)}, http_content_type::plaintext); + boost::system::error_code ec; + using tcp = boost::asio::ip::tcp; + tcp::resolver resolver(app().get_io_service()); + + my->_endpoint = *resolver.resolve(tcp::v4(), host, port, ec); + if (!ec) { + fc_ilog(logger(), "configured prometheus metrics exporter to listen on ${h}", ("h", lipstr)); + } else { + fc_elog(logger(), "failed to configure prometheus metrics exporter to listen on ${h} (${m})", + ("h", lipstr)("m", ec.message())); + } } void prometheus_plugin::plugin_startup() { - my->_prometheus_thread_pool.start(1, []( const fc::exception& e ) { - elog("Prometheus excpetion ${e}:${l}", ("e", e)); - } ); - + my->start(); ilog("Prometheus plugin started."); } diff --git a/tests/TestHarness/queries.py b/tests/TestHarness/queries.py index ffaf1dd115..e73c3ad06b 100644 --- a/tests/TestHarness/queries.py +++ b/tests/TestHarness/queries.py @@ -596,15 +596,18 @@ def processCleosCmd(self, cmd, cmdDesc, silentErrors=True, exitOnError=False, ex return trans - def processUrllibRequest(self, resource, command, payload={}, silentErrors=False, exitOnError=False, exitMsg=None, returnType=ReturnType.json, endpoint=None): + def processUrllibRequest(self, resource, command, payload={}, silentErrors=False, exitOnError=False, exitMsg=None, returnType=ReturnType.json, method="POST", endpoint=None): if not endpoint: endpoint = self.endpointHttp cmd = f"{endpoint}/v1/{resource}/{command}" - req = urllib.request.Request(cmd, method="POST") - req.add_header('Content-Type', 'application/json') - data = payload - data = json.dumps(data) - data = data.encode() + req = urllib.request.Request(cmd, method=method) + if len(payload): + req.add_header('Content-Type', 'application/json') + data = payload + data = json.dumps(data) + data = data.encode() + else: + data = None if Utils.Debug: Utils.Print("cmd: %s %s" % (cmd, payload)) rtn=None start=time.perf_counter() @@ -772,4 +775,3 @@ def getLatestBlockHeaderState(self): def getActivatedProtocolFeatures(self): latestBlockHeaderState = self.getLatestBlockHeaderState() return latestBlockHeaderState["activated_protocol_features"]["protocol_features"] - diff --git a/tests/plugin_http_api_test.py b/tests/plugin_http_api_test.py index 9573767ae0..b020039817 100755 --- a/tests/plugin_http_api_test.py +++ b/tests/plugin_http_api_test.py @@ -1341,8 +1341,8 @@ def test_DbSizeApi(self) : def test_prometheusApi(self) : resource = "prometheus" command = "metrics" - - ret_text = self.nodeos.processUrllibRequest(resource, command, returnType = ReturnType.raw ).decode() + endpointPrometheus = f'http://{self.nodeos.host}:9101' + ret_text = self.nodeos.processUrllibRequest(resource, command, returnType = ReturnType.raw, method="GET", endpoint=endpointPrometheus).decode() # filter out all empty lines or lines starting with '#' data_lines = filter(lambda line: len(line) > 0 and line[0]!='#', ret_text.split('\n')) # converting each line into a key value pair and then construct a dictionay out of all the pairs