Skip to content

Commit

Permalink
transport: cluster: remove unnecessary ClusterData fields
Browse files Browse the repository at this point in the history
Information provided by `ClusterData::all_nodes` and was redundant - it
could be reconstructed using
`ReplicaLocator::unique_nodes_in_global_ring`, (`ReplicaLocator` can be
accessed via `ClusterData::replica_locator`).
  • Loading branch information
havaker committed Mar 17, 2023
1 parent 188d660 commit 0db57c1
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 36 deletions.
10 changes: 5 additions & 5 deletions examples/custom_load_balancing_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ impl LoadBalancingPolicy for CustomLoadBalancingPolicy {
_statement: &Statement,
cluster: &'a ClusterData,
) -> Box<dyn Iterator<Item = Arc<Node>> + Send + Sync + 'a> {
let fav_dc_info = cluster
.get_datacenters_info()
.get(&self.fav_datacenter_name);
let fav_dc_nodes = cluster
.replica_locator()
.unique_nodes_in_datacenter_ring(&self.fav_datacenter_name);

match fav_dc_info {
Some(info) => Box::new(info.nodes.iter().cloned()),
match fav_dc_nodes {
Some(nodes) => Box::new(nodes.iter().cloned()),
// If there is no dc with provided name, fallback to other datacenters
None => Box::new(cluster.get_nodes_info().iter().cloned()),
}
Expand Down
29 changes: 18 additions & 11 deletions scylla/src/transport/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ pub struct Datacenter {
pub struct ClusterData {
pub(crate) known_peers: HashMap<Uuid, Arc<Node>>, // Invariant: nonempty after Cluster::new()
pub(crate) keyspaces: HashMap<String, Keyspace>,
pub(crate) all_nodes: Vec<Arc<Node>>,
pub(crate) datacenters: HashMap<String, Datacenter>,
pub(crate) locator: ReplicaLocator,
}

Expand All @@ -96,8 +94,6 @@ impl<'a> std::fmt::Debug for ClusterDataNeatDebug<'a> {
&RingSizePrinter(cluster_data.locator.ring().len())
})
.field("keyspaces", &cluster_data.keyspaces.keys())
.field("all_nodes", &cluster_data.all_nodes)
.field("datacenters", &cluster_data.datacenters)
.finish_non_exhaustive()
}
}
Expand Down Expand Up @@ -285,7 +281,7 @@ impl ClusterData {
}

pub(crate) async fn wait_until_all_pools_are_initialized(&self) {
for node in self.all_nodes.iter() {
for node in self.locator.unique_nodes_in_global_ring().iter() {
node.wait_until_pool_initialized().await;
}
}
Expand Down Expand Up @@ -369,8 +365,6 @@ impl ClusterData {
ClusterData {
known_peers: new_known_peers,
keyspaces: metadata.keyspaces,
all_nodes,
datacenters,
locator,
}
}
Expand All @@ -384,13 +378,26 @@ impl ClusterData {

/// Access datacenter details collected by the driver
/// Returned `HashMap` is indexed by names of datacenters
pub fn get_datacenters_info(&self) -> &HashMap<String, Datacenter> {
&self.datacenters
pub fn get_datacenters_info(&self) -> HashMap<String, Datacenter> {
self.locator
.datacenter_names()
.iter()
.map(|dc_name| {
let nodes = self
.locator
.unique_nodes_in_datacenter_ring(dc_name)
.unwrap()
.to_vec();
let rack_count = nodes.iter().map(|node| node.rack.as_ref()).unique().count();

(dc_name.clone(), Datacenter { nodes, rack_count })
})
.collect()
}

/// Access details about nodes known to the driver
pub fn get_nodes_info(&self) -> &Vec<Arc<Node>> {
&self.all_nodes
pub fn get_nodes_info(&self) -> &[Arc<Node>] {
self.locator.unique_nodes_in_global_ring()
}

/// Compute token of a table partition key
Expand Down
14 changes: 9 additions & 5 deletions scylla/src/transport/load_balancing/dc_aware_round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ impl DcAwareRoundRobinPolicy {

fn retrieve_local_nodes<'a>(&self, cluster: &'a ClusterData) -> &'a [Arc<Node>] {
cluster
.datacenters
.get(&self.local_dc)
.map(|dc| &dc.nodes)
.replica_locator()
.unique_nodes_in_datacenter_ring(&self.local_dc)
.unwrap_or(EMPTY_NODE_LIST)
}

Expand All @@ -47,7 +46,8 @@ impl DcAwareRoundRobinPolicy {
let local_dc = self.local_dc.clone();

cluster
.all_nodes
.replica_locator()
.unique_nodes_in_global_ring()
.iter()
.cloned()
.filter(move |node| !DcAwareRoundRobinPolicy::is_local_node(node, &local_dc))
Expand All @@ -68,7 +68,11 @@ impl LoadBalancingPolicy for DcAwareRoundRobinPolicy {

if self.include_remote_nodes {
let remote_nodes = self.retrieve_remote_nodes(cluster);
let remote_nodes_count = cluster.all_nodes.len() - local_nodes.len();
let remote_nodes_count = cluster
.replica_locator()
.unique_nodes_in_global_ring()
.len()
- local_nodes.len();
let remote_nodes_rotation = super::compute_rotation(index, remote_nodes_count);
let rotated_remote_nodes =
super::iter_rotated_left(remote_nodes, remote_nodes_rotation);
Expand Down
19 changes: 14 additions & 5 deletions scylla/src/transport/load_balancing/latency_aware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,12 @@ impl Default for LatencyAwarePolicy {

impl LoadBalancingPolicy for LatencyAwarePolicy {
fn plan<'a>(&self, _statement: &Statement, cluster: &'a ClusterData) -> Plan<'a> {
self.make_plan(cluster.all_nodes.clone())
self.make_plan(
cluster
.replica_locator()
.unique_nodes_in_global_ring()
.to_vec(),
)
}

fn name(&self) -> String {
Expand All @@ -200,7 +205,9 @@ impl LoadBalancingPolicy for LatencyAwarePolicy {
}

fn update_cluster_data(&self, cluster_data: &ClusterData) {
self.refresh_last_min_avg_nodes(&cluster_data.all_nodes);
self.refresh_last_min_avg_nodes(
cluster_data.replica_locator().unique_nodes_in_global_ring(),
);
self.child_policy.update_cluster_data(cluster_data);
}
}
Expand Down Expand Up @@ -522,7 +529,7 @@ mod tests {
);

// Await last min average updater.
policy.refresh_last_min_avg_nodes(&cluster.all_nodes);
policy.refresh_last_min_avg_nodes(cluster.replica_locator().unique_nodes_in_global_ring());
tokio::time::sleep(policy.update_rate).await;

let plans = (0..16)
Expand Down Expand Up @@ -574,7 +581,7 @@ mod tests {
);

// Await last min average updater.
policy.refresh_last_min_avg_nodes(&cluster.all_nodes);
policy.refresh_last_min_avg_nodes(cluster.replica_locator().unique_nodes_in_global_ring());
tokio::time::sleep(policy.update_rate).await;

let plans = (0..16)
Expand Down Expand Up @@ -634,7 +641,9 @@ mod tests {
);

// Await last min average updater.
policy.refresh_last_min_avg_nodes(&cluster.all_nodes);
policy.refresh_last_min_avg_nodes(
cluster.replica_locator().unique_nodes_in_global_ring(),
);
tokio::time::sleep(policy.update_rate).await;

let plans = (0..16)
Expand Down
11 changes: 9 additions & 2 deletions scylla/src/transport/load_balancing/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ impl LoadBalancingPolicy for RoundRobinPolicy {
fn plan<'a>(&self, _statement: &Statement, cluster: &'a ClusterData) -> Plan<'a> {
let index = self.index.fetch_add(1, ORDER_TYPE);

let nodes_count = cluster.all_nodes.len();
let nodes_count = cluster
.replica_locator()
.unique_nodes_in_global_ring()
.len();
let rotation = super::compute_rotation(index, nodes_count);
let rotated_nodes = super::slice_rotated_left(&cluster.all_nodes, rotation).cloned();
let rotated_nodes = super::slice_rotated_left(
cluster.replica_locator().unique_nodes_in_global_ring(),
rotation,
)
.cloned();
trace!(
nodes = rotated_nodes
.clone()
Expand Down
20 changes: 15 additions & 5 deletions scylla/src/transport/load_balancing/token_aware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ impl TokenAwarePolicy {
.collect()
}

fn rack_count_in_dc(dc_name: &str, cluster: &ClusterData) -> Option<usize> {
let nodes_in_that_dc = cluster
.replica_locator()
.unique_nodes_in_datacenter_ring(dc_name)?;

let count = nodes_in_that_dc
.iter()
.map(|node| &node.rack)
.unique()
.count();

Some(count)
}

fn network_topology_strategy_replicas(
cluster: &ClusterData,
token: &Token,
Expand All @@ -39,11 +53,7 @@ impl TokenAwarePolicy {
let mut acceptable_repeats = datacenter_repfactors
.iter()
.map(|(dc_name, repfactor)| {
let rack_count = cluster
.datacenters
.get(dc_name)
.map(|dc| dc.rack_count)
.unwrap_or(0);
let rack_count = Self::rack_count_in_dc(dc_name, cluster).unwrap_or(0);

(dc_name.as_str(), repfactor.saturating_sub(rack_count))
})
Expand Down
2 changes: 1 addition & 1 deletion scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ impl Session {
Some(token) => {
TokenAwarePolicy::replicas_for_token(&cluster_data, &token, statement.keyspace)
}
None => cluster_data.all_nodes.clone(),
None => cluster_data.get_nodes_info().to_owned(),
}
}

Expand Down
4 changes: 2 additions & 2 deletions scylla/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ impl LoadBalancingPolicy for FixedOrderLoadBalancer {
Box::new(
cluster
.get_nodes_info()
.clone()
.into_iter()
.iter()
.cloned()
.sorted_by(|node1, node2| Ord::cmp(&node1.address, &node2.address)),
)
}
Expand Down

0 comments on commit 0db57c1

Please sign in to comment.