Skip to content

Commit

Permalink
feat(server): support monitor command - user can set connection to
Browse files Browse the repository at this point in the history
monitor all commands running on all other connections (dragonflydb#344).

Signed-off-by: Boaz Sade <[email protected]>
  • Loading branch information
boazsade committed Oct 24, 2022
1 parent 835f0ff commit 9ee6432
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 35 deletions.
2 changes: 1 addition & 1 deletion docs/api_status.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ with respect to Memcached and Redis APIs.
- [X] ZSCORE
- [ ] Other
- [ ] BGREWRITEAOF
- [ ] MONITOR
- [x] MONITOR
- [ ] RANDOMKEY

### API 2
Expand Down
5 changes: 5 additions & 0 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class ConnectionContext {
bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber.
bool monitor : 1; // when a monitor command is sent over a given connection, we need to aware of
// it as a state for the connection
protected:
void EnableMonitoring(bool enable) {
force_dispatch = enable; // required to support the monitoring
monitor = enable;
}

private:
Connection* owner_;
Expand Down
58 changes: 50 additions & 8 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ struct Connection::RequestDeleter {
// Please note: The call to the Dtor is mandatory for this!!
// This class contain types that don't have trivial destructed objects
struct Connection::Request {
using MonitorMessage = std::string;

struct PipelineMsg {
absl::FixedArray<MutableSlice, 6> args;

Expand All @@ -122,26 +124,38 @@ struct Connection::Request {
};

private:
using MessagePayload = std::variant<PipelineMsg, PubMsgRecord>;
using MessagePayload = std::variant<PipelineMsg, PubMsgRecord, MonitorMessage>;

Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) {
}

Request(PubMsgRecord msg) : payload(std::move(msg)) {
}

Request(MonitorMessage msg) : payload(std::move(msg)) {
}

Request(const Request&) = delete;

public:
// Overload to create the a new pipeline message
static RequestPtr New(mi_heap_t* heap, RespVec args, size_t capacity);

// overload to create a new pubsub message
// Overload to create a new pubsub message
static RequestPtr New(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc);

// Overload to create a new the monitor message
static RequestPtr New(MonitorMessage msg);

MessagePayload payload;
};

Connection::RequestPtr Connection::Request::New(std::string msg) {
void* ptr = mi_malloc(sizeof(Request));
Request* req = new (ptr) Request(std::move(msg));
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
}

Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, size_t capacity) {
constexpr auto kReqSz = sizeof(Request);
void* ptr = mi_heap_malloc_small(heap, kReqSz);
Expand All @@ -159,6 +173,7 @@ Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, s
pipeline_msg.args[i] = MutableSlice(next, s);
next += s;
}

return Connection::RequestPtr{req, Connection::RequestDeleter{}};
}

Expand Down Expand Up @@ -491,8 +506,6 @@ auto Connection::ParseRedis() -> ParserStatus {
service_->DispatchCommand(cmd_list, cc_.get());
last_interaction_ = time(nullptr);
} else {
VLOG(2) << "Dispatch async";

// Dispatch via queue to speedup input reading.
RequestPtr req = FromArgs(std::move(parse_args_), tlh);

Expand Down Expand Up @@ -655,19 +668,23 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars

struct Connection::DispatchOperations {
DispatchOperations(SinkReplyBuilder* b, Connection* me)
: stats{me->service_->GetThreadLocalConnectionStats()}, builder{b},
empty{me->dispatch_q_.empty()}, self(me) {
: stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) {
}

void operator()(PubMsgRecord& msg);
void operator()(Request::PipelineMsg& msg);
void operator()(const Request::MonitorMessage& msg);

ConnectionStats* stats = nullptr;
SinkReplyBuilder* builder = nullptr;
bool empty = false;
Connection* self = nullptr;
};

void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
rbuilder->SendSimpleString(msg);
}

void Connection::DispatchOperations::operator()(PubMsgRecord& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
++stats->async_writes_cnt;
Expand All @@ -690,7 +707,7 @@ void Connection::DispatchOperations::operator()(PubMsgRecord& msg) {

void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
++stats->pipelined_cmd_cnt;

bool empty = self->dispatch_q_.empty();
builder->SetBatchMode(!empty);
self->cc_->async_dispatch = true;
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
Expand All @@ -703,6 +720,9 @@ struct Connection::DispatchCleanup {
msg.bc.Dec();
}

void operator()(const Connection::Request::MonitorMessage&) const {
}

void operator()(const Connection::Request::PipelineMsg&) const {
}
};
Expand Down Expand Up @@ -769,4 +789,26 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) {
}
}

void Connection::SendMonitorMsg(std::string monitor_msg) {
DCHECK(cc_);

if (!cc_->conn_closing) {
RequestPtr req = Request::New(std::move(monitor_msg));
dispatch_q_.push_back(std::move(req));
if (dispatch_q_.size() == 1) {
evc_.notify();
}
}
}

std::string Connection::RemoteEndpointStr() const {
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
bool unix_socket = lsb->IsUDS();
std::string connection_str = unix_socket ? "unix:" : std::string{};

auto re = lsb->RemoteEndpoint();
absl::StrAppend(&connection_str, re.address().to_string(), ":", re.port());
return connection_str;
}

} // namespace facade
6 changes: 6 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class Connection : public util::Connection {

virtual void SendMsgVecAsync(const PubMessage& pub_msg, util::fibers_ext::BlockingCounter bc);

// Please note, this accept the message by value, since we really want to
// create a new copy here, so that we would not need to "worry" about memory
// management, we are assuming that we would not have many copy for this, and that
// we would not need in this way to sync on the lifetime of the message
virtual void SendMonitorMsg(std::string monitor_msg);

void SetName(std::string_view name) {
CopyCharBuf(name, sizeof(name_), name_);
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)

add_library(dragonfly_lib channel_slice.cc command_registry.cc
config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc
config_flags.cc conn_context.cc debugcmd.cc server_state.cc dflycmd.cc
generic_family.cc hset_family.cc json_family.cc
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
Expand Down
5 changes: 2 additions & 3 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ namespace dfly {

using namespace std;

void ConnectionContext::SendMonitorMsg(std::string_view msg,
util::fibers_ext::BlockingCounter borrows) {
void ConnectionContext::SendMonitorMsg(std::string msg) {
CHECK(owner());

owner()->SendMonitorMsg(msg, borrows);
owner()->SendMonitorMsg(std::move(msg));
}

void ConnectionContext::ChangeMonitor(bool start) {
Expand Down
13 changes: 9 additions & 4 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ struct ConnectionState {
ExecInfo(ExecInfo&&) = delete;

// Return true if ExecInfo is active (after MULTI)
bool IsActive() { return state != EXEC_INACTIVE; }
bool IsActive() {
return state != EXEC_INACTIVE;
}

// Resets to blank state after EXEC or DISCARD
void Clear();
Expand Down Expand Up @@ -122,7 +124,11 @@ class ConnectionContext : public facade::ConnectionContext {
return conn_state.db_index;
}

void SendMonitorMsg(std::string_view msg, util::fibers_ext::BlockingCounter borrows);
// Note that this is accepted by value for lifetime reasons
// we want to have our own copy since we are assuming that
// 1. there will be not to many connections that we in monitor state
// 2. we need to have for each of them each own copy for thread safe reasons
void SendMonitorMsg(std::string msg);

void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args);
void ChangePSub(bool to_add, bool to_reply, CmdArgList args);
Expand All @@ -134,8 +140,7 @@ class ConnectionContext : public facade::ConnectionContext {

private:
void SendSubscriptionChangedResponse(std::string_view action,
std::optional<std::string_view> topic,
unsigned count);
std::optional<std::string_view> topic, unsigned count);
};

} // namespace dfly
25 changes: 15 additions & 10 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ std::string MonitorTimestamp() {
}

auto CmdEntryToMonitorFormat(std::string_view str) -> std::string {
// This code is based on Redis impl for it at [email protected]
std::string result = absl::StrCat("\"");

for (auto c : str) {
switch (c) {
case '\\':
absl::StrAppend(&result, "\\\\");
break;
case '"':
absl::StrAppend(&result, "\\");
result += c;
absl::StrAppend(&result, "\\\"");
break;
case '\n':
absl::StrAppend(&result, "\\n");
Expand All @@ -116,7 +118,7 @@ auto CmdEntryToMonitorFormat(std::string_view str) -> std::string {
if (isprint(c)) {
result += c;
} else {
absl::StrAppend(&result, "\\x", absl::Hex((unsigned char)c, absl::kSpacePad2));
absl::StrAppend(&result, "\\x", absl::Hex((unsigned char)c, absl::kZeroPad2));
}
break;
}
Expand Down Expand Up @@ -162,15 +164,17 @@ void MonitorCmd(bool admin_cmd, ConnectionContext* connection, CmdArgList args)
// enabled on them - see https://redis.io/commands/monitor/
const auto& my_monitors = ServerState::tlocal()->Monitors();
if (!(my_monitors.Empty() || admin_cmd)) {
util::fibers_ext::BlockingCounter bc(my_monitors.Size());
// We have connections waiting to get the info on the last command, send it to them
auto monitor_msg = MakeMonitorMessage(connection->conn_state, connection->owner(), args);
LOG(WARNING) << "sending command '" << monitor_msg << "' to the clients that registered on it";
shard_set->pool()->Await([&monitor_msg, bc](unsigned idx, util::ProactorBase*) {
ServerState::tlocal()->Monitors().Send(monitor_msg, bc, idx);
// Note that this is accepted by value for lifetime reasons
// we want to have our own copy since we are assuming that
// 1. there will be not to many connections that we in monitor state
// 2. we need to have for each of them each own copy for thread safe reasons
VLOG(1) << "sending command '" << monitor_msg << "' to the clients that registered on it";
shard_set->pool()->Await([monitor_msg](unsigned idx, util::ProactorBase*) {
ServerState::tlocal()->Monitors().Send(monitor_msg, idx);
});
bc.Wait(); // Wait for all the messages to be sent.
// we need to un-borrow what we used, do this here

shard_set->pool()->Await(
[](unsigned idx, util::ProactorBase*) { ServerState::tlocal()->Monitors().Release(idx); });
}
Expand Down Expand Up @@ -1090,7 +1094,8 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis

// The comparison can still be true even if a key expired due to another one being created.
// So we have to check the watched_dirty flag, which is set if a key expired.
return watch_exist_count.load() == exec_info.watched_existed && !exec_info.watched_dirty.load(memory_order_relaxed);
return watch_exist_count.load() == exec_info.watched_existed &&
!exec_info.watched_dirty.load(memory_order_relaxed);
}

// Check if exec_info watches keys on dbs other than db_indx.
Expand Down
10 changes: 4 additions & 6 deletions src/server/server_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ MonitorsRepo::MonitorInfo::MonitorInfo(ConnectionContext* conn)
thread_id(util::ProactorBase::GetIndex()) {
}

void MonitorsRepo::MonitorInfo::Send(std::string_view msg, std::uint32_t tid,
util::fibers_ext::BlockingCounter borrows) {
void MonitorsRepo::MonitorInfo::Send(std::string msg, std::uint32_t tid) {
if (tid == thread_id && connection) {
VLOG(1) << "thread " << tid << " sending monitor message '" << msg << "'";
token.Inc();
connection->SendMonitorMsg(msg, borrows);
connection->SendMonitorMsg(msg);
VLOG(1) << "thread " << tid << " successfully finish to send the message";
}
}
Expand All @@ -32,10 +31,9 @@ void MonitorsRepo::Add(const MonitorInfo& info) {
monitors_.push_back(info);
}

void MonitorsRepo::Send(std::string_view msg, util::fibers_ext::BlockingCounter borrows,
std::uint32_t tid) {
void MonitorsRepo::Send(std::string msg, std::uint32_t tid) {
std::for_each(monitors_.begin(), monitors_.end(),
[msg, tid, borrows](auto& monitor_conn) { monitor_conn.Send(msg, tid, borrows); });
[msg, tid](auto& monitor_conn) { monitor_conn.Send(msg, tid); });
}

void MonitorsRepo::Release(std::uint32_t tid) {
Expand Down
12 changes: 10 additions & 2 deletions src/server/server_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,20 @@ class MonitorsRepo {

explicit MonitorInfo(ConnectionContext* conn);

void Send(std::string_view msg, std::uint32_t tid, util::fibers_ext::BlockingCounter borrows);
// Note that this is accepted by value for lifetime reasons
// we want to have our own copy since we are assuming that
// 1. there will be not to many connections that we in monitor state
// 2. we need to have for each of them each own copy for thread safe reasons
void Send(std::string msg, std::uint32_t tid);
};

void Add(const MonitorInfo& info);

void Send(std::string_view msg, util::fibers_ext::BlockingCounter borrows, std::uint32_t tid);
// Note that this is accepted by value for lifetime reasons
// we want to have our own copy since we are assuming that
// 1. there will be not to many connections that we in monitor state
// 2. we need to have for each of them each own copy for thread safe reasons
void Send(std::string msg, std::uint32_t tid);

void Release(std::uint32_t tid);

Expand Down

0 comments on commit 9ee6432

Please sign in to comment.