Skip to content

Commit

Permalink
feat: only collecting all tables on demand in influxql planner (apach…
Browse files Browse the repository at this point in the history
…e#854)

* add `all_tables` interface in `MetaProvider`.

* make all tables will only be collected while calling table names of `SchemaProvider` of influxql.

* remove some useless comments and params.

* fix style and refactor some error descriptions.

* add the custom impl for `table_exists` method in influxql SchemaProvider.

* return error when schema/catalog not exist in `all_table` of `MetaProvider`.

* add comments of `all_tables` in `MetaProvider`.

* add comment for influxql planner and remove useless Result wrapping.
  • Loading branch information
Rachelint authored Apr 24, 2023
1 parent 33332f8 commit e1d4bd1
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 82 deletions.
35 changes: 2 additions & 33 deletions query_frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
use std::{sync::Arc, time::Instant};

use catalog::manager::ManagerRef;
use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest};
use cluster::config::SchemaConfig;
use common_types::request_id::RequestId;
use common_util::error::{BoxError, GenericError};
use common_util::error::GenericError;
use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use sqlparser::ast::{SetExpr, Statement as SqlStatement, TableFactor};
Expand Down Expand Up @@ -146,39 +145,9 @@ impl<P: MetaProvider> Frontend<P> {
&self,
ctx: &mut Context,
stmt: InfluxqlStatement,
manager: ManagerRef,
) -> Result<Plan> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);
let catalog_name = self.provider.default_catalog_name();
let catalog = manager
.catalog_by_name(catalog_name)
.box_err()
.context(InfluxqlPlanWithCause {
msg: format!("get catalog failed, value:{catalog_name}"),
})?
.context(InfluxqlPlan {
msg: format!("catalog is none, value:{catalog_name}"),
})?;
let schema_name = self.provider.default_schema_name();
let schema = catalog
.schema_by_name(schema_name)
.box_err()
.context(InfluxqlPlanWithCause {
msg: format!("get schema failed, value:{schema_name}"),
})?
.context(InfluxqlPlan {
msg: format!("schema is none, value:{schema_name}"),
})?;
let all_tables = schema
.all_tables()
.box_err()
.context(InfluxqlPlanWithCause {
msg: format!("get all tables failed, catalog:{catalog_name}, schema:{schema_name}"),
})?;

planner
.influxql_stmt_to_plan(stmt, all_tables)
.context(CreatePlan)
planner.influxql_stmt_to_plan(stmt).context(CreatePlan)
}

pub fn write_req_to_plan(
Expand Down
97 changes: 67 additions & 30 deletions query_frontend/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

//! InfluxQL planner
use std::{collections::HashMap, sync::Arc};
use std::{cell::OnceCell, sync::Arc};

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use common_util::error::BoxError;
use datafusion::{
error::DataFusionError, logical_expr::TableSource, sql::planner::ContextProvider,
Expand All @@ -18,6 +19,7 @@ use influxql_parser::{
statement::Statement as InfluxqlStatement,
};
use influxql_schema::Schema;
use log::error;
use snafu::{ensure, ResultExt};
use table_engine::table::TableRef;

Expand All @@ -33,17 +35,18 @@ pub const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "iox::measurement";
// Port from https://github.com/ceresdb/influxql/blob/36fc4d873e/iox_query_influxql/src/frontend/planner.rs#L28
struct InfluxQLSchemaProvider<'a, P: MetaProvider> {
context_provider: ContextProviderAdapter<'a, P>,
// TODO: avoid load all tables.
// if we can ensure `table_names` is only called once, then load tables lazily is better.
tables: HashMap<String, (Arc<dyn TableSource>, Schema)>,
tables_cache: OnceCell<Vec<TableRef>>,
}

impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> {
fn get_table_provider(&self, name: &str) -> datafusion::error::Result<Arc<dyn TableSource>> {
self.tables
.get(name)
.map(|(t, _)| Arc::clone(t))
.ok_or_else(|| DataFusionError::Plan(format!("measurement does not exist: {name}")))
self.context_provider
.get_table_provider(name.into())
.map_err(|e| {
DataFusionError::Plan(format!(
"measurement does not exist, measurement:{name}, source:{e}"
))
})
}

fn get_function_meta(&self, name: &str) -> Option<Arc<datafusion::logical_expr::ScalarUDF>> {
Expand All @@ -58,21 +61,68 @@ impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> {
}

fn table_names(&self) -> Vec<&'_ str> {
self.tables.keys().map(|k| k.as_str()).collect::<Vec<_>>()
let tables = match self
.tables_cache
.get_or_try_init(|| self.context_provider.all_tables())
{
Ok(tables) => tables,
Err(e) => {
// Restricted by the external interface of iox, we can just print error log here
// and return empty `Vec`.
error!("Influxql planner failed to get all tables, err:{e}");
return Vec::default();
}
};

tables.iter().map(|t| t.name()).collect()
}

fn table_schema(&self, name: &str) -> Option<Schema> {
self.tables.get(name).map(|(_, s)| s.clone())
let table_source = match self.get_table_provider(name) {
Ok(table) => table,
Err(e) => {
// Restricted by the external interface of iox, we can just print error log here
// and return None.
error!("Influxql planner failed to get table schema, name:{name}, err:{e}");
return None;
}
};

let ceresdb_arrow_schema = table_source.schema();
let influxql_schema = match convert_to_influxql_schema(table_source.schema()) {
Ok(schema) => schema,
Err(e) => {
// Same as above here.
error!("Influxql planner failed to convert schema to influxql schema, schema:{ceresdb_arrow_schema}, err:{e}");
return None;
}
};

Some(influxql_schema)
}

fn table_exists(&self, name: &str) -> bool {
match self.context_provider.table(name.into()) {
Ok(Some(_)) => true,
Ok(None) => false,
Err(e) => {
// Same as above here.
error!("Influxql planner failed to find table, table_name:{name}, err:{e}");
false
}
}
}
}

/// Influxql logical planner
///
/// NOTICE: planner will be built for each influxql query.
pub(crate) struct Planner<'a, P: MetaProvider> {
schema_provider: InfluxQLSchemaProvider<'a, P>,
}

fn convert_influxql_schema(ceresdb_schema: common_types::schema::Schema) -> Result<Schema> {
let arrow_schema = ceresdb_schema.into_arrow_schema_ref();
ceresdb_schema_to_influxdb(arrow_schema)
fn convert_to_influxql_schema(ceresdb_arrow_schema: ArrowSchemaRef) -> Result<Schema> {
ceresdb_schema_to_influxdb(ceresdb_arrow_schema)
.box_err()
.and_then(|s| Schema::try_from(s).box_err())
.context(BuildPlanWithCause {
Expand All @@ -81,26 +131,13 @@ fn convert_influxql_schema(ceresdb_schema: common_types::schema::Schema) -> Resu
}

impl<'a, P: MetaProvider> Planner<'a, P> {
pub fn try_new(
context_provider: ContextProviderAdapter<'a, P>,
all_tables: Vec<TableRef>,
) -> Result<Self> {
let tables = all_tables
.into_iter()
.map(|t| {
let table_name = t.name().to_string();
let schema = convert_influxql_schema(t.schema())?;
let table_source = context_provider.table_source(t);
Ok((table_name, (table_source, schema)))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(Self {
pub fn new(context_provider: ContextProviderAdapter<'a, P>) -> Self {
Self {
schema_provider: InfluxQLSchemaProvider {
context_provider,
tables,
tables_cache: OnceCell::new(),
},
})
}
}

/// Build sql logical plan from [InfluxqlStatement].
Expand Down
1 change: 1 addition & 0 deletions query_frontend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
#![feature(once_cell)]

//! SQL frontend
//!
Expand Down
12 changes: 4 additions & 8 deletions query_frontend/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
.context(BuildPromPlanError)
}

pub fn influxql_stmt_to_plan(
&self,
statement: InfluxqlStatement,
all_tables: Vec<TableRef>,
) -> Result<Plan> {
pub fn influxql_stmt_to_plan(&self, statement: InfluxqlStatement) -> Result<Plan> {
let adapter = ContextProviderAdapter::new(self.provider, self.read_parallelism);

crate::influxql::planner::Planner::try_new(adapter, all_tables)
.and_then(|planner| planner.statement_to_plan(statement))
let planner = crate::influxql::planner::Planner::new(adapter);
planner
.statement_to_plan(statement)
.context(BuildInfluxqlPlan)
}

Expand Down
56 changes: 55 additions & 1 deletion query_frontend/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use datafusion::{
sql::planner::ContextProvider,
};
use df_operator::{registry::FunctionRegistry, scalar::ScalarUdf, udaf::AggregateUdf};
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use table_engine::{provider::TableProviderAdapter, table::TableRef};

use crate::container::{TableContainer, TableReference};
Expand All @@ -35,6 +35,9 @@ pub enum Error {
source: catalog::Error,
},

#[snafu(display("Failed to find catalog, name:{}", name))]
CatalogNotFound { name: String },

#[snafu(display("Failed to find schema, name:{}", name))]
SchemaNotFound { name: String },

Expand All @@ -44,6 +47,18 @@ pub enum Error {
source: Box<catalog::schema::Error>,
},

#[snafu(display(
"Failed to get all tables, catalog_name:{}, schema_name:{}, err:{}",
catalog_name,
schema_name,
source
))]
GetAllTables {
catalog_name: String,
schema_name: String,
source: catalog::schema::Error,
},

#[snafu(display("Failed to find udf, err:{}", source))]
FindUdf {
source: df_operator::registry::Error,
Expand Down Expand Up @@ -72,6 +87,13 @@ pub trait MetaProvider {

/// Get udaf by name.
fn aggregate_udf(&self, name: &str) -> Result<Option<AggregateUdf>>;

/// Return all tables.
///
/// Note that it may incur expensive cost.
/// Now it is used in `table_names` method in `SchemaProvider`(introduced by
/// influxql).
fn all_tables(&self) -> Result<Vec<TableRef>>;
}

/// We use an adapter instead of using [catalog::Manager] directly, because
Expand Down Expand Up @@ -138,6 +160,34 @@ impl<'a> MetaProvider for CatalogMetaProvider<'a> {
fn aggregate_udf(&self, name: &str) -> Result<Option<AggregateUdf>> {
self.function_registry.find_udaf(name).context(FindUdf)
}

// TODO: after supporting not only default catalog and schema, we should
// refactor the tables collecting procedure.
fn all_tables(&self) -> Result<Vec<TableRef>> {
let catalog = self
.manager
.catalog_by_name(self.default_catalog)
.with_context(|| FindCatalog {
name: self.default_catalog,
})?
.with_context(|| CatalogNotFound {
name: self.default_catalog,
})?;

let schema = catalog
.schema_by_name(self.default_schema)
.context(FindSchema {
name: self.default_schema,
})?
.with_context(|| SchemaNotFound {
name: self.default_schema,
})?;

schema.all_tables().with_context(|| GetAllTables {
catalog_name: self.default_catalog,
schema_name: self.default_schema,
})
}
}

/// An adapter to ContextProvider, not thread safe
Expand Down Expand Up @@ -219,6 +269,10 @@ impl<'a, P: MetaProvider> MetaProvider for ContextProviderAdapter<'a, P> {
fn aggregate_udf(&self, name: &str) -> Result<Option<AggregateUdf>> {
self.meta_provider.aggregate_udf(name)
}

fn all_tables(&self) -> Result<Vec<TableRef>> {
self.meta_provider.all_tables()
}
}

impl<'a, P: MetaProvider> ContextProvider for ContextProviderAdapter<'a, P> {
Expand Down
4 changes: 4 additions & 0 deletions query_frontend/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ impl MetaProvider for MockMetaProvider {
fn aggregate_udf(&self, _name: &str) -> crate::provider::Result<Option<AggregateUdf>> {
todo!()
}

fn all_tables(&self) -> crate::provider::Result<Vec<TableRef>> {
todo!()
}
}
6 changes: 1 addition & 5 deletions server/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
);

frontend
.influxql_stmt_to_plan(
&mut sql_ctx,
stmts.remove(0),
instance.catalog_manager.clone(),
)
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
Expand Down
6 changes: 1 addition & 5 deletions server/src/proxy/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
);

frontend
.influxql_stmt_to_plan(
&mut sql_ctx,
stmts.remove(0),
self.instance.catalog_manager.clone(),
)
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
Expand Down

0 comments on commit e1d4bd1

Please sign in to comment.