Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: only collecting all tables on demand in influxql planner #854

Merged
35 changes: 2 additions & 33 deletions query_frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

use std::{sync::Arc, time::Instant};

use catalog::manager::ManagerRef;
use ceresdbproto::{prometheus::Expr as PromExpr, storage::WriteTableRequest};
use cluster::config::SchemaConfig;
use common_types::request_id::RequestId;
use common_util::error::{BoxError, GenericError};
use common_util::error::GenericError;
use influxql_parser::statement::Statement as InfluxqlStatement;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use sqlparser::ast::{SetExpr, Statement as SqlStatement, TableFactor};
Expand Down Expand Up @@ -146,39 +145,9 @@ impl<P: MetaProvider> Frontend<P> {
&self,
ctx: &mut Context,
stmt: InfluxqlStatement,
manager: ManagerRef,
) -> Result<Plan> {
let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism);
let catalog_name = self.provider.default_catalog_name();
let catalog = manager
.catalog_by_name(catalog_name)
.box_err()
.context(InfluxqlPlanWithCause {
msg: format!("get catalog failed, value:{catalog_name}"),
})?
.context(InfluxqlPlan {
msg: format!("catalog is none, value:{catalog_name}"),
})?;
let schema_name = self.provider.default_schema_name();
let schema = catalog
.schema_by_name(schema_name)
.box_err()
.context(InfluxqlPlanWithCause {
msg: format!("get schema failed, value:{schema_name}"),
})?
.context(InfluxqlPlan {
msg: format!("schema is none, value:{schema_name}"),
})?;
let all_tables = schema
.all_tables()
.box_err()
.context(InfluxqlPlanWithCause {
msg: format!("get all tables failed, catalog:{catalog_name}, schema:{schema_name}"),
})?;

planner
.influxql_stmt_to_plan(stmt, all_tables)
.context(CreatePlan)
planner.influxql_stmt_to_plan(stmt).context(CreatePlan)
}

pub fn write_req_to_plan(
Expand Down
97 changes: 67 additions & 30 deletions query_frontend/src/influxql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

//! InfluxQL planner

use std::{collections::HashMap, sync::Arc};
use std::{cell::OnceCell, sync::Arc};

use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use common_util::error::BoxError;
use datafusion::{
error::DataFusionError, logical_expr::TableSource, sql::planner::ContextProvider,
Expand All @@ -18,6 +19,7 @@ use influxql_parser::{
statement::Statement as InfluxqlStatement,
};
use influxql_schema::Schema;
use log::error;
use snafu::{ensure, ResultExt};
use table_engine::table::TableRef;

Expand All @@ -33,17 +35,18 @@ pub const CERESDB_MEASUREMENT_COLUMN_NAME: &str = "iox::measurement";
// Port from https://github.com/ceresdb/influxql/blob/36fc4d873e/iox_query_influxql/src/frontend/planner.rs#L28
struct InfluxQLSchemaProvider<'a, P: MetaProvider> {
context_provider: ContextProviderAdapter<'a, P>,
// TODO: avoid load all tables.
// if we can ensure `table_names` is only called once, then load tables lazily is better.
tables: HashMap<String, (Arc<dyn TableSource>, Schema)>,
tables_cache: OnceCell<Vec<TableRef>>,
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
}

impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> {
fn get_table_provider(&self, name: &str) -> datafusion::error::Result<Arc<dyn TableSource>> {
self.tables
.get(name)
.map(|(t, _)| Arc::clone(t))
.ok_or_else(|| DataFusionError::Plan(format!("measurement does not exist: {name}")))
self.context_provider
.get_table_provider(name.into())
.map_err(|e| {
DataFusionError::Plan(format!(
"measurement does not exist, measurement:{name}, source:{e}"
))
})
}

fn get_function_meta(&self, name: &str) -> Option<Arc<datafusion::logical_expr::ScalarUDF>> {
Expand All @@ -58,21 +61,68 @@ impl<'a, P: MetaProvider> SchemaProvider for InfluxQLSchemaProvider<'a, P> {
}

fn table_names(&self) -> Vec<&'_ str> {
self.tables.keys().map(|k| k.as_str()).collect::<Vec<_>>()
let tables = match self
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
.tables_cache
.get_or_try_init(|| self.context_provider.all_tables())
{
Ok(tables) => tables,
Err(e) => {
// Restricted by the external interface of iox, we can just print error log here
// and return empty `Vec`.
error!("Influxql planner failed to get all tables, err:{e}");
return Vec::default();
}
};

tables.iter().map(|t| t.name()).collect()
}

fn table_schema(&self, name: &str) -> Option<Schema> {
self.tables.get(name).map(|(_, s)| s.clone())
let table_source = match self.get_table_provider(name) {
Ok(table) => table,
Err(e) => {
// Restricted by the external interface of iox, we can just print error log here
// and return None.
error!("Influxql planner failed to get table schema, name:{name}, err:{e}");
return None;
}
};

let ceresdb_arrow_schema = table_source.schema();
let influxql_schema = match convert_to_influxql_schema(table_source.schema()) {
Ok(schema) => schema,
Err(e) => {
// Same as above here.
error!("Influxql planner failed to convert schema to influxql schema, schema:{ceresdb_arrow_schema}, err:{e}");
return None;
}
};

Some(influxql_schema)
}

fn table_exists(&self, name: &str) -> bool {
match self.context_provider.table(name.into()) {
Ok(Some(_)) => true,
Ok(None) => false,
Err(e) => {
// Same as above here.
error!("Influxql planner failed to find table, table_name:{name}, err:{e}");
false
}
}
}
}

/// Influxql logical planner
///
/// NOTICE: planner will be built for each influxql query.
pub(crate) struct Planner<'a, P: MetaProvider> {
schema_provider: InfluxQLSchemaProvider<'a, P>,
}

fn convert_influxql_schema(ceresdb_schema: common_types::schema::Schema) -> Result<Schema> {
let arrow_schema = ceresdb_schema.into_arrow_schema_ref();
ceresdb_schema_to_influxdb(arrow_schema)
fn convert_to_influxql_schema(ceresdb_arrow_schema: ArrowSchemaRef) -> Result<Schema> {
ceresdb_schema_to_influxdb(ceresdb_arrow_schema)
.box_err()
.and_then(|s| Schema::try_from(s).box_err())
.context(BuildPlanWithCause {
Expand All @@ -81,26 +131,13 @@ fn convert_influxql_schema(ceresdb_schema: common_types::schema::Schema) -> Resu
}

impl<'a, P: MetaProvider> Planner<'a, P> {
pub fn try_new(
context_provider: ContextProviderAdapter<'a, P>,
all_tables: Vec<TableRef>,
) -> Result<Self> {
let tables = all_tables
.into_iter()
.map(|t| {
let table_name = t.name().to_string();
let schema = convert_influxql_schema(t.schema())?;
let table_source = context_provider.table_source(t);
Ok((table_name, (table_source, schema)))
})
.collect::<Result<HashMap<_, _>>>()?;

Ok(Self {
pub fn new(context_provider: ContextProviderAdapter<'a, P>) -> Self {
Self {
schema_provider: InfluxQLSchemaProvider {
context_provider,
tables,
tables_cache: OnceCell::new(),
},
})
}
}

/// Build sql logical plan from [InfluxqlStatement].
Expand Down
1 change: 1 addition & 0 deletions query_frontend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.
#![feature(once_cell)]

//! SQL frontend
//!
Expand Down
12 changes: 4 additions & 8 deletions query_frontend/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,15 +330,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
.context(BuildPromPlanError)
}

pub fn influxql_stmt_to_plan(
&self,
statement: InfluxqlStatement,
all_tables: Vec<TableRef>,
) -> Result<Plan> {
pub fn influxql_stmt_to_plan(&self, statement: InfluxqlStatement) -> Result<Plan> {
let adapter = ContextProviderAdapter::new(self.provider, self.read_parallelism);

crate::influxql::planner::Planner::try_new(adapter, all_tables)
.and_then(|planner| planner.statement_to_plan(statement))
let planner = crate::influxql::planner::Planner::new(adapter);
planner
.statement_to_plan(statement)
.context(BuildInfluxqlPlan)
}

Expand Down
56 changes: 55 additions & 1 deletion query_frontend/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use datafusion::{
sql::planner::ContextProvider,
};
use df_operator::{registry::FunctionRegistry, scalar::ScalarUdf, udaf::AggregateUdf};
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use table_engine::{provider::TableProviderAdapter, table::TableRef};

use crate::container::{TableContainer, TableReference};
Expand All @@ -35,6 +35,9 @@ pub enum Error {
source: catalog::Error,
},

#[snafu(display("Failed to find catalog, name:{}", name))]
CatalogNotFound { name: String },

#[snafu(display("Failed to find schema, name:{}", name))]
SchemaNotFound { name: String },

Expand All @@ -44,6 +47,18 @@ pub enum Error {
source: Box<catalog::schema::Error>,
},

#[snafu(display(
"Failed to get all tables, catalog_name:{}, schema_name:{}, err:{}",
catalog_name,
schema_name,
source
))]
GetAllTables {
catalog_name: String,
schema_name: String,
source: catalog::schema::Error,
},

#[snafu(display("Failed to find udf, err:{}", source))]
FindUdf {
source: df_operator::registry::Error,
Expand Down Expand Up @@ -72,6 +87,13 @@ pub trait MetaProvider {

/// Get udaf by name.
fn aggregate_udf(&self, name: &str) -> Result<Option<AggregateUdf>>;

/// Return all tables.
///
/// Note that it may incur expensive cost.
/// Now it is used in `table_names` method in `SchemaProvider`(introduced by
/// influxql).
fn all_tables(&self) -> Result<Vec<TableRef>>;
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
}

/// We use an adapter instead of using [catalog::Manager] directly, because
Expand Down Expand Up @@ -138,6 +160,34 @@ impl<'a> MetaProvider for CatalogMetaProvider<'a> {
fn aggregate_udf(&self, name: &str) -> Result<Option<AggregateUdf>> {
self.function_registry.find_udaf(name).context(FindUdf)
}

// TODO: after supporting not only default catalog and schema, we should
// refactor the tables collecting procedure.
fn all_tables(&self) -> Result<Vec<TableRef>> {
let catalog = self
.manager
.catalog_by_name(self.default_catalog)
.with_context(|| FindCatalog {
name: self.default_catalog,
})?
.with_context(|| CatalogNotFound {
name: self.default_catalog,
})?;

let schema = catalog
.schema_by_name(self.default_schema)
.context(FindSchema {
name: self.default_schema,
})?
.with_context(|| SchemaNotFound {
name: self.default_schema,
})?;

schema.all_tables().with_context(|| GetAllTables {
catalog_name: self.default_catalog,
schema_name: self.default_schema,
})
}
}

/// An adapter to ContextProvider, not thread safe
Expand Down Expand Up @@ -219,6 +269,10 @@ impl<'a, P: MetaProvider> MetaProvider for ContextProviderAdapter<'a, P> {
fn aggregate_udf(&self, name: &str) -> Result<Option<AggregateUdf>> {
self.meta_provider.aggregate_udf(name)
}

fn all_tables(&self) -> Result<Vec<TableRef>> {
self.meta_provider.all_tables()
}
}

impl<'a, P: MetaProvider> ContextProvider for ContextProviderAdapter<'a, P> {
Expand Down
4 changes: 4 additions & 0 deletions query_frontend/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,8 @@ impl MetaProvider for MockMetaProvider {
fn aggregate_udf(&self, _name: &str) -> crate::provider::Result<Option<AggregateUdf>> {
todo!()
}

fn all_tables(&self) -> crate::provider::Result<Vec<TableRef>> {
todo!()
}
}
6 changes: 1 addition & 5 deletions server/src/handlers/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
);

frontend
.influxql_stmt_to_plan(
&mut sql_ctx,
stmts.remove(0),
instance.catalog_manager.clone(),
)
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.context(CreatePlan {
query: &request.query,
})?
Expand Down
6 changes: 1 addition & 5 deletions server/src/proxy/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
);

frontend
.influxql_stmt_to_plan(
&mut sql_ctx,
stmts.remove(0),
self.instance.catalog_manager.clone(),
)
.influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0))
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
Expand Down