Skip to content

Commit

Permalink
logic test
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed Feb 13, 2025
1 parent 8745b54 commit 9b5332e
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/query/ast/src/ast/statements/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ impl Display for AttachTableStmt {
)?;

if let Some(cols) = &self.columns_opt {
write!(f, " (")?;
write_comma_separated_list(f, cols.iter())?;
write!(f, ")")?;
}

write!(f, " {}", self.uri_location)?;
Expand Down
30 changes: 27 additions & 3 deletions src/query/ee/src/attach_table/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_sql::plans::CreateTablePlan;
use databend_common_storage::check_operator;
use databend_common_storage::init_operator;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
use databend_common_storages_fuse::FUSE_TBL_LAST_SNAPSHOT_HINT;
use databend_enterprise_attach_table::AttachTableHandler;
use databend_enterprise_attach_table::AttachTableHandlerWrapper;
Expand Down Expand Up @@ -73,13 +74,22 @@ impl AttachTableHandler for RealAttachTableHandler {
number_of_blocks: Some(snapshot.summary.block_count),
};

// `attach_table_schema` is the initial table schema, which is
// - A cloned schema of the table being attached to
// - Or a sub-schema of the table being attached to
// if columns to include are explicitly specified in the "ATTACH TABLE" statement.
let attach_table_schema = if let Some(attached_columns) = &plan.attached_columns {
// Columns to include are specified, let's check them
let schema = &snapshot.schema;
let mut fields_to_attach = vec![];
let mut fields_to_attach = Vec::with_capacity(attached_columns.len());
let mut field_ids_to_include = Vec::with_capacity(attached_columns.len());
let mut invalid_cols = vec![];
for field in attached_columns {
match schema.field_with_name(&field.name) {
Ok(f) => fields_to_attach.push(f.clone()),
Ok(f) => {
field_ids_to_include.push(f.column_id);
fields_to_attach.push(f.clone())
}
Err(_) => invalid_cols.push(field.name.as_str()),
}
}
Expand All @@ -89,9 +99,23 @@ impl AttachTableHandler for RealAttachTableHandler {
invalid_cols.join(",")
)));
}

let new_metadata = if !field_ids_to_include.is_empty() {
let ids = field_ids_to_include
.iter()
.map(|id| format!("{id}"))
.collect::<Vec<_>>()
.join(",");
let mut v = schema.metadata.clone();
v.insert(FUSE_OPT_KEY_ATTACH_COLUMN_IDS.to_owned(), ids);
v
} else {
schema.metadata.clone()
};

TableSchema {
fields: fields_to_attach,
metadata: schema.metadata.clone(),
metadata: new_metadata,
next_column_id: schema.next_column_id,
}
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub const FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD: &str = "row_avg_depth_threshold"

pub const FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS: &str = "data_retention_period_in_hours";

pub const FUSE_OPT_KEY_ATTACH_COLUMN_IDS: &str = "attach_column_ids";

pub const FUSE_TBL_BLOCK_PREFIX: &str = "_b";
pub const FUSE_TBL_BLOCK_INDEX_PREFIX: &str = "_i";
pub const FUSE_TBL_XOR_BLOOM_INDEX_PREFIX: &str = "_i_b_v2";
Expand Down
36 changes: 35 additions & 1 deletion src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use databend_common_expression::types::DataType;
use databend_common_expression::BlockThresholds;
use databend_common_expression::ColumnId;
use databend_common_expression::RemoteExpr;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_expression::ORIGIN_BLOCK_ID_COL_NAME;
use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COL_NAME;
Expand Down Expand Up @@ -92,6 +93,7 @@ use databend_storages_common_table_meta::table::OPT_KEY_STORAGE_FORMAT;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_ATTACHED_DATA_URI;
use databend_storages_common_table_meta::table::OPT_KEY_TABLE_COMPRESSION;
use futures_util::TryStreamExt;
use itertools::Itertools;
use log::info;
use log::warn;
use opendal::Operator;
Expand All @@ -115,6 +117,7 @@ use crate::TableStatistics;
use crate::DEFAULT_BLOCK_PER_SEGMENT;
use crate::DEFAULT_ROW_PER_PAGE;
use crate::DEFAULT_ROW_PER_PAGE_FOR_BLOCKING;
use crate::FUSE_OPT_KEY_ATTACH_COLUMN_IDS;
use crate::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use crate::FUSE_OPT_KEY_DATA_RETENTION_PERIOD_IN_HOURS;
Expand Down Expand Up @@ -562,6 +565,8 @@ impl FuseTable {
// If table_info options contains key OPT_KEY_SNAPSHOT_LOCATION_FIXED_FLAG,
// it means that this table info has been tweaked according to the rules of
// resolving snapshot location from the hint file, it should not be tweaked again.
// Otherwise, inconsistent table snapshots may be used while table is being processed in
// a distributed manner.
return Ok(());
}

Expand All @@ -584,7 +589,36 @@ impl FuseTable {
.options_mut()
.insert(OPT_KEY_SNAPSHOT_LOCATION.to_string(), location);

table_info.meta.schema = Arc::new(schema);
if let Some(ids) = table_info
.schema()
.metadata
.get(FUSE_OPT_KEY_ATTACH_COLUMN_IDS)
{
// extract ids of column to include
let ids: Vec<ColumnId> = ids
.as_str()
.split(",")
.map(|s| s.parse::<u32>())
.try_collect()?;

// retain the columns that are still there
let fields: Vec<TableField> = ids
.iter()
.filter_map(|id| schema.field_of_column_id(*id).ok().cloned())
.collect();

if fields.is_empty() {
return Err(ErrorCode::StorageOther("no effective columns found"));
}

let mut new_schema = table_info.meta.schema.as_ref().clone();
new_schema.metadata = schema.metadata.clone();
new_schema.next_column_id = schema.next_column_id();
new_schema.fields = fields;
table_info.meta.schema = Arc::new(new_schema);
} else {
table_info.meta.schema = Arc::new(schema);
}
}
}

Expand Down
60 changes: 60 additions & 0 deletions tests/suites/5_ee/04_attach_read_only/02_0004_attach_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,66 @@ Error: APIError: QueryFailed: [3905]Modification not permitted: Table 'table_to'
1
2
<<<<
>>>> create or replace table base(c1 string, c2 string, c3 string, c4 string) 's3://testbucket/admin/data/' connection=(access_key_id ='minioadmin' secret_access_key ='minioadmin' endpoint_url='http://127.0.0.1:9900');
>>>> insert into base values('c1', 'c2', 'c3', 'c4')
1
>>>> drop table if exists attach_base
attaching base table
>>>> select * from attach_base
c2 c4
<<<<
#############################
## access renamed columns ###
#############################
>>>> drop table if exists attach_base
>>>> alter table base RENAME COLUMN c2 to c2_new
>>>> insert into base values('c1', 'c2_new', 'c3', 'c4')
1
select all should work
>>>> select * from attach_base
c2 c4
c2_new c4
<<<<
select c2_new should work
>>>> select c2_new from attach_base
c2
c2_new
<<<<
##################################
## drop column from base table ###
##################################
>>>> alter table base DROP COLUMN c1
>>>> delete from base
2
>>>> insert into base values('c2_new', 'c3', 'c4')
1
select all should work
>>>> select * from attach_base
c2_new c4
<<<<
select c2_new should work
>>>> select c2_new from attach_base
c2_new
<<<<
>>>> alter table base DROP COLUMN c4
>>>> select * from attach_base
c2_new
<<<<
select the dropped column will fail
>>>> select c4 from attach_base
Error: APIError: QueryFailed: [1065]error:
--> SQL:1:8
|
1 | select c4 from attach_base
| ^^ column c4 doesn't exist


<<<<
>>>> alter table base DROP COLUMN c2_new
if all the include columns are dropped, select ALL should fail as well
>>>> select * from attach_base
Error: APIError: QueryFailed: [4000]no effective columns found
<<<<
>>>> drop connection my_conn;
>>>> drop table if exists table_from;
>>>> drop table if exists table_from2;
Expand Down
70 changes: 70 additions & 0 deletions tests/suites/5_ee/04_attach_read_only/02_0004_attach_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,76 @@ query "select * from table_to order by a;"
comment "select after deletion with self-defined connection"
query "select * from table_to2 order by a;"

##############################
# implicitly include columns #
##############################

stmt "create or replace table base(c1 string, c2 string, c3 string, c4 string) 's3://testbucket/admin/data/' connection=(access_key_id ='minioadmin' secret_access_key ='minioadmin' endpoint_url='${STORAGE_S3_ENDPOINT_URL}');"

stmt "insert into base values('c1', 'c2', 'c3', 'c4')"

base_storage_prefix=$(mysql -uroot -h127.0.0.1 -P3307 -e "set global hide_options_in_show_create_table=0;show create table base" | grep -i snapshot_location | awk -F'SNAPSHOT_LOCATION='"'"'|_ss' '{print $2}')

stmt "drop table if exists attach_base"

echo "attaching base table"
echo "attach table attach_base (c2, c4) 's3://testbucket/admin/data/$base_storage_prefix' connection=(connection_name ='my_conn')" | $BENDSQL_CLIENT_CONNECT

query "select * from attach_base"

# access modified table
echo "#############################"
echo "## access renamed columns ###"
echo "#############################"

stmt "drop table if exists attach_base"
echo "attach table attach_base (c2, c4) 's3://testbucket/admin/data/$base_storage_prefix' connection=(connection_name ='my_conn')" | $BENDSQL_CLIENT_CONNECT

stmt "alter table base RENAME COLUMN c2 to c2_new"
stmt "insert into base values('c1', 'c2_new', 'c3', 'c4')"

echo "select all should work"
query "select * from attach_base"

echo "select c2_new should work"
query "select c2_new from attach_base"

echo "##################################"
echo "## drop column from base table ###"
echo "##################################"

# dropping columns which are NOT included in the attach table, should not matter
stmt "alter table base DROP COLUMN c1"
stmt "delete from base"
stmt "insert into base values('c2_new', 'c3', 'c4')"
echo "select all should work"
query "select * from attach_base"

echo "select c2_new should work"
query "select c2_new from attach_base"

# dropping columns which are included in the attach table
stmt "alter table base DROP COLUMN c4"

# if dropped table is accessed, it will fail

# select ALL will return the remaining columns
query "select * from attach_base"

echo "select the dropped column will fail"
query "select c4 from attach_base"


stmt "alter table base DROP COLUMN c2_new"

echo "if all the include columns are dropped, select ALL should fail as well"
query "select * from attach_base"






stmt "drop connection my_conn;"

stmt "drop table if exists table_from;"
Expand Down

0 comments on commit 9b5332e

Please sign in to comment.