diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 968059aee2ea..c2bbf95bc745 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -19,7 +19,8 @@ class ConnectionContext { // We won't have any virtual methods, probably. However, since we allocate a derived class, // we need to declare a virtual d-tor, so we could properly delete it from Connection code. - virtual ~ConnectionContext() {} + virtual ~ConnectionContext() { + } Connection* owner() { return owner_; @@ -44,12 +45,19 @@ class ConnectionContext { } // connection state / properties. - bool async_dispatch: 1; // whether this connection is currently handled by dispatch fiber. - bool conn_closing: 1; - bool req_auth: 1; - bool replica_conn: 1; - bool authenticated: 1; - bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber. + bool async_dispatch : 1; // whether this connection is currently handled by dispatch fiber. + bool conn_closing : 1; + bool req_auth : 1; + bool replica_conn : 1; + bool authenticated : 1; + bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber. + bool monitor : 1 = false; // when the connection is monitor do not support most commands other + // than quit and reset + protected: + void EnableMonitoring(bool enable) { + force_dispatch = enable; // required to support the monitoring + monitor = enable; + } private: Connection* owner_; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 45a817aad283..631f13f7d294 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -9,6 +9,7 @@ #include #include +#include #include "base/flags.h" #include "base/logging.h" @@ -16,6 +17,7 @@ #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" +#include "server/conn_context.h" #include "util/fiber_sched_algo.h" #ifdef DFLY_USE_SSL @@ -35,6 +37,14 @@ namespace fibers = boost::fibers; namespace facade { namespace { +// visitor helper - for the message dispatching - +// https://en.cppreference.com/w/cpp/utility/variant/visit +// template struct overloaded : Ts... { using Ts::operator()...; }; +// // explicit deduction guide (not needed as of C++20) +// template overloaded(Ts...) -> overloaded; + +template inline constexpr bool always_false_v = false; + void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { string res("-ERR Protocol error: "); if (pres == RedisParser::BAD_BULKLEN) { @@ -85,6 +95,17 @@ constexpr size_t kReqStorageSize = 88; constexpr size_t kReqStorageSize = 120; #endif +// The format of the message that are sending is +// +"time of day" [db-number args; + struct PipelineMsg { + absl::FixedArray args; + + // I do not use mi_heap_t explicitly but mi_stl_allocator at the end does the same job + // of using the thread's heap. + // The capacity is chosen so that we allocate a fully utilized (256 bytes) block. + absl::FixedArray> storage; + + PipelineMsg(size_t nargs, size_t capacity) : args(nargs), storage(capacity) { + } + }; + + struct MonitorMessage { + std::string_view msg; + fibers_ext::BlockingCounter bc; - // I do not use mi_heap_t explicitly but mi_stl_allocator at the end does the same job - // of using the thread's heap. - // The capacity is chosen so that we allocate a fully utilized (256 bytes) block. - absl::FixedArray> storage; - AsyncMsg* async_msg = nullptr; // allocated and released via mi_malloc. + explicit MonitorMessage(std::string_view m, fibers_ext::BlockingCounter token) + : msg{m}, bc{std::move(token)} { + } + }; + + private: + using MessagePayload = std::variant; + + Request(PipelineMsg msg) : payload(std::move(msg)) { + } - Request(size_t nargs, size_t capacity) : args(nargs), storage(capacity) { + Request(AsyncMsg msg) : payload(std::move(msg)) { + } + + Request(MonitorMessage msg) : payload(std::move(msg)) { } Request(const Request&) = delete; + + public: + static RequestPtr New(mi_heap_t* heap, RespVec args, size_t capacity); + + static RequestPtr New(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc); + + static RequestPtr New(std::string_view msg, fibers_ext::BlockingCounter bc); + + MessagePayload payload; }; +Connection::RequestPtr Connection::Request::New(std::string_view data, + fibers_ext::BlockingCounter bc) { + MonitorMessage msg{data, std::move(bc)}; + + void* ptr = mi_malloc(sizeof(Request)); + Request* req = new (ptr) Request(std::move(msg)); + return Connection::RequestPtr{req, Connection::RequestDeleter{}}; +} + +Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, size_t capacity) { + constexpr auto kReqSz = sizeof(Request); + PipelineMsg req{args.size(), capacity}; + auto* next = req.storage.data(); + for (size_t i = 0; i < args.size(); ++i) { + auto buf = args[i].GetBuf(); + size_t s = buf.size(); + memcpy(next, buf.data(), s); + req.args[i] = MutableSlice(next, s); + next += s; + } + void* ptr = mi_heap_malloc_small(heap, kReqSz); + Request* msg = new (ptr) Request(req); + return Connection::RequestPtr{msg, Connection::RequestDeleter{}}; +} + +Connection::RequestPtr Connection::Request::New(const PubMessage& pub_msg, + fibers_ext::BlockingCounter bc) { + AsyncMsg req{pub_msg, std::move(bc)}; + void* ptr = mi_malloc(sizeof(Request)); + Request* msg = new (ptr) Request(std::move(req)); + return Connection::RequestPtr{msg, Connection::RequestDeleter{}}; +} + Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service) : io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) { @@ -263,21 +355,30 @@ void Connection::RegisterOnBreak(BreakerCb breaker_cb) { breaker_cb_ = breaker_cb; } -void Connection::SendMsgVecAsync(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc) { +void Connection::SendMonitorMsg(std::string_view monitor_msg, + util::fibers_ext::BlockingCounter bc) { DCHECK(cc_); if (cc_->conn_closing) { bc.Dec(); return; } + RequestPtr req = Request::New(monitor_msg, std::move(bc)); + dispatch_q_.push_back(std::move(req)); + if (dispatch_q_.size() == 1) { + evc_.notify(); + } +} - void* ptr = mi_malloc(sizeof(AsyncMsg)); - AsyncMsg* amsg = new (ptr) AsyncMsg(pub_msg, move(bc)); +void Connection::SendMsgVecAsync(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc) { + DCHECK(cc_); - ptr = mi_malloc(sizeof(Request)); - Request* req = new (ptr) Request(0, 0); - req->async_msg = amsg; - dispatch_q_.push_back(req); + if (cc_->conn_closing) { + bc.Dec(); + return; + } + RequestPtr req = Request::New(pub_msg, std::move(bc)); // new (ptr) Request(0, 0); + dispatch_q_.push_back(std::move(req)); if (dispatch_q_.size() == 1) { evc_.notify(); } @@ -437,9 +538,9 @@ auto Connection::ParseRedis() -> ParserStatus { VLOG(2) << "Dispatch async"; // Dispatch via queue to speedup input reading. - Request* req = FromArgs(std::move(parse_args_), tlh); + RequestPtr req = FromArgs(std::move(parse_args_), tlh); - dispatch_q_.push_back(req); + dispatch_q_.push_back(std::move(req)); if (dispatch_q_.size() == 1) { evc_.notify(); } else if (dispatch_q_.size() > 10) { @@ -596,79 +697,102 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantservice_->GetThreadLocalConnectionStats()}, builder{b}, + empty{me->dispatch_q_.empty()}, self(me) { + } + + void operator()(AsyncMsg msg); + void operator()(Request::MonitorMessage msg); + void operator()(Request::PipelineMsg msg); + + ConnectionStats* stats = nullptr; + SinkReplyBuilder* builder = nullptr; + bool empty = false; + Connection* self = nullptr; +}; + +void Connection::DispatchOperations::operator()(AsyncMsg msg) { + RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; + ++stats->async_writes_cnt; + const PubMessage& pub_msg = msg.pub_msg; + string_view arr[4]; + if (pub_msg.pattern.empty()) { + arr[0] = "message"; + arr[1] = pub_msg.channel; + arr[2] = pub_msg.message; + rbuilder->SendStringArr(absl::Span{arr, 3}); + } else { + arr[0] = "pmessage"; + arr[1] = pub_msg.pattern; + arr[2] = pub_msg.channel; + arr[3] = pub_msg.message; + rbuilder->SendStringArr(absl::Span{arr, 4}); + } + msg.bc.Dec(); +} + +void Connection::DispatchOperations::operator()(Request::MonitorMessage msg) { + RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; + rbuilder->SendBulkString(msg.msg); + msg.bc.Dec(); +} + +void Connection::DispatchOperations::operator()(Request::PipelineMsg msg) { + ++stats->pipelined_cmd_cnt; + + builder->SetBatchMode(!empty); + self->cc_->async_dispatch = true; + self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); + self->last_interaction_ = time(nullptr); + self->cc_->async_dispatch = false; +} + +struct Connection::DispatchCleanup { + void operator()(AsyncMsg msg) const { + msg.bc.Dec(); + } + void operator()(Connection::Request::MonitorMessage msg) const { + msg.bc.Dec(); + } + void operator()(Connection::Request::PipelineMsg) const { + } +}; + // DispatchFiber handles commands coming from the InputLoop. // Thus, InputLoop can quickly read data from the input buffer, parse it and push -// into the dispatch queue and DispatchFiber will run those commands asynchronously with InputLoop. -// Note: in some cases, InputLoop may decide to dispatch directly and bypass the DispatchFiber. +// into the dispatch queue and DispatchFiber will run those commands asynchronously with +// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the +// DispatchFiber. void Connection::DispatchFiber(util::FiberSocketBase* peer) { this_fiber::properties().set_name("DispatchFiber"); - ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); SinkReplyBuilder* builder = cc_->reply_builder(); + DispatchOperations dispatch_op{builder, this}; while (!builder->GetError()) { evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); }); if (cc_->conn_closing) break; - Request* req = dispatch_q_.front(); + RequestPtr req{std::move(dispatch_q_.front())}; dispatch_q_.pop_front(); - - if (req->async_msg) { - ++stats->async_writes_cnt; - - RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; - const PubMessage& pub_msg = req->async_msg->pub_msg; - string_view arr[4]; - - if (pub_msg.pattern.empty()) { - arr[0] = "message"; - arr[1] = pub_msg.channel; - arr[2] = pub_msg.message; - rbuilder->SendStringArr(absl::Span{arr, 3}); - } else { - arr[0] = "pmessage"; - arr[1] = pub_msg.pattern; - arr[2] = pub_msg.channel; - arr[3] = pub_msg.message; - rbuilder->SendStringArr(absl::Span{arr, 4}); - } - - req->async_msg->bc.Dec(); - - req->async_msg->~AsyncMsg(); - mi_free(req->async_msg); - } else { - ++stats->pipelined_cmd_cnt; - - builder->SetBatchMode(!dispatch_q_.empty()); - cc_->async_dispatch = true; - service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get()); - last_interaction_ = time(nullptr); - cc_->async_dispatch = false; - } - req->~Request(); - mi_free(req); + std::visit(dispatch_op, req->payload); } cc_->conn_closing = true; // Clean up leftovers. + DispatchCleanup clean_op; while (!dispatch_q_.empty()) { - Request* req = dispatch_q_.front(); + RequestPtr req{std::move(dispatch_q_.front())}; dispatch_q_.pop_front(); - - if (req->async_msg) { - req->async_msg->bc.Dec(); - req->async_msg->~AsyncMsg(); - mi_free(req->async_msg); - } - req->~Request(); - mi_free(req); + std::visit(clean_op, req->payload); } } -auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* { +auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr { DCHECK(!args.empty()); size_t backed_sz = 0; for (const auto& arg : args) { @@ -680,22 +804,11 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* { constexpr auto kReqSz = sizeof(Request); static_assert(kReqSz < MI_SMALL_SIZE_MAX); static_assert(alignof(Request) == 8); - void* ptr = mi_heap_malloc_small(heap, kReqSz); - - Request* req = new (ptr) Request{args.size(), backed_sz}; - auto* next = req->storage.data(); - for (size_t i = 0; i < args.size(); ++i) { - auto buf = args[i].GetBuf(); - size_t s = buf.size(); - memcpy(next, buf.data(), s); - req->args[i] = MutableSlice(next, s); - next += s; - } + RequestPtr req = Request::New(heap, args, backed_sz); return req; } - void Connection::ShutdownSelf() { util::Connection::Shutdown(); } @@ -709,4 +822,39 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) { } } +std::string Connection::CreateMonitorMessage(const dfly::ConnectionState& conn_state) const { + std::string command_details("+" + MonitorTimestamp() + " [" + + std::to_string(conn_state.db_index) + " "); + + LinuxSocketBase* lsb = static_cast(socket_.get()); + + auto AppendConnDetails = [&]() { + if (conn_state.script_info.has_value()) { + absl::StrAppend(&command_details, "lua] "); + } else { + if (lsb->IsUDS()) { // Unix domain socket + absl::StrAppend(&command_details, "unix:"); + } + auto re = lsb->RemoteEndpoint(); + absl::StrAppend(&command_details, re.address().to_string(), ":", re.port(), "] "); + } + }; + + auto AppendCmd = [&](const CmdArgVec& args) { + if (args.empty()) { + absl::StrAppend(&command_details, "error - empty cmd list!"); + } else { + command_details = + std::accumulate(args.begin(), args.end(), command_details, [](auto str, const auto& cmd) { + absl::StrAppend(&str, "\"", std::string_view(cmd.data(), cmd.size()), "\" "); + return str; + }); + } + }; + + AppendConnDetails(); + AppendCmd(cmd_vec_); + return command_details; +} + } // namespace facade diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index ecf3a212de41..16de766049c3 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -19,6 +19,10 @@ typedef struct ssl_ctx_st SSL_CTX; typedef struct mi_heap_s mi_heap_t; +namespace dfly { +class ConnectionState; +} + namespace facade { class ConnectionContext; @@ -61,6 +65,9 @@ class Connection : public util::Connection { virtual void SendMsgVecAsync(const PubMessage& pub_msg, util::fibers_ext::BlockingCounter bc); + // for monitor + virtual void SendMonitorMsg(std::string_view monitor_msg, util::fibers_ext::BlockingCounter bc); + void SetName(std::string_view name) { CopyCharBuf(name, sizeof(name_), name_); } @@ -74,6 +81,8 @@ class Connection : public util::Connection { void ShutdownSelf(); + std::string CreateMonitorMessage(const dfly::ConnectionState& conn_state) const; + protected: void OnShutdown() override; void OnPreMigrateThread() override; @@ -116,11 +125,16 @@ class Connection : public util::Connection { std::unique_ptr cc_; struct Request; + struct DispatchOperations; + struct DispatchCleanup; + struct RequestDeleter; + + using RequestPtr = std::unique_ptr; // args are passed deliberately by value - to pass the ownership. - static Request* FromArgs(RespVec args, mi_heap_t* heap); + static RequestPtr FromArgs(RespVec args, mi_heap_t* heap); - std::deque dispatch_q_; // coordinated via evc_. + std::deque dispatch_q_; // coordinated via evc_. util::fibers_ext::EventCount evc_; RespVec parse_args_; diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 5a9cb03cf794..ef49df0e5c12 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -12,7 +12,7 @@ add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller tiered_storage.cc transaction.cc) cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) -add_library(dragonfly_lib channel_slice.cc command_registry.cc +add_library(dragonfly_lib channel_slice.cc server_state.cc command_registry.cc config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc generic_family.cc hset_family.cc json_family.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index be80718d1d36..c30b4bb57479 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -6,12 +6,55 @@ #include "base/logging.h" #include "server/engine_shard_set.h" +#include "server/server_family.h" +#include "server/server_state.h" +#include "src/facade/dragonfly_connection.h" #include "util/proactor_base.h" - namespace dfly { using namespace std; +void ConnectionContext::SendMonitorMsg(std::string_view msg, + util::fibers_ext::BlockingCounter borrows) { + CHECK(owner()); + + owner()->SendMonitorMsg(msg, borrows); +} + +void ConnectionContext::MonitorCmd() { + if (!ServerState::tlocal()->Monitors().Empty()) { + util::fibers_ext::BlockingCounter bc(ServerState::tlocal()->Monitors().Size()); + // We have connections waiting to get the info on the last command, send it to them + auto msg = owner()->CreateMonitorMessage(conn_state); + shard_set->pool()->Await([&msg, bc](unsigned idx, util::ProactorBase*) { + ServerState::tlocal()->Monitors().Send(msg, bc, idx); + }); + bc.Wait(); // Wait for all the messages to be sent. + // we need to un-borrow what we used, do this here + shard_set->pool()->Await([&msg](unsigned idx, util::ProactorBase*) { + ServerState::tlocal()->Monitors().Release(idx); + }); + } +} + +void ConnectionContext::ChangeMonitor(bool start) { + // This will either remove or register a new connection + // at the "top level" thread --> ServerState context + if (start) { + conn_state.monitor.reset(new ConnectionState::Monitor); + MonitorsRepo::MonitorInfo new_entry(this); + shard_set->pool()->Await( + [&new_entry](auto*) { ServerState::tlocal()->Monitors().Add(new_entry); }); + } else { + VLOG(1) << "connection " << owner()->GetClientInfo() + << " no longer needs to be monitored - removing 0x" << std::hex << (const void*)this; + shard_set->pool()->Await( + [self = this](auto*) { ServerState::tlocal()->Monitors().Remove(self); }); + conn_state.monitor.reset(); + } + EnableMonitoring(start); +} + void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgList args) { vector result(to_reply ? args.size() : 0, 0); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 4cc1979eef18..182b105a8cce 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -32,7 +32,9 @@ struct ConnectionState { ExecInfo(ExecInfo&&) = delete; // Return true if ExecInfo is active (after MULTI) - bool IsActive() { return state != EXEC_INACTIVE; } + bool IsActive() { + return state != EXEC_INACTIVE; + } // Resets to blank state after EXEC or DISCARD void Clear(); @@ -73,6 +75,10 @@ struct ConnectionState { util::fibers_ext::BlockingCounter borrow_token{0}; }; + struct Monitor { + util::fibers_ext::BlockingCounter borrow_token{0}; + }; + enum MCGetMask { FETCH_CAS_VER = 1, }; @@ -92,6 +98,7 @@ struct ConnectionState { ExecInfo exec_info; std::optional script_info; std::unique_ptr subscribe_info; + std::unique_ptr monitor; }; class ConnectionContext : public facade::ConnectionContext { @@ -117,17 +124,21 @@ class ConnectionContext : public facade::ConnectionContext { return conn_state.db_index; } + void MonitorCmd(); + + void SendMonitorMsg(std::string_view msg, util::fibers_ext::BlockingCounter borrows); + void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args); void ChangePSub(bool to_add, bool to_reply, CmdArgList args); void UnsubscribeAll(bool to_reply); void PUnsubscribeAll(bool to_reply); + void ChangeMonitor(bool start); // either start or stop monitor on a given connection bool is_replicating = false; private: void SendSubscriptionChangedResponse(std::string_view action, - std::optional topic, - unsigned count); + std::optional topic, unsigned count); }; } // namespace dfly diff --git a/src/server/main_service.cc b/src/server/main_service.cc index ae34593dfb1f..9385c521ae17 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -448,6 +448,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) return (*cntx)->SendError("This Redis command is not allowed from script"); } + dfly_cntx->MonitorCmd(); + bool is_write_cmd = (cid->opt_mask() & CO::WRITE) || (under_script && dfly_cntx->conn_state.script_info->is_write); bool under_multi = dfly_cntx->conn_state.exec_info.IsActive() && !is_trans_cmd; @@ -973,7 +975,8 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis // The comparison can still be true even if a key expired due to another one being created. // So we have to check the watched_dirty flag, which is set if a key expired. - return watch_exist_count.load() == exec_info.watched_existed && !exec_info.watched_dirty.load(memory_order_relaxed); + return watch_exist_count.load() == exec_info.watched_existed && + !exec_info.watched_dirty.load(memory_order_relaxed); } // Check if exec_info watches keys on dbs other than db_indx. @@ -1180,6 +1183,13 @@ void Service::PubsubPatterns(ConnectionContext* cntx) { (*cntx)->SendLong(pattern_count); } +void Service::Monitor(CmdArgList args, ConnectionContext* cntx) { + VLOG(1) << "starting monitor on this connection: " << cntx->owner()->GetClientInfo(); + // we are registering the current connection for all threads so they will be aware of + // this connection, to send to it any command + cntx->ChangeMonitor(true /* start */); +} + void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) { if (args.size() < 2) { (*cntx)->SendError(WrongNumArgsError(ArgS(args, 0))); @@ -1273,6 +1283,13 @@ void Service::OnClose(facade::ConnectionContext* cntx) { } } + if (conn_state.monitor) { + // remove monitor on this connection + auto token = conn_state.monitor->borrow_token; + server_cntx->ChangeMonitor(false /*start*/); + token.Wait(); + } + server_family_.OnClose(server_cntx); } @@ -1316,6 +1333,7 @@ void Service::RegisterCommands() { << CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe) << CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe) << CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function) + << CI{"MONITOR", CO::ADMIN, 1, 0, 0, 0}.MFUNC(Monitor) << CI{"PUBSUB", CO::LOADING | CO::FAST, -1, 0, 0, 0}.MFUNC(Pubsub); StreamFamily::Register(®istry_); diff --git a/src/server/main_service.h b/src/server/main_service.h index 07a07e8d9f6f..d22582c41fe4 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -102,7 +102,7 @@ class Service : public facade::ServiceInterface { void PSubscribe(CmdArgList args, ConnectionContext* cntx); void PUnsubscribe(CmdArgList args, ConnectionContext* cntx); void Function(CmdArgList args, ConnectionContext* cntx); - + void Monitor(CmdArgList args, ConnectionContext* cntx); void Pubsub(CmdArgList args, ConnectionContext* cntx); void PubsubChannels(std::string_view pattern, ConnectionContext* cntx); void PubsubPatterns(ConnectionContext* cntx); diff --git a/src/server/server_state.cc b/src/server/server_state.cc new file mode 100644 index 000000000000..3d8537b3821e --- /dev/null +++ b/src/server/server_state.cc @@ -0,0 +1,62 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/server_state.h" + +#include "base/logging.h" +#include "server/conn_context.h" + +namespace dfly { + +MonitorsRepo::MonitorInfo::MonitorInfo(ConnectionContext* conn) + : connection(conn), token(conn->conn_state.monitor->borrow_token), + thread_id(util::ProactorBase::GetIndex()) { +} + +void MonitorsRepo::MonitorInfo::Send(std::string_view msg, std::uint32_t tid, + util::fibers_ext::BlockingCounter borrows) { + if (tid == thread_id && connection) { + VLOG(1) << "thread " << tid << " sending monitor message '" << msg << "'"; + token.Inc(); + connection->SendMonitorMsg(msg, borrows); + VLOG(1) << "thread " << tid << " successfully finish to send the message"; + } +} + +void MonitorsRepo::Add(const MonitorInfo& info) { + VLOG(1) << "register connection " + << " at address 0x" << std::hex << (const void*)info.connection << " for thread " + << info.thread_id; + + monitors_.push_back(info); +} + +void MonitorsRepo::Send(std::string_view msg, util::fibers_ext::BlockingCounter borrows, + std::uint32_t tid) { + std::for_each(monitors_.begin(), monitors_.end(), + [msg, tid, borrows](auto& monitor_conn) { monitor_conn.Send(msg, tid, borrows); }); +} + +void MonitorsRepo::Release(std::uint32_t tid) { + std::for_each(monitors_.begin(), monitors_.end(), [tid](auto& monitor_conn) { + if (monitor_conn.thread_id == tid) { + VLOG(1) << "thread " << tid << " releasing token"; + monitor_conn.token.Dec(); + } + }); +} + +void MonitorsRepo::Remove(const ConnectionContext* conn) { + auto it = std::find_if(monitors_.begin(), monitors_.end(), + [&conn](const auto& val) { return val.connection == conn; }); + if (it != monitors_.end()) { + VLOG(1) << "removing connection 0x" << std::hex << (const void*)conn << " releasing token"; + monitors_.erase(it); + } else { + VLOG(1) << "no connection 0x" << std::hex << (const void*)conn + << " found in the registered list here"; + } +} + +} // end of namespace dfly diff --git a/src/server/server_state.h b/src/server/server_state.h index fb58a869de09..5f6878af62de 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -15,10 +15,57 @@ typedef struct mi_heap_s mi_heap_t; namespace dfly { +class ConnectionContext; namespace journal { class Journal; } // namespace journal +// This would be used as a thread local storage of sending +// monitor messages. +// Each thread will have its own list of all the connections that are +// used for monitoring. When a connection is set to monitor it would register +// itself to this list on all i/o threads. When a new command is dispatched, +// and this list is not empty, it would send in the same thread context as then +// thread that registered here the command. +// Note about performance: we are assuming that we would not have many connections +// that are registered here. This is not pub sub where it must be high performance +// and may support many to many with tens or more of connections. It is assumed that +// since monitoring is for debugging only, we would have less than 1 in most cases. +// Also note that we holding this list on the thread level since this is the context +// at which this would run. It also minimized the number of copied for this list. +class MonitorsRepo { + public: + struct MonitorInfo { + ConnectionContext* connection = nullptr; + util::fibers_ext::BlockingCounter token; + std::uint32_t thread_id = 0; + + explicit MonitorInfo(ConnectionContext* conn); + + void Send(std::string_view msg, std::uint32_t tid, util::fibers_ext::BlockingCounter borrows); + }; + + void Add(const MonitorInfo& info); + + void Send(std::string_view msg, util::fibers_ext::BlockingCounter borrows, std::uint32_t tid); + + void Release(std::uint32_t tid); + + void Remove(const ConnectionContext* conn); + + bool Empty() const { + return monitors_.empty(); + } + + std::size_t Size() const { + return monitors_.size(); + } + + private: + using MonitorVec = std::vector; + MonitorVec monitors_; +}; + // Present in every server thread. This class differs from EngineShard. The latter manages // state around engine shards while the former represents coordinator/connection state. // There may be threads that handle engine shards but not IO, there may be threads that handle IO @@ -94,6 +141,10 @@ class ServerState { // public struct - to allow initialization. journal_ = j; } + constexpr MonitorsRepo& Monitors() { + return monitors_; + } + private: int64_t live_transactions_ = 0; mi_heap_t* data_heap_; @@ -105,6 +156,8 @@ class ServerState { // public struct - to allow initialization. using Counter = util::SlidingCounter<7>; Counter qps_; + MonitorsRepo monitors_; + static thread_local ServerState state_; };