Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor load balancing module #408

Closed
havaker opened this issue Feb 28, 2022 · 7 comments · Fixed by #612
Closed

Refactor load balancing module #408

havaker opened this issue Feb 28, 2022 · 7 comments · Fixed by #612
Assignees
Milestone

Comments

@havaker
Copy link
Contributor

havaker commented Feb 28, 2022

What does the load balancing module do?

Load balancing module in our driver is responsible for creating lists of nodes. We are calling this lists plans (query plans), because they define the order in which nodes will be contacted during query execution. A load balancing plan is calculated for every executed query.

Design goals

One of main design decisions that was made was to encapsulate load balancing behavior into composable policies. Composability in this case is the possibility to combine the behavior of different policies.

Polices we wanted to be able to implement were:

  • Round robin
  • Datacenter-aware round robin
  • Token-aware [Datacenter-aware] round robin

In addition to the above-mentioned, we wanted to support the creation of custom policies.

Current implementation overview

Plan representation

Right now, we are representing plans as:

pub type Plan<'a> = Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a>;

By representing plans as iterators, there is no arbitrary limit to their length. The iterator does not have to keep the entire represented plan within itself (as opposed to e.g. using pub type Plan<'a> = Vec<Arc<Node>>). Elements of the plan can be calculated in a lazy way - only when they are definitely needed.

The disadvantages of such representation appear when trying to compose policies. More on that later.

Load balancing policy trait

All load balancing policies implement following trait:

/// Policy that decides which nodes to contact for each query
pub trait LoadBalancingPolicy: Send + Sync {
    /// It is used for each query to find which nodes to query first
    fn plan<'a>(&self, statement: &Statement, cluster: &'a ClusterData) -> Plan<'a>;
}

All load balancing policies have a plan() method. It is called every time a query is executed to calculate a query plan. A policy gets info about a query's statement from statement: &Statement and uses cluster: &ClusterData to get info about cluster's topology and other metadata.

Example policy

Round robin is a one of the simplest policies one can implement. Let's see how it fits this defined interfaces.

/// A Round-robin load balancing policy.
pub struct RoundRobinPolicy {
    index: AtomicUsize,
}

impl LoadBalancingPolicy for RoundRobinPolicy {
   fn plan<'a>(&self, _statement: &Statement, cluster: &'a ClusterData) -> Plan<'a> {
        let index = self.index.fetch_add(1, ORDER_TYPE);

        let nodes_count = cluster.all_nodes.len();
        let rotation = super::compute_rotation(index, nodes_count);
        let rotated_nodes = super::slice_rotated_left(&cluster.all_nodes, rotation).cloned();

        Box::new(rotated_nodes)
    }
}

What is good about this implementation is that it is does not copy any slices, and scales really well with a node count (O(1)).

Composition

One of the most important features of this driver is token-aware load balancing. This type of load balancing requires calculating replica groups for each query. This groups are then returned as plans.

It is often desired to combine the previously mentioned policies - token-aware and round-robin to distribute the load between replicas. How can this be done using our previously defined interfaces? What we would like to do is to have TokenAwarePolicy and a RoundRobinPolicy. Than we would like to have a way to pipe the plan produced by the token-aware policy to the round-robin. There is no way for a LoadBalancingPolicy to receive plan produced by some other policy. So how does it work now?

Extra trait

In order for one policy to process plans constructed by the previous one, a new trait was introduced.

pub trait ChildLoadBalancingPolicy: LoadBalancingPolicy {
    fn apply_child_policy(
        &self,
        plan: Vec<Arc<Node>>,
    ) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync>;
}

It's called child policy because it allows further processing of plans made by its parent. Here is an example implementation for round robin policy:

impl ChildLoadBalancingPolicy for RoundRobinPolicy {
    fn apply_child_policy(
        &self, 
        mut plan: Vec<Arc<Node>>,
    ) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync> {
        let index = self.index.fetch_add(1, ORDER_TYPE);

        let len = plan.len(); // borrow checker forces making such a variable

        plan.rotate_left(super::compute_rotation(index, len));
        Box::new(plan.into_iter())
    }
}

Composition example

/// A wrapper load balancing policy that adds token awareness to a child policy.
pub struct TokenAwarePolicy {
    child_policy: Box<dyn ChildLoadBalancingPolicy>,
}

impl LoadBalancingPolicy for TokenAwarePolicy {
    fn plan<'a>(
        &self,
        statement: &Statement,
        cluster: &'a ClusterData,
    ) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a> {
        match statement.token {
            Some(token) => {
                let keyspace = statement.keyspace.and_then(|k| cluster.keyspaces.get(k));
                let replicas = some_function_that_calculates_replica_list(keyspace);

                self.child_policy.apply_child_policy(replicas)
            }
            // fallback to child policy
            None => {
                self.child_policy.plan(statement, cluster)
            }
        }
    }
}

This example shows a policy that can add token-awareness to another policy (TokenAwarePolicy::child_policy). In its plan method, it generates replica list and forwards this list to a child policy (if a given statement has a token).

Current shortcomings

Replica sets calculation

Inaccessibility

Token aware load balancing module has all the code for replica sets calculation. It is not accessible from the outside, so writing a custom token-aware policy requires duplicating this code.

Volatility

Replica sets are calculated when executing each query; they are not cached anywhere. This can cause performance problems in a big clusters that use NetworkTopologyStrategy which is expensive to calculate.

Composability

Cost

In the current implementation, composability is implemented using ChildLoadBalancingPolicy trait. Its method apply_child_policy has a signature that requires passing a plan as a Vec<Arc<Node>>. This causes cloning Arcs every time a plan is calculated, which can cause scalability issues. The decision to design the child policy interface in such way was made after analysis of the default policy stack: token-aware round robin. Token aware policy produced plans consisting of a few replicas, which had to be rotated by the round robin. Rotation is not a good transformation to do on iterators. To reduce its cost, it was decided (by me (; ) that the child policy plan argument will be a Vec.

Unintuitive

if a user wants to write a custom policy FilteringPolicy and use RoundRobinPolicy combined with it, there are two ways to do it.

  1. FilteringPolicy can implement LoadBalancingPolicy, use RoundRobinPolicy as a child policy (as TokenAwarePolicy does) and feed the round robin with filtered nodes
  2. FilteringPolicy can implement a LoadBalancingPolicy and not use RoundRobinPolicy as a child policy (invoke RoundRobinPolicy::plan() and wrap the resulting plan in Iterator::filter)

First option will cause a lot of copying Arc<Node>, and can become slow if a cluster is big and the filtered node count low.
Second option does not have this trap (because filtering is lazy and it does not causes traversal of the whole iterator). It is only possible because filtering can be applied after round robin though.

State

Policies cannot react to any cluster events (node up/down/removed/added). Because of it they cannot build their own cluster view, and have to rely on ClusterData view provided in a plan method only. Reliance on ClusterData causes addition of fields only specific policies use.

When writing DcAwareRoundRobin I wanted to speed up lookups of local nodes, so I've added a per-dc view of nodes to ClusterData. I then used this view in DcAwareRoundRobin::plan. If I were to write a DcAwareRoundRobin as a custom policy, without modifying anything in the driver, I would be at a disadvantage - there would be no way to return local nodes quickly without that per-dc view in a ClusterData.

Proposed solution

Public replica sets calculation

Move replica sets calculation to another, public module.

Conduct benchmarks to see if it is worth to introduce caching.

Remove composability

Most of the users will end up with a token-aware [dc-aware] round robin load balancer. It makes sense to provide them with a high-quality implementation of such one which doesn't have to make any compromises imposed by composability. We wouldn't be the first to make such a decision - DataStax Java driver has already done so.

If we were to implement replica sets calculation caching, our default policy could even produce plans without any Arc<_> copying.

React to events

Add a mechanism for load balancing policies to react to cluster events. E.g. add methods like on_up(&self, node: Arc<Node>) to LoadBalancingPolicy trait.

Restricting access to ClusterData and removing unnecessary fields would be good too.

@psarna
Copy link
Contributor

psarna commented Feb 28, 2022

I'm convinced - composability is indeed pretty much only needed for token awareness, and we can afford dropping it in favor of specialized implementations. @piodul?

@nemosupremo I'd appreciate your opinion as a user, since I remember that you implement your own load balancing policy. The kind of change described here will most likely be breaking backward compatibility, but I'd assume it would not require much changes to your already coded policies, rather some interface updates.

@ultrabug I'd also appreciate your opinion as a user (and a humble laureate of multiple scylla user awards)

in fact, all user opinions would be very much welcome.

@ultrabug
Copy link
Contributor

First of all thank you and bravo for this clear explanation @havaker , I'm in awe!

Remove composability

About composability: I'm all in favor to have the best implementations available straight from the driver. So I would also indeed drop it. The most advanced users who implement their own (which I'm not part of) should also be provided a comprehensive documentation guide.

Restricting access to ClusterData and removing unnecessary fields would be good too.

When things go wrong, most database drivers are quite poor in exposing the context of their errors. I find that this discussion has the potential to allow exposing more about the actual plan and cluster topology as seen by the driver and I would definitely love to see that happening. Things like:

  • what was the query, prepared or not? token?
  • what was the cluster topology seen at that time. up/down nodes
  • what were the replicas considered? and the one chosen?
  • shard awareness information too for ScyllaDB nodes

React to events

Yes please, please please 👍

Backoff and retry strategies

Do we want to add backoff and retry strategies to this conversation or shall we have another one?

@psarna
Copy link
Contributor

psarna commented Feb 28, 2022

First of all thank you and bravo for this clear explanation @havaker , I'm in awe!

Remove composability

About composability: I'm all in favor to have the best implementations available straight from the driver. So I would also indeed drop it. The most advanced users who implement their own (which I'm not part of) should also be provided a comprehensive documentation guide.

Restricting access to ClusterData and removing unnecessary fields would be good too.

When things go wrong, most database drivers are quite poor in exposing the context of their errors. I find that this discussion has the potential to allow exposing more about the actual plan and cluster topology as seen by the driver and I would definitely love to see that happening. Things like:

  • what was the query, prepared or not? token?
  • what was the cluster topology seen at that time. up/down nodes
  • what were the replicas considered? and the one chosen?
  • shard awareness information too for ScyllaDB nodes

React to events

Yes please, please please +1

Backoff and retry strategies

Do we want to add backoff and retry strategies to this conversation or shall we have another one?

@ultrabug feel free to state your thoughts/proposals in a separate issue. Then, when we change retry/backoff strategies, that issue can be marked as resolved.

@piodul
Copy link
Collaborator

piodul commented Feb 28, 2022

I agree with the proposal, this is a good direction.

I must admit that I always found the composition aspect of the retry policies a bit confusing, so it's good that we are getting rid of it. If anybody wants to include the token- or dc-awareness aspect in their custom policy, then we can consider making appropriate helpers public - however, if we do that they become a part of the public interface and changing that interface will be painful after we reach 1.0, so it's not a straightforward decision.

Add a mechanism for load balancing policies to react to cluster events. E.g. add methods like on_up(&self, node: Arc<Node>) to LoadBalancingPolicy trait.

Just want to point out one thing here - we should not rely solely on the cluster events. The data fetched during topology refresh should serve as a source of truth and we should call the on_add, on_remove etc. according to the diff with the previous version of ClusterData.

@nemosupremo
Copy link
Contributor

  1. I also agree, composability is overrated. In my experience, writing custom policies isn't very common and token-aware load balancing is the right decision 99% of the time. When I do need to, it's probably because I have a use case where token-aware isn't the right thing. (I've only ever had to write a custom policy twice, and both times, almost 5 years apart, were because I wanted to only read from a specific local node for analytics purposes).
  2. I'm not sure how valid the concern of cloning the Arc is here. If this only happens when you execute query, which lets say, the best case of which is 500us, adding a 1,000 clones would only add 2us (https://gist.github.com/andrewcsmith/6ade174b36e20698ffb60f682f30c1fc). There are times where a clone might hurt, but this doesn't seem like one of them (you guys are probably closer to database internals, so I could be wrong here).

@psarna
Copy link
Contributor

psarna commented Jun 14, 2022

For reference: I ran a very simple experiment which counts how many allocations were performed (https://github.com/psarna/trace_alloc) on one of our examples, parallel-prepared.rs, and current load balancing implementation indeed contributes quite a lot to a total number of allocations. While the original code, with token-aware load balancing enabled, needs ~10 mallocs per request, when you hardcode the code to not be token-aware, the number drops to ~6 per request, which is a 40% improvement.

@mmatczuk
Copy link

FYI it's possible to reduce nr. of allocations here to zero scylladb/scylla-go-driver#255

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants