Skip to content

Commit

Permalink
refactor: proxy write and route (#889)
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 authored May 12, 2023
1 parent 04c297b commit def0f96
Show file tree
Hide file tree
Showing 17 changed files with 1,072 additions and 1,168 deletions.
15 changes: 2 additions & 13 deletions proxy/src/grpc/route.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use ceresdbproto::storage::{RouteRequest, RouteResponse};
use common_util::error::BoxError;
use http::StatusCode;
use query_engine::executor::Executor as QueryExecutor;
use snafu::ResultExt;

use crate::{error, error::ErrWithCause, Context, Proxy};
use crate::{error, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_route(&self, _ctx: Context, req: RouteRequest) -> RouteResponse {
let routes = self
.router
.route(req)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "fail to route",
});
let routes = self.route(req).await;

let mut resp = RouteResponse::default();
match routes {
Expand Down
313 changes: 7 additions & 306 deletions proxy/src/grpc/write.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,9 @@
// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{cmp::max, collections::HashMap, time::Instant};

use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RouteRequest, WriteRequest, WriteResponse,
};
use common_types::request_id::RequestId;
use common_util::error::BoxError;
use futures::{future::try_join_all, FutureExt};
use http::StatusCode;
use interpreters::interpreter::Output;
use log::debug;
use ceresdbproto::storage::{WriteRequest, WriteResponse};
use query_engine::executor::Executor as QueryExecutor;
use query_frontend::plan::{InsertPlan, Plan};
use router::endpoint::Endpoint;
use snafu::{OptionExt, ResultExt};
use tonic::transport::Channel;

use crate::{
error,
error::{build_ok_header, ErrNoCause, ErrWithCause, InternalNoCause, Result},
execute_plan,
forward::{ForwardResult, ForwarderRef},
instance::InstanceRef,
Context, Proxy,
};

#[derive(Debug)]
pub struct WriteContext {
pub request_id: RequestId,
pub deadline: Option<Instant>,
pub catalog: String,
pub schema: String,
pub auto_create_table: bool,
}

impl WriteContext {
pub fn new(
request_id: RequestId,
deadline: Option<Instant>,
catalog: String,
schema: String,
) -> Self {
let auto_create_table = true;
Self {
request_id,
deadline,
catalog,
schema,
auto_create_table,
}
}
}
use crate::{error, error::build_ok_header, Context, Proxy};

impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_write(&self, ctx: Context, req: WriteRequest) -> WriteResponse {
Expand All @@ -64,262 +16,11 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
..Default::default()
}
}
Ok(v) => v,
}
}

async fn handle_write_internal(
&self,
ctx: Context,
req: WriteRequest,
) -> Result<WriteResponse> {
let write_context = req.context.clone().context(ErrNoCause {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;

let (write_request_to_local, write_requests_to_forward) =
self.split_write_request(req).await?;

let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1);

// Write to remote.
for (endpoint, table_write_request) in write_requests_to_forward {
let forwarder = self.forwarder.clone();
let write_handle = self.engine_runtimes.io_runtime.spawn(async move {
Self::write_to_remote(forwarder, endpoint, table_write_request).await
});

futures.push(write_handle.boxed());
}

// Write to local.
if !write_request_to_local.table_requests.is_empty() {
let local_handle =
async move { Ok(self.write_to_local(ctx, write_request_to_local).await) };
futures.push(local_handle.boxed());
}

let resps = try_join_all(futures)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to join task",
})?;

debug!(
"Grpc handle write finished, schema:{}, resps:{:?}",
write_context.database, resps
);

let mut success = 0;
for resp in resps {
success += resp?.success;
}

Ok(WriteResponse {
success,
header: Some(build_ok_header()),
..Default::default()
})
}

async fn write_to_remote(
forwarder: ForwarderRef,
endpoint: Endpoint,
table_write_request: WriteRequest,
) -> Result<WriteResponse> {
let do_write = |mut client: StorageServiceClient<Channel>,
request: tonic::Request<WriteRequest>,
_: &Endpoint| {
let write = async move {
client
.write(request)
.await
.map(|resp| resp.into_inner())
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Forwarded write request failed",
})
}
.boxed();

Box::new(write) as _
};

let forward_result = forwarder
.forward_with_endpoint(endpoint, tonic::Request::new(table_write_request), do_write)
.await;
let forward_res = forward_result
.map_err(|e| {
error!("Failed to forward sql req but the error is ignored, err:{e}");
e
})
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Local response is not expected",
})?;

match forward_res {
ForwardResult::Forwarded(resp) => resp,
ForwardResult::Local => InternalNoCause {
msg: "Local response is not expected".to_string(),
}
.fail(),
}
}

async fn write_to_local(&self, ctx: Context, req: WriteRequest) -> Result<WriteResponse> {
let request_id = RequestId::next_id();
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
let catalog = self.instance.catalog_manager.default_catalog_name();
let req_ctx = req.context.context(ErrNoCause {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;
let schema = req_ctx.database;
let schema_config = self
.schema_config_provider
.schema_config(&schema)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Fail to fetch schema config, schema_name:{schema}"),
})?;

debug!(
"Grpc handle write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}",
req.table_requests
.first()
.map(|m| (&m.table, &m.tag_names, &m.field_names)),
req.table_requests.len(),
);

let write_context = WriteContext {
request_id,
deadline,
catalog: catalog.to_string(),
schema: schema.clone(),
auto_create_table: self.auto_create_table,
};

let plan_vec = self
.write_request_to_insert_plan(req.table_requests, schema_config, write_context)
.await?;

let mut success = 0;
for insert_plan in plan_vec {
success += execute_insert_plan(
request_id,
catalog,
&schema,
self.instance.clone(),
insert_plan,
deadline,
)
.await?;
}

Ok(WriteResponse {
success: success as u32,
header: Some(build_ok_header()),
..Default::default()
})
}

async fn split_write_request(
&self,
req: WriteRequest,
) -> Result<(WriteRequest, HashMap<Endpoint, WriteRequest>)> {
// Split write request into multiple requests, each request contains table
// belong to one remote engine.
let tables = req
.table_requests
.iter()
.map(|table_request| table_request.table.clone())
.collect();

// TODO: Make the router can accept an iterator over the tables to avoid the
// memory allocation here.
let route_data = self
.router
.route(RouteRequest {
context: req.context.clone(),
tables,
})
.await?;
let forwarded_table_routes = route_data
.into_iter()
.filter_map(|router| {
router
.endpoint
.map(|endpoint| (router.table, endpoint.into()))
})
.filter(|router| !self.forwarder.is_local_endpoint(&router.1))
.collect::<HashMap<_, _>>();

// No table need to be forwarded.
if forwarded_table_routes.is_empty() {
return Ok((req, HashMap::default()));
}

let mut table_requests_to_local = WriteRequest {
table_requests: Vec::with_capacity(max(
req.table_requests.len() - forwarded_table_routes.len(),
0,
)),
context: req.context.clone(),
};

let mut table_requests_to_forward = HashMap::with_capacity(forwarded_table_routes.len());

let write_context = req.context;
for table_request in req.table_requests {
let route = forwarded_table_routes.get(&table_request.table);
match route {
Some(endpoint) => {
let table_requests = table_requests_to_forward
.entry(endpoint.clone())
.or_insert_with(|| WriteRequest {
context: write_context.clone(),
table_requests: Vec::new(),
});
table_requests.table_requests.push(table_request);
}
_ => {
table_requests_to_local.table_requests.push(table_request);
}
}
Ok(v) => WriteResponse {
header: Some(build_ok_header()),
success: v.success,
failed: v.failed,
},
}
Ok((table_requests_to_local, table_requests_to_forward))
}
}

pub async fn execute_insert_plan<Q: QueryExecutor + 'static>(
request_id: RequestId,
catalog: &str,
schema: &str,
instance: InstanceRef<Q>,
insert_plan: InsertPlan,
deadline: Option<Instant>,
) -> Result<usize> {
debug!(
"Grpc handle write table begin, table:{}, row_num:{}",
insert_plan.table.name(),
insert_plan.rows.num_rows()
);
let plan = Plan::Insert(insert_plan);
let output = execute_plan(request_id, catalog, schema, instance, plan, deadline).await;
output.and_then(|output| match output {
Output::AffectedRows(n) => Ok(n),
Output::Records(_) => ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: "Invalid output type, expect AffectedRows, found Records",
}
.fail(),
})
}
6 changes: 0 additions & 6 deletions proxy/src/handlers/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,6 @@ pub enum Error {
source: tokio::time::error::Elapsed,
backtrace: Backtrace,
},

#[snafu(display("Route handler failed, table:{:?}, source:{}", table, source))]
RouteHandler {
table: String,
source: router::Error,
},
}

define_result!(Error);
Expand Down
1 change: 0 additions & 1 deletion proxy/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
pub mod admin;
pub(crate) mod error;
pub mod route;

mod prelude {
pub use catalog::manager::Manager as CatalogManager;
Expand Down
Loading

0 comments on commit def0f96

Please sign in to comment.