From 6afd5f396a005dd9f39dde389ca179dc60bb44e5 Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Tue, 8 Aug 2023 22:57:02 +0200 Subject: [PATCH 1/3] feat(server): Support limiting the number of open connections. --- helio | 2 +- src/facade/dragonfly_listener.cc | 4 ++++ src/facade/dragonfly_listener.h | 1 + src/server/config_registry.cc | 13 +++++++++++++ src/server/config_registry.h | 2 ++ src/server/server_family.cc | 22 ++++++++++++++++++++-- tests/dragonfly/config_test.py | 21 +++++++++++++++++++++ 7 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 tests/dragonfly/config_test.py diff --git a/helio b/helio index c8ccbbdf9113..47cd5684e499 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit c8ccbbdf9113e5d3f1dc16c6cb96396ac7e3694d +Subproject commit 47cd5684e499efe3e8af4b30446cab2a6510592a diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index 05e681de1f21..86de83f8a5fc 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -267,6 +267,10 @@ void Listener::OnConnectionClose(util::Connection* conn) { } } +void Listener::OnMaxConnectionsReached(util::FiberSocketBase* sock) { + sock->Write(io::Buffer("-ERR max number of clients reached\r\n")); +} + // We can limit number of threads handling dragonfly connections. ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) { util::ProactorPool* pp = pool(); diff --git a/src/facade/dragonfly_listener.h b/src/facade/dragonfly_listener.h index d81832241870..22a4806b5c37 100644 --- a/src/facade/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -41,6 +41,7 @@ class Listener : public util::ListenerInterface { void OnConnectionStart(util::Connection* conn) final; void OnConnectionClose(util::Connection* conn) final; + void OnMaxConnectionsReached(util::FiberSocketBase* sock) final; void PreAcceptLoop(ProactorBase* pb) final; void PreShutdown() final; diff --git a/src/server/config_registry.cc b/src/server/config_registry.cc index c805614badb4..25ff9611cf7c 100644 --- a/src/server/config_registry.cc +++ b/src/server/config_registry.cc @@ -39,6 +39,19 @@ bool ConfigRegistry::Set(std::string_view config_name, std::string_view value) { return cb(*flag); } +std::optional ConfigRegistry::Get(std::string_view config_name) { + unique_lock lk(mu_); + auto it = registry_.find(config_name); + if (it == registry_.end()) + return {}; + auto cb = it->second; + lk.unlock(); + + absl::CommandLineFlag* flag = absl::FindCommandLineFlag(config_name); + CHECK(flag); + return flag->CurrentValue(); +} + void ConfigRegistry::Reset() { unique_lock lk(mu_); registry_.clear(); diff --git a/src/server/config_registry.h b/src/server/config_registry.h index 92eb4d1a568f..ae7c8eacff9b 100644 --- a/src/server/config_registry.h +++ b/src/server/config_registry.h @@ -19,6 +19,8 @@ class ConfigRegistry { // Returns true if the value was updated. bool Set(std::string_view config_name, std::string_view value); + std::optional Get(std::string_view config_name); + void Reset(); private: diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 146b73b6ae77..2aeb99e3452c 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -59,6 +59,8 @@ ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the ABSL_FLAG(string, requirepass, "", "password for AUTH authentication. " "If empty can also be set with DFLY_PASSWORD environment variable."); +ABSL_FLAG(uint32_t, maxclients, 64000, "Maximum number of concurrent clients allowed."); + ABSL_FLAG(string, save_schedule, "", "glob spec for the UTC time to save a snapshot which matches HH:MM 24h time"); ABSL_FLAG(string, snapshot_cron, "", @@ -569,6 +571,21 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector(this); + for (auto* listener : listeners_) { + listener->SetMaxClients(absl::GetFlag(FLAGS_maxclients)); + } + config_registry.Register("maxclients", [this](const absl::CommandLineFlag& flag) { + auto res = flag.TryGet(); + if (!res) + return false; + + for (auto* listener : listeners_) { + listener->SetMaxClients(res.value()); + } + + return true; + }); + pb_task_ = shard_set->pool()->GetNextProactor(); if (pb_task_->GetKind() == ProactorBase::EPOLL) { fq_threadpool_.reset(new FiberQueueThreadPool(absl::GetFlag(FLAGS_epoll_file_threads))); @@ -1514,9 +1531,10 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { if (param == "databases") { res.emplace_back(param); res.push_back(absl::StrCat(absl::GetFlag(FLAGS_dbnum))); - } else if (param == "maxmemory") { + } else if (auto value_from_registry = config_registry.Get(param); + value_from_registry.has_value()) { res.emplace_back(param); - res.push_back(absl::StrCat(max_memory_limit)); + res.push_back(*value_from_registry); } return (*cntx)->SendStringArr(res, RedisReplyBuilder::MAP); diff --git a/tests/dragonfly/config_test.py b/tests/dragonfly/config_test.py new file mode 100644 index 000000000000..e02129705463 --- /dev/null +++ b/tests/dragonfly/config_test.py @@ -0,0 +1,21 @@ +import pytest +import redis +from .utility import * +from . import DflyStartException + + +async def test_maxclients(df_factory): + # Needs some authentication + server = df_factory.create(port=1111, maxclients=1) + server.start() + + async with server.client() as client1: + assert [b"maxclients", b"1"] == await client1.execute_command("CONFIG GET maxclients") + + with pytest.raises(redis.exceptions.ConnectionError): + async with server.client() as client2: + await client2.get("test") + + await client1.execute_command("CONFIG SET maxclients 3") + async with server.client() as client2: + await client2.get("test") From 183457ea925d84389ef9193eac4e488850dc0a99 Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Wed, 9 Aug 2023 22:02:00 +0200 Subject: [PATCH 2/3] * Update helio after the small fix was merged to master * Don't limit admin connections (and add a test case) --- helio | 2 +- src/facade/dragonfly_listener.cc | 8 ++++++++ src/facade/dragonfly_listener.h | 5 +++++ src/server/config_registry.cc | 2 +- src/server/dfly_main.cc | 1 + src/server/server_family.cc | 23 ++++++++++++----------- tests/dragonfly/config_test.py | 7 ++++++- 7 files changed, 34 insertions(+), 14 deletions(-) diff --git a/helio b/helio index 47cd5684e499..963304405500 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 47cd5684e499efe3e8af4b30446cab2a6510592a +Subproject commit 96330440550013c69da14ae173049bf80e1e9257 diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index 86de83f8a5fc..1f2b7f0a412d 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -202,6 +202,14 @@ bool Listener::AwaitDispatches(absl::Duration timeout, return false; } +bool Listener::IsAdminInterface() const { + return is_admin_; +} + +void Listener::SetAdminInterface(bool is_admin) { + is_admin_ = is_admin; +} + void Listener::PreShutdown() { // Iterate on all connections and allow them to finish their commands for // a short period. diff --git a/src/facade/dragonfly_listener.h b/src/facade/dragonfly_listener.h index 22a4806b5c37..b5a4041437f4 100644 --- a/src/facade/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -35,6 +35,9 @@ class Listener : public util::ListenerInterface { bool AwaitDispatches(absl::Duration timeout, const std::function& filter); + bool IsAdminInterface() const; + void SetAdminInterface(bool is_admin = true); + private: util::Connection* NewConnection(ProactorBase* proactor) final; ProactorBase* PickConnectionProactor(util::FiberSocketBase* sock) final; @@ -59,6 +62,8 @@ class Listener : public util::ListenerInterface { std::atomic_uint32_t next_id_{0}; + bool is_admin_ = false; + uint32_t conn_cnt_{0}; uint32_t min_cnt_thread_id_{0}; int32_t min_cnt_{0}; diff --git a/src/server/config_registry.cc b/src/server/config_registry.cc index 25ff9611cf7c..a655d202ac67 100644 --- a/src/server/config_registry.cc +++ b/src/server/config_registry.cc @@ -43,7 +43,7 @@ std::optional ConfigRegistry::Get(std::string_view config_name) { unique_lock lk(mu_); auto it = registry_.find(config_name); if (it == registry_.end()) - return {}; + return std::nullopt; auto cb = it->second; lk.unlock(); diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 829666d08608..4dab9d6a10ea 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -388,6 +388,7 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) { const std::string printable_addr = absl::StrCat("admin socket ", interface_addr ? interface_addr : "any", ":", admin_port); Listener* admin_listener = new Listener{Protocol::REDIS, &service}; + admin_listener->SetAdminInterface(); error_code ec = acceptor->AddListener(interface_addr, admin_port, admin_listener); if (ec) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 2aeb99e3452c..1fc83089faa9 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -565,25 +565,26 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { ServerFamily::~ServerFamily() { } +void SetMaxClients(std::vector& listeners, uint32_t maxclients) { + for (auto* listener : listeners) { + if (!listener->IsAdminInterface()) { + listener->SetMaxClients(maxclients); + } + } +} + void ServerFamily::Init(util::AcceptServer* acceptor, std::vector listeners) { CHECK(acceptor_ == nullptr); acceptor_ = acceptor; listeners_ = std::move(listeners); dfly_cmd_ = make_unique(this); - for (auto* listener : listeners_) { - listener->SetMaxClients(absl::GetFlag(FLAGS_maxclients)); - } + SetMaxClients(listeners_, absl::GetFlag(FLAGS_maxclients)); config_registry.Register("maxclients", [this](const absl::CommandLineFlag& flag) { auto res = flag.TryGet(); - if (!res) - return false; - - for (auto* listener : listeners_) { - listener->SetMaxClients(res.value()); - } - - return true; + if (res.has_value()) + SetMaxClients(listeners_, res.value()); + return res.has_value(); }); pb_task_ = shard_set->pool()->GetNextProactor(); diff --git a/tests/dragonfly/config_test.py b/tests/dragonfly/config_test.py index e02129705463..af03ad2af351 100644 --- a/tests/dragonfly/config_test.py +++ b/tests/dragonfly/config_test.py @@ -1,12 +1,13 @@ import pytest import redis +from redis.asyncio import Redis as RedisClient from .utility import * from . import DflyStartException async def test_maxclients(df_factory): # Needs some authentication - server = df_factory.create(port=1111, maxclients=1) + server = df_factory.create(port=1111, maxclients=1, admin_port=1112) server.start() async with server.client() as client1: @@ -16,6 +17,10 @@ async def test_maxclients(df_factory): async with server.client() as client2: await client2.get("test") + # Check that admin connections are not limited. + async with RedisClient(port=server.admin_port) as admin_client: + await admin_client.get("test") + await client1.execute_command("CONFIG SET maxclients 3") async with server.client() as client2: await client2.get("test") From b6a4a9d67bd06085d461d11ad81b39bb17701db0 Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Mon, 14 Aug 2023 14:22:52 +0200 Subject: [PATCH 3/3] Resolve CR comments --- src/facade/dragonfly_connection.cc | 4 ++-- src/server/config_registry.cc | 4 +--- tests/dragonfly/config_test.py | 1 + 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index f415b09698c8..5ce02265f09c 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -11,6 +11,7 @@ #include "base/flags.h" #include "base/logging.h" #include "facade/conn_context.h" +#include "facade/dragonfly_listener.h" #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" @@ -399,8 +400,7 @@ uint32_t Connection::GetClientId() const { } bool Connection::IsAdmin() const { - uint16_t admin_port = absl::GetFlag(FLAGS_admin_port); - return socket_->LocalEndpoint().port() == admin_port; + return static_cast(owner())->IsAdminInterface(); } io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { diff --git a/src/server/config_registry.cc b/src/server/config_registry.cc index a655d202ac67..2f99a211355c 100644 --- a/src/server/config_registry.cc +++ b/src/server/config_registry.cc @@ -41,10 +41,8 @@ bool ConfigRegistry::Set(std::string_view config_name, std::string_view value) { std::optional ConfigRegistry::Get(std::string_view config_name) { unique_lock lk(mu_); - auto it = registry_.find(config_name); - if (it == registry_.end()) + if (!registry_.contains(config_name)) return std::nullopt; - auto cb = it->second; lk.unlock(); absl::CommandLineFlag* flag = absl::FindCommandLineFlag(config_name); diff --git a/tests/dragonfly/config_test.py b/tests/dragonfly/config_test.py index af03ad2af351..903e7e535495 100644 --- a/tests/dragonfly/config_test.py +++ b/tests/dragonfly/config_test.py @@ -22,5 +22,6 @@ async def test_maxclients(df_factory): await admin_client.get("test") await client1.execute_command("CONFIG SET maxclients 3") + assert [b"maxclients", b"3"] == await client1.execute_command("CONFIG GET maxclients") async with server.client() as client2: await client2.get("test")