Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce TableOperator to encasulate operation of tables #808

Merged
merged 11 commits into from
Apr 7, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ workspace = true
async-trait = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
log = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
9 changes: 8 additions & 1 deletion catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Common traits and types about catalog (schema)
Expand All @@ -8,6 +8,7 @@ extern crate common_util;
pub mod consts;
pub mod manager;
pub mod schema;
pub mod table_operator;

use std::sync::Arc;

Expand Down Expand Up @@ -48,6 +49,12 @@ pub enum Error {

#[snafu(display("Unsupported method, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
UnSupported { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to operate table, msg:{}, err:{}", msg, source))]
TableOperatorWithCause { msg: String, source: GenericError },

#[snafu(display("Failed to operate table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableOperatorNoCause { msg: String, backtrace: Backtrace },
}

define_result!(Error);
Expand Down
116 changes: 94 additions & 22 deletions catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ pub struct CreateTableRequest {
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Schema id
pub schema_id: SchemaId,
/// Table name
pub table_name: String,
/// Table schema
Expand All @@ -208,11 +206,15 @@ pub struct CreateTableRequest {
}

impl CreateTableRequest {
pub fn into_engine_create_request(self, table_id: TableId) -> engine::CreateTableRequest {
pub fn into_engine_create_request(
self,
table_id: TableId,
schema_id: SchemaId,
) -> engine::CreateTableRequest {
engine::CreateTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id: self.schema_id,
schema_id,
table_name: self.table_name,
table_id,
table_schema: self.table_schema,
Expand All @@ -236,25 +238,101 @@ pub struct CreateOptions {
pub create_if_not_exists: bool,
}

pub type DropTableRequest = engine::DropTableRequest;
/// Drop table request
#[derive(Debug, Clone)]
pub struct DropTableRequest {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Table name
pub table_name: String,
/// Table engine type
pub engine: String,
}

/// Drop table options.
impl DropTableRequest {
pub fn into_engine_drop_request(self, schema_id: SchemaId) -> engine::DropTableRequest {
engine::DropTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
table_name: self.table_name,
engine: self.engine,
}
}
}
/// Drop table options
#[derive(Clone)]
pub struct DropOptions {
/// Table engine
pub table_engine: TableEngineRef,
}

pub type OpenTableRequest = engine::OpenTableRequest;
/// Open table request
#[derive(Debug, Clone)]
pub struct OpenTableRequest {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Table name
pub table_name: String,
/// Table id
pub table_id: TableId,
/// Table engine type
pub engine: String,
/// Shard id, shard is the table set about scheduling from nodes
pub shard_id: ShardId,
}

impl OpenTableRequest {
pub fn into_engine_open_request(self, schema_id: SchemaId) -> engine::OpenTableRequest {
engine::OpenTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
table_name: self.table_name,
table_id: self.table_id,
engine: self.engine,
shard_id: self.shard_id,
}
}
}
/// Open table options.
#[derive(Clone)]
pub struct OpenOptions {
/// Table engine
pub table_engine: TableEngineRef,
}

pub type CloseTableRequest = engine::CloseTableRequest;
/// Close table request
#[derive(Clone, Debug)]
pub struct CloseTableRequest {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Table name
pub table_name: String,
/// Table id
pub table_id: TableId,
/// Table engine type
pub engine: String,
}

impl CloseTableRequest {
pub fn into_engine_close_request(self, schema_id: SchemaId) -> engine::CloseTableRequest {
engine::CloseTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
table_name: self.table_name,
table_id: self.table_id,
engine: self.engine,
}
}
}

/// Close table options.
#[derive(Clone)]
Expand Down Expand Up @@ -291,32 +369,26 @@ pub trait Schema {
/// Find table by name.
fn table_by_name(&self, name: NameRef) -> Result<Option<TableRef>>;

/// TODO: remove this method afterwards.
/// Create table according to `request`.
async fn create_table(
&self,
request: CreateTableRequest,
opts: CreateOptions,
) -> Result<TableRef>;

/// TODO: remove this method afterwards.
/// Drop table according to `request`.
///
/// Returns true if the table is really dropped.
async fn drop_table(&self, request: DropTableRequest, opts: DropOptions) -> Result<bool>;

/// Open the table according to `request`.
///
/// Return None if table does not exist.
async fn open_table(
&self,
request: OpenTableRequest,
opts: OpenOptions,
) -> Result<Option<TableRef>>;

/// Close the table according to `request`.
///
/// Return false if table does not exist.
async fn close_table(&self, request: CloseTableRequest, opts: CloseOptions) -> Result<()>;

/// All tables
fn all_tables(&self) -> Result<Vec<TableRef>>;

/// Register the opened table into schema.
fn register_table(&self, table: TableRef);

/// Unregister table
fn unregister_table(&self, table_name: &str);
}
Loading