Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Support limiting the number of open connections. #1670

Merged
merged 3 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion helio
4 changes: 2 additions & 2 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "facade/conn_context.h"
#include "facade/dragonfly_listener.h"
#include "facade/memcache_parser.h"
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
Expand Down Expand Up @@ -399,8 +400,7 @@ uint32_t Connection::GetClientId() const {
}

bool Connection::IsAdmin() const {
uint16_t admin_port = absl::GetFlag(FLAGS_admin_port);
return socket_->LocalEndpoint().port() == admin_port;
return static_cast<Listener*>(owner())->IsAdminInterface();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add IsAdminInterface() API to ListenerInterface?
I realize it's perhaps not clean, but having a downcast is messy and can lead to issues down the road

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsAdminInterface is dragonfly specific. On the other side, facade::Connection is always issued by facade::Listener.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to any suggestions of removing hardcoded downcasts :)
We could even template-ize Listener to have facade Listener return its own connection

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal opinion is that we should avoid casts if there is a reasonable design choice that allows it. And if not - we can have them and not feel bad about it. Specifically, any virtual inheritance implies the possibility of downcasts and it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the cast either, if we start doing it too much I think we should find out a better way, but for now this was the shortest way I found to do this :)

}

io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
Expand Down
12 changes: 12 additions & 0 deletions src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ bool Listener::AwaitDispatches(absl::Duration timeout,
return false;
}

bool Listener::IsAdminInterface() const {
return is_admin_;
}

void Listener::SetAdminInterface(bool is_admin) {
is_admin_ = is_admin;
}

void Listener::PreShutdown() {
// Iterate on all connections and allow them to finish their commands for
// a short period.
Expand Down Expand Up @@ -267,6 +275,10 @@ void Listener::OnConnectionClose(util::Connection* conn) {
}
}

void Listener::OnMaxConnectionsReached(util::FiberSocketBase* sock) {
royjacobson marked this conversation as resolved.
Show resolved Hide resolved
sock->Write(io::Buffer("-ERR max number of clients reached\r\n"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this be handled by the client?
I imagine that it will receive an error before even sending a command, so will it even know how to treat it?
Also, don't we want to close the connection? (or is it implicitly done by the framework?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clients I tested (python, redis-cli) seemed to handle it fine. redis does behave a bit differently (the socket doesn't close immediately and the message is delayed until the first command).

But then I tried to DDoS redis by opening a bunch of connections and leaving them open, and apparently it will start doing what we do.

and yeah, the framework is closing the connection :)

}

// We can limit number of threads handling dragonfly connections.
ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
util::ProactorPool* pp = pool();
Expand Down
6 changes: 6 additions & 0 deletions src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ class Listener : public util::ListenerInterface {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);

bool IsAdminInterface() const;
void SetAdminInterface(bool is_admin = true);
Comment on lines +38 to +39
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have Connection::IsAdmin(), could we maybe consolidate them to use the same logic? (either inspect the port or explicitly set like here)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooo, I did NOT expect the implementation of Connection::IsAdmin to be what it is 😆
Changed it to query the Listener.


private:
util::Connection* NewConnection(ProactorBase* proactor) final;
ProactorBase* PickConnectionProactor(util::FiberSocketBase* sock) final;

void OnConnectionStart(util::Connection* conn) final;
void OnConnectionClose(util::Connection* conn) final;
void OnMaxConnectionsReached(util::FiberSocketBase* sock) final;
void PreAcceptLoop(ProactorBase* pb) final;

void PreShutdown() final;
Expand All @@ -58,6 +62,8 @@ class Listener : public util::ListenerInterface {

std::atomic_uint32_t next_id_{0};

bool is_admin_ = false;

uint32_t conn_cnt_{0};
uint32_t min_cnt_thread_id_{0};
int32_t min_cnt_{0};
Expand Down
11 changes: 11 additions & 0 deletions src/server/config_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ bool ConfigRegistry::Set(std::string_view config_name, std::string_view value) {
return cb(*flag);
}

std::optional<std::string> ConfigRegistry::Get(std::string_view config_name) {
unique_lock lk(mu_);
if (!registry_.contains(config_name))
return std::nullopt;
lk.unlock();

absl::CommandLineFlag* flag = absl::FindCommandLineFlag(config_name);
CHECK(flag);
return flag->CurrentValue();
}

void ConfigRegistry::Reset() {
unique_lock lk(mu_);
registry_.clear();
Expand Down
2 changes: 2 additions & 0 deletions src/server/config_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ConfigRegistry {
// Returns true if the value was updated.
bool Set(std::string_view config_name, std::string_view value);

std::optional<std::string> Get(std::string_view config_name);

void Reset();

private:
Expand Down
1 change: 1 addition & 0 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
const std::string printable_addr =
absl::StrCat("admin socket ", interface_addr ? interface_addr : "any", ":", admin_port);
Listener* admin_listener = new Listener{Protocol::REDIS, &service};
admin_listener->SetAdminInterface();
error_code ec = acceptor->AddListener(interface_addr, admin_port, admin_listener);

if (ec) {
Expand Down
23 changes: 21 additions & 2 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the
ABSL_FLAG(string, requirepass, "",
"password for AUTH authentication. "
"If empty can also be set with DFLY_PASSWORD environment variable.");
ABSL_FLAG(uint32_t, maxclients, 64000, "Maximum number of concurrent clients allowed.");

ABSL_FLAG(string, save_schedule, "",
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
ABSL_FLAG(string, snapshot_cron, "",
Expand Down Expand Up @@ -563,12 +565,28 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
ServerFamily::~ServerFamily() {
}

void SetMaxClients(std::vector<facade::Listener*>& listeners, uint32_t maxclients) {
for (auto* listener : listeners) {
if (!listener->IsAdminInterface()) {
listener->SetMaxClients(maxclients);
}
}
}

void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners) {
CHECK(acceptor_ == nullptr);
acceptor_ = acceptor;
listeners_ = std::move(listeners);
dfly_cmd_ = make_unique<DflyCmd>(this);

SetMaxClients(listeners_, absl::GetFlag(FLAGS_maxclients));
config_registry.Register("maxclients", [this](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<uint32_t>();
if (res.has_value())
SetMaxClients(listeners_, res.value());
return res.has_value();
});

pb_task_ = shard_set->pool()->GetNextProactor();
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
fq_threadpool_.reset(new FiberQueueThreadPool(absl::GetFlag(FLAGS_epoll_file_threads)));
Expand Down Expand Up @@ -1514,9 +1532,10 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
if (param == "databases") {
res.emplace_back(param);
res.push_back(absl::StrCat(absl::GetFlag(FLAGS_dbnum)));
} else if (param == "maxmemory") {
} else if (auto value_from_registry = config_registry.Get(param);
value_from_registry.has_value()) {
res.emplace_back(param);
res.push_back(absl::StrCat(max_memory_limit));
res.push_back(*value_from_registry);
}

return (*cntx)->SendStringArr(res, RedisReplyBuilder::MAP);
Expand Down
27 changes: 27 additions & 0 deletions tests/dragonfly/config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pytest
import redis
from redis.asyncio import Redis as RedisClient
from .utility import *
from . import DflyStartException


async def test_maxclients(df_factory):
# Needs some authentication
server = df_factory.create(port=1111, maxclients=1, admin_port=1112)
server.start()

async with server.client() as client1:
assert [b"maxclients", b"1"] == await client1.execute_command("CONFIG GET maxclients")

with pytest.raises(redis.exceptions.ConnectionError):
async with server.client() as client2:
await client2.get("test")

# Check that admin connections are not limited.
async with RedisClient(port=server.admin_port) as admin_client:
await admin_client.get("test")

await client1.execute_command("CONFIG SET maxclients 3")
royjacobson marked this conversation as resolved.
Show resolved Hide resolved
assert [b"maxclients", b"3"] == await client1.execute_command("CONFIG GET maxclients")
async with server.client() as client2:
await client2.get("test")