Skip to content

Commit

Permalink
fix: record remote engine requests (#1140)
Browse files Browse the repository at this point in the history
## Rationale
#1127 add support for remote engine, but in client, it should be for
server.

## Detailed Changes
- Remove remote client record, add add for remote server.

## Test Plan
Manually
  • Loading branch information
jiacai2050 authored Aug 8, 2023
1 parent dce9f92 commit b69f82d
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 55 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion remote_engine_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ futures = { workspace = true }
generic_error = { workspace = true }
log = { workspace = true }
macros = { workspace = true }
proxy = { workspace = true }
router = { workspace = true }
runtime = { workspace = true }
serde = { workspace = true }
Expand Down
52 changes: 3 additions & 49 deletions remote_engine_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,13 @@ use common_types::{record_batch::RecordBatch, schema::RecordSchema};
pub use config::Config;
use futures::{Stream, StreamExt};
use generic_error::BoxError;
use proxy::hotspot::{HotspotRecorder, Message};
use router::RouterRef;
use runtime::Runtime;
use snafu::ResultExt;
use table_engine::{
remote::{
self,
model::{
GetTableInfoRequest, ReadRequest, TableIdentifier, TableInfo, WriteBatchResult,
WriteRequest,
},
model::{GetTableInfoRequest, ReadRequest, TableInfo, WriteBatchResult, WriteRequest},
RemoteEngine,
},
stream::{self, ErrWithSource, RecordBatchStream, SendableRecordBatchStream},
Expand Down Expand Up @@ -122,55 +118,19 @@ pub mod error {

pub struct RemoteEngineImpl {
client: Client,
hotspot_recorder: Arc<HotspotRecorder>,
}

impl RemoteEngineImpl {
pub fn new(
config: Config,
router: RouterRef,
worker_runtime: Arc<Runtime>,
hotspot_recorder: Arc<HotspotRecorder>,
) -> Self {
pub fn new(config: Config, router: RouterRef, worker_runtime: Arc<Runtime>) -> Self {
let client = Client::new(config, router, worker_runtime);

Self {
client,
hotspot_recorder,
}
}

fn format_hot_key(table: &TableIdentifier) -> String {
format!("{}/{}", table.schema, table.table)
}

async fn record_write(&self, request: &WriteRequest) {
let hot_key = Self::format_hot_key(&request.table);
let row_count = request.write_request.row_group.num_rows();
let field_count = row_count * request.write_request.row_group.schema().num_columns();
self.hotspot_recorder
.send_msg_or_log(
"inc_write_reqs",
Message::Write {
key: hot_key,
row_count,
field_count,
},
)
.await;
Self { client }
}
}

#[async_trait]
impl RemoteEngine for RemoteEngineImpl {
async fn read(&self, request: ReadRequest) -> remote::Result<SendableRecordBatchStream> {
self.hotspot_recorder
.send_msg_or_log(
"inc_query_reqs",
Message::Query(Self::format_hot_key(&request.table)),
)
.await;

let client_read_stream = self
.client
.read(request)
Expand All @@ -181,8 +141,6 @@ impl RemoteEngine for RemoteEngineImpl {
}

async fn write(&self, request: WriteRequest) -> remote::Result<usize> {
self.record_write(&request).await;

self.client
.write(request)
.await
Expand All @@ -194,10 +152,6 @@ impl RemoteEngine for RemoteEngineImpl {
&self,
requests: Vec<WriteRequest>,
) -> remote::Result<Vec<WriteBatchResult>> {
for req in &requests {
self.record_write(req).await;
}

self.client
.write_batch(requests)
.await
Expand Down
13 changes: 13 additions & 0 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use log::{info, warn};
use macros::define_result;
use proxy::{
forward,
hotspot::HotspotRecorder,
instance::InstanceRef,
schema_config_provider::{self},
Proxy,
Expand Down Expand Up @@ -106,6 +107,9 @@ pub enum Error {
#[snafu(display("Missing proxy.\nBacktrace:\n{}", backtrace))]
MissingProxy { backtrace: Backtrace },

#[snafu(display("Missing HotspotRecorder.\nBacktrace:\n{}", backtrace))]
MissingHotspotRecorder { backtrace: Backtrace },

#[snafu(display("Catalog name is not utf8.\nBacktrace:\n{}", backtrace))]
ParseCatalogName {
source: std::string::FromUtf8Error,
Expand Down Expand Up @@ -213,6 +217,7 @@ pub struct Builder<Q, P> {
opened_wals: Option<OpenedWals>,
proxy: Option<Arc<Proxy<Q, P>>>,
request_notifiers: Option<Arc<RequestNotifiers<StreamReadReqKey, error::Result<RecordBatch>>>>,
hotspot_recorder: Option<Arc<HotspotRecorder>>,
}

impl<Q, P> Builder<Q, P> {
Expand All @@ -226,6 +231,7 @@ impl<Q, P> Builder<Q, P> {
opened_wals: None,
proxy: None,
request_notifiers: None,
hotspot_recorder: None,
}
}

Expand Down Expand Up @@ -265,6 +271,11 @@ impl<Q, P> Builder<Q, P> {
self
}

pub fn hotspot_recorder(mut self, hotspot_recorder: Arc<HotspotRecorder>) -> Self {
self.hotspot_recorder = Some(hotspot_recorder);
self
}

pub fn request_notifiers(mut self, enable_query_dedup: bool) -> Self {
if enable_query_dedup {
self.request_notifiers = Some(Arc::new(RequestNotifiers::default()));
Expand All @@ -279,6 +290,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Builder<Q, P> {
let instance = self.instance.context(MissingInstance)?;
let opened_wals = self.opened_wals.context(MissingWals)?;
let proxy = self.proxy.context(MissingProxy)?;
let hotspot_recorder = self.hotspot_recorder.context(MissingHotspotRecorder)?;

let meta_rpc_server = self.cluster.map(|v| {
let builder = meta_event_service::Builder {
Expand All @@ -295,6 +307,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Builder<Q, P> {
instance,
runtimes: runtimes.clone(),
request_notifiers: self.request_notifiers,
hotspot_recorder,
};
RemoteEngineServiceServer::new(service)
};
Expand Down
48 changes: 46 additions & 2 deletions server/src/grpc/remote_engine_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use common_types::record_batch::RecordBatch;
use futures::stream::{self, BoxStream, FuturesUnordered, StreamExt};
use generic_error::BoxError;
use log::{error, info};
use proxy::instance::InstanceRef;
use proxy::{
hotspot::{HotspotRecorder, Message},
instance::InstanceRef,
};
use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner};
use snafu::{OptionExt, ResultExt};
use table_engine::{
Expand Down Expand Up @@ -107,6 +110,7 @@ pub struct RemoteEngineServiceImpl<Q: QueryExecutor + 'static, P: PhysicalPlanne
pub instance: InstanceRef<Q, P>,
pub runtimes: Arc<EngineRuntimes>,
pub request_notifiers: Option<Arc<RequestNotifiers<StreamReadReqKey, Result<RecordBatch>>>>,
pub hotspot_recorder: Arc<HotspotRecorder>,
}

impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> RemoteEngineServiceImpl<Q, P> {
Expand Down Expand Up @@ -177,7 +181,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> RemoteEngineServiceImpl<Q,
})?;

let request_key = StreamReadReqKey::new(
table.table,
table.table.clone(),
read_request.predicate.clone(),
read_request.projected_schema.projection(),
);
Expand All @@ -196,6 +200,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> RemoteEngineServiceImpl<Q,
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.stream_read
.observe(instant.saturating_elapsed().as_secs_f64());

return Ok(ReceiverStream::new(rx));
}
};
Expand Down Expand Up @@ -404,6 +409,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> RemoteEngineServiceImpl<Q,
fn handler_ctx(&self) -> HandlerContext {
HandlerContext {
catalog_manager: self.instance.catalog_manager.clone(),
hotspot_recorder: self.hotspot_recorder.clone(),
}
}
}
Expand All @@ -412,6 +418,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> RemoteEngineServiceImpl<Q,
#[derive(Clone)]
struct HandlerContext {
catalog_manager: ManagerRef,
hotspot_recorder: Arc<HotspotRecorder>,
}

#[async_trait]
Expand All @@ -424,6 +431,15 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> RemoteEngineService
&self,
request: Request<ReadRequest>,
) -> std::result::Result<Response<Self::ReadStream>, Status> {
if let Some(table) = &request.get_ref().table {
self.hotspot_recorder
.send_msg_or_log(
"inc_query_reqs",
Message::Query(format_hot_key(&table.schema, &table.table)),
)
.await
}

REMOTE_ENGINE_GRPC_HANDLER_COUNTER_VEC.stream_query.inc();
let result = match self.request_notifiers.clone() {
Some(request_notifiers) => {
Expand Down Expand Up @@ -564,13 +580,41 @@ async fn handle_stream_read(
}
}

fn format_hot_key(schema: &str, table: &str) -> String {
format!("{schema}/{table}")
}

async fn record_write(
hotspot_recorder: &Arc<HotspotRecorder>,
request: &table_engine::remote::model::WriteRequest,
) {
let hot_key = format_hot_key(&request.table.schema, &request.table.table);
let row_count = request.write_request.row_group.num_rows();
let field_count = row_count * request.write_request.row_group.schema().num_columns();
hotspot_recorder
.send_msg_or_log(
"inc_write_reqs",
Message::Write {
key: hot_key,
row_count,
field_count,
},
)
.await;
}

async fn handle_write(ctx: HandlerContext, request: WriteRequest) -> Result<WriteResponse> {
let write_request: table_engine::remote::model::WriteRequest =
request.try_into().box_err().context(ErrWithCause {
code: StatusCode::BadRequest,
msg: "fail to convert write request",
})?;

// In theory we should record write request we at the beginning of server's
// handle, but the payload is encoded, so we cannot record until decode payload
// here.
record_write(&ctx.hotspot_recorder, &write_request).await;

let num_rows = write_request.write_request.row_group.num_rows();
let table = find_table_by_identifier(&ctx, &write_request.table)?;

Expand Down
4 changes: 2 additions & 2 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Builder<Q, P> {
self.remote_engine_client_config.clone(),
router.clone(),
engine_runtimes.io_runtime.clone(),
hotspot_recorder.clone(),
));

let partition_table_engine = Arc::new(PartitionTableEngine::new(remote_engine_ref.clone()));
Expand Down Expand Up @@ -414,7 +413,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Builder<Q, P> {
self.server_config.resp_compress_min_length.as_byte() as usize,
self.server_config.auto_create_table,
provider.clone(),
hotspot_recorder,
hotspot_recorder.clone(),
engine_runtimes.clone(),
self.cluster.is_some(),
));
Expand Down Expand Up @@ -459,6 +458,7 @@ impl<Q: QueryExecutor + 'static, P: PhysicalPlanner> Builder<Q, P> {
.opened_wals(opened_wals)
.timeout(self.server_config.timeout.map(|v| v.0))
.proxy(proxy)
.hotspot_recorder(hotspot_recorder)
.request_notifiers(self.server_config.enable_query_dedup)
.build()
.context(BuildGrpcService)?;
Expand Down

0 comments on commit b69f82d

Please sign in to comment.