Skip to content

Commit

Permalink
on_query_success/failure
Browse files Browse the repository at this point in the history
  • Loading branch information
wprzytula authored and havaker committed Jan 22, 2023
1 parent 81eea08 commit a1c84de
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 13 deletions.
24 changes: 15 additions & 9 deletions scylla/src/transport/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::sync::mpsc;

use super::errors::QueryError;
use crate::cql_to_rust::{FromRow, FromRowError};
use crate::load_balancing::NodeRef;
use crate::{load_balancing, retry_policy};

use crate::frame::types::LegacyConsistency;
Expand Down Expand Up @@ -397,7 +398,7 @@ where
trace!(parent: &span, "Execution started");
// Query pages until an error occurs
let queries_result: Result<PageSendAttemptedProof, QueryError> = self
.query_pages(&connection, current_consistency)
.query_pages(&connection, current_consistency, node)
.instrument(span.clone())
.await;

Expand Down Expand Up @@ -484,6 +485,7 @@ where
&mut self,
connection: &Arc<Connection>,
consistency: Consistency,
node: NodeRef<'_>,
) -> Result<PageSendAttemptedProof, QueryError> {
loop {
self.metrics.inc_total_paged_queries();
Expand All @@ -498,13 +500,14 @@ where
(self.page_query)(connection.clone(), consistency, self.paging_state.clone())
.await?;

let elapsed = query_start.elapsed();
match query_response.response {
Response::Result(result::Result::Rows(mut rows)) => {
let _ = self
.metrics
.log_query_latency(query_start.elapsed().as_millis() as u64);
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
self.log_attempt_success();
self.log_query_success();
self.load_balancer
.on_query_success(&self.statement_info, elapsed, node);

self.paging_state = rows.metadata.paging_state.take();

Expand All @@ -531,14 +534,17 @@ where
}
Response::Error(err) => {
self.metrics.inc_failed_paged_queries();
return Err(err.into());
let err = err.into();
self.load_balancer
.on_query_failure(&self.statement_info, elapsed, node, &err);
return Err(err);
}
_ => {
self.metrics.inc_failed_paged_queries();

return Err(QueryError::ProtocolError(
"Unexpected response to next page query",
));
let err = QueryError::ProtocolError("Unexpected response to next page query");
self.load_balancer
.on_query_failure(&self.statement_info, elapsed, node, &err);
return Err(err);
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions scylla/src/transport/load_balancing/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,23 @@ impl LoadBalancingPolicy for DefaultPolicy {

Box::new(plan)
}

fn on_query_success(
&self,
_statement: &QueryInfo,
_latency: std::time::Duration,
_node: NodeRef<'_>,
) {
}

fn on_query_failure(
&self,
_statement: &QueryInfo,
_latency: std::time::Duration,
_node: NodeRef<'_>,
_error: &scylla_cql::errors::QueryError,
) {
}
}

impl DefaultPolicy {
Expand Down
14 changes: 13 additions & 1 deletion scylla/src/transport/load_balancing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
use super::{cluster::ClusterData, node::NodeRef};
use crate::routing::Token;
use scylla_cql::frame::types;
use scylla_cql::{errors::QueryError, frame::types};

use std::{sync::Arc, time::Duration};

mod default;
mod plan;
Expand Down Expand Up @@ -40,4 +42,14 @@ pub trait LoadBalancingPolicy: Send + Sync + std::fmt::Debug {
statement: &'a QueryInfo,
cluster: &'a ClusterData,
) -> FallbackPlan<'a>;

fn on_query_success(&self, statement: &QueryInfo, latency: Duration, node: NodeRef<'_>);

fn on_query_failure(
&self,
statement: &QueryInfo,
latency: Duration,
node: NodeRef<'_>,
error: &QueryError,
);
}
13 changes: 10 additions & 3 deletions scylla/src/transport/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::frame::types::LegacyConsistency;
use crate::history;
use crate::history::HistoryListener;
use crate::load_balancing;
use crate::load_balancing::QueryInfo;
use crate::retry_policy;
use crate::transport::NodeRef;
use async_trait::async_trait;
Expand Down Expand Up @@ -1420,6 +1421,7 @@ impl Session {
consistency: statement_config.consistency,
retry_session: retry_policy.new_session(),
history_data,
query_info: statement_info.clone(),
},
)
};
Expand Down Expand Up @@ -1453,6 +1455,7 @@ impl Session {
consistency: statement_config.consistency,
retry_session: retry_policy.new_session(),
history_data,
query_info: statement_info.clone(),
},
)
.await
Expand Down Expand Up @@ -1539,13 +1542,14 @@ impl Session {
.instrument(span.clone())
.await;

let elapsed = query_start.elapsed();
last_error = match query_result {
Ok(response) => {
trace!(parent: &span, "Query succeeded");
let _ = self
.metrics
.log_query_latency(query_start.elapsed().as_millis() as u64);
let _ = self.metrics.log_query_latency(elapsed.as_millis() as u64);
context.log_attempt_success(&attempt_id);
self.load_balancer
.on_query_success(&context.query_info, elapsed, node);
return Some(Ok(RunQueryResult::Completed(response)));
}
Err(e) => {
Expand All @@ -1555,6 +1559,8 @@ impl Session {
"Query failed"
);
self.metrics.inc_failed_nonpaged_queries();
self.load_balancer
.on_query_failure(&context.query_info, elapsed, node, &e);
Some(e)
}
};
Expand Down Expand Up @@ -1741,6 +1747,7 @@ struct ExecuteQueryContext<'a> {
consistency: Option<Consistency>,
retry_session: Box<dyn RetrySession>,
history_data: Option<HistoryData<'a>>,
query_info: QueryInfo<'a>,
}

struct HistoryData<'a> {
Expand Down
17 changes: 17 additions & 0 deletions scylla/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ impl LoadBalancingPolicy for FixedOrderLoadBalancer {
.sorted_by(|node1, node2| Ord::cmp(&node1.address, &node2.address)),
)
}

fn on_query_success(
&self,
_: &scylla::load_balancing::QueryInfo,
_: std::time::Duration,
_: scylla::load_balancing::NodeRef<'_>,
) {
}

fn on_query_failure(
&self,
_: &scylla::load_balancing::QueryInfo,
_: std::time::Duration,
_: scylla::load_balancing::NodeRef<'_>,
_: &scylla_cql::errors::QueryError,
) {
}
}

pub async fn test_with_3_node_cluster<F, Fut>(
Expand Down

0 comments on commit a1c84de

Please sign in to comment.