Skip to content

Commit

Permalink
Revert old LatencyAware load balancing
Browse files Browse the repository at this point in the history
This reverts commits: 4266ec2, 4708f84, 30dc069, 009f26e.
  • Loading branch information
wprzytula authored and havaker committed Jan 19, 2023
1 parent 494e2c6 commit bc63656
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 90 deletions.
50 changes: 20 additions & 30 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ use tokio::sync::mpsc;

use super::errors::QueryError;
use crate::cql_to_rust::{FromRow, FromRowError};
use crate::{load_balancing, retry_policy, Session};
use crate::{load_balancing, retry_policy};

use crate::frame::types::LegacyConsistency;
use crate::frame::{
response::{
result,
result::{ColumnSpec, Row, Rows},
NonErrorResponse,
Response,
},
value::SerializedValues,
};
Expand All @@ -31,10 +31,10 @@ use crate::routing::Token;
use crate::statement::Consistency;
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
use crate::transport::cluster::ClusterData;
use crate::transport::connection::{Connection, NonErrorQueryResponse, QueryResponse};
use crate::transport::connection::{Connection, QueryResponse};
use crate::transport::load_balancing::LoadBalancingPolicy;
use crate::transport::metrics::Metrics;
use crate::transport::node::{Node, TimestampedAverage};
use crate::transport::node::Node;
use crate::transport::retry_policy::{RetryDecision, RetrySession};
use tracing::{trace, trace_span, warn, Instrument};
use uuid::Uuid;
Expand Down Expand Up @@ -397,7 +397,7 @@ where
trace!(parent: &span, "Execution started");
// Query pages until an error occurs
let queries_result: Result<PageSendAttemptedProof, QueryError> = self
.query_pages(&connection, current_consistency, node)
.query_pages(&connection, current_consistency)
.instrument(span.clone())
.await;

Expand Down Expand Up @@ -484,7 +484,6 @@ where
&mut self,
connection: &Arc<Connection>,
consistency: Consistency,
node: &Node,
) -> Result<PageSendAttemptedProof, QueryError> {
loop {
self.metrics.inc_total_paged_queries();
Expand All @@ -495,33 +494,24 @@ where
"Sending"
);
self.log_attempt_start(connection.get_connect_address());
let query_response =
let query_response: QueryResponse =
(self.page_query)(connection.clone(), consistency, self.paging_state.clone())
.await?
.into_non_error_query_response();

let elapsed = query_start.elapsed();
if Session::should_consider_query_for_latency_measurements(
&*self.load_balancer,
&query_response,
) {
let mut average_latency_guard = node.average_latency.write().unwrap();
*average_latency_guard =
TimestampedAverage::compute_next(*average_latency_guard, elapsed);
}
match query_response {
Ok(NonErrorQueryResponse {
response: NonErrorResponse::Result(result::Result::Rows(mut rows)),
tracing_id,
..
}) => {
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
.await?;

match query_response.response {
Response::Result(result::Result::Rows(mut rows)) => {
let _ = self
.metrics
.log_query_latency(query_start.elapsed().as_millis() as u64);
self.log_attempt_success();
self.log_query_success();

self.paging_state = rows.metadata.paging_state.take();

let received_page = ReceivedPage { rows, tracing_id };
let received_page = ReceivedPage {
rows,
tracing_id: query_response.tracing_id,
};

// Send next page to RowIterator
let (proof, res) = self.sender.send(Ok(received_page)).await;
Expand All @@ -539,11 +529,11 @@ where
self.retry_session.reset();
self.log_query_start();
}
Err(err) => {
Response::Error(err) => {
self.metrics.inc_failed_paged_queries();
return Err(err);
return Err(err.into());
}
Ok(_) => {
_ => {
self.metrics.inc_failed_paged_queries();

return Err(QueryError::ProtocolError(
Expand Down
42 changes: 1 addition & 41 deletions scylla/src/transport/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,10 @@ use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Arc,
},
time::{Duration, Instant},
};

#[derive(Debug, Clone, Copy)]
pub struct TimestampedAverage {
pub timestamp: Instant,
pub average: Duration,
pub num_measures: usize,
}

impl TimestampedAverage {
pub(crate) fn compute_next(previous: Option<Self>, last_latency: Duration) -> Option<Self> {
let now = Instant::now();
match previous {
prev if last_latency.is_zero() => prev,
None => Some(Self {
num_measures: 1,
average: last_latency,
timestamp: now,
}),
Some(prev_avg) => Some({
let delay = (now - prev_avg.timestamp).as_secs_f64();
let prev_weight = (delay + 1.).ln() / delay;
let last_latency_nanos = last_latency.as_nanos() as f64;
let prev_avg_nanos = prev_avg.average.as_nanos() as f64;
let average = Duration::from_nanos(
((1. - prev_weight) * last_latency_nanos + prev_weight * prev_avg_nanos).round()
as u64,
);
Self {
num_measures: prev_avg.num_measures + 1,
timestamp: now,
average,
}
}),
}
}
}

/// Node represents a cluster node along with it's data and connections
///
/// Note: if a Node changes its address (the optionally translated address),
Expand All @@ -65,8 +28,6 @@ pub struct Node {
pub datacenter: Option<String>,
pub rack: Option<String>,

pub average_latency: RwLock<Option<TimestampedAverage>>,

// If the node is filtered out by the host filter, this will be None
pool: Option<NodeConnectionPool>,

Expand Down Expand Up @@ -101,7 +62,6 @@ impl Node {
rack,
pool,
down_marker: false.into(),
average_latency: RwLock::new(None),
}
}

Expand Down
22 changes: 3 additions & 19 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::history;
use crate::history::HistoryListener;
use crate::load_balancing;
use crate::retry_policy;
use crate::transport::node::TimestampedAverage;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::join_all;
Expand Down Expand Up @@ -1321,14 +1320,6 @@ impl Session {
}
}

pub fn should_consider_query_for_latency_measurements(
_load_balancer: &dyn LoadBalancingPolicy,
_result: &Result<impl AllowedRunQueryResTType, QueryError>,
) -> bool {
// TODO(havaker)
false
}

// This method allows to easily run a query using load balancing, retry policy etc.
// Requires some information about the query and two closures
// First closure is used to choose a connection
Expand Down Expand Up @@ -1547,19 +1538,12 @@ impl Session {
.instrument(span.clone())
.await;

let elapsed = query_start.elapsed();
if Self::should_consider_query_for_latency_measurements(
&*self.load_balancer,
&query_result,
) {
let mut average_latency_guard = node.average_latency.write().unwrap();
*average_latency_guard =
TimestampedAverage::compute_next(*average_latency_guard, elapsed);
}
last_error = match query_result {
Ok(response) => {
trace!(parent: &span, "Query succeeded");
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
let _ = self
.metrics
.log_query_latency(query_start.elapsed().as_millis() as u64);
context.log_attempt_success(&attempt_id);
return Some(Ok(RunQueryResult::Completed(response)));
}
Expand Down

0 comments on commit bc63656

Please sign in to comment.