-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
transport: load balancing module refactor #612
Conversation
I'm not familiar with the token ring, replica computation etc., but apart from that, this approach looks good. |
pub fn new() -> Self { | ||
Self {} | ||
impl LoadBalancingPolicy for DefaultPolicy { | ||
fn pick<'a>(&'a self, query: &'a QueryInfo, cluster: &'a ClusterData) -> NodeRef<'a> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've got a feeling that pick
and fallback
methods follow a similar pattern (control flow-wise) and we could try to reduce the complexity by deduplicating them. Basically, you could think of pick
as a function that returns the first element of fallback
without having to allocate memory for the FallbackPlan
. Of course, if we implemented pick
such that it calls fallback
then the performance benefit would be gone - however, what if we removed the need to allocate the plan with a callback?
You could implement a function do_with_plan
(didn't think too much about the name):
// FnOnce won't be sufficient to express callback's type, because you don't have
// access to the iterator's type - you will most likely have to use a trait
trait PlanExtractor<'a> {
type Output;
fn extract<I>(plan: I) -> Self::Output
where
I: Iterator<Output = NodeRef<'a> + Send + Sync + 'a>;
}
fn do_with_plan<'a, E>(
&'a self,
query: &'a QueryInfo,
cluster: &'a ClusterData,
extractor: E,
) -> E::Output
where
E: PlanExtractor<'a>,
{
/* snip */
let plan = maybe_replicas
.chain(robined_local_nodes)
.chain(maybe_remote_nodes)
.unique();
extractor.extract(plan)
}
I didn't really scrutinize the pick
and fallback
methods so I'm not sure what the performance impact of this approach will be. If computing the plan requires significantly more computations than the current implementation of pick
(apart from avoiding the allocation), then I guess we can stay with the current approach.
It would be great if we could compare the performance of both approaches. Perhaps a benchmark would be in order? After all, one of the reasons for the refactor was to improve the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had an initial look, generally looks nice although I didn't dig into the juicy details yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice.
We got rid of the problematic policy chaining and the code is much clearer.
I like how simple planning has become, in my implementation I had a bunch of convoluted iterators that were a pain to write and read. This implementation is much more elegant.
The direction seems good, but there are still a few things that need to be solved before proceeding, like LWT routing, latency awareness and proper UP/DOWN event handling.
|
||
// Get a list of all local alive nodes, and apply a round robin to it | ||
let local_nodes = self.preffered_node_set(cluster); | ||
let robined_local_nodes = Self::round_robin_nodes(local_nodes, Self::is_alive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Round robin has the problem of overloads in case of node failure.
If the usual round robin order is A->B->C and A fails, B will take over all of A's requests. It would be better to try nodes in random order, then in case of A's failure the load that used to be handled by A will be shared equally among B and C.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we have no quick idea how to mitigate this problem yet, let's put off this problem until a follow-up.
I'm sad to see that we didn't really take advantage of grouping all default features in one Default Policy. Namely, we still pass the load balancing policy in an Arc, which imposes some overhead. Instead, I would propose:
|
Rebased on top of main (using @wprzytula's pull request havaker#8) |
Removed |
Rebased on top of main, merged havaker#6 (with additional |
I don't think that the overhead you mentioned is noticeable. |
e7f3d91
to
fc471e8
Compare
v1:
|
clippy check is failing. It looks like 9bc33b0 removed a |
Please update the PR description. It mentions that some things still need to be done - the PR was marked as ready so I guess those things are ready? |
v2:
|
`LoadBalancingPolicy::plan` was split to `pick` and `fallback` methods. This allows to better optimize the most common case, where only one node from the load balancing plan is needed. Changes required in query execution code were minimized by providing a lazy chaining iterator `transport::load_balancing::plan::Plan`. This iterator's first element is a node returned by the `LoadBalancingPolicy::pick` function, the next items come from `LoadBalancingPolicy::fallback` iterator. Falback method is called lazily - only when second+ element of the `Plan` iterator is needed.
Implemented token and datacenter awareness. Added a builder for default policy (adding new parameters to default policy won't break the API). Default policy prefers to return nodes in the following order: - Alive local replicas (if token is available & token awareness is enabled) - Alive remote replicas (if datacenter failover is permitted & possible due to consistency constraints) - Alive local nodes - Alive remote nodes (if datacenter failover is permitted & possible due to consistency constraints) - Enabled down nodes If no preferred datacenter is specified, all nodes are treated as local ones. `DefaultPolicy::pick` method does not allocate if the replica lists for given strategy were precomputed. Co-authored-by: Wojciech Przytuła <[email protected]>
Two methods were added to the LoadBalancingPolicy trait: fn on_query_success(&self, query: &QueryInfo, latency: Duration, node: NodeRef<'_>); fn on_query_failure(&self, query: &QueryInfo, latency: Duration, node: NodeRef<'_>, error: &QueryError); Their addition allows implementing latency aware policy.
This commit prepares for cleaner introduction of latency awareness into DefaultPolicy.
The experimental latency awareness module is added to DefaultPolicy. Its behaviour is based on the previous LatencyAwarePolicy implementation. Notes on performance related to operations involving pick predicate: Pick predicate is boxed inside DefaultPolicy. Fallback performed efficiently - predicate is only borrowed, for eager computation of fallback iterator.
v12:
|
The new version of load balancing does the precomputation of replica sets for each vnode, which could turn out to be quite a bit of work, so I measured how much resources the precomputation consumes when compared to the previous version which didn't do the precomputation (using lbbench): Cluster: 3 datacenters, 8 nodes in each one A few SimpleStrategy keyspaces, RF <= 8:Before: 1.8MB of memory used, 420µs to compute ClusterData A few big SimpleStrategt keyspaces, RF ~= 20Before: 1.9MB of memory used, 558.229µs to compute ClusterData A few NetworkTopologyStrategy keyspaces RF <= 8Before: 1.9MB of memory used, 454.11µs to compute ClusterData All of the aboveBefore: 2MB of memory used, 354.14µs to compute ClusterData We use a bit more memory and compute power, but it's within reasonable limits. One positive thing stemming from the new precomputation is that the number of allocations per request got reduced:
After:
Here are the exact results: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🚀
Nice work! |
Resolves scylladb#468 This is a follow-up on scylladb#508 and scylladb#658: - To minimize CPU usage related to network operations when inserting a very large number of lines, it is relevant to batch. - To batch in the most efficient manner, these batches have to be shard-aware. Since scylladb#508, `batch` will pick the shard of the first statement to send the query to. However it is left to the user to constitute the batches in such a way that the target shard is the same for all the elements of the batch. - This was made *possible* by scylladb#658, but it was still very boilerplate-ish. I was waiting for scylladb#612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it). - This new ~`Session::first_shard_for_statement(self, &PreparedStatement, &SerializedValues) -> Option<(Node, Option<Shard>)>` makes shard-aware batching easy on the users, by providing access to the first node and shard of the query plan.
Description
This is the third attempt to refactor this module;)
Replica set calculation was moved to a separate module (
transport::locator
).Replica set calculation is done by a module originally written by @cvybhu -
transport::locator::replication_info
.struct ReplicationData
lives in that module, and provides a set of functions that allow calculating ofSimpleStrategy
&NetworkTopologyStrategy
replica lists.To make load balancing fast, precomputing replica sets is required. This is done by another of @cvybhu's modules -
transport::locator::precomputed_replicas
.precomputed_replicas::PrecomputedReplicas
is a struct that precomputes replica lists of a given strategies, and provides O(1) access to desired replica slices.locator::ReplicaLocator
combines the functionality of the previously mentioned modules, and provides a unified API for getting precomputed or calculated on the fly replica sets. Representing replica sets by a customReplicaSet
type allowed creating an API that supported optional limiting replicas to a single data center (and allocation-free creation, getting & iteration through precomputed replica sets).Things left for a follow-up:
LoadBalancingPolicy
interface was changed.plan
was split topick
andfallback
methods. This allows to better optimize the most common case, where only one node from the load balancing plan is needed. Changes required in query execution code were minimized by providing a lazy chaining iteratorltransport::load_balancing::plan::Plan
. This iterator's first element is a node returned by theLoadBalancingPolicy::pick
function, the next items come fromLoadBalancingPolicy::fallback
iterator. Falback method is called lazily - only when second+ element of thePlan
iterator is needed.The following methods were added:
The motivation for adding them was the ability to contain the logic of latency-aware policy inside some load balancing policy.
Default policy was created.
Default policy supports token and data center awareness. It also has a logic to do a data center failover.
Things left for a follow-up:
Latency aware policy was merged with the default one
Mechanisms used only by latency aware policy were removed (e.g.
TimestampedAverage
living inNode
).Pre-review checklist
Fixes:
annotations to PR description.Fixes: #408