Skip to content

Commit

Permalink
fix(server): Small sub fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Mar 15, 2023
1 parent 4caebf0 commit 6f55fe6
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 28 deletions.
21 changes: 12 additions & 9 deletions src/server/channel_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,36 @@ ChannelStore::Subscriber::Subscriber(uint32_t tid)
}

void ChannelStore::ChannelMap::Add(string_view key, ConnectionContext* me, uint32_t thread_id) {
(*this)[key].emplace(me, thread_id);
auto it = find(key);
if (it == end())
it = emplace(key, make_unique<SubscribeMap>()).first;
it->second->emplace(me, thread_id);
}

void ChannelStore::ChannelMap::Remove(string_view key, ConnectionContext* me) {
if (auto it = find(key); it != end()) {
it->second.erase(me);
if (it->second.empty())
it->second->erase(me);
if (it->second->empty())
erase(it);
}
}

void ChannelStore::AddSubscription(string_view channel, ConnectionContext* me, uint32_t thread_id) {
void ChannelStore::AddSub(string_view channel, ConnectionContext* me, uint32_t thread_id) {
unique_lock lk{lock_};
channels_.Add(channel, me, thread_id);
}

void ChannelStore::AddGlobPattern(string_view pattern, ConnectionContext* me, uint32_t thread_id) {
void ChannelStore::AddPatternSub(string_view pattern, ConnectionContext* me, uint32_t thread_id) {
unique_lock lk{lock_};
patterns_.Add(pattern, me, thread_id);
}

void ChannelStore::RemoveSubscription(string_view channel, ConnectionContext* me) {
void ChannelStore::RemoveSub(string_view channel, ConnectionContext* me) {
unique_lock lk{lock_};
channels_.Remove(channel, me);
}

void ChannelStore::RemoveGlobPattern(string_view pattern, ConnectionContext* me) {
void ChannelStore::RemovePatternSub(string_view pattern, ConnectionContext* me) {
unique_lock lk{lock_};
patterns_.Remove(pattern, me);
}
Expand All @@ -60,12 +63,12 @@ vector<ChannelStore::Subscriber> ChannelStore::FetchSubscribers(string_view chan
vector<Subscriber> res;

if (auto it = channels_.find(channel); it != channels_.end()) {
Fill(it->second, string{}, &res);
Fill(*it->second, string{}, &res);
}

for (const auto& [pat, subs] : patterns_) {
if (stringmatchlen(pat.data(), pat.size(), channel.data(), channel.size(), 0) == 1) {
Fill(subs, pat, &res);
Fill(*subs, pat, &res);
}
}

Expand Down
18 changes: 7 additions & 11 deletions src/server/channel_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,29 @@ class ChannelStore {
}
};

void AddSubscription(std::string_view channel, ConnectionContext* me, uint32_t thread_id);
void RemoveSubscription(std::string_view channel, ConnectionContext* me);
void AddSub(std::string_view channel, ConnectionContext* me, uint32_t thread_id);
void RemoveSub(std::string_view channel, ConnectionContext* me);

void AddGlobPattern(std::string_view pattern, ConnectionContext* me, uint32_t thread_id);
void RemoveGlobPattern(std::string_view pattern, ConnectionContext* me);
void AddPatternSub(std::string_view pattern, ConnectionContext* me, uint32_t thread_id);
void RemovePatternSub(std::string_view pattern, ConnectionContext* me);

std::vector<Subscriber> FetchSubscribers(std::string_view channel);

std::vector<std::string> ListChannels(const std::string_view pattern) const;
size_t PatternCount() const;

private:
using SubscribeMap = absl::flat_hash_map<ConnectionContext*, unsigned>;
using ThreadId = unsigned;
using SubscribeMap = absl::flat_hash_map<ConnectionContext*, ThreadId>;

struct ChannelMap : absl::flat_hash_map<std::string, SubscribeMap> {
struct ChannelMap : absl::flat_hash_map<std::string, std::unique_ptr<SubscribeMap>> {
void Add(std::string_view key, ConnectionContext* me, uint32_t thread_id);
void Remove(std::string_view key, ConnectionContext* me);
};

static void Fill(const SubscribeMap& src, const std::string& pattern,
std::vector<Subscriber>* out);

struct SubInfo {
unsigned thread_id;
ConnectionContext* conn_cntx;
};

mutable folly::RWSpinLock lock_;
ChannelMap channels_;
ChannelMap patterns_;
Expand Down
10 changes: 5 additions & 5 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ vector<unsigned> ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add,
auto& sinfo = *conn->conn_state.subscribe_info.get();
auto& local_store = pattern ? sinfo.patterns : sinfo.channels;

auto sadd = pattern ? &ChannelStore::AddGlobPattern : &ChannelStore::AddSubscription;
auto sremove = pattern ? &ChannelStore::RemoveGlobPattern : &ChannelStore::RemoveSubscription;
auto sadd = pattern ? &ChannelStore::AddPatternSub : &ChannelStore::AddSub;
auto sremove = pattern ? &ChannelStore::RemovePatternSub : &ChannelStore::RemoveSub;

int32_t tid = util::ProactorBase::GetIndex();
DCHECK_GE(tid, 0);
Expand Down Expand Up @@ -113,8 +113,8 @@ void ConnectionContext::ChangeSubscription(ChannelStore* store, bool to_add, boo
}
}

void ConnectionContext::ChangePSub(ChannelStore* store, bool to_add, bool to_reply,
CmdArgList args) {
void ConnectionContext::ChangePSubscription(ChannelStore* store, bool to_add, bool to_reply,
CmdArgList args) {
vector<unsigned> result = ChangeSubscriptions(true, args, to_add, to_reply, this, store);

if (to_reply) {
Expand Down Expand Up @@ -147,7 +147,7 @@ void ConnectionContext::PUnsubscribeAll(ChannelStore* store, bool to_reply) {
StringVec patterns(conn_state.subscribe_info->patterns.begin(),
conn_state.subscribe_info->patterns.end());
CmdArgVec arg_vec(patterns.begin(), patterns.end());
ChangePSub(store, false, to_reply, CmdArgList{arg_vec});
ChangePSubscription(store, false, to_reply, CmdArgList{arg_vec});
}

void ConnectionContext::SendSubscriptionChangedResponse(string_view action,
Expand Down
2 changes: 1 addition & 1 deletion src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ConnectionContext : public facade::ConnectionContext {
}

void ChangeSubscription(ChannelStore* store, bool to_add, bool to_reply, CmdArgList args);
void ChangePSub(ChannelStore* store, bool to_add, bool to_reply, CmdArgList args);
void ChangePSubscription(ChannelStore* store, bool to_add, bool to_reply, CmdArgList args);
void UnsubscribeAll(ChannelStore* store, bool to_reply);
void PUnsubscribeAll(ChannelStore* store, bool to_reply);
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
Expand Down
4 changes: 2 additions & 2 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1413,7 +1413,7 @@ void Service::Unsubscribe(CmdArgList args, ConnectionContext* cntx) {

void Service::PSubscribe(CmdArgList args, ConnectionContext* cntx) {
args.remove_prefix(1);
cntx->ChangePSub(server_family_.channel_store(), true, true, args);
cntx->ChangePSubscription(server_family_.channel_store(), true, true, args);
}

void Service::PUnsubscribe(CmdArgList args, ConnectionContext* cntx) {
Expand All @@ -1422,7 +1422,7 @@ void Service::PUnsubscribe(CmdArgList args, ConnectionContext* cntx) {
if (args.size() == 0) {
cntx->PUnsubscribeAll(server_family_.channel_store(), true);
} else {
cntx->ChangePSub(server_family_.channel_store(), false, true, args);
cntx->ChangePSubscription(server_family_.channel_store(), false, true, args);
}
}

Expand Down

0 comments on commit 6f55fe6

Please sign in to comment.