Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Support limiting the number of open connections. #1670

Merged
merged 3 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion helio
4 changes: 4 additions & 0 deletions src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ void Listener::OnConnectionClose(util::Connection* conn) {
}
}

void Listener::OnMaxConnectionsReached(util::FiberSocketBase* sock) {
royjacobson marked this conversation as resolved.
Show resolved Hide resolved
sock->Write(io::Buffer("-ERR max number of clients reached\r\n"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this be handled by the client?
I imagine that it will receive an error before even sending a command, so will it even know how to treat it?
Also, don't we want to close the connection? (or is it implicitly done by the framework?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clients I tested (python, redis-cli) seemed to handle it fine. redis does behave a bit differently (the socket doesn't close immediately and the message is delayed until the first command).

But then I tried to DDoS redis by opening a bunch of connections and leaving them open, and apparently it will start doing what we do.

and yeah, the framework is closing the connection :)

}

// We can limit number of threads handling dragonfly connections.
ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
util::ProactorPool* pp = pool();
Expand Down
1 change: 1 addition & 0 deletions src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Listener : public util::ListenerInterface {

void OnConnectionStart(util::Connection* conn) final;
void OnConnectionClose(util::Connection* conn) final;
void OnMaxConnectionsReached(util::FiberSocketBase* sock) final;
void PreAcceptLoop(ProactorBase* pb) final;

void PreShutdown() final;
Expand Down
13 changes: 13 additions & 0 deletions src/server/config_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ bool ConfigRegistry::Set(std::string_view config_name, std::string_view value) {
return cb(*flag);
}

std::optional<std::string> ConfigRegistry::Get(std::string_view config_name) {
unique_lock lk(mu_);
auto it = registry_.find(config_name);
if (it == registry_.end())
royjacobson marked this conversation as resolved.
Show resolved Hide resolved
return {};
royjacobson marked this conversation as resolved.
Show resolved Hide resolved
auto cb = it->second;
royjacobson marked this conversation as resolved.
Show resolved Hide resolved
lk.unlock();

absl::CommandLineFlag* flag = absl::FindCommandLineFlag(config_name);
CHECK(flag);
return flag->CurrentValue();
}

void ConfigRegistry::Reset() {
unique_lock lk(mu_);
registry_.clear();
Expand Down
2 changes: 2 additions & 0 deletions src/server/config_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class ConfigRegistry {
// Returns true if the value was updated.
bool Set(std::string_view config_name, std::string_view value);

std::optional<std::string> Get(std::string_view config_name);

void Reset();

private:
Expand Down
22 changes: 20 additions & 2 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the
ABSL_FLAG(string, requirepass, "",
"password for AUTH authentication. "
"If empty can also be set with DFLY_PASSWORD environment variable.");
ABSL_FLAG(uint32_t, maxclients, 64000, "Maximum number of concurrent clients allowed.");

ABSL_FLAG(string, save_schedule, "",
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
ABSL_FLAG(string, snapshot_cron, "",
Expand Down Expand Up @@ -569,6 +571,21 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
listeners_ = std::move(listeners);
dfly_cmd_ = make_unique<DflyCmd>(this);

for (auto* listener : listeners_) {
listener->SetMaxClients(absl::GetFlag(FLAGS_maxclients));
}
config_registry.Register("maxclients", [this](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<uint32_t>();
if (!res)
return false;

for (auto* listener : listeners_) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only apply to the main port listener. For sure not for admin

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed (and added a test accordingly).

I added IsAdmin/SetAdmin methods to Listener, but I don't think it's an overkill, we already wanted something similar when we spoke about hiding admin connections from CLIENT LIST.

listener->SetMaxClients(res.value());
}

return true;
});

pb_task_ = shard_set->pool()->GetNextProactor();
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
fq_threadpool_.reset(new FiberQueueThreadPool(absl::GetFlag(FLAGS_epoll_file_threads)));
Expand Down Expand Up @@ -1514,9 +1531,10 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
if (param == "databases") {
res.emplace_back(param);
res.push_back(absl::StrCat(absl::GetFlag(FLAGS_dbnum)));
} else if (param == "maxmemory") {
} else if (auto value_from_registry = config_registry.Get(param);
value_from_registry.has_value()) {
res.emplace_back(param);
res.push_back(absl::StrCat(max_memory_limit));
res.push_back(*value_from_registry);
}

return (*cntx)->SendStringArr(res, RedisReplyBuilder::MAP);
Expand Down
21 changes: 21 additions & 0 deletions tests/dragonfly/config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pytest
import redis
from .utility import *
from . import DflyStartException


async def test_maxclients(df_factory):
# Needs some authentication
server = df_factory.create(port=1111, maxclients=1)
server.start()

async with server.client() as client1:
assert [b"maxclients", b"1"] == await client1.execute_command("CONFIG GET maxclients")

with pytest.raises(redis.exceptions.ConnectionError):
async with server.client() as client2:
await client2.get("test")

await client1.execute_command("CONFIG SET maxclients 3")
royjacobson marked this conversation as resolved.
Show resolved Hide resolved
async with server.client() as client2:
await client2.get("test")