diff --git a/Cargo.lock b/Cargo.lock index 0482563a0b..da4ac8f23d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5678,7 +5678,6 @@ dependencies = [ "generic_error", "log", "macros", - "proxy", "router", "runtime", "serde", diff --git a/remote_engine_client/Cargo.toml b/remote_engine_client/Cargo.toml index c52847571b..839e330900 100644 --- a/remote_engine_client/Cargo.toml +++ b/remote_engine_client/Cargo.toml @@ -33,7 +33,6 @@ futures = { workspace = true } generic_error = { workspace = true } log = { workspace = true } macros = { workspace = true } -proxy = { workspace = true } router = { workspace = true } runtime = { workspace = true } serde = { workspace = true } diff --git a/remote_engine_client/src/lib.rs b/remote_engine_client/src/lib.rs index 52e576886f..ead791b708 100644 --- a/remote_engine_client/src/lib.rs +++ b/remote_engine_client/src/lib.rs @@ -33,17 +33,13 @@ use common_types::{record_batch::RecordBatch, schema::RecordSchema}; pub use config::Config; use futures::{Stream, StreamExt}; use generic_error::BoxError; -use proxy::hotspot::{HotspotRecorder, Message}; use router::RouterRef; use runtime::Runtime; use snafu::ResultExt; use table_engine::{ remote::{ self, - model::{ - GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, WriteBatchResult, - WriteRequest, - }, + model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteBatchResult, WriteRequest}, RemoteEngine, }, stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream}, @@ -122,55 +118,19 @@ pub mod error { pub struct RemoteEngineImpl { client: Client, - hotspot_recorder: Arc, } impl RemoteEngineImpl { - pub fn new( - config: Config, - router: RouterRef, - worker_runtime: Arc, - hotspot_recorder: Arc, - ) -> Self { + pub fn new(config: Config, router: RouterRef, worker_runtime: Arc) -> Self { let client = Client::new(config, router, worker_runtime); - Self { - client, - hotspot_recorder, - } - } - - fn format_hot_key(table: &TableIdentifier) -> String { - format!("{}/{}", table.schema, table.table) - } - - async fn record_write(&self, request: &WriteRequest) { - let hot_key = Self::format_hot_key(&request.table); - let row_count = request.write_request.row_group.num_rows(); - let field_count = row_count * request.write_request.row_group.schema().num_columns(); - self.hotspot_recorder - .send_msg_or_log( - "inc_write_reqs", - Message::Write { - key: hot_key, - row_count, - field_count, - }, - ) - .await; + Self { client } } } #[async_trait] impl RemoteEngine for RemoteEngineImpl { async fn read(&self, request: ReadRequest) -> remote::Result { - self.hotspot_recorder - .send_msg_or_log( - "inc_query_reqs", - Message::Query(Self::format_hot_key(&request.table)), - ) - .await; - let client_read_stream = self .client .read(request) @@ -181,8 +141,6 @@ impl RemoteEngine for RemoteEngineImpl { } async fn write(&self, request: WriteRequest) -> remote::Result { - self.record_write(&request).await; - self.client .write(request) .await @@ -194,10 +152,6 @@ impl RemoteEngine for RemoteEngineImpl { &self, requests: Vec, ) -> remote::Result> { - for req in &requests { - self.record_write(req).await; - } - self.client .write_batch(requests) .await diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index d15c252c6c..0fe5a9e558 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -35,6 +35,7 @@ use log::{info, warn}; use macros::define_result; use proxy::{ forward, + hotspot::HotspotRecorder, instance::InstanceRef, schema_config_provider::{self}, Proxy, @@ -106,6 +107,9 @@ pub enum Error { #[snafu(display("Missing proxy.\nBacktrace:\n{}", backtrace))] MissingProxy { backtrace: Backtrace }, + #[snafu(display("Missing HotspotRecorder.\nBacktrace:\n{}", backtrace))] + MissingHotspotRecorder { backtrace: Backtrace }, + #[snafu(display("Catalog name is not utf8.\nBacktrace:\n{}", backtrace))] ParseCatalogName { source: std::string::FromUtf8Error, @@ -213,6 +217,7 @@ pub struct Builder { opened_wals: Option, proxy: Option>>, request_notifiers: Option>>>, + hotspot_recorder: Option>, } impl Builder { @@ -226,6 +231,7 @@ impl Builder { opened_wals: None, proxy: None, request_notifiers: None, + hotspot_recorder: None, } } @@ -265,6 +271,11 @@ impl Builder { self } + pub fn hotspot_recorder(mut self, hotspot_recorder: Arc) -> Self { + self.hotspot_recorder = Some(hotspot_recorder); + self + } + pub fn request_notifiers(mut self, enable_query_dedup: bool) -> Self { if enable_query_dedup { self.request_notifiers = Some(Arc::new(RequestNotifiers::default())); @@ -279,6 +290,7 @@ impl Builder { let instance = self.instance.context(MissingInstance)?; let opened_wals = self.opened_wals.context(MissingWals)?; let proxy = self.proxy.context(MissingProxy)?; + let hotspot_recorder = self.hotspot_recorder.context(MissingHotspotRecorder)?; let meta_rpc_server = self.cluster.map(|v| { let builder = meta_event_service::Builder { @@ -295,6 +307,7 @@ impl Builder { instance, runtimes: runtimes.clone(), request_notifiers: self.request_notifiers, + hotspot_recorder, }; RemoteEngineServiceServer::new(service) }; diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index fce915555c..69a4861d6a 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -31,7 +31,10 @@ use common_types::record_batch::RecordBatch; use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt}; use generic_error::BoxError; use log::{error, info}; -use proxy::instance::InstanceRef; +use proxy::{ + hotspot::{HotspotRecorder, Message}, + instance::InstanceRef, +}; use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use snafu::{OptionExt, ResultExt}; use table_engine::{ @@ -107,6 +110,7 @@ pub struct RemoteEngineServiceImpl, pub runtimes: Arc, pub request_notifiers: Option>>>, + pub hotspot_recorder: Arc, } impl RemoteEngineServiceImpl { @@ -177,7 +181,7 @@ impl RemoteEngineServiceImpl RemoteEngineServiceImpl RemoteEngineServiceImpl HandlerContext { HandlerContext { catalog_manager: self.instance.catalog_manager.clone(), + hotspot_recorder: self.hotspot_recorder.clone(), } } } @@ -412,6 +418,7 @@ impl RemoteEngineServiceImpl, } #[async_trait] @@ -424,6 +431,15 @@ impl RemoteEngineService &self, request: Request, ) -> std::result::Result, Status> { + if let Some(table) = &request.get_ref().table { + self.hotspot_recorder + .send_msg_or_log( + "inc_query_reqs", + Message::Query(format_hot_key(&table.schema, &table.table)), + ) + .await + } + REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC.stream_query.inc(); let result = match self.request_notifiers.clone() { Some(request_notifiers) => { @@ -564,6 +580,29 @@ async fn handle_stream_read( } } +fn format_hot_key(schema: &str, table: &str) -> String { + format!("{schema}/{table}") +} + +async fn record_write( + hotspot_recorder: &Arc, + request: &table_engine::remote::model::WriteRequest, +) { + let hot_key = format_hot_key(&request.table.schema, &request.table.table); + let row_count = request.write_request.row_group.num_rows(); + let field_count = row_count * request.write_request.row_group.schema().num_columns(); + hotspot_recorder + .send_msg_or_log( + "inc_write_reqs", + Message::Write { + key: hot_key, + row_count, + field_count, + }, + ) + .await; +} + async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result { let write_request: table_engine::remote::model::WriteRequest = request.try_into().box_err().context(ErrWithCause { @@ -571,6 +610,11 @@ async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result Builder { self.remote_engine_client_config.clone(), router.clone(), engine_runtimes.io_runtime.clone(), - hotspot_recorder.clone(), )); let partition_table_engine = Arc::new(PartitionTableEngine::new(remote_engine_ref.clone())); @@ -414,7 +413,7 @@ impl Builder { self.server_config.resp_compress_min_length.as_byte() as usize, self.server_config.auto_create_table, provider.clone(), - hotspot_recorder, + hotspot_recorder.clone(), engine_runtimes.clone(), self.cluster.is_some(), )); @@ -459,6 +458,7 @@ impl Builder { .opened_wals(opened_wals) .timeout(self.server_config.timeout.map(|v| v.0)) .proxy(proxy) + .hotspot_recorder(hotspot_recorder) .request_notifiers(self.server_config.enable_query_dedup) .build() .context(BuildGrpcService)?;