-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): Pubsub updates with RCU #980
Conversation
Signed-off-by: Vladislav Oleshko <[email protected]>
Please adopt a habit of adding additional info to the commit description. It's not a one-liner PR. |
Signed-off-by: Vladislav Oleshko <[email protected]>
src/server/channel_store.h
Outdated
ChannelMap* channels_; | ||
ChannelMap* patterns_; | ||
ControlBlock* control_block_; | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a lot of indirections as its easier to work it this way, but to flatten it we can:
- Store the ChannelStore in threads by value (3x8)
- Store the ChannelMaps without pointer and move them around with ugly memcpy's
- Store a single ChannelStore and make the ChannelMap* pointers atomic. We still need to dispatch to the shard_set to allow deletes (as we don't have hazard pointers)
// Wrapper around atomic pointer that allows copying and moving. | ||
// Made to overcome restrictions of absl::flat_hash_map. | ||
// Copy/Move don't need to be atomic with RCU. | ||
struct UpdatablePointer { | ||
UpdatablePointer(SubscribeMap* sm) : ptr{sm} { | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a tricky part... flat_hash_map
really doesn't allow full in-place construction, std does (it has std::piecewise_construct
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also add to helio:
https://github.com/romange/beeri/blob/master/base/atomic_wrapper.h
which should solve the problem
src/server/channel_store.h
Outdated
// Centralized controller to prevent overlaping updates. | ||
struct ControlBlock { | ||
void Destroy(); | ||
|
||
ChannelStore* most_recent; | ||
::boost::fibers::mutex update_mu; // locked during updates. | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future, instead of plain locking, we can accumulate all changes and let all fulfill all writers requests with a single operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its a bit more difficult to synchronize it, besides we need to explicitly aggregate changes by key to tell whether we'll modify the map slots (replace/add on single value cancel each other)
src/server/channel_store.cc
Outdated
// RCU update existing SubscribeMap entry. | ||
DCHECK(it->second->size() > 0); | ||
auto* replacement = new SubscribeMap{*it->second}; | ||
if (add) | ||
replacement->emplace(cntx_, thread_id_); | ||
else | ||
replacement->erase(cntx_); | ||
|
||
freelist_.push_back(it->second.Get()); | ||
it->second.Set(replacement); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what no full copy RCU looks like
Signed-off-by: Vladislav Oleshko <[email protected]>
Signed-off-by: Vladislav Oleshko <[email protected]>
}; | ||
shard_set->pool()->DispatchBrief(std::move(cb)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Dispatch only on active threads
Signed-off-by: Vladislav Oleshko <[email protected]>
|
||
// RCU update existing SubscribeMap entry. | ||
DCHECK(it->second->size() > 0); | ||
auto* replacement = new SubscribeMap{*it->second}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can Modify run on multiple threads in parallel? I see it under mutex lock below so I am a bit confused
why do you need atomics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the map slot is read by all the other reader threads, added a comment
Signed-off-by: Vladislav Oleshko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good. My only concern is around Apply()
bottleneck
// Wrapper around atomic pointer that allows copying and moving. | ||
// Made to overcome restrictions of absl::flat_hash_map. | ||
// Copy/Move don't need to be atomic with RCU. | ||
struct UpdatablePointer { | ||
UpdatablePointer(SubscribeMap* sm) : ptr{sm} { | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also add to helio:
https://github.com/romange/beeri/blob/master/base/atomic_wrapper.h
which should solve the problem
std::pair<ChannelMap*, bool> GetTargetMap(); | ||
|
||
// Apply modify operation to target map. | ||
void Modify(ChannelMap* target, std::string_view key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same.
|
||
// Update control block and unlock it. | ||
cb.most_recent = replacement; | ||
cb.update_mu.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
food for thought: Since the critical section includes a hop, it set a low limit for the throughput capacity of this operation. maybe another way to improve it is to make most_recent
atomic as well,
move Await
call to after unlock and instead of passing "replacement", read directly from most_recent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I'm actually not sure how much of an improvement the thread local pointer has, we do a lot of atomic ops either way 🤷🏻♂️
Signed-off-by: Vladislav Oleshko <[email protected]>
Fixed.
Yes, I didn't yet optimize it - we should squash parallel updates in the future. |
Implements RCU (read-copy-update) for updating the centralized channel store.
Contrary to old mechanism of sharding subscriber info across shards, a centralized store allows avoiding a hop for fetching subscribers. In general, it only slightly improves the latency, but in case of heavy traffic on one channel it allows "spreading" the load, as the single shard no longer is a bottleneck, thus increasing throughput by multiple times.