From c9f7cbe0e942e10de1a4a55e3e84a492e284b412 Mon Sep 17 00:00:00 2001 From: Boaz Sade Date: Mon, 24 Oct 2022 18:29:34 +0300 Subject: [PATCH] Monitor command (#427) feat(server): support monitor command - allowing user to debug commands from all connections by using a connection as monitors for the this (#344) Signed-off-by: Boaz Sade --- docs/api_status.md | 2 +- src/facade/conn_context.h | 15 ++-- src/facade/dragonfly_connection.cc | 58 ++++++++++++-- src/facade/dragonfly_connection.h | 8 ++ src/facade/facade.cc | 1 - src/server/CMakeLists.txt | 2 +- src/server/conn_context.cc | 28 +++++++ src/server/conn_context.h | 20 ++++- src/server/main_service.cc | 123 ++++++++++++++++++++++++++++- src/server/main_service.h | 2 +- src/server/server_state.cc | 51 ++++++++++++ src/server/server_state.h | 60 ++++++++++++++ 12 files changed, 346 insertions(+), 24 deletions(-) create mode 100644 src/server/server_state.cc diff --git a/docs/api_status.md b/docs/api_status.md index b091e21e8ac2..ff67265fe1b2 100644 --- a/docs/api_status.md +++ b/docs/api_status.md @@ -104,7 +104,7 @@ with respect to Memcached and Redis APIs. - [X] ZSCORE - [ ] Other - [ ] BGREWRITEAOF - - [ ] MONITOR + - [x] MONITOR - [ ] RANDOMKEY ### API 2 diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 968059aee2ea..f9abf57cd954 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -19,7 +19,8 @@ class ConnectionContext { // We won't have any virtual methods, probably. However, since we allocate a derived class, // we need to declare a virtual d-tor, so we could properly delete it from Connection code. - virtual ~ConnectionContext() {} + virtual ~ConnectionContext() { + } Connection* owner() { return owner_; @@ -44,12 +45,12 @@ class ConnectionContext { } // connection state / properties. - bool async_dispatch: 1; // whether this connection is currently handled by dispatch fiber. - bool conn_closing: 1; - bool req_auth: 1; - bool replica_conn: 1; - bool authenticated: 1; - bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber. + bool async_dispatch : 1; // whether this connection is currently handled by dispatch fiber. + bool conn_closing : 1; + bool req_auth : 1; + bool replica_conn : 1; + bool authenticated : 1; + bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber. private: Connection* owner_; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 993681629dbf..a9503bf56fb2 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -109,6 +109,8 @@ struct Connection::RequestDeleter { // Please note: The call to the Dtor is mandatory for this!! // This class contain types that don't have trivial destructed objects struct Connection::Request { + using MonitorMessage = std::string; + struct PipelineMsg { absl::FixedArray args; @@ -122,7 +124,7 @@ struct Connection::Request { }; private: - using MessagePayload = std::variant; + using MessagePayload = std::variant; Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) { } @@ -130,18 +132,30 @@ struct Connection::Request { Request(PubMsgRecord msg) : payload(std::move(msg)) { } + Request(MonitorMessage msg) : payload(std::move(msg)) { + } + Request(const Request&) = delete; public: // Overload to create the a new pipeline message static RequestPtr New(mi_heap_t* heap, RespVec args, size_t capacity); - // overload to create a new pubsub message + // Overload to create a new pubsub message static RequestPtr New(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc); + // Overload to create a new the monitor message + static RequestPtr New(MonitorMessage msg); + MessagePayload payload; }; +Connection::RequestPtr Connection::Request::New(std::string msg) { + void* ptr = mi_malloc(sizeof(Request)); + Request* req = new (ptr) Request(std::move(msg)); + return Connection::RequestPtr{req, Connection::RequestDeleter{}}; +} + Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, size_t capacity) { constexpr auto kReqSz = sizeof(Request); void* ptr = mi_heap_malloc_small(heap, kReqSz); @@ -159,6 +173,7 @@ Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, s pipeline_msg.args[i] = MutableSlice(next, s); next += s; } + return Connection::RequestPtr{req, Connection::RequestDeleter{}}; } @@ -491,8 +506,6 @@ auto Connection::ParseRedis() -> ParserStatus { service_->DispatchCommand(cmd_list, cc_.get()); last_interaction_ = time(nullptr); } else { - VLOG(2) << "Dispatch async"; - // Dispatch via queue to speedup input reading. RequestPtr req = FromArgs(std::move(parse_args_), tlh); @@ -655,19 +668,23 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantservice_->GetThreadLocalConnectionStats()}, builder{b}, - empty{me->dispatch_q_.empty()}, self(me) { + : stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) { } void operator()(PubMsgRecord& msg); void operator()(Request::PipelineMsg& msg); + void operator()(const Request::MonitorMessage& msg); ConnectionStats* stats = nullptr; SinkReplyBuilder* builder = nullptr; - bool empty = false; Connection* self = nullptr; }; +void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) { + RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; + rbuilder->SendSimpleString(msg); +} + void Connection::DispatchOperations::operator()(PubMsgRecord& msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; ++stats->async_writes_cnt; @@ -690,7 +707,7 @@ void Connection::DispatchOperations::operator()(PubMsgRecord& msg) { void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { ++stats->pipelined_cmd_cnt; - + bool empty = self->dispatch_q_.empty(); builder->SetBatchMode(!empty); self->cc_->async_dispatch = true; self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); @@ -703,6 +720,9 @@ struct Connection::DispatchCleanup { msg.bc.Dec(); } + void operator()(const Connection::Request::MonitorMessage&) const { + } + void operator()(const Connection::Request::PipelineMsg&) const { } }; @@ -769,4 +789,26 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) { } } +void Connection::SendMonitorMsg(std::string monitor_msg) { + DCHECK(cc_); + + if (!cc_->conn_closing) { + RequestPtr req = Request::New(std::move(monitor_msg)); + dispatch_q_.push_back(std::move(req)); + if (dispatch_q_.size() == 1) { + evc_.notify(); + } + } +} + +std::string Connection::RemoteEndpointStr() const { + LinuxSocketBase* lsb = static_cast(socket_.get()); + bool unix_socket = lsb->IsUDS(); + std::string connection_str = unix_socket ? "unix:" : std::string{}; + + auto re = lsb->RemoteEndpoint(); + absl::StrAppend(&connection_str, re.address().to_string(), ":", re.port()); + return connection_str; +} + } // namespace facade diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 95b8ce9d882a..1de48d183479 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -59,8 +59,15 @@ class Connection : public util::Connection { std::string_view message; }; + // this function is overriden at test_utils TestConnection virtual void SendMsgVecAsync(const PubMessage& pub_msg, util::fibers_ext::BlockingCounter bc); + // Please note, this accept the message by value, since we really want to + // create a new copy here, so that we would not need to "worry" about memory + // management, we are assuming that we would not have many copy for this, and that + // we would not need in this way to sync on the lifetime of the message + void SendMonitorMsg(std::string monitor_msg); + void SetName(std::string_view name) { CopyCharBuf(name, sizeof(name_), name_); } @@ -70,6 +77,7 @@ class Connection : public util::Connection { } std::string GetClientInfo() const; + std::string RemoteEndpointStr() const; uint32 GetClientId() const; void ShutdownSelf(); diff --git a/src/facade/facade.cc b/src/facade/facade.cc index b8f2cd7d61dd..06e31bdb6165 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -63,7 +63,6 @@ string UnknownSubCmd(string_view subcmd, string_view cmd) { cmd, " HELP."); } - const char kSyntaxErr[] = "syntax error"; const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value"; const char kKeyNotFoundErr[] = "no such key"; diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 5a9cb03cf794..bbeedda33b44 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -13,7 +13,7 @@ add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) add_library(dragonfly_lib channel_slice.cc command_registry.cc - config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc + config_flags.cc conn_context.cc debugcmd.cc server_state.cc dflycmd.cc generic_family.cc hset_family.cc json_family.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index be80718d1d36..095222845c6a 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -6,12 +6,40 @@ #include "base/logging.h" #include "server/engine_shard_set.h" +#include "server/server_family.h" +#include "server/server_state.h" +#include "src/facade/dragonfly_connection.h" #include "util/proactor_base.h" namespace dfly { using namespace std; +void ConnectionContext::SendMonitorMsg(std::string msg) { + CHECK(owner()); + + owner()->SendMonitorMsg(std::move(msg)); +} + +void ConnectionContext::ChangeMonitor(bool start) { + // This will either remove or register a new connection + // at the "top level" thread --> ServerState context + // note that we are registering/removing this connection to the thread at which at run + // then notify all other threads that there is a change in the number of monitors + auto& my_monitors = ServerState::tlocal()->Monitors(); + if (start) { + my_monitors.Add(this); + } else { + VLOG(1) << "connection " << owner()->GetClientInfo() + << " no longer needs to be monitored - removing 0x" << std::hex << (const void*)this; + my_monitors.Remove(this); + } + // Tell other threads that about the change in the number of connection that we monitor + shard_set->pool()->Await( + [start](auto*) { ServerState::tlocal()->Monitors().NotifyChangeCount(start); }); + EnableMonitoring(start); +} + void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgList args) { vector result(to_reply ? args.size() : 0, 0); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 4cc1979eef18..1a0e25e46195 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -32,7 +32,9 @@ struct ConnectionState { ExecInfo(ExecInfo&&) = delete; // Return true if ExecInfo is active (after MULTI) - bool IsActive() { return state != EXEC_INACTIVE; } + bool IsActive() { + return state != EXEC_INACTIVE; + } // Resets to blank state after EXEC or DISCARD void Clear(); @@ -117,17 +119,29 @@ class ConnectionContext : public facade::ConnectionContext { return conn_state.db_index; } + // Note that this is accepted by value for lifetime reasons + // we want to have our own copy since we are assuming that + // 1. there will be not to many connections that we in monitor state + // 2. we need to have for each of them each own copy for thread safe reasons + void SendMonitorMsg(std::string msg); + void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args); void ChangePSub(bool to_add, bool to_reply, CmdArgList args); void UnsubscribeAll(bool to_reply); void PUnsubscribeAll(bool to_reply); + void ChangeMonitor(bool start); // either start or stop monitor on a given connection bool is_replicating = false; + bool monitor = false; // when a monitor command is sent over a given connection, we need to aware + // of it as a state for the connection private: + void EnableMonitoring(bool enable) { + force_dispatch = enable; // required to support the monitoring + monitor = enable; + } void SendSubscriptionChangedResponse(std::string_view action, - std::optional topic, - unsigned count); + std::optional topic, unsigned count); }; } // namespace dfly diff --git a/src/server/main_service.cc b/src/server/main_service.cc index ae34593dfb1f..3440c0d95a01 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -68,6 +68,105 @@ std::optional engine_varz; constexpr size_t kMaxThreadSize = 1024; +void DeactivateMonitoring(ConnectionContext* server_ctx) { + if (server_ctx->monitor) { + // remove monitor on this connection + server_ctx->ChangeMonitor(false /*start*/); + } +} + +// The format of the message that are sending is +// +"time of day" [db-number std::string { + // This code is based on Redis impl for it at sdscatrepr@sds.c + std::string result = absl::StrCat("\""); + + for (auto c : str) { + switch (c) { + case '\\': + absl::StrAppend(&result, "\\\\"); + break; + case '"': + absl::StrAppend(&result, "\\\""); + break; + case '\n': + absl::StrAppend(&result, "\\n"); + break; + case '\r': + absl::StrAppend(&result, "\\r"); + break; + case '\t': + absl::StrAppend(&result, "\\t"); + break; + case '\a': + absl::StrAppend(&result, "\\a"); + break; + case '\b': + absl::StrAppend(&result, "\\b"); + break; + default: + if (isprint(c)) { + result += c; + } else { + absl::StrAppend(&result, "\\x", absl::Hex((unsigned char)c, absl::kZeroPad2)); + } + break; + } + } + absl::StrAppend(&result, "\""); + return result; +} + +std::string MakeMonitorMessage(const ConnectionState& conn_state, + const facade::Connection* connection, CmdArgList args) { + std::string message = CreateMonitorTimestamp(); + + if (conn_state.script_info.has_value()) { + absl::StrAppend(&message, "lua] "); + } else { + absl::StrAppend(&message, connection->RemoteEndpointStr()); + } + if (args.empty()) { + absl::StrAppend(&message, "error - empty cmd list!"); + } else if (auto cmd_name = std::string_view(args[0].data(), args[0].size()); + cmd_name == "AUTH") { // we cannot just send auth details in this case + absl::StrAppend(&message, "\"", cmd_name, "\""); + } else { + message = std::accumulate(args.begin(), args.end(), message, [](auto str, const auto& cmd) { + absl::StrAppend(&str, " ", CmdEntryToMonitorFormat(std::string_view(cmd.data(), cmd.size()))); + return str; + }); + } + return message; +} + +void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* connection, CmdArgList args) { + // We are not sending any admin command in the monitor, and we do not want to + // do any processing if we don't have any waiting connections with monitor + // enabled on them - see https://redis.io/commands/monitor/ + const auto& my_monitors = ServerState::tlocal()->Monitors(); + if (!(my_monitors.Empty() || admin_cmd)) { + // We have connections waiting to get the info on the last command, send it to them + auto monitor_msg = MakeMonitorMessage(connection->conn_state, connection->owner(), args); + // Note that this is accepted by value for lifetime reasons + // we want to have our own copy since we are assuming that + // 1. there will be not to many connections that we in monitor state + // 2. we need to have for each of them each own copy for thread safe reasons + VLOG(1) << "sending command '" << monitor_msg << "' to the clients that registered on it"; + shard_set->pool()->DispatchBrief( + [msg = std::move(monitor_msg)](unsigned idx, util::ProactorBase*) { + ServerState::tlocal()->Monitors().Send(msg); + }); + } +} + class InterpreterReplier : public RedisReplyBuilder { public: InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) { @@ -376,7 +475,7 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i void Service::Shutdown() { VLOG(1) << "Service::Shutdown"; - // We mark that we are shuttind down. After this incoming requests will be + // We mark that we are shutting down. After this incoming requests will be // rejected pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); }); @@ -442,6 +541,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) } } + // only reset and quit are allow if this connection is used for monitoring + if (dfly_cntx->monitor && (cmd_name != "RESET" && cmd_name != "QUIT")) { + return (*cntx)->SendError("Replica can't interact with the keyspace"); + } + bool under_script = dfly_cntx->conn_state.script_info.has_value(); if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) { @@ -555,6 +659,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) dfly_cntx->reply_builder()->CloseConnection(); } + DispatchMonitorIfNeeded(cid->opt_mask() & CO::ADMIN, dfly_cntx, args); + end_usec = ProactorBase::GetMonotonicTimeNs(); request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000); @@ -726,6 +832,8 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) { SinkReplyBuilder* builder = cntx->reply_builder(); builder->CloseConnection(); + + DeactivateMonitoring(static_cast(cntx)); cntx->owner()->ShutdownSelf(); } @@ -973,7 +1081,8 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis // The comparison can still be true even if a key expired due to another one being created. // So we have to check the watched_dirty flag, which is set if a key expired. - return watch_exist_count.load() == exec_info.watched_existed && !exec_info.watched_dirty.load(memory_order_relaxed); + return watch_exist_count.load() == exec_info.watched_existed && + !exec_info.watched_dirty.load(memory_order_relaxed); } // Check if exec_info watches keys on dbs other than db_indx. @@ -1180,6 +1289,13 @@ void Service::PubsubPatterns(ConnectionContext* cntx) { (*cntx)->SendLong(pattern_count); } +void Service::Monitor(CmdArgList args, ConnectionContext* cntx) { + VLOG(1) << "starting monitor on this connection: " << cntx->owner()->GetClientInfo(); + // we are registering the current connection for all threads so they will be aware of + // this connection, to send to it any command + cntx->ChangeMonitor(true /* start */); +} + void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) { if (args.size() < 2) { (*cntx)->SendError(WrongNumArgsError(ArgS(args, 0))); @@ -1273,6 +1389,8 @@ void Service::OnClose(facade::ConnectionContext* cntx) { } } + DeactivateMonitoring(server_cntx); + server_family_.OnClose(server_cntx); } @@ -1316,6 +1434,7 @@ void Service::RegisterCommands() { << CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe) << CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe) << CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function) + << CI{"MONITOR", CO::ADMIN, 1, 0, 0, 0}.MFUNC(Monitor) << CI{"PUBSUB", CO::LOADING | CO::FAST, -1, 0, 0, 0}.MFUNC(Pubsub); StreamFamily::Register(®istry_); diff --git a/src/server/main_service.h b/src/server/main_service.h index 07a07e8d9f6f..d22582c41fe4 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -102,7 +102,7 @@ class Service : public facade::ServiceInterface { void PSubscribe(CmdArgList args, ConnectionContext* cntx); void PUnsubscribe(CmdArgList args, ConnectionContext* cntx); void Function(CmdArgList args, ConnectionContext* cntx); - + void Monitor(CmdArgList args, ConnectionContext* cntx); void Pubsub(CmdArgList args, ConnectionContext* cntx); void PubsubChannels(std::string_view pattern, ConnectionContext* cntx); void PubsubPatterns(ConnectionContext* cntx); diff --git a/src/server/server_state.cc b/src/server/server_state.cc new file mode 100644 index 000000000000..49beda7fb755 --- /dev/null +++ b/src/server/server_state.cc @@ -0,0 +1,51 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/server_state.h" + +#include "base/logging.h" +#include "server/conn_context.h" + +namespace dfly { + +void MonitorsRepo::Add(ConnectionContext* connection) { + VLOG(1) << "register connection " + << " at address 0x" << std::hex << (const void*)connection << " for thread " + << util::ProactorBase::GetIndex(); + + monitors_.push_back(connection); +} + +void MonitorsRepo::Send(const std::string& msg) { + if (!monitors_.empty()) { + VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg + << "' for " << monitors_.size(); + for (auto monitor_conn : monitors_) { + monitor_conn->SendMonitorMsg(msg); + } + } +} + +void MonitorsRepo::Remove(const ConnectionContext* conn) { + auto it = std::find_if(monitors_.begin(), monitors_.end(), + [&conn](const auto& val) { return val == conn; }); + if (it != monitors_.end()) { + VLOG(1) << "removing connection 0x" << std::hex << (const void*)conn << " releasing token"; + monitors_.erase(it); + } else { + VLOG(1) << "no connection 0x" << std::hex << (const void*)conn + << " found in the registered list here"; + } +} + +void MonitorsRepo::NotifyChangeCount(bool added) { + if (added) { + ++global_count_; + } else { + DCHECK(global_count_ > 0); + --global_count_; + } +} + +} // end of namespace dfly diff --git a/src/server/server_state.h b/src/server/server_state.h index fb58a869de09..385fac9e7af9 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -15,10 +15,64 @@ typedef struct mi_heap_s mi_heap_t; namespace dfly { +class ConnectionContext; namespace journal { class Journal; } // namespace journal +// This would be used as a thread local storage of sending +// monitor messages. +// Each thread will have its own list of all the connections that are +// used for monitoring. When a connection is set to monitor it would register +// itself to this list on all i/o threads. When a new command is dispatched, +// and this list is not empty, it would send in the same thread context as then +// thread that registered here the command. +// Note about performance: we are assuming that we would not have many connections +// that are registered here. This is not pub sub where it must be high performance +// and may support many to many with tens or more of connections. It is assumed that +// since monitoring is for debugging only, we would have less than 1 in most cases. +// Also note that we holding this list on the thread level since this is the context +// at which this would run. It also minimized the number of copied for this list. +class MonitorsRepo { + public: + // This function adds a new connection to be monitored. This function only add + // new connection that belong to this thread! Must not be called outside of this + // thread context + void Add(ConnectionContext* info); + + // Note that this is accepted by value for lifetime reasons + // we want to have our own copy since we are assuming that + // 1. there will be not to many connections that we in monitor state + // 2. we need to have for each of them each own copy for thread safe reasons + void Send(const std::string& msg); + + // This function remove a connection what was monitored. This function only removes + // a connection that belong to this thread! Must not be called outside of this + // thread context + void Remove(const ConnectionContext* conn); + + // We have for each thread the total number of monitors in the application. + // So this call is thread safe since we hold a copy of this for each thread. + // If this return true, then we don't need to run the monitor operation at all. + bool Empty() const { + return global_count_ == 0u; + } + + // This function is run on all threads to either increment or decrement the "shared" counter + // of the monitors - it must be called as part of removing a monitor (for example + // when a connection is closed). + void NotifyChangeCount(bool added); + + std::size_t Size() const { + return monitors_.size(); + } + + private: + using MonitorVec = std::vector; + MonitorVec monitors_; // save connections belonging to this thread only! + unsigned int global_count_ = 0; // by global its means that we count the monitor for all threads +}; + // Present in every server thread. This class differs from EngineShard. The latter manages // state around engine shards while the former represents coordinator/connection state. // There may be threads that handle engine shards but not IO, there may be threads that handle IO @@ -94,6 +148,10 @@ class ServerState { // public struct - to allow initialization. journal_ = j; } + constexpr MonitorsRepo& Monitors() { + return monitors_; + } + private: int64_t live_transactions_ = 0; mi_heap_t* data_heap_; @@ -105,6 +163,8 @@ class ServerState { // public struct - to allow initialization. using Counter = util::SlidingCounter<7>; Counter qps_; + MonitorsRepo monitors_; + static thread_local ServerState state_; };