diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 3d61efce29..f5c481ea2c 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -23,6 +23,8 @@ //! ``` mod connections_container; +mod pipeline_routing; + mod connections_logic; /// Exposed only for testing. pub mod testing { @@ -41,6 +43,10 @@ use crate::{ FromRedisValue, InfoDict, }; use dashmap::DashMap; +use pipeline_routing::{ + collect_pipeline_requests, map_pipeline_to_nodes, process_pipeline_responses, + route_for_pipeline, PipelineResponses, +}; use std::{ collections::{HashMap, HashSet}, fmt, io, mem, @@ -285,6 +291,7 @@ where offset, count, route: route.into(), + sub_pipeline: false, }, sender, }) @@ -606,6 +613,7 @@ enum CmdArg { offset: usize, count: usize, route: InternalSingleNodeRouting, + sub_pipeline: bool, }, ClusterScan { // struct containing the arguments for the cluster scan command - scan state cursor, match pattern, count and object type. @@ -621,44 +629,6 @@ enum Operation { UpdateConnectionPassword(Option), } -fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { - fn route_for_command(cmd: &Cmd) -> Option { - match cluster_routing::RoutingInfo::for_routable(cmd) { - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::SpecificNode(route), - )) => Some(route), - Some(cluster_routing::RoutingInfo::SingleNode( - SingleNodeRoutingInfo::RandomPrimary, - )) => Some(Route::new_random_primary()), - Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, - Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { - .. - })) => None, - None => None, - } - } - - // Find first specific slot and send to it. There's no need to check If later commands - // should be routed to a different slot, since the server will return an error indicating this. - pipeline.cmd_iter().map(route_for_command).try_fold( - None, - |chosen_route, next_cmd_route| match (chosen_route, next_cmd_route) { - (None, _) => Ok(next_cmd_route), - (_, None) => Ok(chosen_route), - (Some(chosen_route), Some(next_cmd_route)) => { - if chosen_route.slot() != next_cmd_route.slot() { - Err((ErrorKind::CrossSlot, "Received crossed slots in pipeline").into()) - } else if chosen_route.slot_addr() == SlotAddr::ReplicaOptional { - Ok(Some(next_cmd_route)) - } else { - Ok(Some(chosen_route)) - } - } - }, - ) -} - fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> { Box::pin(tokio::time::sleep(duration)) } @@ -2147,15 +2117,88 @@ where offset, count, route, + sub_pipeline, } => { - Self::try_pipeline_request( - pipeline, - offset, - count, - Self::get_connection(route, core, None), - ) - .await + if pipeline.is_atomic() || sub_pipeline { + // If the pipeline is atomic (i.e., a transaction) or if the pipeline is already splitted into sub-pipelines, we can send it as is, with no need to split it into sub-pipelines. + Self::try_pipeline_request( + pipeline, + offset, + count, + Self::get_connection(route, core, None), + ) + .await + } else { + // The pipeline is not atomic and not already splitted, we need to split it into sub-pipelines and send them separately. + + // Distribute pipeline commands across cluster nodes based on routing information. + // Returns: + // - pipelines_by_connection: Map of node addresses to their pipeline contexts + // - response_policies: List of response aggregation policies for multi-node operations + let (pipelines_by_connection, response_policies) = + map_pipeline_to_nodes(&pipeline, core.clone()) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + // Initialize `PipelineResponses` to store responses for each pipeline command. + // This will be used to store the responses from the different sub-pipelines to the pipeline commands. + // A command can have one or more responses (e.g MultiNode commands). + // Each entry in `PipelineResponses` corresponds to a command in the original pipeline and contains + // a vector of tuples where each tuple holds a response to the command and the address of the node that provided it. + let mut pipeline_responses: PipelineResponses = + vec![Vec::new(); pipeline.len()]; + + let mut final_responses: Vec = Vec::with_capacity(pipeline.len()); + + // Processes the sub-pipelines to generate pending requests for execution on specific nodes. + // Each pending request encapsulates all the necessary details for executing commands on a node. + // + // Returns: + // - `receivers`: A vector of `oneshot::Receiver` instances, enabling asynchronous retrieval + // of the results from the execution of each sub-pipeline. + // - `pending_requests`: A vector of `PendingRequest` objects, each representing a scheduled command + // for execution on a node. + // - `addresses_and_indices`: A vector of tuples where each tuple contains a node address and a list + // of command indices for each sub-pipeline, allowing the results to be mapped back to their original command within the original pipeline. + let (receivers, pending_requests, addresses_and_indices) = + collect_pipeline_requests(pipelines_by_connection); + + // Add the pending requests to the pending_requests queue + core.pending_requests + .lock() + .unwrap() + .extend(pending_requests.into_iter()); + + // Wait for all receivers to complete and collect the responses + let responses: Vec<_> = futures::future::join_all(receivers.into_iter()) + .await + .into_iter() + .collect(); + + // Process the responses and update the pipeline_responses + process_pipeline_responses( + &mut pipeline_responses, + responses, + addresses_and_indices, + )?; + + // Process response policies after all tasks are complete + Self::aggregate_pipeline_multi_node_commands( + &mut pipeline_responses, + response_policies, + ) + .await?; + + // Collect final responses + for mut value in pipeline_responses.into_iter() { + // unwrap() is safe here because we know that the vector is not empty + final_responses.push(value.pop().unwrap().0); + } + + Ok(Response::Multiple(final_responses)) + } } + CmdArg::ClusterScan { cluster_scan_args, .. } => { @@ -2180,6 +2223,81 @@ where } } + /// Aggregates pipeline responses for multi-node commands and updates the `pipeline_responses` vector. + /// + /// Pipeline commands with multi-node routing info, will be splitted into multiple pipelines, therefore, after executing each pipeline and storing the results in `pipeline_responses`, + /// the multi-node commands will contain more than one response (one for each sub-pipeline that contained the command). This responses must be aggregated into a single response, based on the proper response policy. + /// + /// This function processes the provided `response_policies`, which contain information about how responses from multiple nodes should be aggregated. + /// For each policy: + /// - It collects the multiple responses and their source node addresses from the corresponding entry in `pipeline_responses`. + /// - Uses the routing information and optional response policy to aggregate the responses into a single result. + /// + /// The aggregated result replaces the existing entries in `pipeline_responses` for the given command index, changing the multiple responses to the command into a single aggregated response. + /// + /// After the execution of this function, all entries in `pipeline_responses` will contain a single response for each command. + /// + /// # Arguments + /// * `pipeline_responses` - A mutable reference to a vector of vectors, where each inner vector contains tuples of responses and their corresponding node addresses. + /// * `response_policies` - A vector of tuples, each containing: + /// - The index of the command in the pipeline that has a multi-node routing info. + /// - The routing information for the command. + /// - An optional response policy that dictates how the responses should be aggregated. + /// + /// # Returns + /// * `Result<(), (OperationTarget, RedisError)>` - Returns `Ok(())` if the aggregation is successful, or an error tuple containing the operation target and the Redis error if it fails. + /// + /// # Example + /// Suppose we have a pipeline with multiple commands that were split and executed on different nodes. + /// This function will aggregate the responses for commands that were split across multiple nodes. + /// + /// ```rust,compile_fail + /// // Example pipeline with commands that might be split across nodes + /// + /// let mut pipeline_responses = vec![ + /// vec![(Value::Int(1), "node1".to_string()), (Value::Int(2), "node2".to_string()), (Value::Int(3), "node3".to_string())], // represents `DBSIZE` + /// vec![(Value::Int(3), "node3".to_string())], + /// vec![(Value::SimpleString("PONG".to_string()), "node1".to_string()), (Value::SimpleString("PONG".to_string()), "node2".to_string()), (Value::SimpleString("PONG".to_string()), "node3".to_string())], // represents `PING` + /// ]; + /// let response_policies = vec![ + /// (0, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::Aggregate(AggregateOp::Sum))), + /// (2, MultipleNodeRoutingInfo::AllNodes, Some(ResponsePolicy::AllSucceeded)), + /// ]; + /// + /// // Aggregating the responses + /// aggregate_pipeline_multi_node_commands(&mut pipeline_responses, response_policies).await.unwrap(); + /// + /// // After aggregation, pipeline_responses will be updated with aggregated results + /// assert_eq!(pipeline_responses[0], vec![(Value::Int(6), "".to_string())]); + /// assert_eq!(pipeline_responses[1], vec![(Value::Int(3), "node3".to_string())]); + /// assert_eq!(pipeline_responses[2], vec![(Value::SimpleString("PONG".to_string()), "".to_string())]); + /// ``` + /// + /// This function is essential for handling multi-node commands in a Redis cluster, ensuring that responses from different nodes are correctly aggregated and processed. + async fn aggregate_pipeline_multi_node_commands( + pipeline_responses: &mut PipelineResponses, + response_policies: Vec<(usize, MultipleNodeRoutingInfo, Option)>, + ) -> Result<(), (OperationTarget, RedisError)> { + for (index, routing_info, response_policy) in response_policies { + let response_receivers = pipeline_responses[index] + .iter() + .map(|(value, address)| { + let (sender, receiver) = oneshot::channel(); + let _ = sender.send(Ok(Response::Single(value.clone()))); + (Some(address.clone()), receiver) + }) + .collect(); + + let aggregated_response = + Self::aggregate_results(response_receivers, &routing_info, response_policy) + .await + .map_err(|err| (OperationTarget::FanOut, err))?; + + pipeline_responses[index] = vec![(aggregated_response, "".to_string())]; + } + Ok(()) + } + async fn get_connection( routing: InternalSingleNodeRouting, core: Core, @@ -2821,7 +2939,7 @@ impl Connect for MultiplexedConnection { #[cfg(test)] mod pipeline_routing_tests { - use super::route_for_pipeline; + use super::pipeline_routing::route_for_pipeline; use crate::{ cluster_routing::{Route, SlotAddr}, cmd, @@ -2830,6 +2948,7 @@ mod pipeline_routing_tests { #[test] fn test_first_route_is_found() { let mut pipeline = crate::Pipeline::new(); + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters @@ -2845,7 +2964,7 @@ mod pipeline_routing_tests { #[test] fn test_return_none_if_no_route_is_found() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters .add_command(cmd("EVAL")); // route randomly @@ -2856,7 +2975,7 @@ mod pipeline_routing_tests { #[test] fn test_prefer_primary_route_over_replica() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .get("foo") // route to replica of slot 12182 .add_command(cmd("FLUSHALL")) // route to all masters @@ -2873,7 +2992,7 @@ mod pipeline_routing_tests { #[test] fn test_raise_cross_slot_error_on_conflicting_slots() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .add_command(cmd("FLUSHALL")) // route to all masters .set("baz", "bar") // route to slot 4813 @@ -2888,7 +3007,7 @@ mod pipeline_routing_tests { #[test] fn unkeyed_commands_dont_affect_route() { let mut pipeline = crate::Pipeline::new(); - + pipeline.atomic(); pipeline .set("{foo}bar", "baz") // route to primary of slot 12182 .cmd("CONFIG").arg("GET").arg("timeout") // unkeyed command diff --git a/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs new file mode 100644 index 0000000000..a864e34316 --- /dev/null +++ b/glide-core/redis-rs/redis/src/cluster_async/pipeline_routing.rs @@ -0,0 +1,458 @@ +use crate::aio::ConnectionLike; +use crate::cluster_async::ClusterConnInner; +use crate::cluster_async::Connect; +use crate::cluster_routing::{ + command_for_multi_slot_indices, MultipleNodeRoutingInfo, ResponsePolicy, SingleNodeRoutingInfo, +}; +use crate::{cluster_routing, RedisResult, Value}; +use crate::{cluster_routing::Route, Cmd, ErrorKind, RedisError}; +use std::collections::HashMap; +use std::sync::Arc; + +use crate::cluster_async::MUTEX_READ_ERR; +use crate::Pipeline; +use futures::FutureExt; +use rand::prelude::IteratorRandom; +use tokio::sync::oneshot; +use tokio::sync::oneshot::error::RecvError; + +use super::CmdArg; +use super::PendingRequest; +use super::RequestInfo; +use super::{Core, InternalSingleNodeRouting, OperationTarget, Response}; + +/// Represents a pipeline command execution context for a specific node +#[derive(Default)] +pub struct NodePipelineContext { + /// The pipeline of commands to be executed + pub pipeline: Pipeline, + /// The connection to the node + pub connection: C, + /// Vector of (command_index, inner_index) pairs tracking command order + /// command_index: Position in the original pipeline + /// inner_index: Optional sub-index for multi-node operations (e.g. MSET) + pub command_indices: Vec<(usize, Option)>, +} + +/// Maps node addresses to their pipeline execution contexts +pub type NodePipelineMap = HashMap>; + +impl NodePipelineContext { + fn new(connection: C) -> Self { + Self { + pipeline: Pipeline::new(), + connection, + command_indices: Vec::new(), + } + } + + // Adds a command to the pipeline and records its index + fn add_command(&mut self, cmd: Cmd, index: usize, inner_index: Option) { + self.pipeline.add_command(cmd); + self.command_indices.push((index, inner_index)); + } +} + +/// `NodeResponse` represents a response from a node along with its source node address. +type NodeResponse = (Value, String); +/// `PipelineResponses` represents the responses for each pipeline command. +/// The outer `Vec` represents the pipeline commands, and each inner `Vec` contains (response, address) pairs. +/// Since some commands can be executed across multiple nodes (e.g., multi-node commands), a single command +/// might produce multiple responses, each from a different node. By storing the responses with their +/// respective node addresses, we ensure that we have all the information needed to aggregate the results later. +pub type PipelineResponses = Vec>; + +/// `AddressAndIndices` represents the address of a node and the indices of commands associated with that node. +type AddressAndIndices = Vec<(String, Vec<(usize, Option)>)>; + +/// Adds a command to the pipeline map for a specific node address. +pub fn add_command_to_node_pipeline_map( + pipeline_map: &mut NodePipelineMap, + address: String, + connection: C, + cmd: Cmd, + index: usize, + inner_index: Option, +) { + pipeline_map + .entry(address) + .or_insert_with(|| NodePipelineContext::new(connection)) + .add_command(cmd, index, inner_index); +} + +/// Adds a command to a random existing node pipeline in the pipeline map +pub fn add_command_to_random_existing_node( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + index: usize, +) -> RedisResult<()> { + let mut rng = rand::thread_rng(); + if let Some(node_context) = pipeline_map.values_mut().choose(&mut rng) { + node_context.add_command(cmd, index, None); + Ok(()) + } else { + Err(RedisError::from((ErrorKind::IoError, "No nodes available"))) + } +} + +/// Maps the commands in a pipeline to the appropriate nodes based on their routing information. +/// +/// This function processes each command in the given pipeline, determines its routing information, +/// and organizes it into a map of node pipelines. It handles both single-node and multi-node routing +/// strategies and ensures that the commands are distributed accordingly. +/// +/// It also collects response policies for multi-node routing and returns them along with the pipeline map. +/// This is to ensure we can aggregate responses from properly from the different nodes. +/// +/// # Arguments +/// +/// * `pipeline` - A reference to the pipeline containing the commands to route. +/// * `core` - The core object that provides access to connection locks and other resources. +/// +/// # Returns +/// +/// A `RedisResult` containing a tuple: +/// +/// - A `NodePipelineMap` where commands are grouped by their corresponding nodes (as pipelines). +/// - A `Vec<(usize, MultipleNodeRoutingInfo, Option)>` containing the routing information +/// and response policies for multi-node commands, along with the index of the command in the pipeline, for aggregating the responses later. +pub async fn map_pipeline_to_nodes( + pipeline: &crate::Pipeline, + core: Core, +) -> RedisResult<( + NodePipelineMap, + Vec<(usize, MultipleNodeRoutingInfo, Option)>, +)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let mut pipelines_by_connection = NodePipelineMap::new(); + let mut response_policies = Vec::new(); + + for (index, cmd) in pipeline.cmd_iter().enumerate() { + match cluster_routing::RoutingInfo::for_routable(cmd).unwrap_or( + cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random), + ) { + cluster_routing::RoutingInfo::SingleNode(route) => { + handle_pipeline_single_node_routing( + &mut pipelines_by_connection, + cmd.clone(), + route.into(), + core.clone(), + index, + ) + .await + .map_err(|(_target, err)| err)?; + } + + cluster_routing::RoutingInfo::MultiNode((multi_node_routing, response_policy)) => { + //save the routing info and response policy, so we will be able to aggregate the results later + response_policies.push((index, multi_node_routing.clone(), response_policy)); + match multi_node_routing { + MultipleNodeRoutingInfo::AllNodes | MultipleNodeRoutingInfo::AllMasters => { + let connections: Vec<_> = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + if matches!(multi_node_routing, MultipleNodeRoutingInfo::AllNodes) { + lock.all_node_connections().collect() + } else { + lock.all_primary_connections().collect() + } + }; + for (inner_index, (address, conn)) in connections.into_iter().enumerate() { + add_command_to_node_pipeline_map( + &mut pipelines_by_connection, + address, + conn.await, + cmd.clone(), + index, + Some(inner_index), + ); + } + } + MultipleNodeRoutingInfo::MultiSlot((slots, _)) => { + handle_pipeline_multi_slot_routing( + &mut pipelines_by_connection, + core.clone(), + cmd, + index, + slots, + ) + .await; + } + } + } + } + } + Ok((pipelines_by_connection, response_policies)) +} + +/// Handles pipeline commands that require single-node routing. +/// +/// This function processes commands with `SingleNode` routing information and determines +/// the appropriate handling based on the routing type. +/// +/// ### Parameters: +/// - `pipeline_map`: A mutable reference to the `NodePipelineMap`, representing the pipelines grouped by nodes. +/// - `cmd`: The command to process and add to the appropriate node pipeline. +/// - `routing`: The single-node routing information, which determines how the command is routed. +/// - `core`: The core object responsible for managing connections and routing logic. +/// - `index`: The position of the command in the overall pipeline. +pub async fn handle_pipeline_single_node_routing( + pipeline_map: &mut NodePipelineMap, + cmd: Cmd, + routing: InternalSingleNodeRouting, + core: Core, + index: usize, +) -> Result<(), (OperationTarget, RedisError)> +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + if matches!(routing, InternalSingleNodeRouting::Random) && !pipeline_map.is_empty() { + // The routing info is to a random node, and we already have sub-pipelines within our pipelines map, so just add it to a random sub-pipeline + add_command_to_random_existing_node(pipeline_map, cmd, index) + .map_err(|err| (OperationTarget::NotFound, err))?; + Ok(()) + } else { + let (address, conn) = + ClusterConnInner::get_connection(routing, core, Some(Arc::new(cmd.clone()))) + .await + .map_err(|err| (OperationTarget::NotFound, err))?; + add_command_to_node_pipeline_map(pipeline_map, address, conn, cmd, index, None); + Ok(()) + } +} + +/// Handles multi-slot commands within a pipeline. +/// +/// This function processes commands with routing information indicating multiple slots +/// (e.g., `MSET` or `MGET`), splits them into sub-commands based on their target slots, +/// and assigns these sub-commands to the appropriate pipelines for the corresponding nodes. +/// +/// ### Parameters: +/// - `pipelines_by_connection`: A mutable map of node pipelines where the commands will be added. +/// - `core`: The core structure that provides access to connection management. +/// - `cmd`: The original multi-slot command that needs to be split. +/// - `index`: The index of the original command within the pipeline. +/// - `slots`: A vector containing routing information. Each entry includes: +/// - `Route`: The specific route for the slot. +/// - `Vec`: Indices of the keys within the command that map to this slot. +pub async fn handle_pipeline_multi_slot_routing( + pipelines_by_connection: &mut NodePipelineMap, + core: Core, + cmd: &Cmd, + index: usize, + slots: Vec<(Route, Vec)>, +) where + C: Clone, +{ + // inner_index is used to keep track of the index of the sub-command inside cmd + for (inner_index, (route, indices)) in slots.iter().enumerate() { + let conn = { + let lock = core.conn_lock.read().expect(MUTEX_READ_ERR); + lock.connection_for_route(route) + }; + if let Some((address, conn)) = conn { + // create the sub-command for the slot + let new_cmd = command_for_multi_slot_indices(cmd, indices.iter()); + add_command_to_node_pipeline_map( + pipelines_by_connection, + address, + conn.await, + new_cmd, + index, + Some(inner_index), + ); + } + } +} + +/// Creates `PendingRequest` objects for each pipeline in the provided pipeline map. +/// +/// This function processes the given map of node pipelines and prepares each sub-pipeline for execution +/// by creating a `PendingRequest` containing all necessary details for execution. +/// Additionally, it sets up communication channels to asynchronously receive the results of each sub-pipeline's execution. +/// +/// Returns a tuple containing: +/// - **receivers**: A vector of `oneshot::Receiver` objects to receive the responses of the sub-pipeline executions. +/// - **pending_requests**: A vector of `PendingRequest` objects, each representing a pipeline scheduled for execution on a node. +/// - **addresses_and_indices**: A vector of tuples containing node addresses and their associated command indices for each sub-pipeline, +/// allowing the results to be mapped back to their original command within the original pipeline. +#[allow(clippy::type_complexity)] +pub fn collect_pipeline_requests( + pipelines_by_connection: NodePipelineMap, +) -> ( + Vec>>, + Vec>, + Vec<(String, Vec<(usize, Option)>)>, +) +where + C: Clone + ConnectionLike + Connect + Send + Sync + 'static, +{ + let mut receivers = Vec::new(); + let mut pending_requests = Vec::new(); + let mut addresses_and_indices = Vec::new(); + + for (address, context) in pipelines_by_connection { + // Create a channel to receive the pipeline execution results + let (sender, receiver) = oneshot::channel(); + // Add the receiver to the list of receivers + receivers.push(receiver); + pending_requests.push(PendingRequest { + retry: 0, + sender, + info: RequestInfo { + cmd: CmdArg::Pipeline { + count: context.pipeline.len(), + pipeline: context.pipeline.into(), + offset: 0, + route: InternalSingleNodeRouting::Connection { + address: address.clone(), + conn: async { context.connection }.boxed().shared(), + }, + // mark it as a sub-pipeline mode + sub_pipeline: true, + }, + }, + }); + // Record the node address and its associated command indices for result mapping + addresses_and_indices.push((address, context.command_indices)); + } + + (receivers, pending_requests, addresses_and_indices) +} + +/// Adds the result of a pipeline command to the `pipeline_responses` collection. +/// +/// This function updates the `pipeline_responses` vector at the given `index` and optionally at the +/// `inner_index` if provided. If `inner_index` is `Some`, it ensures that the vector at that index is large enough +/// to hold the value and address at the specified position, resizing it if necessary. If `inner_index` is `None`, +/// the value and address are simply appended to the vector. +/// +/// # Parameters +/// - `pipeline_responses`: A mutable reference to a vector of vectors that stores the results of pipeline commands. +/// - `index`: The index in `pipeline_responses` where the result should be stored. +/// - `inner_index`: An optional index within the vector at `index`, used to store the result at a specific position. +/// - `value`: The result value to store. +/// - `address`: The address associated with the result. +pub fn add_pipeline_result( + pipeline_responses: &mut PipelineResponses, + index: usize, + inner_index: Option, + value: Value, + address: String, +) { + match inner_index { + Some(inner_index) => { + // Ensure the vector at the given index is large enough to hold the value and address at the specified position + if pipeline_responses[index].len() <= inner_index { + pipeline_responses[index].resize(inner_index + 1, (Value::Nil, "".to_string())); + } + pipeline_responses[index][inner_index] = (value, address); + } + None => pipeline_responses[index].push((value, address)), + } +} + +/// Processes the responses of pipeline commands and updates the given `pipeline_responses` +/// with the corresponding results. +/// +/// The function iterates over the responses along with the `addresses_and_indices` list, +/// ensuring that each response is added to its appropriate position in `pipeline_responses` along with the associated address. +/// If any response indicates an error, the function terminates early and returns the first encountered error. +/// +/// # Parameters +/// +/// - `pipeline_responses`: A vec that holds the original pipeline commands responses. +/// - `responses`: A list of responses corresponding to each sub-pipeline. +/// - `addresses_and_indices`: A list of (address, indices) pairs indicating where each response should be placed. +/// +/// # Returns +/// +/// - `Ok(())` if all responses are processed successfully. +/// - `Err((OperationTarget, RedisError))` if a node-level or reception error occurs. +pub fn process_pipeline_responses( + pipeline_responses: &mut PipelineResponses, + responses: Vec, RecvError>>, + addresses_and_indices: AddressAndIndices, +) -> Result<(), (OperationTarget, RedisError)> { + for ((address, command_indices), response_result) in + addresses_and_indices.into_iter().zip(responses) + { + match response_result { + Ok(Ok(Response::Multiple(values))) => { + // Add each response to the pipeline_responses vector at the appropriate index + for ((index, inner_index), value) in command_indices.into_iter().zip(values) { + add_pipeline_result( + pipeline_responses, + index, + inner_index, + value, + address.clone(), + ); + } + } + Ok(Err(err)) => { + return Err((OperationTarget::Node { address }, err)); + } + _ => { + return Err(( + OperationTarget::Node { address }, + RedisError::from((ErrorKind::ResponseError, "Failed to receive response")), + )); + } + } + } + Ok(()) +} + +/// This function returns the route for a given pipeline. +/// The function goes over the commands in the pipeline, checks that all key-based commands are routed to the same slot, +/// and returns the route for that specific node. +/// If the pipeline contains no key-based commands, the function returns None. +/// For non-atomic pipelines, the function will return None, regardless of the commands in it. +pub fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> { + fn route_for_command(cmd: &Cmd) -> Option { + match cluster_routing::RoutingInfo::for_routable(cmd) { + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => None, + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(route), + )) => Some(route), + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) => Some(Route::new_random_primary()), + Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, + Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { + .. + })) => None, + None => None, + } + } + + if pipeline.is_atomic() { + // Find first specific slot and send to it. There's no need to check If later commands + // should be routed to a different slot, since the server will return an error indicating this. + pipeline + .cmd_iter() + .map(route_for_command) + .try_fold(None, |chosen_route, next_cmd_route| { + match (chosen_route, next_cmd_route) { + (None, _) => Ok(next_cmd_route), + (_, None) => Ok(chosen_route), + (Some(chosen_route), Some(next_cmd_route)) => { + if chosen_route.slot() != next_cmd_route.slot() { + Err(( + ErrorKind::CrossSlot, + "Received crossed slots in transaction", + ) + .into()) + } else { + Ok(Some(chosen_route)) + } + } + } + }) + } else { + // Pipeline is not atomic, so we can have commands with different slots. + Ok(None) + } +} diff --git a/glide-core/redis-rs/redis/src/pipeline.rs b/glide-core/redis-rs/redis/src/pipeline.rs index babb57a1ff..813961156a 100644 --- a/glide-core/redis-rs/redis/src/pipeline.rs +++ b/glide-core/redis-rs/redis/src/pipeline.rs @@ -203,6 +203,24 @@ impl Pipeline { pub fn execute(&self, con: &mut dyn ConnectionLike) { self.query::<()>(con).unwrap(); } + + /// Returns whether the pipeline is in transaction mode (atomic). + /// + /// When in transaction mode, all commands in the pipeline are executed + /// as a single atomic operation. + pub fn is_atomic(&self) -> bool { + self.transaction_mode + } + + /// Returns the number of commands in the pipeline. + pub fn len(&self) -> usize { + self.commands.len() + } + + /// Returns `true` if the pipeline contains no commands. + pub fn is_empty(&self) -> bool { + self.commands.is_empty() + } } fn encode_pipeline(cmds: &[Cmd], atomic: bool) -> Vec { diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 005a38a9ca..cc3b01413a 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -365,10 +365,10 @@ impl Client { .into()); } }; - Self::convert_transaction_values_to_expected_types(pipeline, values, command_count) + Self::convert_pipeline_values_to_expected_types(pipeline, values, command_count) } - fn convert_transaction_values_to_expected_types( + fn convert_pipeline_values_to_expected_types( pipeline: &redis::Pipeline, values: Vec, command_count: usize, @@ -413,6 +413,29 @@ impl Client { .boxed() } + pub fn send_pipeline<'a>( + &'a mut self, + pipeline: &'a redis::Pipeline, + ) -> redis::RedisFuture<'a, Value> { + let command_count = pipeline.cmd_iter().count(); + let _offset = command_count + 1; //TODO: check + + run_with_timeout(Some(self.request_timeout), async move { + let values = match self.internal_client { + ClientWrapper::Standalone(ref mut client) => { + client.send_pipeline(pipeline, 0, command_count).await + } + + ClientWrapper::Cluster { ref mut client } => { + client.req_packed_commands(pipeline, 0, command_count).await + } + }?; + + Self::convert_pipeline_values_to_expected_types(pipeline, values, command_count) + }) + .boxed() + } + pub async fn invoke_script<'a>( &'a mut self, hash: &'a str, diff --git a/glide-core/src/protobuf/command_request.proto b/glide-core/src/protobuf/command_request.proto index d7c693cfd6..8b7887bc2d 100644 --- a/glide-core/src/protobuf/command_request.proto +++ b/glide-core/src/protobuf/command_request.proto @@ -501,6 +501,10 @@ message Transaction { repeated Command commands = 1; } +message Pipeline { + repeated Command commands = 1; +} + message ClusterScan { string cursor = 1; optional bytes match_pattern = 2; @@ -524,6 +528,7 @@ message CommandRequest { ScriptInvocationPointers script_invocation_pointers = 5; ClusterScan cluster_scan = 6; UpdateConnectionPassword update_connection_password = 7; + Pipeline pipeline = 8; } - Routes route = 8; + Routes route = 9; } diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 0b034e48c3..b63a0968fc 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -4,7 +4,8 @@ use super::rotating_buffer::RotatingBuffer; use crate::client::Client; use crate::cluster_scan_container::get_cluster_scan_cursor; use crate::command_request::{ - command, command_request, ClusterScan, Command, CommandRequest, Routes, SlotTypes, Transaction, + command, command_request, ClusterScan, Command, CommandRequest, Pipeline, Routes, SlotTypes, + Transaction, }; use crate::connection_request::ConnectionRequest; use crate::errors::{error_message, error_type, RequestErrorType}; @@ -388,6 +389,18 @@ async fn send_transaction( .map_err(|err| err.into()) } +async fn send_pipeline(request: Pipeline, client: &mut Client) -> ClientUsageResult { + let mut pipeline = redis::Pipeline::with_capacity(request.commands.capacity()); + for command in request.commands { + pipeline.add_command(get_redis_command(&command)?); + } + + client + .send_pipeline(&pipeline) + .await + .map_err(|err| err.into()) +} + fn get_slot_addr(slot_type: &protobuf::EnumOrUnknown) -> ClientUsageResult { slot_type .enum_value() @@ -491,6 +504,9 @@ fn handle_request(request: CommandRequest, mut client: Client, writer: Rc Err(e), } } + command_request::Command::Pipeline(pipeline) => { + send_pipeline(pipeline, &mut client).await + } command_request::Command::ScriptInvocation(script) => { match get_route(request.route.0, None) { Ok(routes) => { diff --git a/glide-core/tests/test_socket_listener.rs b/glide-core/tests/test_socket_listener.rs index 5921236e36..6a897e1cae 100644 --- a/glide-core/tests/test_socket_listener.rs +++ b/glide-core/tests/test_socket_listener.rs @@ -21,7 +21,7 @@ mod socket_listener { use crate::utilities::mocks::{Mock, ServerMock}; use super::*; - use command_request::{CommandRequest, RequestType}; + use command_request::{CommandRequest, Pipeline, RequestType}; use glide_core::command_request::command::{Args, ArgsArray}; use glide_core::command_request::{Command, Transaction}; use glide_core::response::{response, ConstantResponse, Response}; @@ -309,6 +309,28 @@ mod socket_listener { write_request(buffer, socket, request); } + fn write_pipeline_request( + buffer: &mut Vec, + socket: &mut UnixStream, + callback_index: u32, + commands_components: Vec, + ) { + let mut request = CommandRequest::new(); + request.callback_idx = callback_index; + let mut pipeline = Pipeline::new(); + pipeline.commands.reserve(commands_components.len()); + + for components in commands_components { + pipeline.commands.push(get_command(components)); + } + + request.command = Some(command_request::command_request::Command::Pipeline( + pipeline, + )); + + write_request(buffer, socket, request); + } + fn write_get( buffer: &mut Vec, socket: &mut UnixStream, @@ -1211,6 +1233,128 @@ mod socket_listener { ); } + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_send_pipeline_and_get_array_of_results( + #[values(RedisType::Cluster, RedisType::Standalone)] use_cluster: RedisType, + ) { + let test_basics = setup_test_basics(Tls::NoTls, TestServer::Shared, use_cluster); + let mut socket = test_basics + .socket + .try_clone() + .expect("Failed to clone socket"); + + const CALLBACK_INDEX: u32 = 0; + let key = generate_random_string(KEY_LENGTH); + let key2 = generate_random_string(KEY_LENGTH); + + let commands = vec![ + CommandComponents { + args: vec![key.clone().into(), "bar".to_string().into()], + args_pointer: true, + request_type: RequestType::Set.into(), + }, + CommandComponents { + args: vec!["GET".to_string().into(), key.clone().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.clone().into(), key2.into()], + args_pointer: false, + request_type: RequestType::MGet.into(), + }, + CommandComponents { + args: vec!["FLUSHALL".to_string().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.into()], + args_pointer: false, + request_type: RequestType::Get.into(), + }, + CommandComponents { + args: vec!["HELLO".into()], + args_pointer: false, + request_type: RequestType::Ping.into(), + }, + ]; + let mut buffer = Vec::with_capacity(200); + write_pipeline_request(&mut buffer, &mut socket, CALLBACK_INDEX, commands); + + assert_value_response( + &mut buffer, + Some(&mut socket), + CALLBACK_INDEX, + Value::Array(vec![ + Value::Okay, + Value::BulkString(vec![b'b', b'a', b'r']), + Value::Array(vec![Value::BulkString(vec![b'b', b'a', b'r']), Value::Nil]), + Value::Okay, + Value::Nil, + Value::BulkString(vec![b'H', b'E', b'L', b'L', b'O']), + ]), + ); + } + + #[rstest] + #[serial_test::serial] + #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] + fn test_send_pipeline_and_get_error( + #[values(RedisType::Cluster, RedisType::Standalone)] use_cluster: RedisType, + ) { + let mut test_basics = setup_test_basics(Tls::NoTls, TestServer::Shared, use_cluster); + let mut socket = test_basics + .socket + .try_clone() + .expect("Failed to clone socket"); + + const CALLBACK_INDEX: u32 = 0; + let key = generate_random_string(KEY_LENGTH); + let commands = vec![ + CommandComponents { + args: vec![key.clone().into(), "bar".to_string().into()], + args_pointer: true, + request_type: RequestType::Set.into(), + }, + CommandComponents { + args: vec!["GET".to_string().into(), key.clone().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.clone().into(), "random_key".into()], + args_pointer: false, + request_type: RequestType::MGet.into(), + }, + CommandComponents { + args: vec![key.clone().into()], + args_pointer: false, + request_type: RequestType::LLen.into(), + }, + CommandComponents { + args: vec!["FLUSHALL".to_string().into()], + args_pointer: false, + request_type: RequestType::CustomCommand.into(), + }, + CommandComponents { + args: vec![key.into()], + args_pointer: false, + request_type: RequestType::Get.into(), + }, + ]; + let mut buffer = Vec::with_capacity(200); + write_pipeline_request(&mut buffer, &mut socket, CALLBACK_INDEX, commands); + + assert_error_response( + &mut buffer, + &mut test_basics.socket, + CALLBACK_INDEX, + ResponseType::RequestError, + ); + } #[rstest] #[serial_test::serial] #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)]