From a1c84de4b152527ee02ff6325fc7649e7084516a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Przytu=C5=82a?= Date: Tue, 10 Jan 2023 21:11:58 +0100 Subject: [PATCH] on_query_success/failure --- scylla/src/transport/iterator.rs | 24 ++++++++++++------- .../src/transport/load_balancing/default.rs | 17 +++++++++++++ scylla/src/transport/load_balancing/mod.rs | 14 ++++++++++- scylla/src/transport/session.rs | 13 +++++++--- scylla/tests/utils/mod.rs | 17 +++++++++++++ 5 files changed, 72 insertions(+), 13 deletions(-) diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index dd2c163135..59740af884 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -15,6 +15,7 @@ use tokio::sync::mpsc; use super::errors::QueryError; use crate::cql_to_rust::{FromRow, FromRowError}; +use crate::load_balancing::NodeRef; use crate::{load_balancing, retry_policy}; use crate::frame::types::LegacyConsistency; @@ -397,7 +398,7 @@ where trace!(parent: &span, "Execution started"); // Query pages until an error occurs let queries_result: Result = self - .query_pages(&connection, current_consistency) + .query_pages(&connection, current_consistency, node) .instrument(span.clone()) .await; @@ -484,6 +485,7 @@ where &mut self, connection: &Arc, consistency: Consistency, + node: NodeRef<'_>, ) -> Result { loop { self.metrics.inc_total_paged_queries(); @@ -498,13 +500,14 @@ where (self.page_query)(connection.clone(), consistency, self.paging_state.clone()) .await?; + let elapsed = query_start.elapsed(); match query_response.response { Response::Result(result::Result::Rows(mut rows)) => { - let _ = self - .metrics - .log_query_latency(query_start.elapsed().as_millis() as u64); + let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); self.log_attempt_success(); self.log_query_success(); + self.load_balancer + .on_query_success(&self.statement_info, elapsed, node); self.paging_state = rows.metadata.paging_state.take(); @@ -531,14 +534,17 @@ where } Response::Error(err) => { self.metrics.inc_failed_paged_queries(); - return Err(err.into()); + let err = err.into(); + self.load_balancer + .on_query_failure(&self.statement_info, elapsed, node, &err); + return Err(err); } _ => { self.metrics.inc_failed_paged_queries(); - - return Err(QueryError::ProtocolError( - "Unexpected response to next page query", - )); + let err = QueryError::ProtocolError("Unexpected response to next page query"); + self.load_balancer + .on_query_failure(&self.statement_info, elapsed, node, &err); + return Err(err); } } } diff --git a/scylla/src/transport/load_balancing/default.rs b/scylla/src/transport/load_balancing/default.rs index ef7fdf21ca..be1d1c46e0 100644 --- a/scylla/src/transport/load_balancing/default.rs +++ b/scylla/src/transport/load_balancing/default.rs @@ -124,6 +124,23 @@ impl LoadBalancingPolicy for DefaultPolicy { Box::new(plan) } + + fn on_query_success( + &self, + _statement: &QueryInfo, + _latency: std::time::Duration, + _node: NodeRef<'_>, + ) { + } + + fn on_query_failure( + &self, + _statement: &QueryInfo, + _latency: std::time::Duration, + _node: NodeRef<'_>, + _error: &scylla_cql::errors::QueryError, + ) { + } } impl DefaultPolicy { diff --git a/scylla/src/transport/load_balancing/mod.rs b/scylla/src/transport/load_balancing/mod.rs index bdb7fbeb46..c99bb4c720 100644 --- a/scylla/src/transport/load_balancing/mod.rs +++ b/scylla/src/transport/load_balancing/mod.rs @@ -5,7 +5,9 @@ use super::{cluster::ClusterData, node::NodeRef}; use crate::routing::Token; -use scylla_cql::frame::types; +use scylla_cql::{errors::QueryError, frame::types}; + +use std::{sync::Arc, time::Duration}; mod default; mod plan; @@ -40,4 +42,14 @@ pub trait LoadBalancingPolicy: Send + Sync + std::fmt::Debug { statement: &'a QueryInfo, cluster: &'a ClusterData, ) -> FallbackPlan<'a>; + + fn on_query_success(&self, statement: &QueryInfo, latency: Duration, node: NodeRef<'_>); + + fn on_query_failure( + &self, + statement: &QueryInfo, + latency: Duration, + node: NodeRef<'_>, + error: &QueryError, + ); } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 0305a25041..0fb2b3394e 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -5,6 +5,7 @@ use crate::frame::types::LegacyConsistency; use crate::history; use crate::history::HistoryListener; use crate::load_balancing; +use crate::load_balancing::QueryInfo; use crate::retry_policy; use crate::transport::NodeRef; use async_trait::async_trait; @@ -1420,6 +1421,7 @@ impl Session { consistency: statement_config.consistency, retry_session: retry_policy.new_session(), history_data, + query_info: statement_info.clone(), }, ) }; @@ -1453,6 +1455,7 @@ impl Session { consistency: statement_config.consistency, retry_session: retry_policy.new_session(), history_data, + query_info: statement_info.clone(), }, ) .await @@ -1539,13 +1542,14 @@ impl Session { .instrument(span.clone()) .await; + let elapsed = query_start.elapsed(); last_error = match query_result { Ok(response) => { trace!(parent: &span, "Query succeeded"); - let _ = self - .metrics - .log_query_latency(query_start.elapsed().as_millis() as u64); + let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64); context.log_attempt_success(&attempt_id); + self.load_balancer + .on_query_success(&context.query_info, elapsed, node); return Some(Ok(RunQueryResult::Completed(response))); } Err(e) => { @@ -1555,6 +1559,8 @@ impl Session { "Query failed" ); self.metrics.inc_failed_nonpaged_queries(); + self.load_balancer + .on_query_failure(&context.query_info, elapsed, node, &e); Some(e) } }; @@ -1741,6 +1747,7 @@ struct ExecuteQueryContext<'a> { consistency: Option, retry_session: Box, history_data: Option>, + query_info: QueryInfo<'a>, } struct HistoryData<'a> { diff --git a/scylla/tests/utils/mod.rs b/scylla/tests/utils/mod.rs index 47ff99eac2..2f06343335 100644 --- a/scylla/tests/utils/mod.rs +++ b/scylla/tests/utils/mod.rs @@ -44,6 +44,23 @@ impl LoadBalancingPolicy for FixedOrderLoadBalancer { .sorted_by(|node1, node2| Ord::cmp(&node1.address, &node2.address)), ) } + + fn on_query_success( + &self, + _: &scylla::load_balancing::QueryInfo, + _: std::time::Duration, + _: scylla::load_balancing::NodeRef<'_>, + ) { + } + + fn on_query_failure( + &self, + _: &scylla::load_balancing::QueryInfo, + _: std::time::Duration, + _: scylla::load_balancing::NodeRef<'_>, + _: &scylla_cql::errors::QueryError, + ) { + } } pub async fn test_with_3_node_cluster(