Skip to content

Commit

Permalink
chore: decouple reply_builder from ConnectionContext
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Nov 5, 2024
1 parent aa4a618 commit 81b578c
Show file tree
Hide file tree
Showing 22 changed files with 118 additions and 125 deletions.
27 changes: 2 additions & 25 deletions src/facade/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,9 @@
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"

ABSL_FLAG(bool, experimental_new_io, true,
"Use new replying code - should "
"reduce latencies for pipelining");

namespace facade {

ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
if (owner) {
protocol_ = owner->protocol();
}

if (stream) {
switch (protocol_) {
case Protocol::REDIS: {
RedisReplyBuilder* rb = absl::GetFlag(FLAGS_experimental_new_io)
? new RedisReplyBuilder2(stream)
: new RedisReplyBuilder(stream);
rbuilder_.reset(rb);
break;
}
case Protocol::MEMCACHE:
rbuilder_.reset(new MCReplyBuilder(stream));
break;
}
}

ConnectionContext::ConnectionContext(Connection* owner) : owner_(owner) {
conn_closing = false;
req_auth = false;
replica_conn = false;
Expand All @@ -49,7 +26,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
}

size_t ConnectionContext::UsedMemory() const {
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
return dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
}

} // namespace facade
12 changes: 1 addition & 11 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class Connection;

class ConnectionContext {
public:
ConnectionContext(::io::Sink* stream, Connection* owner);
explicit ConnectionContext(Connection* owner);

virtual ~ConnectionContext() {
}
Expand All @@ -32,14 +32,6 @@ class ConnectionContext {
return owner_;
}

Protocol protocol() const {
return protocol_;
}

SinkReplyBuilder* reply_builder_old() {
return rbuilder_.get();
}

virtual size_t UsedMemory() const;

// connection state / properties.
Expand Down Expand Up @@ -71,8 +63,6 @@ class ConnectionContext {

private:
Connection* owner_;
Protocol protocol_ = Protocol::REDIS;
std::unique_ptr<SinkReplyBuilder> rbuilder_;
};

} // namespace facade
57 changes: 36 additions & 21 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ ABSL_FLAG(bool, migrate_connections, true,
"they operate. Currently this is only supported for Lua script invocations, and can "
"happen at most once per connection.");

ABSL_FLAG(bool, experimental_new_io, true,
"Use new replying code - should reduce latencies for pipelining");

using namespace util;
using absl::GetFlag;
using nonstd::make_unexpected;
Expand Down Expand Up @@ -487,14 +490,15 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());

self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()},
self->reply_builder_, self->cc_.get());
self->reply_builder_.get(), self->cc_.get());

self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
}

void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value, static_cast<MCReplyBuilder*>(self->reply_builder_),
self->service_->DispatchMC(msg.cmd, msg.value,
static_cast<MCReplyBuilder*>(self->reply_builder_.get()),
self->cc_.get());
self->last_interaction_ = time(nullptr);
}
Expand Down Expand Up @@ -535,25 +539,24 @@ void UpdateLibNameVerMap(const string& name, const string& ver, int delta) {
}
} // namespace

Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
Connection::Connection(Protocol proto, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize),
protocol_(proto),
http_listener_(http_listener),
ssl_ctx_(ctx),
service_(service),
flags_(0) {
static atomic_uint32_t next_id{1};

protocol_ = protocol;

constexpr size_t kReqSz = sizeof(Connection::PipelineMessage);
static_assert(kReqSz <= 256 && kReqSz >= 200);

switch (protocol) {
case Protocol::REDIS:
switch (proto) {
case REDIS:
redis_parser_.reset(new RedisParser(GetFlag(FLAGS_max_multi_bulk_len)));
break;
case Protocol::MEMCACHE:
case MEMCACHE:
memcache_parser_.reset(new MemcacheParser);
break;
}
Expand Down Expand Up @@ -724,8 +727,7 @@ void Connection::HandleRequests() {
// because both Write and Recv internally check if the socket was shut
// down and return with an error accordingly.
if (http_res && socket_->IsOpen()) {
cc_.reset(service_->CreateContext(socket_.get(), this));
reply_builder_ = cc_->reply_builder_old();
cc_.reset(service_->CreateContext(this));

if (*http_res) {
VLOG(1) << "HTTP1.1 identified";
Expand All @@ -749,15 +751,26 @@ void Connection::HandleRequests() {
if (breaker_cb_) {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
}

switch (protocol_) {
case REDIS: {
RedisReplyBuilder* rb = absl::GetFlag(FLAGS_experimental_new_io)
? new RedisReplyBuilder2(socket_.get())
: new RedisReplyBuilder(socket_.get());
reply_builder_.reset(rb);
break;
}
case MEMCACHE:
reply_builder_.reset(new MCReplyBuilder(socket_.get()));
break;
}
ConnectionFlow();

socket_->CancelOnErrorCb(); // noop if nothing is registered.
}
VLOG(1) << "Closed connection for peer "
<< GetClientInfo(fb2::ProactorBase::me()->GetPoolIndex());
cc_.reset();
reply_builder_ = nullptr;
reply_builder_.reset();
}
}

Expand Down Expand Up @@ -929,6 +942,8 @@ io::Result<bool> Connection::CheckForHttpProto() {
}

void Connection::ConnectionFlow() {
DCHECK(reply_builder_);

++stats_->num_conns;
++stats_->conn_received_cnt;
stats_->read_buf_capacity += io_buf_.Capacity();
Expand Down Expand Up @@ -986,7 +1001,7 @@ void Connection::ConnectionFlow() {
VLOG(1) << "Error parser status " << parser_error_;

if (redis_parser_) {
SendProtocolError(RedisParser::Result(parser_error_), reply_builder_);
SendProtocolError(RedisParser::Result(parser_error_), reply_builder_.get());
} else {
DCHECK(memcache_parser_);
reply_builder_->SendProtocolError("bad command line format");
Expand Down Expand Up @@ -1089,7 +1104,7 @@ Connection::ParserStatus Connection::ParseRedis() {

auto dispatch_sync = [this, &parse_args, &cmd_vec] {
RespExpr::VecToArgList(parse_args, &cmd_vec);
service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_, cc_.get());
service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_.get(), cc_.get());
};
auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle {
return {FromArgs(std::move(parse_args), tlh)};
Expand Down Expand Up @@ -1134,14 +1149,14 @@ auto Connection::ParseMemcache() -> ParserStatus {
string_view value;

auto dispatch_sync = [this, &cmd, &value] {
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(reply_builder_), cc_.get());
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(reply_builder_.get()), cc_.get());
};

auto dispatch_async = [&cmd, &value]() -> MessageHandle {
return {make_unique<MCPipelineMessage>(std::move(cmd), value)};
};

MCReplyBuilder* builder = static_cast<MCReplyBuilder*>(reply_builder_);
MCReplyBuilder* builder = static_cast<MCReplyBuilder*>(reply_builder_.get());

do {
string_view str = ToSV(io_buf_.InputBuffer());
Expand Down Expand Up @@ -1358,7 +1373,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) {

void Connection::SquashPipeline() {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);
DCHECK_EQ(reply_builder_->type(), SinkReplyBuilder::REDIS); // Only Redis is supported.
DCHECK_EQ(reply_builder_->type(), REDIS); // Only Redis is supported.

vector<ArgSlice> squash_cmds;
squash_cmds.reserve(dispatch_q_.size());
Expand All @@ -1374,7 +1389,7 @@ void Connection::SquashPipeline() {
cc_->async_dispatch = true;

size_t dispatched =
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get());
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_.get(), cc_.get());

if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
reply_builder_->FlushBatch();
Expand All @@ -1397,7 +1412,7 @@ void Connection::SquashPipeline() {
}

void Connection::ClearPipelinedMessages() {
DispatchOperations dispatch_op{reply_builder_, this};
DispatchOperations dispatch_op{reply_builder_.get(), this};

// Recycle messages even from disconnecting client to keep properly track of memory stats
// As well as to avoid pubsub backpressure leakege.
Expand Down Expand Up @@ -1445,7 +1460,7 @@ std::string Connection::DebugInfo() const {
void Connection::ExecutionFiber() {
ThisFiber::SetName("ExecutionFiber");

DispatchOperations dispatch_op{reply_builder_, this};
DispatchOperations dispatch_op{reply_builder_.get(), this};

size_t squashing_threshold = GetFlag(FLAGS_pipeline_squash);

Expand Down Expand Up @@ -1809,7 +1824,7 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
size_t mem = sizeof(*this) + dfly::HeapSize(dispatch_q_) + dfly::HeapSize(name_) +
dfly::HeapSize(tmp_parse_args_) + dfly::HeapSize(tmp_cmd_vec_) +
dfly::HeapSize(memcache_parser_) + dfly::HeapSize(redis_parser_) +
dfly::HeapSize(cc_);
dfly::HeapSize(cc_) + dfly::HeapSize(reply_builder_);

// We add a hardcoded 9k value to accomodate for the part of the Fiber stack that is in use.
// The allocated stack is actually larger (~130k), but only a small fraction of that (9k
Expand Down
10 changes: 2 additions & 8 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Connection : public util::Connection {
struct QueueBackpressure;

public:
Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
Connection(Protocol type, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service);
~Connection();

Expand Down Expand Up @@ -269,10 +269,6 @@ class Connection : public util::Connection {

bool IsMain() const;

Protocol protocol() const {
return protocol_;
}

void SetName(std::string name);

void SetLibName(std::string name);
Expand Down Expand Up @@ -404,9 +400,7 @@ class Connection : public util::Connection {
Protocol protocol_;
ConnectionStats* stats_ = nullptr;

// cc_->reply_builder may change during the lifetime of the connection, due to injections.
// This is a pointer to the original, socket based reply builder that never changes.
SinkReplyBuilder* reply_builder_ = nullptr;
std::unique_ptr<SinkReplyBuilder> reply_builder_;
util::HttpListenerBase* http_listener_;
SSL_CTX* ssl_ctx_;

Expand Down
4 changes: 4 additions & 0 deletions src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class Listener : public util::ListenerInterface {
bool IsPrivilegedInterface() const;
bool IsMainInterface() const;

Protocol protocol() const {
return protocol_;
}

private:
util::Connection* NewConnection(ProactorBase* proactor) final;
ProactorBase* PickConnectionProactor(util::FiberSocketBase* sock) final;
Expand Down
13 changes: 13 additions & 0 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,17 @@ ostream& operator<<(ostream& os, facade::RespSpan ras) {
return os;
}

ostream& operator<<(ostream& os, facade::Protocol p) {
switch (p) {
case facade::REDIS:
os << "REDIS";
break;
case facade::MEMCACHE:
os << "MEMCACHE";
break;
}

return os;
}

} // namespace std
4 changes: 2 additions & 2 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ constexpr size_t kSanitizerOverhead = 0u;
#endif
#endif

enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 };
enum Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 };

using MutableSlice = std::string_view;
using CmdArgList = absl::Span<const std::string_view>;
Expand Down Expand Up @@ -189,5 +189,5 @@ void ResetStats();

namespace std {
ostream& operator<<(ostream& os, facade::CmdArgList args);

ostream& operator<<(ostream& os, facade::Protocol proto);
} // namespace std
6 changes: 3 additions & 3 deletions src/facade/ok_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ class OkService : public ServiceInterface {
builder->SendError("");
}

ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) final {
return new ConnectionContext{peer, owner};
ConnectionContext* CreateContext(Connection* owner) final {
return new ConnectionContext{owner};
}
};

void RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
OkService service;
pool->Await([](auto*) { tl_facade_stats = new FacadeStats; });

acceptor->AddListener(GetFlag(FLAGS_port), new Listener{Protocol::REDIS, &service});
acceptor->AddListener(GetFlag(FLAGS_port), new Listener{REDIS, &service});

acceptor->Run();
acceptor->Wait();
Expand Down
3 changes: 2 additions & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ void SinkReplyBuilder2::NextVec(std::string_view str) {
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
}

MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink, MC), noreply_(false) {
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink)
: SinkReplyBuilder(sink, MEMCACHE), noreply_(false) {
}

void MCReplyBuilder::SendSimpleString(std::string_view str) {
Expand Down
2 changes: 1 addition & 1 deletion src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class SinkReplyBuilder {
SinkReplyBuilder(const SinkReplyBuilder&) = delete;
void operator=(const SinkReplyBuilder&) = delete;

enum Type { REDIS, MC };
using Type = Protocol;

explicit SinkReplyBuilder(::io::Sink* sink, Type t);

Expand Down
2 changes: 1 addition & 1 deletion src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class ServiceInterface {
virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
MCReplyBuilder* builder, ConnectionContext* cntx) = 0;

virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0;
virtual ConnectionContext* CreateContext(Connection* owner) = 0;

virtual void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
}
Expand Down
Loading

0 comments on commit 81b578c

Please sign in to comment.