-
Notifications
You must be signed in to change notification settings - Fork 68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add new CommandRequest - Pipeline #2954
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
Signed-off-by: Shoham Elias <[email protected]>
pipeline: &'a redis::Pipeline, | ||
) -> redis::RedisFuture<'a, Value> { | ||
let command_count = pipeline.cmd_iter().count(); | ||
let _offset = command_count + 1; //TODO: check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolve TODO
.push((index, inner_index)); | ||
} | ||
|
||
async fn routes_pipeline_commands( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
find a better name and make it shorter
pipeline: &crate::Pipeline, | ||
core: Core<C>, | ||
) -> RedisResult<( | ||
HashMap<String, (Pipeline, C, Vec<(usize, Option<usize>)>)>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
encapsulate in struct
match cluster_routing::RoutingInfo::for_routable(cmd) { | ||
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) | ||
| None => { | ||
if pipelines_by_connection.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment
} else { | ||
// since the map is not empty, add the command to a random connection within the map. | ||
let mut rng = rand::thread_rng(); | ||
let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to addresses
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think about way not to clone the addresses
Self::get_connection(route, core, None), | ||
) | ||
.await | ||
if pipeline.is_atomic() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactor, make it readable (shorter)
) | ||
.await | ||
} else { | ||
let (pipelines_by_connection, response_policies) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment
Self::routes_pipeline_commands(&pipeline, core.clone()) | ||
.await | ||
.map_err(|err| (OperationTarget::FanOut, err))?; | ||
let mut values_and_addresses = vec![Vec::new(); pipeline.len()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
command return values per pipeline?
for (index, routing_info, response_policy) in response_policies { | ||
#[allow(clippy::type_complexity)] | ||
// Safely access `values_and_addresses` for the current index | ||
let response_receivers: Vec<( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use structs for complex types
|
||
// Collect final responses | ||
for mut value in values_and_addresses.into_iter() { | ||
assert_eq!(value.len(), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont use asserts in prod code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use index 0 instead of poping and unwrapping
.map_err(|err| (OperationTarget::FanOut, err))?; | ||
|
||
// Update `values_and_addresses` for the current index | ||
values_and_addresses[index] = vec![(aggregated_response, "".to_string())]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use index 0 for storing aggregated_response
glide-core/src/socket_listener.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets try to use pipeline for both transaction and pipeline, differentiating by is_atomic
@@ -501,6 +501,10 @@ message Transaction { | |||
repeated Command commands = 1; | |||
} | |||
|
|||
message Pipeline { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets remove Transaction and use Pipeline + is_atomic flag
Signed-off-by: Shoham Elias <[email protected]>
Issue link
This Pull Request is linked to issue (URL): [REPLACE ME]
Checklist
Before submitting the PR make sure the following are checked: