Skip to content

Commit

Permalink
Add Session::first_shard_for_statement
Browse files Browse the repository at this point in the history
Resolves scylladb#468

This is a follow-up on scylladb#508 and scylladb#658:
- To minimize CPU usage related to network operations when inserting a very large number of lines, it is relevant to batch.
- To batch in the most efficient manner, these batches have to be shard-aware. Since scylladb#508, `batch` will pick the shard of the first statement to send the query to. However it is left to the user to constitute the batches in such a way that the target shard is the same for all the elements of the batch.
- This was made *possible* by scylladb#658, but it was still very boilerplate-ish. I was waiting for scylladb#612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it).
- This new ~`Session::first_shard_for_statement(self, &PreparedStatement, &SerializedValues) -> Option<(Node, Option<Shard>)>` makes shard-aware batching easy on the users, by providing access to the first node and shard of the query plan.
  • Loading branch information
Ten0 committed Jun 3, 2023
1 parent ad0d2f5 commit 738b1e1
Showing 1 changed file with 59 additions and 16 deletions.
75 changes: 59 additions & 16 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::frame::types::LegacyConsistency;
use crate::history;
use crate::history::HistoryListener;
use crate::retry_policy::RetryPolicy;
use crate::routing;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
Expand Down Expand Up @@ -898,15 +899,7 @@ impl Session {
.as_ref()
.map(|pk| prepared.get_partitioner_name().hash(pk));

let statement_info = RoutingInfo {
consistency: prepared
.get_consistency()
.unwrap_or(self.default_execution_profile_handle.access().consistency),
serial_consistency: prepared.get_serial_consistency(),
token,
keyspace: prepared.get_keyspace_name(),
is_confirmed_lwt: prepared.is_confirmed_lwt(),
};
let statement_info = self.routing_info(prepared, token);

let span =
RequestSpan::new_prepared(partition_key.as_ref(), token, serialized_values.size());
Expand Down Expand Up @@ -1814,13 +1807,63 @@ impl Session {
prepared: &PreparedStatement,
serialized_values: &SerializedValues,
) -> Result<Option<Token>, QueryError> {
match self.calculate_partition_key(prepared, serialized_values) {
Ok(Some(partition_key)) => {
let partitioner_name = prepared.get_partitioner_name();
Ok(Some(partitioner_name.hash(&partition_key)))
}
Ok(None) => Ok(None),
Err(err) => Err(err),
Ok(self
.calculate_partition_key(prepared, serialized_values)?
.map(|partition_key| prepared.get_partitioner_name().hash(&partition_key)))
}

/// Get the first node/shard that the load balancer would target if running this query
///
/// This may help constituting shard-aware batches
pub fn first_shard_for_statement(
&self,
prepared: &PreparedStatement,
serialized_values: &SerializedValues,
) -> Result<Option<(Arc<Node>, Option<routing::Shard>)>, QueryError> {
let token = match self.calculate_token(prepared, serialized_values)? {
Some(token) => token,
None => return Ok(None),
};
let routing_info = self.routing_info(prepared, Some(token));
let cluster_data = self.cluster.get_data();
let execution_profile = prepared
.config
.execution_profile_handle
.as_ref()
.unwrap_or_else(|| self.get_default_execution_profile_handle())
.access();
let mut query_plan = load_balancing::Plan::new(
&*execution_profile.load_balancing_policy,
&routing_info,
&cluster_data,
);
// We can't return the full iterator here because the iterator borrows from local variables.
// In order to achieve that, two designs would be possible:
// - Construct a self-referential struct and implement iterator on it via e.g. Ouroboros
// - Take a closure as a parameter that will take the local iterator and return anything, and
// this function would return directly what the closure returns
// Most likely though, people would use this for some kind of shard-awareness optimization for batching,
// and are consequently not interested in subsequent nodes.
// Until then, let's just expose this, as it is simpler
Ok(query_plan.next().map(move |node| {
let token = node.sharder().map(|sharder| sharder.shard_of(token));
(node.clone(), token)
}))
}

fn routing_info<'p>(
&self,
prepared: &'p PreparedStatement,
token: Option<Token>,
) -> RoutingInfo<'p> {
RoutingInfo {
consistency: prepared
.get_consistency()
.unwrap_or(self.default_execution_profile_handle.access().consistency),
serial_consistency: prepared.get_serial_consistency(),
token,
keyspace: prepared.get_keyspace_name(),
is_confirmed_lwt: prepared.is_confirmed_lwt(),
}
}

Expand Down

0 comments on commit 738b1e1

Please sign in to comment.