Skip to content

Commit

Permalink
worker: use rwlock for sibling group
Browse files Browse the repository at this point in the history
Since it is much more common to send messages than to add or
remove ports from a sibling group, using a rwlock is appropriate here.

Refs: #38780 (comment)

PR-URL: #38783
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Joyee Cheung <[email protected]>
  • Loading branch information
addaleax authored and danielleadams committed May 31, 2021
1 parent 3e6b3b2 commit a2da9e2
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 5 deletions.
6 changes: 3 additions & 3 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ Maybe<bool> SiblingGroup::Dispatch(
std::shared_ptr<Message> message,
std::string* error) {

Mutex::ScopedLock lock(group_mutex_);
RwLock::ScopedReadLock lock(group_mutex_);

// The source MessagePortData is not part of this group.
if (ports_.find(source) == ports_.end()) {
Expand Down Expand Up @@ -1376,7 +1376,7 @@ void SiblingGroup::Entangle(MessagePortData* port) {
}

void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
Mutex::ScopedLock lock(group_mutex_);
RwLock::ScopedWriteLock lock(group_mutex_);
for (MessagePortData* data : ports) {
ports_.insert(data);
CHECK(!data->group_);
Expand All @@ -1386,7 +1386,7 @@ void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {

void SiblingGroup::Disentangle(MessagePortData* data) {
auto self = shared_from_this(); // Keep alive until end of function.
Mutex::ScopedLock lock(group_mutex_);
RwLock::ScopedWriteLock lock(group_mutex_);
ports_.erase(data);
data->group_.reset();

Expand Down
4 changes: 2 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
size_t size() const { return ports_.size(); }

private:
std::string name_;
const std::string name_;
RwLock group_mutex_; // Protects ports_.
std::set<MessagePortData*> ports_;
Mutex group_mutex_;

static void CheckSiblingGroup(const std::string& name);

Expand Down
76 changes: 76 additions & 0 deletions src/node_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ namespace node {
template <typename Traits> class ConditionVariableBase;
template <typename Traits> class MutexBase;
struct LibuvMutexTraits;
struct LibuvRwlockTraits;

using ConditionVariable = ConditionVariableBase<LibuvMutexTraits>;
using Mutex = MutexBase<LibuvMutexTraits>;
using RwLock = MutexBase<LibuvRwlockTraits>;

template <typename T, typename MutexT = Mutex>
class ExclusiveAccess {
Expand Down Expand Up @@ -70,6 +72,8 @@ class MutexBase {
inline ~MutexBase();
inline void Lock();
inline void Unlock();
inline void RdLock();
inline void RdUnlock();

MutexBase(const MutexBase&) = delete;
MutexBase& operator=(const MutexBase&) = delete;
Expand All @@ -92,6 +96,21 @@ class MutexBase {
const MutexBase& mutex_;
};

class ScopedReadLock {
public:
inline explicit ScopedReadLock(const MutexBase& mutex);
inline ~ScopedReadLock();

ScopedReadLock(const ScopedReadLock&) = delete;
ScopedReadLock& operator=(const ScopedReadLock&) = delete;

private:
template <typename> friend class ConditionVariableBase;
const MutexBase& mutex_;
};

using ScopedWriteLock = ScopedLock;

class ScopedUnlock {
public:
inline explicit ScopedUnlock(const ScopedLock& scoped_lock);
Expand Down Expand Up @@ -167,6 +186,42 @@ struct LibuvMutexTraits {
static inline void mutex_unlock(MutexT* mutex) {
uv_mutex_unlock(mutex);
}

static inline void mutex_rdlock(MutexT* mutex) {
uv_mutex_lock(mutex);
}

static inline void mutex_rdunlock(MutexT* mutex) {
uv_mutex_unlock(mutex);
}
};

struct LibuvRwlockTraits {
using MutexT = uv_rwlock_t;

static inline int mutex_init(MutexT* mutex) {
return uv_rwlock_init(mutex);
}

static inline void mutex_destroy(MutexT* mutex) {
uv_rwlock_destroy(mutex);
}

static inline void mutex_lock(MutexT* mutex) {
uv_rwlock_wrlock(mutex);
}

static inline void mutex_unlock(MutexT* mutex) {
uv_rwlock_wrunlock(mutex);
}

static inline void mutex_rdlock(MutexT* mutex) {
uv_rwlock_rdlock(mutex);
}

static inline void mutex_rdunlock(MutexT* mutex) {
uv_rwlock_rdunlock(mutex);
}
};

template <typename Traits>
Expand Down Expand Up @@ -214,6 +269,16 @@ void MutexBase<Traits>::Unlock() {
Traits::mutex_unlock(&mutex_);
}

template <typename Traits>
void MutexBase<Traits>::RdLock() {
Traits::mutex_rdlock(&mutex_);
}

template <typename Traits>
void MutexBase<Traits>::RdUnlock() {
Traits::mutex_rdunlock(&mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedLock::ScopedLock(const MutexBase& mutex)
: mutex_(mutex) {
Expand All @@ -229,6 +294,17 @@ MutexBase<Traits>::ScopedLock::~ScopedLock() {
Traits::mutex_unlock(&mutex_.mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedReadLock::ScopedReadLock(const MutexBase& mutex)
: mutex_(mutex) {
Traits::mutex_rdlock(&mutex_.mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedReadLock::~ScopedReadLock() {
Traits::mutex_rdunlock(&mutex_.mutex_);
}

template <typename Traits>
MutexBase<Traits>::ScopedUnlock::ScopedUnlock(const ScopedLock& scoped_lock)
: mutex_(scoped_lock.mutex_) {
Expand Down

0 comments on commit a2da9e2

Please sign in to comment.