Skip to content

Commit

Permalink
refactor: add request id in context (#1153)
Browse files Browse the repository at this point in the history
## Rationale
Currently, when query failed, the error log doesn't contain request id,
this makes it's very hard to tell how long the error query takes.

## Detailed Changes
- Add `request_id` in context, and print this ctx when query failed.
- Remove `enable_partition_table_access`, `runtime` from context to
avoid unnecessary clone.
- Remove unnecessary param's move, change to reference.
  • Loading branch information
jiacai2050 authored Aug 16, 2023
1 parent b59e07e commit 62b061c
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 200 deletions.
14 changes: 5 additions & 9 deletions proxy/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use std::time::Duration;

use common_types::request_id::RequestId;
use macros::define_result;
use snafu::{ensure, Backtrace, Snafu};

Expand All @@ -42,15 +43,16 @@ define_result!(Error);
/// Context for request, may contains
/// 1. Request context and options
/// 2. Info from http headers
#[derive(Debug)]
pub struct RequestContext {
/// Catalog of the request
pub catalog: String,
/// Schema of request
pub schema: String,
/// Enable partition table_access flag
pub enable_partition_table_access: bool,
/// Request timeout
pub timeout: Option<Duration>,
/// Request id
pub request_id: RequestId,
}

impl RequestContext {
Expand All @@ -63,7 +65,6 @@ impl RequestContext {
pub struct Builder {
catalog: String,
schema: String,
enable_partition_table_access: bool,
timeout: Option<Duration>,
}

Expand All @@ -78,11 +79,6 @@ impl Builder {
self
}

pub fn enable_partition_table_access(mut self, enable_partition_table_access: bool) -> Self {
self.enable_partition_table_access = enable_partition_table_access;
self
}

pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.timeout = timeout;
self
Expand All @@ -95,8 +91,8 @@ impl Builder {
Ok(RequestContext {
catalog: self.catalog,
schema: self.schema,
enable_partition_table_access: self.enable_partition_table_access,
timeout: self.timeout,
request_id: RequestId::next_id(),
})
}
}
3 changes: 1 addition & 2 deletions proxy/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use ceresdbproto::{
use common_types::{
datum::DatumKind,
record_batch::RecordBatch,
request_id::RequestId,
schema::{RecordSchema, TSID_COLUMN},
};
use generic_error::BoxError;
Expand Down Expand Up @@ -76,7 +75,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
ctx: Context,
req: PrometheusQueryRequest,
) -> Result<PrometheusQueryResponse> {
let request_id = RequestId::next_id();
let request_id = ctx.request_id;
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
let req_ctx = req.context.context(ErrNoCause {
Expand Down
99 changes: 43 additions & 56 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ use log::{error, warn};
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};
use router::endpoint::Endpoint;
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{transport::Channel, IntoRequest};

use crate::{
Expand All @@ -45,8 +43,6 @@ use crate::{
Context, Proxy,
};

const STREAM_QUERY_CHANNEL_LEN: usize = 20;

impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
pub async fn handle_sql_query(&self, ctx: Context, req: SqlQueryRequest) -> SqlQueryResponse {
// Incoming query maybe larger than query_failed + query_succeeded for some
Expand All @@ -55,10 +51,11 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
GRPC_HANDLER_COUNTER_VEC.incoming_query.inc();

self.hotspot_recorder.inc_sql_query_reqs(&req).await;
match self.handle_sql_query_internal(ctx, req).await {
match self.handle_sql_query_internal(&ctx, &req).await {
Err(e) => {
error!("Failed to handle sql query, ctx:{ctx:?}, err:{e}");

GRPC_HANDLER_COUNTER_VEC.query_failed.inc();
error!("Failed to handle sql query, err:{e}");
SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
Expand All @@ -73,8 +70,8 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

async fn handle_sql_query_internal(
&self,
ctx: Context,
req: SqlQueryRequest,
ctx: &Context,
req: &SqlQueryRequest,
) -> Result<SqlQueryResponse> {
if req.context.is_none() {
return ErrNoCause {
Expand All @@ -86,7 +83,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

let req_context = req.context.as_ref().unwrap();
let schema = &req_context.database;
match self.handle_sql(ctx, schema, &req.sql).await? {
match self.handle_sql(ctx, schema, &req.sql, false).await? {
SqlResponse::Forwarded(resp) => Ok(resp),
SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length),
}
Expand All @@ -99,7 +96,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
) -> BoxStream<'static, SqlQueryResponse> {
GRPC_HANDLER_COUNTER_VEC.stream_query.inc();
self.hotspot_recorder.inc_sql_query_reqs(&req).await;
match self.clone().handle_stream_query_internal(ctx, req).await {
match self.clone().handle_stream_query_internal(&ctx, &req).await {
Err(e) => stream::once(async {
error!("Failed to handle stream sql query, err:{e}");
GRPC_HANDLER_COUNTER_VEC.stream_query_failed.inc();
Expand All @@ -118,8 +115,8 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {

async fn handle_stream_query_internal(
self: Arc<Self>,
ctx: Context,
req: SqlQueryRequest,
ctx: &Context,
req: &SqlQueryRequest,
) -> Result<BoxStream<'static, SqlQueryResponse>> {
if req.context.is_none() {
return ErrNoCause {
Expand All @@ -130,66 +127,56 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}

let req_context = req.context.as_ref().unwrap();
let schema = req_context.database.clone();
let req = match self
.clone()
.maybe_forward_stream_sql_query(ctx.clone(), &req)
.await
{
let schema = &req_context.database;
let req = match self.clone().maybe_forward_stream_sql_query(ctx, req).await {
Some(resp) => match resp {
ForwardResult::Forwarded(resp) => return resp,
ForwardResult::Local => req,
},
None => req,
};

let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN);
let runtime = ctx.runtime.clone();
let resp_compress_min_length = self.resp_compress_min_length;
let output = self
.as_ref()
.fetch_sql_query_output(ctx, &schema, &req.sql)
.fetch_sql_query_output(ctx, schema, &req.sql, false)
.await?;
runtime.spawn(async move {
match output {
Output::AffectedRows(rows) => {
let resp =
QueryResponseBuilder::with_ok_header().build_with_affected_rows(rows);
if tx.send(resp).await.is_err() {
error!("Failed to send affected rows resp in stream sql query");
}
GRPC_HANDLER_COUNTER_VEC
.query_affected_row
.inc_by(rows as u64);
}
Output::Records(batches) => {
let mut num_rows = 0;
for batch in &batches {
let resp = {
let mut writer = QueryResponseWriter::new(resp_compress_min_length);
writer.write(batch)?;
writer.finish()
}?;

if tx.send(resp).await.is_err() {
error!("Failed to send record batches resp in stream sql query");
break;
}
num_rows += batch.num_rows();
}
GRPC_HANDLER_COUNTER_VEC
.query_succeeded_row
.inc_by(num_rows as u64);

match output {
Output::AffectedRows(rows) => {
GRPC_HANDLER_COUNTER_VEC
.query_affected_row
.inc_by(rows as u64);

let resp = QueryResponseBuilder::with_ok_header().build_with_affected_rows(rows);

Ok(Box::pin(stream::once(async { resp })))
}
Output::Records(batches) => {
let mut num_rows = 0;
let mut results = Vec::with_capacity(batches.len());
for batch in &batches {
let resp = {
let mut writer = QueryResponseWriter::new(resp_compress_min_length);
writer.write(batch)?;
writer.finish()
}?;
results.push(resp);
num_rows += batch.num_rows();
}

GRPC_HANDLER_COUNTER_VEC
.query_succeeded_row
.inc_by(num_rows as u64);

Ok(Box::pin(stream::iter(results)))
}
Ok::<(), Error>(())
});
Ok(ReceiverStream::new(rx).boxed())
}
}

async fn maybe_forward_stream_sql_query(
self: Arc<Self>,
ctx: Context,
ctx: &Context,
req: &SqlQueryRequest,
) -> Option<ForwardResult<BoxStream<'static, SqlQueryResponse>, Error>> {
if req.tables.len() != 1 {
Expand All @@ -203,7 +190,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
schema: req_ctx.database.clone(),
table: req.tables[0].clone(),
req: req.clone().into_request(),
forwarded_from: ctx.forwarded_from,
forwarded_from: ctx.forwarded_from.clone(),
};
let do_query = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<SqlQueryRequest>,
Expand Down
23 changes: 8 additions & 15 deletions proxy/src/http/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ use ceresdbproto::storage::{
};
use common_types::{
datum::DatumKind,
request_id::RequestId,
schema::{RecordSchema, TSID_COLUMN},
};
use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::{debug, error};
use log::{error, info};
use prom_remote_api::types::{
Label, LabelMatcher, Query, QueryResult, RemoteStorage, Sample, TimeSeries, WriteRequest,
};
Expand Down Expand Up @@ -85,12 +84,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let ctx = ProxyContext {
runtime: self.engine_runtimes.write_runtime.clone(),
timeout: ctx.timeout,
enable_partition_table_access: false,
forwarded_from: None,
};
let ctx = ProxyContext::new(ctx.timeout, None);

match self.handle_write_internal(ctx, table_request).await {
Ok(result) => {
Expand Down Expand Up @@ -127,15 +121,14 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
metric: String,
query: Query,
) -> Result<QueryResult> {
// Open partition table if needed.
self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &metric)
.await?;

let request_id = RequestId::next_id();
let request_id = ctx.request_id;
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
info!("Handle prom remote query begin, ctx:{ctx:?}, metric:{metric}, request:{query:?}");

debug!("Query handler try to process request, request_id:{request_id}, request:{query:?}");
// Open partition table if needed.
self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &metric)
.await?;

let provider = CatalogMetaProvider {
manager: self.instance.catalog_manager.clone(),
Expand Down Expand Up @@ -171,7 +164,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
.await?;

let cost = begin_instant.saturating_elapsed().as_millis();
debug!("Query handler finished, request_id:{request_id}, cost:{cost}ms, query:{query:?}");
info!("Handle prom remote query successfully, ctx:{ctx:?}, cost:{cost}ms");

convert_query_result(metric, timestamp_col_name, field_col_name, output)
}
Expand Down
17 changes: 10 additions & 7 deletions proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_types::{
use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
use log::error;
use query_engine::{
executor::{Executor as QueryExecutor, RecordBatchVec},
physical_planner::PhysicalPlanner,
Expand All @@ -49,14 +50,16 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
ctx: &RequestContext,
req: Request,
) -> Result<Output> {
let context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.read_runtime.clone(),
enable_partition_table_access: true,
forwarded_from: None,
};
let schema = &ctx.schema;
let ctx = Context::new(ctx.timeout, None);

match self.handle_sql(context, &ctx.schema, &req.query).await? {
match self
.handle_sql(&ctx, schema, &req.query, true)
.await
.map_err(|e| {
error!("Handle sql query failed, ctx:{ctx:?}, req:{req:?}, err:{e}");
e
})? {
SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp),
SqlResponse::Local(output) => Ok(output),
}
Expand Down
10 changes: 2 additions & 8 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::time::Instant;
use ceresdbproto::storage::{
RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest,
};
use common_types::request_id::RequestId;
use generic_error::BoxError;
use http::StatusCode;
use interpreters::interpreter::Output;
Expand Down Expand Up @@ -80,12 +79,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
}),
table_requests: write_table_requests,
};
let proxy_context = Context {
timeout: ctx.timeout,
runtime: self.engine_runtimes.write_runtime.clone(),
enable_partition_table_access: false,
forwarded_from: None,
};
let proxy_context = Context::new(ctx.timeout, None);

match self
.handle_write_internal(proxy_context, table_request)
Expand Down Expand Up @@ -126,7 +120,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Proxy<Q, P> {
ctx: RequestContext,
req: InfluxqlRequest,
) -> Result<Output> {
let request_id = RequestId::next_id();
let request_id = ctx.request_id;
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);

Expand Down
Loading

0 comments on commit 62b061c

Please sign in to comment.