Skip to content

Commit

Permalink
feat(server): monitor command, add command filters
Browse files Browse the repository at this point in the history
feat(server): support for monitor command. add filters and support for auth not showing sensitive data (dragonflydb#344)

Signed-off-by: Boaz Sade <[email protected]>
  • Loading branch information
boazsade committed Oct 20, 2022
1 parent a09f0ca commit 6b75e5d
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 49 deletions.
2 changes: 1 addition & 1 deletion docs/api_status.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ with respect to Memcached and Redis APIs.
- [X] ZSCORE
- [ ] Other
- [ ] BGREWRITEAOF
- [ ] MONITOR
- [x] MONITOR
- [ ] RANDOMKEY

### API 2
Expand Down
6 changes: 3 additions & 3 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class ConnectionContext {
bool req_auth : 1;
bool replica_conn : 1;
bool authenticated : 1;
bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber.
bool monitor : 1 = false; // when the connection is monitor do not support most commands other
// than quit and reset
bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber.
bool monitor : 1; // when the connection is monitor do not support most commands other
// than quit and reset
protected:
void EnableMonitoring(bool enable) {
force_dispatch = enable; // required to support the monitoring
Expand Down
52 changes: 26 additions & 26 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
#include <mimalloc.h>

#include <boost/fiber/operations.hpp>
#include <iomanip>

#include "base/flags.h"
#include "base/logging.h"
#include "facade/conn_context.h"
#include "facade/memcache_parser.h"
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
#include "server/common.h"
#include "server/conn_context.h"
#include "util/fiber_sched_algo.h"

Expand All @@ -37,12 +37,6 @@ namespace fibers = boost::fibers;
namespace facade {
namespace {

// visitor helper - for the message dispatching -
// https://en.cppreference.com/w/cpp/utility/variant/visit
// template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
// // explicit deduction guide (not needed as of C++20)
// template <class... Ts> overloaded(Ts...) -> overloaded<Ts...>;

template <class> inline constexpr bool always_false_v = false;

void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) {
Expand Down Expand Up @@ -101,9 +95,7 @@ std::string MonitorTimestamp() {
timeval tv;

gettimeofday(&tv, nullptr);
std::ostringstream output;
output << tv.tv_sec << "." << std::setw(6) << std::setfill('0') << tv.tv_usec;
return output.str();
return absl::StrCat(tv.tv_sec, ".", tv.tv_usec, absl::kZeroPad6);
}

} // namespace
Expand All @@ -124,9 +116,7 @@ struct Connection::Shutdown {

// Used as custom deleter for Request object
struct Connection::RequestDeleter {
void operator()(Request* req) const {
mi_free(req);
}
void operator()(Request* req) const;
};

struct Connection::Request {
Expand Down Expand Up @@ -154,7 +144,7 @@ struct Connection::Request {
private:
using MessagePayload = std::variant<PipelineMsg, AsyncMsg, MonitorMessage>;

Request(PipelineMsg msg) : payload(std::move(msg)) {
Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) {
}

Request(AsyncMsg msg) : payload(std::move(msg)) {
Expand Down Expand Up @@ -186,7 +176,11 @@ Connection::RequestPtr Connection::Request::New(std::string_view data,

Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, size_t capacity) {
constexpr auto kReqSz = sizeof(Request);
PipelineMsg req{args.size(), capacity};
void* ptr = mi_heap_malloc_small(heap, kReqSz);
Request* msg = new (ptr) Request(args.size(), capacity);
// at this point we know that we have PipelineMsg in Request so next op is safe
// We must construct in place here, since there is a slice that uses memory locations
Request::PipelineMsg& req = std::get<Request::PipelineMsg>(msg->payload);
auto* next = req.storage.data();
for (size_t i = 0; i < args.size(); ++i) {
auto buf = args[i].GetBuf();
Expand All @@ -195,8 +189,6 @@ Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, s
req.args[i] = MutableSlice(next, s);
next += s;
}
void* ptr = mi_heap_malloc_small(heap, kReqSz);
Request* msg = new (ptr) Request(req);
return Connection::RequestPtr{msg, Connection::RequestDeleter{}};
}

Expand All @@ -208,6 +200,11 @@ Connection::RequestPtr Connection::Request::New(const PubMessage& pub_msg,
return Connection::RequestPtr{msg, Connection::RequestDeleter{}};
}

void Connection::RequestDeleter::operator()(Request* req) const {
req->~Request();
mi_free(req);
}

Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) {
Expand Down Expand Up @@ -703,17 +700,17 @@ struct Connection::DispatchOperations {
empty{me->dispatch_q_.empty()}, self(me) {
}

void operator()(AsyncMsg msg);
void operator()(Request::MonitorMessage msg);
void operator()(Request::PipelineMsg msg);
void operator()(AsyncMsg& msg);
void operator()(Request::MonitorMessage& msg);
void operator()(Request::PipelineMsg& msg);

ConnectionStats* stats = nullptr;
SinkReplyBuilder* builder = nullptr;
bool empty = false;
Connection* self = nullptr;
};

void Connection::DispatchOperations::operator()(AsyncMsg msg) {
void Connection::DispatchOperations::operator()(AsyncMsg& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
++stats->async_writes_cnt;
const PubMessage& pub_msg = msg.pub_msg;
Expand All @@ -733,13 +730,13 @@ void Connection::DispatchOperations::operator()(AsyncMsg msg) {
msg.bc.Dec();
}

void Connection::DispatchOperations::operator()(Request::MonitorMessage msg) {
void Connection::DispatchOperations::operator()(Request::MonitorMessage& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
rbuilder->SendBulkString(msg.msg);
msg.bc.Dec();
}

void Connection::DispatchOperations::operator()(Request::PipelineMsg msg) {
void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
++stats->pipelined_cmd_cnt;

builder->SetBatchMode(!empty);
Expand All @@ -750,13 +747,13 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg msg) {
}

struct Connection::DispatchCleanup {
void operator()(AsyncMsg msg) const {
void operator()(AsyncMsg& msg) const {
msg.bc.Dec();
}
void operator()(Connection::Request::MonitorMessage msg) const {
void operator()(Connection::Request::MonitorMessage& msg) const {
msg.bc.Dec();
}
void operator()(Connection::Request::PipelineMsg) const {
void operator()(const Connection::Request::PipelineMsg&) const {
}
};

Expand Down Expand Up @@ -843,6 +840,9 @@ std::string Connection::CreateMonitorMessage(const dfly::ConnectionState& conn_s
auto AppendCmd = [&](const CmdArgVec& args) {
if (args.empty()) {
absl::StrAppend(&command_details, "error - empty cmd list!");
} else if (auto cmd_name = std::string_view(args[0].data(), args[0].size());
cmd_name == "AUTH") { // we cannot just send auth details in this case
absl::StrAppend(&command_details, "\"", cmd_name, "\"");
} else {
command_details =
std::accumulate(args.begin(), args.end(), command_details, [](auto str, const auto& cmd) {
Expand Down
16 changes: 0 additions & 16 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,6 @@ void ConnectionContext::SendMonitorMsg(std::string_view msg,
owner()->SendMonitorMsg(msg, borrows);
}

void ConnectionContext::MonitorCmd() {
if (!ServerState::tlocal()->Monitors().Empty()) {
util::fibers_ext::BlockingCounter bc(ServerState::tlocal()->Monitors().Size());
// We have connections waiting to get the info on the last command, send it to them
auto msg = owner()->CreateMonitorMessage(conn_state);
shard_set->pool()->Await([&msg, bc](unsigned idx, util::ProactorBase*) {
ServerState::tlocal()->Monitors().Send(msg, bc, idx);
});
bc.Wait(); // Wait for all the messages to be sent.
// we need to un-borrow what we used, do this here
shard_set->pool()->Await([&msg](unsigned idx, util::ProactorBase*) {
ServerState::tlocal()->Monitors().Release(idx);
});
}
}

void ConnectionContext::ChangeMonitor(bool start) {
// This will either remove or register a new connection
// at the "top level" thread --> ServerState context
Expand Down
2 changes: 0 additions & 2 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ class ConnectionContext : public facade::ConnectionContext {
return conn_state.db_index;
}

void MonitorCmd();

void SendMonitorMsg(std::string_view msg, util::fibers_ext::BlockingCounter borrows);

void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args);
Expand Down
26 changes: 25 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,25 @@ std::optional<VarzFunction> engine_varz;

constexpr size_t kMaxThreadSize = 1024;

void MonitorCmd(bool admin_cmd, ConnectionContext* connection) {
// We are not sending any admin command in the monitor, and we do not want to
// do any processing if we don't have any waiting connections with monitor
// enabled on them - see https://redis.io/commands/monitor/
const auto& my_monitors = ServerState::tlocal()->Monitors();
if (!(my_monitors.Empty() || admin_cmd)) {
util::fibers_ext::BlockingCounter bc(my_monitors.Size());
// We have connections waiting to get the info on the last command, send it to them
auto monitor_msg = connection->owner()->CreateMonitorMessage(connection->conn_state);
shard_set->pool()->Await([&monitor_msg, bc](unsigned idx, util::ProactorBase*) {
ServerState::tlocal()->Monitors().Send(monitor_msg, bc, idx);
});
bc.Wait(); // Wait for all the messages to be sent.
// we need to un-borrow what we used, do this here
shard_set->pool()->Await(
[](unsigned idx, util::ProactorBase*) { ServerState::tlocal()->Monitors().Release(idx); });
}
}

class InterpreterReplier : public RedisReplyBuilder {
public:
InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) {
Expand Down Expand Up @@ -442,13 +461,18 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
}
}

// only reset and quit are allow if this connection is used for monitoring
if (cntx->monitor && (cmd_name != "RESET" && cmd_name != "QUIT")) {
return (*cntx)->SendError("Replica can't interact with the keyspace");
}

bool under_script = dfly_cntx->conn_state.script_info.has_value();

if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) {
return (*cntx)->SendError("This Redis command is not allowed from script");
}

dfly_cntx->MonitorCmd();
MonitorCmd(cid->opt_mask() & CO::ADMIN, dfly_cntx);

bool is_write_cmd = (cid->opt_mask() & CO::WRITE) ||
(under_script && dfly_cntx->conn_state.script_info->is_write);
Expand Down

0 comments on commit 6b75e5d

Please sign in to comment.