Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add new CommandRequest - Pipeline #2954

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

shohamazon
Copy link
Collaborator

Issue link

This Pull Request is linked to issue (URL): [REPLACE ME]

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Destination branch is correct - main or release
  • Create merge commit if merging release branch into main, squash otherwise.

shohamazon and others added 4 commits January 15, 2025 14:04
@ikolomi ikolomi self-requested a review January 16, 2025 10:53
@shohamazon shohamazon added Rust core redis-rs/glide-core matter Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. labels Jan 16, 2025
@shohamazon shohamazon marked this pull request as ready for review January 16, 2025 10:55
@shohamazon shohamazon requested a review from a team as a code owner January 16, 2025 10:55
pipeline: &'a redis::Pipeline,
) -> redis::RedisFuture<'a, Value> {
let command_count = pipeline.cmd_iter().count();
let _offset = command_count + 1; //TODO: check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve TODO

.push((index, inner_index));
}

async fn routes_pipeline_commands(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

find a better name and make it shorter

pipeline: &crate::Pipeline,
core: Core<C>,
) -> RedisResult<(
HashMap<String, (Pipeline, C, Vec<(usize, Option<usize>)>)>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

encapsulate in struct

match cluster_routing::RoutingInfo::for_routable(cmd) {
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
| None => {
if pipelines_by_connection.is_empty() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

} else {
// since the map is not empty, add the command to a random connection within the map.
let mut rng = rand::thread_rng();
let keys: Vec<_> = pipelines_by_connection.keys().cloned().collect();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to addresses

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think about way not to clone the addresses

Self::get_connection(route, core, None),
)
.await
if pipeline.is_atomic() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactor, make it readable (shorter)

)
.await
} else {
let (pipelines_by_connection, response_policies) =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment

Self::routes_pipeline_commands(&pipeline, core.clone())
.await
.map_err(|err| (OperationTarget::FanOut, err))?;
let mut values_and_addresses = vec![Vec::new(); pipeline.len()];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

command return values per pipeline?

for (index, routing_info, response_policy) in response_policies {
#[allow(clippy::type_complexity)]
// Safely access `values_and_addresses` for the current index
let response_receivers: Vec<(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use structs for complex types


// Collect final responses
for mut value in values_and_addresses.into_iter() {
assert_eq!(value.len(), 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont use asserts in prod code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use index 0 instead of poping and unwrapping

.map_err(|err| (OperationTarget::FanOut, err))?;

// Update `values_and_addresses` for the current index
values_and_addresses[index] = vec![(aggregated_response, "".to_string())];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use index 0 for storing aggregated_response

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets try to use pipeline for both transaction and pipeline, differentiating by is_atomic

@@ -501,6 +501,10 @@ message Transaction {
repeated Command commands = 1;
}

message Pipeline {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets remove Transaction and use Pipeline + is_atomic flag

Signed-off-by: Shoham Elias <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. Rust core redis-rs/glide-core matter
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants