diff --git a/docs/api_status.md b/docs/api_status.md index 1a95a73124af..5b0afcaba642 100644 --- a/docs/api_status.md +++ b/docs/api_status.md @@ -104,7 +104,7 @@ with respect to Memcached and Redis APIs. - [X] ZSCORE - [ ] Other - [ ] BGREWRITEAOF - - [ ] MONITOR + - [x] MONITOR - [ ] RANDOMKEY ### API 2 diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index c2bbf95bc745..3f0c9ea5ea3c 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -50,9 +50,9 @@ class ConnectionContext { bool req_auth : 1; bool replica_conn : 1; bool authenticated : 1; - bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber. - bool monitor : 1 = false; // when the connection is monitor do not support most commands other - // than quit and reset + bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber. + bool monitor : 1; // when the connection is monitor do not support most commands other + // than quit and reset protected: void EnableMonitoring(bool enable) { force_dispatch = enable; // required to support the monitoring diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 631f13f7d294..39ac90d576ff 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -9,7 +9,6 @@ #include #include -#include #include "base/flags.h" #include "base/logging.h" @@ -17,6 +16,7 @@ #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" +#include "server/common.h" #include "server/conn_context.h" #include "util/fiber_sched_algo.h" @@ -37,12 +37,6 @@ namespace fibers = boost::fibers; namespace facade { namespace { -// visitor helper - for the message dispatching - -// https://en.cppreference.com/w/cpp/utility/variant/visit -// template struct overloaded : Ts... { using Ts::operator()...; }; -// // explicit deduction guide (not needed as of C++20) -// template overloaded(Ts...) -> overloaded; - template inline constexpr bool always_false_v = false; void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { @@ -101,9 +95,7 @@ std::string MonitorTimestamp() { timeval tv; gettimeofday(&tv, nullptr); - std::ostringstream output; - output << tv.tv_sec << "." << std::setw(6) << std::setfill('0') << tv.tv_usec; - return output.str(); + return absl::StrCat(tv.tv_sec, ".", tv.tv_usec, absl::kZeroPad6); } } // namespace @@ -124,9 +116,7 @@ struct Connection::Shutdown { // Used as custom deleter for Request object struct Connection::RequestDeleter { - void operator()(Request* req) const { - mi_free(req); - } + void operator()(Request* req) const; }; struct Connection::Request { @@ -154,7 +144,7 @@ struct Connection::Request { private: using MessagePayload = std::variant; - Request(PipelineMsg msg) : payload(std::move(msg)) { + Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) { } Request(AsyncMsg msg) : payload(std::move(msg)) { @@ -186,7 +176,11 @@ Connection::RequestPtr Connection::Request::New(std::string_view data, Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, size_t capacity) { constexpr auto kReqSz = sizeof(Request); - PipelineMsg req{args.size(), capacity}; + void* ptr = mi_heap_malloc_small(heap, kReqSz); + Request* msg = new (ptr) Request(args.size(), capacity); + // at this point we know that we have PipelineMsg in Request so next op is safe + // We must construct in place here, since there is a slice that uses memory locations + Request::PipelineMsg& req = std::get(msg->payload); auto* next = req.storage.data(); for (size_t i = 0; i < args.size(); ++i) { auto buf = args[i].GetBuf(); @@ -195,8 +189,6 @@ Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, s req.args[i] = MutableSlice(next, s); next += s; } - void* ptr = mi_heap_malloc_small(heap, kReqSz); - Request* msg = new (ptr) Request(req); return Connection::RequestPtr{msg, Connection::RequestDeleter{}}; } @@ -208,6 +200,11 @@ Connection::RequestPtr Connection::Request::New(const PubMessage& pub_msg, return Connection::RequestPtr{msg, Connection::RequestDeleter{}}; } +void Connection::RequestDeleter::operator()(Request* req) const { + req->~Request(); + mi_free(req); +} + Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service) : io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) { @@ -703,9 +700,9 @@ struct Connection::DispatchOperations { empty{me->dispatch_q_.empty()}, self(me) { } - void operator()(AsyncMsg msg); - void operator()(Request::MonitorMessage msg); - void operator()(Request::PipelineMsg msg); + void operator()(AsyncMsg& msg); + void operator()(Request::MonitorMessage& msg); + void operator()(Request::PipelineMsg& msg); ConnectionStats* stats = nullptr; SinkReplyBuilder* builder = nullptr; @@ -713,7 +710,7 @@ struct Connection::DispatchOperations { Connection* self = nullptr; }; -void Connection::DispatchOperations::operator()(AsyncMsg msg) { +void Connection::DispatchOperations::operator()(AsyncMsg& msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; ++stats->async_writes_cnt; const PubMessage& pub_msg = msg.pub_msg; @@ -733,13 +730,13 @@ void Connection::DispatchOperations::operator()(AsyncMsg msg) { msg.bc.Dec(); } -void Connection::DispatchOperations::operator()(Request::MonitorMessage msg) { +void Connection::DispatchOperations::operator()(Request::MonitorMessage& msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; rbuilder->SendBulkString(msg.msg); msg.bc.Dec(); } -void Connection::DispatchOperations::operator()(Request::PipelineMsg msg) { +void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { ++stats->pipelined_cmd_cnt; builder->SetBatchMode(!empty); @@ -750,13 +747,13 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg msg) { } struct Connection::DispatchCleanup { - void operator()(AsyncMsg msg) const { + void operator()(AsyncMsg& msg) const { msg.bc.Dec(); } - void operator()(Connection::Request::MonitorMessage msg) const { + void operator()(Connection::Request::MonitorMessage& msg) const { msg.bc.Dec(); } - void operator()(Connection::Request::PipelineMsg) const { + void operator()(const Connection::Request::PipelineMsg&) const { } }; @@ -843,6 +840,9 @@ std::string Connection::CreateMonitorMessage(const dfly::ConnectionState& conn_s auto AppendCmd = [&](const CmdArgVec& args) { if (args.empty()) { absl::StrAppend(&command_details, "error - empty cmd list!"); + } else if (auto cmd_name = std::string_view(args[0].data(), args[0].size()); + cmd_name == "AUTH") { // we cannot just send auth details in this case + absl::StrAppend(&command_details, "\"", cmd_name, "\""); } else { command_details = std::accumulate(args.begin(), args.end(), command_details, [](auto str, const auto& cmd) { diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index c30b4bb57479..23ef1f2e0f51 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -21,22 +21,6 @@ void ConnectionContext::SendMonitorMsg(std::string_view msg, owner()->SendMonitorMsg(msg, borrows); } -void ConnectionContext::MonitorCmd() { - if (!ServerState::tlocal()->Monitors().Empty()) { - util::fibers_ext::BlockingCounter bc(ServerState::tlocal()->Monitors().Size()); - // We have connections waiting to get the info on the last command, send it to them - auto msg = owner()->CreateMonitorMessage(conn_state); - shard_set->pool()->Await([&msg, bc](unsigned idx, util::ProactorBase*) { - ServerState::tlocal()->Monitors().Send(msg, bc, idx); - }); - bc.Wait(); // Wait for all the messages to be sent. - // we need to un-borrow what we used, do this here - shard_set->pool()->Await([&msg](unsigned idx, util::ProactorBase*) { - ServerState::tlocal()->Monitors().Release(idx); - }); - } -} - void ConnectionContext::ChangeMonitor(bool start) { // This will either remove or register a new connection // at the "top level" thread --> ServerState context diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 182b105a8cce..b0c5d8a4bd24 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -124,8 +124,6 @@ class ConnectionContext : public facade::ConnectionContext { return conn_state.db_index; } - void MonitorCmd(); - void SendMonitorMsg(std::string_view msg, util::fibers_ext::BlockingCounter borrows); void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 9385c521ae17..698695e01a6c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -68,6 +68,25 @@ std::optional engine_varz; constexpr size_t kMaxThreadSize = 1024; +void MonitorCmd(bool admin_cmd, ConnectionContext* connection) { + // We are not sending any admin command in the monitor, and we do not want to + // do any processing if we don't have any waiting connections with monitor + // enabled on them - see https://redis.io/commands/monitor/ + const auto& my_monitors = ServerState::tlocal()->Monitors(); + if (!(my_monitors.Empty() || admin_cmd)) { + util::fibers_ext::BlockingCounter bc(my_monitors.Size()); + // We have connections waiting to get the info on the last command, send it to them + auto monitor_msg = connection->owner()->CreateMonitorMessage(connection->conn_state); + shard_set->pool()->Await([&monitor_msg, bc](unsigned idx, util::ProactorBase*) { + ServerState::tlocal()->Monitors().Send(monitor_msg, bc, idx); + }); + bc.Wait(); // Wait for all the messages to be sent. + // we need to un-borrow what we used, do this here + shard_set->pool()->Await( + [](unsigned idx, util::ProactorBase*) { ServerState::tlocal()->Monitors().Release(idx); }); + } +} + class InterpreterReplier : public RedisReplyBuilder { public: InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) { @@ -442,13 +461,18 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) } } + // only reset and quit are allow if this connection is used for monitoring + if (cntx->monitor && (cmd_name != "RESET" && cmd_name != "QUIT")) { + return (*cntx)->SendError("Replica can't interact with the keyspace"); + } + bool under_script = dfly_cntx->conn_state.script_info.has_value(); if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) { return (*cntx)->SendError("This Redis command is not allowed from script"); } - dfly_cntx->MonitorCmd(); + MonitorCmd(cid->opt_mask() & CO::ADMIN, dfly_cntx); bool is_write_cmd = (cid->opt_mask() & CO::WRITE) || (under_script && dfly_cntx->conn_state.script_info->is_write);