Skip to content

Commit

Permalink
Some edits as we moved to update the connection map from the tokio ta…
Browse files Browse the repository at this point in the history
…sk itself.

Signed-off-by: GilboaAWS <[email protected]>
  • Loading branch information
GilboaAWS committed Feb 4, 2025
1 parent 883748c commit a8588c1
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ impl Drop for RefreshTaskState {
}
}

// This struct is used to track the status of each address refresh
// This struct is used to track the status of each address refresh state
// TODO move this struct logic into the connection_map itself
#[derive(Default)]
pub(crate) struct RefreshConnectionStates {
// Follow the refresh ops on the connections
Expand Down
23 changes: 1 addition & 22 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ where

if !addrs_to_refresh.is_empty() {
// don't try existing nodes since we know a. it does not exist. b. exist but its connection is closed
Self::refresh_and_update_connections(
Self::trigger_refresh_connection_tasks(
inner.clone(),
addrs_to_refresh,
RefreshConnectionType::AllConnections,
Expand Down Expand Up @@ -1432,13 +1432,6 @@ where
let address_clone = address.clone();
let address_clone_for_task = address.clone();

// let node_option = if check_existing_conn {
// let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR);
// connections_container.remove_node(&address)
// } else {
// None
// };

let mut node_option = inner
.conn_lock
.read()
Expand Down Expand Up @@ -1474,20 +1467,6 @@ where
)
.await;

// Maintain the newly refreshed connection separately from the main connection map.
// This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation.
// This approach ensures that all requests within the current batch interact with a consistent connection map,
// preventing potential reordering issues.
//
// By delaying the integration of the refreshed connection:
//
// 1. We maintain consistency throughout the processing of a batch of requests.
// 2. We avoid mid-batch changes to the connection map that could lead to inconsistent routing or ordering of operations.
// 3. We ensure that all requests in a batch see the same cluster topology, reducing the risk of race conditions or unexpected behavior.
//
// This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is
// updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information
// with the requirement for consistent request handling within each processing cycle.
match node_result {
Ok(node) => {
debug!(
Expand Down
2 changes: 1 addition & 1 deletion python/python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ async def glide_client(
protocol: ProtocolVersion,
) -> AsyncGenerator[TGlideClient, None]:
"Get async socket client for tests"
client = await create_client(request, cluster_mode, protocol=protocol)
client = await create_client(request, cluster_mode, protocol=protocol, request_timeout=10000)
yield client
await test_teardown(request, cluster_mode, protocol)
await client.close()
Expand Down

0 comments on commit a8588c1

Please sign in to comment.