Skip to content

Commit

Permalink
Merge branch 'main' into fix_blocking_transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Borys <[email protected]>
  • Loading branch information
BorysTheDev authored Jan 2, 2024
2 parents 92652f6 + 03f69ff commit cf15451
Show file tree
Hide file tree
Showing 35 changed files with 624 additions and 475 deletions.
2 changes: 1 addition & 1 deletion src/core/simple_lru_counter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void SimpleLruCounter::BumpToHead(uint32_t index) {
node_arr_[node.prev].next = node.next;
node_arr_[node.next].prev = node.prev;
node.prev = tail;
node.prev = head_;
node.next = head_;
head_ = index;
}
}; // namespace dfly
28 changes: 27 additions & 1 deletion src/core/simple_lru_counter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ namespace dfly {

class SimpleLruTest : public ::testing::Test {
protected:
SimpleLruTest() : cache_(4) {
SimpleLruTest() : cache_(kSize) {
}

const size_t kSize = 4;
SimpleLruCounter cache_;
};

Expand Down Expand Up @@ -45,4 +46,29 @@ TEST_F(SimpleLruTest, Basic) {
ASSERT_EQ(6, cache_.Get("f"));
}

TEST_F(SimpleLruTest, DifferentOrder) {
for (uint32_t i = 0; i < kSize * 2; ++i) {
cache_.Put(absl::StrCat(i), i);
}

for (uint32_t i = 0; i < kSize; ++i) {
EXPECT_EQ(nullopt, cache_.Get(absl::StrCat(i)));
}
for (uint32_t i = kSize; i < kSize * 2; ++i) {
EXPECT_EQ(i, cache_.Get(absl::StrCat(i)));
}

for (uint32_t i = kSize; i > 0; --i) {
cache_.Put(absl::StrCat(i), i);
}
cache_.Put("0", 0);

for (uint32_t i = 0; i < kSize; ++i) {
EXPECT_EQ(i, cache_.Get(absl::StrCat(i)));
}
for (uint32_t i = kSize; i < kSize * 2; ++i) {
EXPECT_EQ(nullopt, cache_.Get(absl::StrCat(i)));
}
}

} // namespace dfly
2 changes: 1 addition & 1 deletion src/facade/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc
add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc
reply_capture.cc resp_expr.cc cmd_arg_parser.cc tls_error.cc)

Expand Down
44 changes: 44 additions & 0 deletions src/facade/conn_context.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "facade/conn_context.h"

#include "facade/dragonfly_connection.h"

namespace facade {

ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
if (owner) {
protocol_ = owner->protocol();
}

if (stream) {
switch (protocol_) {
case Protocol::REDIS:
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
case Protocol::MEMCACHE:
rbuilder_.reset(new MCReplyBuilder(stream));
break;
}
}

conn_closing = false;
req_auth = false;
replica_conn = false;
authenticated = false;
async_dispatch = false;
sync_dispatch = false;
journal_emulated = false;
paused = false;
blocked = false;

subscriptions = 0;
}

size_t ConnectionContext::UsedMemory() const {
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
}

} // namespace facade
9 changes: 4 additions & 5 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ class ConnectionContext {
rbuilder_->SendProtocolError(str);
}

virtual size_t UsedMemory() const {
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) +
dfly::HeapSize(acl_commands);
}
virtual size_t UsedMemory() const;

// connection state / properties.
bool conn_closing : 1;
Expand All @@ -101,7 +98,9 @@ class ConnectionContext {
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused : 1; // whether this connection is paused due to CLIENT PAUSE
bool blocked; // whether it's blocked on blocking commands like BLPOP, needs to be addressable

// whether it's blocked on blocking commands like BLPOP, needs to be addressable
bool blocked;

// How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions;
Expand Down
29 changes: 9 additions & 20 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,6 @@ void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
}
}

void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) {
stats->io_write_cnt += builder->io_write_cnt();
stats->io_write_bytes += builder->io_write_bytes();

for (const auto& k_v : builder->err_count()) {
stats->err_count_map[k_v.first] += k_v.second;
}
builder->reset_io_stats();
}

// TODO: to implement correct matcher according to HTTP spec
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html
// One place to find a good implementation would be https://github.com/h2o/picohttpparser
Expand All @@ -114,6 +104,9 @@ constexpr size_t kMinReadSize = 256;

thread_local uint32_t free_req_release_weight = 0;

const char* kPhaseName[Connection::NUM_PHASES] = {"SETUP", "READ", "PROCESS", "SHUTTING_DOWN",
"PRECLOSE"};

} // namespace

thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;
Expand Down Expand Up @@ -513,13 +506,15 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co
string after;
absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id));
absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_);
absl::StrAppend(&after, " phase=", PHASE_NAMES[phase_]);
string_view phase_name = PHASE_NAMES[phase_];

if (cc_) {
string cc_info = service_->GetContextInfo(cc_.get());
if (cc_->reply_builder()->IsSendActive())
phase_name = "send";
absl::StrAppend(&after, " ", cc_info);
}

absl::StrAppend(&after, " phase=", phase_name);
return {std::move(before), std::move(after)};
}

Expand Down Expand Up @@ -684,14 +679,12 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}
}

FetchBuilderStats(stats_, orig_builder);
}

if (ec && !FiberSocketBase::IsConnClosed(ec)) {
string conn_info = service_->GetContextInfo(cc_.get());
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec
<< " " << ec.message();
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName()
<< " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message();
}
}

Expand Down Expand Up @@ -899,8 +892,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
size_t max_iobfuf_len = absl::GetFlag(FLAGS_max_client_iobuf_len);

do {
FetchBuilderStats(stats_, orig_builder);

HandleMigrateRequest();

io::MutableBytes append_buf = io_buf_.AppendBuffer();
Expand Down Expand Up @@ -975,8 +966,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
ec = orig_builder->GetError();
} while (peer->IsOpen() && !ec);

FetchBuilderStats(stats_, orig_builder);

if (ec)
return ec;

Expand Down
44 changes: 2 additions & 42 deletions src/facade/facade.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,20 @@

#include "base/logging.h"
#include "facade/command_id.h"
#include "facade/conn_context.h"
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "facade/resp_expr.h"

namespace facade {

using namespace std;

#define ADD(x) (x) += o.x
#define ADD_M(m) \
do { \
for (const auto& k_v : o.m) { \
m[k_v.first] += k_v.second; \
} \
} while (0)

constexpr size_t kSizeConnStats = sizeof(ConnectionStats);

ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 144u);
static_assert(kSizeConnStats == 96u);

ADD(read_buf_capacity);
ADD(dispatch_queue_entries);
Expand All @@ -36,17 +29,13 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(pipeline_cmd_cache_bytes);
ADD(io_read_cnt);
ADD(io_read_bytes);
ADD(io_write_cnt);
ADD(io_write_bytes);
ADD(command_cnt);
ADD(pipelined_cmd_cnt);
ADD(conn_received_cnt);
ADD(num_conns);
ADD(num_replicas);
ADD(num_blocked_clients);

ADD_M(err_count_map);

return *this;
}

Expand Down Expand Up @@ -110,35 +99,6 @@ const char* RespExpr::TypeName(Type t) {
ABSL_UNREACHABLE();
}

ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
if (owner) {
protocol_ = owner->protocol();
}

if (stream) {
switch (protocol_) {
case Protocol::REDIS:
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
case Protocol::MEMCACHE:
rbuilder_.reset(new MCReplyBuilder(stream));
break;
}
}

conn_closing = false;
req_auth = false;
replica_conn = false;
authenticated = false;
async_dispatch = false;
sync_dispatch = false;
journal_emulated = false;
paused = false;
blocked = false;

subscriptions = 0;
}

CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first_key,
int8_t last_key, uint32_t acl_categories)
: name_(name),
Expand Down
4 changes: 0 additions & 4 deletions src/facade/facade_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ struct CmdArgListFormatter {
};

struct ConnectionStats {
absl::flat_hash_map<std::string, uint64_t> err_count_map;

size_t read_buf_capacity = 0; // total capacity of input buffers
size_t dispatch_queue_entries = 0; // total number of dispatch queue entries
size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries
Expand All @@ -48,8 +46,6 @@ struct ConnectionStats {

size_t io_read_cnt = 0;
size_t io_read_bytes = 0;
size_t io_write_cnt = 0;
size_t io_write_bytes = 0;

uint64_t command_cnt = 0;
uint64_t pipelined_cmd_cnt = 0;
Expand Down
Loading

0 comments on commit cf15451

Please sign in to comment.