Skip to content

Commit

Permalink
feat: introduce TableOperator to encasulate operation of tables (#808)
Browse files Browse the repository at this point in the history
* add `register_table` in `Schema`, adapt this change in volatile based schema impl.

* refactor table opertations.

* abort the modification in meta event service.

* add table operator to meta_event_service.

* add table operator to local recovery.

* make clippy happy.

* update integration tests.

* remove useless table operation methods, and schema id in related requests.

* address CR.
  • Loading branch information
Rachelint authored Apr 7, 2023
1 parent d9b7d15 commit f27792c
Show file tree
Hide file tree
Showing 17 changed files with 698 additions and 456 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ workspace = true
async-trait = { workspace = true }
common_types = { workspace = true }
common_util = { workspace = true }
log = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
9 changes: 8 additions & 1 deletion catalog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

//! Common traits and types about catalog (schema)
Expand All @@ -8,6 +8,7 @@ extern crate common_util;
pub mod consts;
pub mod manager;
pub mod schema;
pub mod table_operator;

use std::sync::Arc;

Expand Down Expand Up @@ -48,6 +49,12 @@ pub enum Error {

#[snafu(display("Unsupported method, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
UnSupported { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to operate table, msg:{}, err:{}", msg, source))]
TableOperatorWithCause { msg: String, source: GenericError },

#[snafu(display("Failed to operate table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableOperatorNoCause { msg: String, backtrace: Backtrace },
}

define_result!(Error);
Expand Down
116 changes: 94 additions & 22 deletions catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ pub struct CreateTableRequest {
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Schema id
pub schema_id: SchemaId,
/// Table name
pub table_name: String,
/// Table schema
Expand All @@ -208,11 +206,15 @@ pub struct CreateTableRequest {
}

impl CreateTableRequest {
pub fn into_engine_create_request(self, table_id: TableId) -> engine::CreateTableRequest {
pub fn into_engine_create_request(
self,
table_id: TableId,
schema_id: SchemaId,
) -> engine::CreateTableRequest {
engine::CreateTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id: self.schema_id,
schema_id,
table_name: self.table_name,
table_id,
table_schema: self.table_schema,
Expand All @@ -236,25 +238,101 @@ pub struct CreateOptions {
pub create_if_not_exists: bool,
}

pub type DropTableRequest = engine::DropTableRequest;
/// Drop table request
#[derive(Debug, Clone)]
pub struct DropTableRequest {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Table name
pub table_name: String,
/// Table engine type
pub engine: String,
}

/// Drop table options.
impl DropTableRequest {
pub fn into_engine_drop_request(self, schema_id: SchemaId) -> engine::DropTableRequest {
engine::DropTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
table_name: self.table_name,
engine: self.engine,
}
}
}
/// Drop table options
#[derive(Clone)]
pub struct DropOptions {
/// Table engine
pub table_engine: TableEngineRef,
}

pub type OpenTableRequest = engine::OpenTableRequest;
/// Open table request
#[derive(Debug, Clone)]
pub struct OpenTableRequest {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Table name
pub table_name: String,
/// Table id
pub table_id: TableId,
/// Table engine type
pub engine: String,
/// Shard id, shard is the table set about scheduling from nodes
pub shard_id: ShardId,
}

impl OpenTableRequest {
pub fn into_engine_open_request(self, schema_id: SchemaId) -> engine::OpenTableRequest {
engine::OpenTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
table_name: self.table_name,
table_id: self.table_id,
engine: self.engine,
shard_id: self.shard_id,
}
}
}
/// Open table options.
#[derive(Clone)]
pub struct OpenOptions {
/// Table engine
pub table_engine: TableEngineRef,
}

pub type CloseTableRequest = engine::CloseTableRequest;
/// Close table request
#[derive(Clone, Debug)]
pub struct CloseTableRequest {
/// Catalog name
pub catalog_name: String,
/// Schema name
pub schema_name: String,
/// Table name
pub table_name: String,
/// Table id
pub table_id: TableId,
/// Table engine type
pub engine: String,
}

impl CloseTableRequest {
pub fn into_engine_close_request(self, schema_id: SchemaId) -> engine::CloseTableRequest {
engine::CloseTableRequest {
catalog_name: self.catalog_name,
schema_name: self.schema_name,
schema_id,
table_name: self.table_name,
table_id: self.table_id,
engine: self.engine,
}
}
}

/// Close table options.
#[derive(Clone)]
Expand Down Expand Up @@ -291,32 +369,26 @@ pub trait Schema {
/// Find table by name.
fn table_by_name(&self, name: NameRef) -> Result<Option<TableRef>>;

/// TODO: remove this method afterwards.
/// Create table according to `request`.
async fn create_table(
&self,
request: CreateTableRequest,
opts: CreateOptions,
) -> Result<TableRef>;

/// TODO: remove this method afterwards.
/// Drop table according to `request`.
///
/// Returns true if the table is really dropped.
async fn drop_table(&self, request: DropTableRequest, opts: DropOptions) -> Result<bool>;

/// Open the table according to `request`.
///
/// Return None if table does not exist.
async fn open_table(
&self,
request: OpenTableRequest,
opts: OpenOptions,
) -> Result<Option<TableRef>>;

/// Close the table according to `request`.
///
/// Return false if table does not exist.
async fn close_table(&self, request: CloseTableRequest, opts: CloseOptions) -> Result<()>;

/// All tables
fn all_tables(&self) -> Result<Vec<TableRef>>;

/// Register the opened table into schema.
fn register_table(&self, table: TableRef);

/// Unregister table
fn unregister_table(&self, table_name: &str);
}
Loading

0 comments on commit f27792c

Please sign in to comment.