Skip to content

Commit

Permalink
feat: support alter source rename (#8778)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Mar 28, 2023
1 parent e98a1a4 commit b9c9d35
Show file tree
Hide file tree
Showing 15 changed files with 180 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ steps:
retry: *auto-retry

- label: "recovery test (deterministic simulation)"
command: "TEST_NUM=8 KILL_RATE=0.5 timeout 14m ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=8 KILL_RATE=0.5 timeout 20m ci/scripts/deterministic-recovery-test.sh"
depends_on: "build-simulation"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
Expand Down
27 changes: 27 additions & 0 deletions e2e_test/ddl/alter_rename_relation.slt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ CREATE SINK sink AS SELECT mv3.v1 AS v1, mv3.v21 AS v2 FROM mv AS mv3 WITH (
connector = 'blackhole'
);

statement ok
CREATE SOURCE src (v INT) WITH (
connector = 'datagen',
fields.v.kind = 'sequence',
fields.v.start = '1',
fields.v.end = '10',
datagen.rows.per.second='15',
datagen.split.num = '1'
) ROW FORMAT JSON;

statement ok
CREATE VIEW v1 AS ( SELECT * FROM t_as WHERE id = 1);

Expand Down Expand Up @@ -123,6 +133,17 @@ SELECT * from v4
1 (1,(1,2)) 1 (1,(1,2))
2 (2,(2,4)) 2 (2,(2,4))

statement ok
CREATE MATERIALIZED VIEW mv4 AS SELECT * FROM src;

statement ok
ALTER SOURCE src RENAME TO src1;

query TT
SHOW CREATE MATERIALIZED VIEW mv4;
----
public.mv4 CREATE MATERIALIZED VIEW mv4 AS SELECT * FROM src1 AS src

statement ok
DROP SINK sink1;

Expand All @@ -138,6 +159,12 @@ DROP VIEW v3;
statement ok
DROP VIEW v2;

statement ok
DROP MATERIALIZED VIEW mv4;

statement ok
DROP SOURCE src1;

statement ok
DROP MATERIALIZED VIEW mv3;

Expand Down
1 change: 1 addition & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ message AlterRelationNameRequest {
uint32 view_id = 2;
uint32 index_id = 3;
uint32 sink_id = 4;
uint32 source_id = 5;
}
string new_name = 20;
}
Expand Down
5 changes: 5 additions & 0 deletions src/frontend/src/binder/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ impl Binder {
Self::resolve_single_name(name.0, "table name")
}

/// return the `source_name`
pub fn resolve_source_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "source name")
}

/// return the `user_name`
pub fn resolve_user_name(name: ObjectName) -> Result<String> {
Self::resolve_single_name(name.0, "user name")
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ pub trait CatalogWriter: Send + Sync {
async fn alter_index_name(&self, index_id: u32, index_name: &str) -> Result<()>;

async fn alter_sink_name(&self, sink_id: u32, sink_name: &str) -> Result<()>;

async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()>;
}

#[derive(Clone)]
Expand Down Expand Up @@ -305,6 +307,14 @@ impl CatalogWriter for CatalogWriterImpl {
.await?;
self.wait_version(version).await
}

async fn alter_source_name(&self, source_id: u32, source_name: &str) -> Result<()> {
let version = self
.meta_client
.alter_relation_name(Relation::SourceId(source_id), source_name)
.await?;
self.wait_version(version).await
}
}

impl CatalogWriterImpl {
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,12 @@ impl SchemaCatalog {
let source = SourceCatalog::from(prost);
let source_ref = Arc::new(source);

// check if source name get updated.
let old_source = self.source_by_id.get(&id).unwrap();
// check if source name get updated.
if old_source.name != name {
self.source_by_name.remove(&old_source.name);
}

self.source_by_name.insert(name, source_ref.clone());
self.source_by_id.insert(id, source_ref);
}
Expand Down
31 changes: 31 additions & 0 deletions src/frontend/src/handler/alter_relation_rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,37 @@ pub async fn handle_rename_sink(
Ok(PgResponse::empty_result(StatementType::ALTER_SINK))
}

pub async fn handle_rename_source(
handler_args: HandlerArgs,
source_name: ObjectName,
new_source_name: ObjectName,
) -> Result<RwPgResponse> {
let session = handler_args.session;
let db_name = session.database();
let (schema_name, real_source_name) =
Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
let new_source_name = Binder::resolve_source_name(new_source_name)?;
let search_path = session.config().get_search_path();
let user_name = &session.auth_context().user_name;

let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let source_id = {
let reader = session.env().catalog_reader().read_guard();
let (source, schema_name) =
reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
session.check_privilege_for_drop_alter(schema_name, &**source)?;
source.id
};

let catalog_writer = session.env().catalog_writer();
catalog_writer
.alter_source_name(source_id, &new_source_name)
.await?;

Ok(PgResponse::empty_result(StatementType::ALTER_SOURCE))
}

#[cfg(test)]
mod tests {

Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ pub async fn handle(
name,
operation: AlterSinkOperation::RenameSink { sink_name },
} => alter_relation_rename::handle_rename_sink(handler_args, name, sink_name).await,
Statement::AlterSource {
name,
operation: AlterSourceOperation::RenameSource { source_name },
} => alter_relation_rename::handle_rename_source(handler_args, name, source_name).await,
Statement::AlterSystem { param, value } => {
alter_system::handle_alter_system(handler_args, param, value).await
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ impl CatalogWriter for MockCatalogWriter {
async fn alter_sink_name(&self, _sink_id: u32, _sink_name: &str) -> Result<()> {
unreachable!()
}

async fn alter_source_name(&self, _source_id: u32, _source_name: &str) -> Result<()> {
unreachable!()
}
}

impl MockCatalogWriter {
Expand Down
35 changes: 35 additions & 0 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,6 +1139,41 @@ where
Ok(version)
}

pub async fn alter_source_name(
&self,
source_id: SourceId,
source_name: &str,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
database_core.ensure_source_id(source_id)?;

// 1. validate new source name.
let mut source = database_core.sources.get(&source_id).unwrap().clone();
database_core.check_relation_name_duplicated(&(
source.database_id,
source.schema_id,
source_name.to_string(),
))?;

// 2. rename source and its definition.
let old_name = source.name.clone();
source.name = source_name.to_string();

// 3. update, commit and notify all relations that depend on this source.
self.alter_relation_name_refs_inner(
database_core,
source_id,
&old_name,
source_name,
vec![],
vec![],
vec![],
Some(source),
)
.await
}

pub async fn alter_index_name(
&self,
index_id: IndexId,
Expand Down
5 changes: 5 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ where
.alter_sink_name(sink_id, new_name)
.await
}
Relation::SourceId(source_id) => {
self.catalog_manager
.alter_source_name(source_id, new_name)
.await
}
}
}
}
17 changes: 17 additions & 0 deletions src/sqlparser/src/ast/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ pub enum AlterSinkOperation {
RenameSink { sink_name: ObjectName },
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(feature = "visitor", derive(Visit, VisitMut))]
pub enum AlterSourceOperation {
RenameSource { source_name: ObjectName },
}

impl fmt::Display for AlterTableOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Expand Down Expand Up @@ -184,6 +191,16 @@ impl fmt::Display for AlterSinkOperation {
}
}

impl fmt::Display for AlterSourceOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AlterSourceOperation::RenameSource { source_name } => {
write!(f, "RENAME TO {source_name}")
}
}
}
}

/// An `ALTER COLUMN` (`Statement::AlterTable`) operation
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down
13 changes: 12 additions & 1 deletion src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ pub use self::query::{
};
pub use self::statement::*;
pub use self::value::{DateTimeField, DollarQuotedString, TrimWhereField, Value};
pub use crate::ast::ddl::{AlterIndexOperation, AlterSinkOperation, AlterViewOperation};
pub use crate::ast::ddl::{
AlterIndexOperation, AlterSinkOperation, AlterSourceOperation, AlterViewOperation,
};
use crate::keywords::Keyword;
use crate::parser::{Parser, ParserError};

Expand Down Expand Up @@ -1011,6 +1013,12 @@ pub enum Statement {
name: ObjectName,
operation: AlterSinkOperation,
},
/// ALTER SOURCE
AlterSource {
/// Source name
name: ObjectName,
operation: AlterSourceOperation,
},
/// DESCRIBE TABLE OR SOURCE
Describe {
/// Table or Source name
Expand Down Expand Up @@ -1415,6 +1423,9 @@ impl fmt::Display for Statement {
Statement::AlterSink { name, operation } => {
write!(f, "ALTER SINK {} {}", name, operation)
}
Statement::AlterSource { name, operation } => {
write!(f, "ALTER SOURCE {} {}", name, operation)
}
Statement::Drop(stmt) => write!(f, "DROP {}", stmt),
Statement::DropFunction {
if_exists,
Expand Down
26 changes: 25 additions & 1 deletion src/sqlparser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2428,12 +2428,17 @@ impl Parser {
self.parse_alter_view(true)
} else if self.parse_keyword(Keyword::SINK) {
self.parse_alter_sink()
} else if self.parse_keyword(Keyword::SOURCE) {
self.parse_alter_source()
} else if self.parse_keyword(Keyword::USER) {
self.parse_alter_user()
} else if self.parse_keyword(Keyword::SYSTEM) {
self.parse_alter_system()
} else {
self.expected("TABLE or USER after ALTER", self.peek_token())
self.expected(
"TABLE, INDEX, MATERIALIZED, VIEW, SINK, SOURCE, USER or SYSTEM after ALTER",
self.peek_token(),
)
}
}

Expand Down Expand Up @@ -2592,6 +2597,25 @@ impl Parser {
})
}

pub fn parse_alter_source(&mut self) -> Result<Statement, ParserError> {
let source_name = self.parse_object_name()?;
let operation = if self.parse_keyword(Keyword::RENAME) {
if self.parse_keyword(Keyword::TO) {
let source_name = self.parse_object_name()?;
AlterSourceOperation::RenameSource { source_name }
} else {
return self.expected("TO after RENAME", self.peek_token());
}
} else {
return self.expected("RENAME after ALTER SOURCE", self.peek_token());
};

Ok(Statement::AlterSource {
name: source_name,
operation,
})
}

pub fn parse_alter_system(&mut self) -> Result<Statement, ParserError> {
self.expect_keyword(Keyword::SET)?;
let param = self.parse_identifier()?;
Expand Down
1 change: 1 addition & 0 deletions src/utils/pgwire/src/pg_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub enum StatementType {
ALTER_TABLE,
ALTER_MATERIALIZED_VIEW,
ALTER_SINK,
ALTER_SOURCE,
ALTER_SYSTEM,
REVOKE_PRIVILEGE,
// Introduce ORDER_BY statement type cuz Calcite unvalidated AST has SqlKind.ORDER_BY. Note
Expand Down

0 comments on commit b9c9d35

Please sign in to comment.