Skip to content

Commit

Permalink
refactor: find new columns to improve write performance
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 23, 2023
1 parent dcae0e0 commit 8c2f601
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 41 deletions.
131 changes: 92 additions & 39 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use bytes::Bytes;
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, value, RouteRequest, WriteRequest,
storage_service_client::StorageServiceClient, value, RouteRequest, Value, WriteRequest,
WriteResponse as WriteResponsePB, WriteSeriesEntry, WriteTableRequest,
};
use cluster::config::SchemaConfig;
Expand All @@ -34,7 +34,7 @@ use query_engine::executor::Executor as QueryExecutor;
use query_frontend::{
frontend::{Context as FrontendContext, Frontend},
plan::{AlterTableOperation, AlterTablePlan, InsertPlan, Plan},
planner::build_schema_from_write_table_request,
planner::{build_column_schema, try_get_data_type_from_value},
provider::CatalogMetaProvider,
};
use router::endpoint::Endpoint;
Expand All @@ -43,7 +43,7 @@ use table_engine::table::TableRef;
use tonic::transport::Channel;

use crate::{
error::{ErrNoCause, ErrWithCause, InternalNoCause, Result},
error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result},
forward::{ForwardResult, ForwarderRef},
Context, Proxy,
};
Expand Down Expand Up @@ -320,7 +320,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
.map(|endpoint| (router.table, endpoint.into()))
})
.filter(|router| !self.forwarder.is_local_endpoint(&router.1))
.collect::<HashMap<_, _>>();
.collect::<BTreeMap<_, _>>();

// No table need to be forwarded.
if forwarded_table_routes.is_empty() {
Expand Down Expand Up @@ -477,14 +477,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
code: StatusCode::BAD_REQUEST,
})?;
let schema = req_ctx.database;
let schema_config = self
.schema_config_provider
.schema_config(&schema)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Fail to fetch schema config, schema_name:{schema}"),
})?;

debug!(
"Local write begin, catalog:{catalog}, schema:{schema}, request_id:{request_id}, first_table:{:?}, num_tables:{}",
Expand All @@ -503,7 +495,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
};

let plan_vec = self
.write_request_to_insert_plan(req.table_requests, schema_config, write_context)
.write_request_to_insert_plan(req.table_requests, write_context)
.await?;

let mut success = 0;
Expand All @@ -522,7 +514,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
async fn write_request_to_insert_plan(
&self,
table_requests: Vec<WriteTableRequest>,
schema_config: Option<&SchemaConfig>,
write_context: WriteContext,
) -> Result<Vec<InsertPlan>> {
let mut plan_vec = Vec::with_capacity(table_requests.len());
Expand All @@ -534,7 +525,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
deadline,
auto_create_table,
} = write_context;
let schema_config = schema_config.cloned().unwrap_or_default();
for write_table_req in table_requests {
let table_name = &write_table_req.table;
self.maybe_open_partition_table_if_not_exist(&catalog, &schema, table_name)
Expand All @@ -555,7 +545,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
// * Currently, the decision to add columns is made at the request level, not at
// the row level, so the cost is relatively small.
let table_schema = table.schema();
let columns = find_new_columns(&table_schema, &schema_config, &write_table_req)?;
let columns = find_new_columns(&table_schema, &write_table_req)?;
if !columns.is_empty() {
self.execute_add_columns_plan(
request_id,
Expand Down Expand Up @@ -668,32 +658,95 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

fn find_new_columns(
schema: &Schema,
schema_config: &SchemaConfig,
write_req: &WriteTableRequest,
write_table_req: &WriteTableRequest,
) -> Result<Vec<ColumnSchema>> {
let new_schema = build_schema_from_write_table_request(schema_config, write_req)
let WriteTableRequest {
table,
field_names,
tag_names,
entries: write_entries,
} = write_table_req;

let mut columns: BTreeMap<_, ColumnSchema> = BTreeMap::new();
for write_entry in write_entries {
// parse tags
for tag in &write_entry.tags {
let name_index = tag.name_index as usize;
ensure!(
name_index < tag_names.len(),
InternalNoCause {
msg: format!(
"Tag {tag:?} is not found in tag_names:{tag_names:?}, table:{table}",
),
}
);

let tag_name = &tag_names[name_index];

build_column(&mut columns, schema, tag_name, &tag.value, true)?;
}

// parse fields
for field_group in &write_entry.field_groups {
for field in &field_group.fields {
let field_index = field.name_index as usize;
ensure!(
field_index < field_names.len(),
InternalNoCause {
msg: format!(
"Field {field:?} is not found in field_names:{field_names:?}, table:{table}",
),
}
);
if (field.name_index as usize) < field_names.len() {
let field_name = &field_names[field.name_index as usize];
build_column(&mut columns, schema, field_name, &field.value, false)?;
}
}
}
}

Ok(columns.into_iter().map(|v| v.1).collect())
}

fn build_column<'a>(
columns: &mut BTreeMap<&'a str, ColumnSchema>,
schema: &Schema,
name: &'a str,
value: &Option<Value>,
is_tag: bool,
) -> Result<()> {
// Skip adding columns, the following cases:
// 1. Field already exists.
// 2. The new column has been added.
if schema.index_of(name).is_some() || columns.get(name).is_some() {
return Ok(());
}

let column_value = value
.as_ref()
.with_context(|| InternalNoCause {
msg: format!("Field value is needed, field:{name}"),
})?
.value
.as_ref()
.with_context(|| InternalNoCause {
msg: format!("Field value type is not supported, field:{name}"),
})?;

let data_type = try_get_data_type_from_value(column_value)
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Build schema from write table request failed",
.context(Internal {
msg: "Failed to get data type",
})?;

let columns = new_schema.columns();
let old_columns = schema.columns();

// find new columns:
// 1. timestamp column can't be a new column;
// 2. column not in old schema is a new column.
let new_columns = columns
.iter()
.enumerate()
.filter(|(idx, column)| {
*idx != new_schema.timestamp_index()
&& !old_columns.iter().any(|c| c.name == column.name)
})
.map(|(_, column)| column.clone())
.collect();
Ok(new_columns)
let column_schema = build_column_schema(name, data_type, is_tag)
.box_err()
.context(Internal {
msg: "Failed to build column schema",
})?;
columns.insert(name, column_schema);
Ok(())
}

fn write_table_request_to_insert_plan(
Expand Down Expand Up @@ -802,7 +855,7 @@ fn write_entry_to_rows(
}

// Fill fields.
let mut field_name_index: HashMap<String, usize> = HashMap::new();
let mut field_name_index: BTreeMap<String, usize> = BTreeMap::new();
for (i, field_group) in write_series_entry.field_groups.into_iter().enumerate() {
// timestamp
let timestamp_index_in_schema = schema.timestamp_index();
Expand Down
5 changes: 3 additions & 2 deletions query_frontend/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
}
}

fn build_column_schema(
pub fn build_column_schema(
column_name: &str,
data_type: DatumKind,
is_tag: bool,
Expand Down Expand Up @@ -537,7 +537,7 @@ fn ensure_data_type_compatible(
Ok(())
}

fn try_get_data_type_from_value(value: &PbValue) -> Result<DatumKind> {
pub fn try_get_data_type_from_value(value: &PbValue) -> Result<DatumKind> {
match value {
PbValue::Float64Value(_) => Ok(DatumKind::Double),
PbValue::StringValue(_) => Ok(DatumKind::String),
Expand All @@ -555,6 +555,7 @@ fn try_get_data_type_from_value(value: &PbValue) -> Result<DatumKind> {
PbValue::VarbinaryValue(_) => Ok(DatumKind::Varbinary),
}
}

/// A planner wraps the datafusion's logical planner, and delegate sql like
/// select/explain to datafusion's planner.
pub(crate) struct PlannerDelegate<'a, P: MetaProvider> {
Expand Down

0 comments on commit 8c2f601

Please sign in to comment.