Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Pubsub updates with RCU #980

Merged
merged 9 commits into from
Mar 26, 2023
Merged

Conversation

dranikpg
Copy link
Contributor

@dranikpg dranikpg commented Mar 22, 2023

Implements RCU (read-copy-update) for updating the centralized channel store.

Contrary to old mechanism of sharding subscriber info across shards, a centralized store allows avoiding a hop for fetching subscribers. In general, it only slightly improves the latency, but in case of heavy traffic on one channel it allows "spreading" the load, as the single shard no longer is a bottleneck, thus increasing throughput by multiple times.

Benchmarks:

1. Dry run without subscribers

OLD
===================================================================================================
Type          Ops/sec    Avg. Latency     p50 Latency     p99 Latency   p99.9 Latency       KB/sec  
---------------------------------------------------------------------------------------------------
Publishs    955759.86         0.75294         0.71100         1.32700         4.57500     43709.23  
Totals      955759.86         0.75294         0.71100         1.32700         4.57500     43709.23

NEW
===================================================================================================
Type          Ops/sec    Avg. Latency     p50 Latency     p99 Latency   p99.9 Latency       KB/sec  
---------------------------------------------------------------------------------------------------
Publishs   1040942.05         0.69134         0.67900         1.07900         3.75900     47604.77  
Totals     1040942.05         0.69134         0.67900         1.07900         3.75900     47604.77 

2. Run with subscribers

OLD
===================================================================================================
Type          Ops/sec    Avg. Latency     p50 Latency     p99 Latency   p99.9 Latency       KB/sec  
---------------------------------------------------------------------------------------------------
Publishs    399898.33         1.79988         1.37500         7.00700        11.64700     18288.39  
Totals      399898.33         1.79988         1.37500         7.00700        11.64700     18288.39 

NEW
===================================================================================================
Type          Ops/sec    Avg. Latency     p50 Latency     p99 Latency   p99.9 Latency       KB/sec  
---------------------------------------------------------------------------------------------------
Publishs    418730.46         1.71866         1.21500         6.97500        10.81500     19149.67  
Totals      418730.46         1.71866         1.21500         6.97500        10.81500     19149.6

3. Single channel

OLD
===================================================================================================
Type          Ops/sec    Avg. Latency     p50 Latency     p99 Latency   p99.9 Latency       KB/sec  
---------------------------------------------------------------------------------------------------
Publishs     88210.54         9.06565         8.76700        15.10300        22.01500      4048.73  
Totals       88210.54         9.06565         8.76700        15.10300        22.01500      4048.73 

NEW
===================================================================================================
Type          Ops/sec    Avg. Latency     p50 Latency     p99 Latency   p99.9 Latency       KB/sec  
---------------------------------------------------------------------------------------------------
Publishs    236476.81         3.38045         1.13500        27.39100        35.32700     10853.92  
Totals      236476.81         3.38045         1.13500        27.39100        35.32700     10853.92

Signed-off-by: Vladislav Oleshko <[email protected]>
@romange
Copy link
Collaborator

romange commented Mar 22, 2023

Please adopt a habit of adding additional info to the commit description. It's not a one-liner PR.

Signed-off-by: Vladislav Oleshko <[email protected]>
Comment on lines 117 to 120
ChannelMap* channels_;
ChannelMap* patterns_;
ControlBlock* control_block_;
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a lot of indirections as its easier to work it this way, but to flatten it we can:

  1. Store the ChannelStore in threads by value (3x8)
  2. Store the ChannelMaps without pointer and move them around with ugly memcpy's
  3. Store a single ChannelStore and make the ChannelMap* pointers atomic. We still need to dispatch to the shard_set to allow deletes (as we don't have hazard pointers)

Comment on lines +80 to +86
// Wrapper around atomic pointer that allows copying and moving.
// Made to overcome restrictions of absl::flat_hash_map.
// Copy/Move don't need to be atomic with RCU.
struct UpdatablePointer {
UpdatablePointer(SubscribeMap* sm) : ptr{sm} {
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a tricky part... flat_hash_map really doesn't allow full in-place construction, std does (it has std::piecewise_construct)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also add to helio:
https://github.com/romange/beeri/blob/master/base/atomic_wrapper.h
which should solve the problem

Comment on lines 57 to 63
// Centralized controller to prevent overlaping updates.
struct ControlBlock {
void Destroy();

ChannelStore* most_recent;
::boost::fibers::mutex update_mu; // locked during updates.
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, instead of plain locking, we can accumulate all changes and let all fulfill all writers requests with a single operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its a bit more difficult to synchronize it, besides we need to explicitly aggregate changes by key to tell whether we'll modify the map slots (replace/add on single value cancel each other)

src/server/channel_store.h Outdated Show resolved Hide resolved
src/server/main_service.cc Outdated Show resolved Hide resolved
Comment on lines 188 to 198
// RCU update existing SubscribeMap entry.
DCHECK(it->second->size() > 0);
auto* replacement = new SubscribeMap{*it->second};
if (add)
replacement->emplace(cntx_, thread_id_);
else
replacement->erase(cntx_);

freelist_.push_back(it->second.Get());
it->second.Set(replacement);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what no full copy RCU looks like

@dranikpg dranikpg marked this pull request as ready for review March 24, 2023 17:21
@dranikpg dranikpg requested a review from romange March 24, 2023 17:21
Comment on lines +1418 to +1419
};
shard_set->pool()->DispatchBrief(std::move(cb));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Dispatch only on active threads

src/server/channel_store.h Outdated Show resolved Hide resolved

// RCU update existing SubscribeMap entry.
DCHECK(it->second->size() > 0);
auto* replacement = new SubscribeMap{*it->second};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can Modify run on multiple threads in parallel? I see it under mutex lock below so I am a bit confused
why do you need atomics?

Copy link
Contributor Author

@dranikpg dranikpg Mar 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the map slot is read by all the other reader threads, added a comment

Signed-off-by: Vladislav Oleshko <[email protected]>
Copy link
Collaborator

@romange romange left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very good. My only concern is around Apply() bottleneck

src/server/channel_store.h Outdated Show resolved Hide resolved
Comment on lines +80 to +86
// Wrapper around atomic pointer that allows copying and moving.
// Made to overcome restrictions of absl::flat_hash_map.
// Copy/Move don't need to be atomic with RCU.
struct UpdatablePointer {
UpdatablePointer(SubscribeMap* sm) : ptr{sm} {
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also add to helio:
https://github.com/romange/beeri/blob/master/base/atomic_wrapper.h
which should solve the problem

src/server/channel_store.h Outdated Show resolved Hide resolved
src/server/channel_store.cc Outdated Show resolved Hide resolved
src/server/main_service.cc Outdated Show resolved Hide resolved
src/server/main_service.cc Outdated Show resolved Hide resolved
std::pair<ChannelMap*, bool> GetTargetMap();

// Apply modify operation to target map.
void Modify(ChannelMap* target, std::string_view key);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same.

src/server/channel_store.h Outdated Show resolved Hide resolved

// Update control block and unlock it.
cb.most_recent = replacement;
cb.update_mu.unlock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

food for thought: Since the critical section includes a hop, it set a low limit for the throughput capacity of this operation. maybe another way to improve it is to make most_recent atomic as well,
move Await call to after unlock and instead of passing "replacement", read directly from most_recent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I'm actually not sure how much of an improvement the thread local pointer has, we do a lot of atomic ops either way 🤷🏻‍♂️

src/server/channel_store.cc Outdated Show resolved Hide resolved
Signed-off-by: Vladislav Oleshko <[email protected]>
@dranikpg
Copy link
Contributor Author

Fixed.

My only concern is around Apply() bottleneck

Yes, I didn't yet optimize it - we should squash parallel updates in the future.

@dranikpg dranikpg merged commit 139e56b into dragonflydb:main Mar 26, 2023
@dranikpg dranikpg deleted the pubsub-rcu branch April 1, 2023 16:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants