From dd674844d860b79e04331e053e743117c8ff533a Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 14 Sep 2023 10:53:12 +0800 Subject: [PATCH 1/3] fix: add table status to cancel background jobs --- Cargo.lock | 14 +++++- Cargo.toml | 1 + analytic_engine/Cargo.toml | 1 + analytic_engine/src/compaction/scheduler.rs | 8 +++ analytic_engine/src/instance/close.rs | 4 ++ .../src/instance/flush_compaction.rs | 10 ++++ analytic_engine/src/table/data.rs | 50 +++++++++++++++---- 7 files changed, 77 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa576795d4..a1460bf067 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,7 @@ dependencies = [ "arrow 43.0.0", "async-stream", "async-trait", + "atomic_enum", "base64 0.13.1", "bytes_ext", "ceresdbproto", @@ -748,6 +749,17 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "atomic_enum" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6227a8d6fdb862bcb100c4314d0d9579e5cd73fa6df31a2e6f6e1acd3c5f1207" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "atty" version = "0.2.14" @@ -7574,7 +7586,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand 0.8.5", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index 1d5f4e98b6..282448f70d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,6 +94,7 @@ analytic_engine = { path = "analytic_engine" } arena = { path = "components/arena" } async-stream = "0.3.4" async-trait = "0.1.72" +atomic_enum = "0.2.0" base64 = "0.13" bytes = "1" bytes_ext = { path = "components/bytes_ext" } diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 05616930e3..7210aeb969 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -34,6 +34,7 @@ arena = { workspace = true } arrow = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } +atomic_enum = { workspace = true } base64 = { workspace = true } bytes_ext = { workspace = true } ceresdbproto = { workspace = true } diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 88aa53c962..07f1c5ccfb 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -569,6 +569,14 @@ impl ScheduleWorker { async fn handle_table_compaction_request(&self, compact_req: TableCompactionRequest) { let table_data = compact_req.table_data.clone(); + if table_data.is_closed() { + error!( + "Table is already closed, unable to do compaction further, table:{}, table_id:{}", + table_data.name, table_data.id + ); + return; + } + let table_options = table_data.table_options(); let compaction_strategy = table_options.compaction_strategy; let picker = self.picker_manager.get_picker(compaction_strategy); diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index 3a5a28a94e..e1af6eeb95 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -79,6 +79,10 @@ impl Closer { let removed_table = self.space.remove_table(&request.table_name); assert!(removed_table.is_some()); + // Table is already moved out of space, we should close it to stop background + // jobs. + table_data.set_closed(); + info!( "table:{}-{} has been removed from the space_id:{}", table_data.name, table_data.id, self.space.id diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 11fd2d2374..b2eeb1ab09 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -725,6 +725,16 @@ impl SpaceStore { .await?; } + if table_data.is_closed() { + return Other { + msg: format!( + "Table is already closed, unable to do update manifest, table:{}, table_id:{}", + table_data.name, table_data.id + ), + } + .fail(); + } + let edit_req = { let meta_update = MetaUpdate::VersionEdit(edit_meta.clone()); MetaEditRequest { diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index d272cc3034..b92d1e4e77 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -21,7 +21,7 @@ use std::{ fmt::Formatter, num::NonZeroUsize, sync::{ - atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering}, + atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, }, time::Duration, @@ -111,6 +111,29 @@ impl TableShardInfo { } } +/// `atomic_enum` macro will expand method like +/// ``` +/// compare_exchange(..) -> Result +/// ``` +/// The result type is conflict with outer +/// Result, so add this hack +// TODO: fix this in atomic_enum crate. +mod hack { + use atomic_enum::atomic_enum; + + #[atomic_enum] + #[derive(PartialEq)] + pub enum TableStatus { + Ok = 0, + /// No background jobs are allowed if the table is closed. + Closed, + /// No write/alter are allowed if the table is dropped. + Dropped, + } +} + +use self::hack::{AtomicTableStatus, TableStatus}; + /// Data of a table pub struct TableData { /// Id of this table @@ -161,10 +184,8 @@ pub struct TableData { /// Not persist, used to determine if this table should flush. last_flush_time_ms: AtomicU64, - /// Flag denoting whether the table is dropped - /// - /// No write/alter is allowed if the table is dropped. - dropped: AtomicBool, + /// Table Status + status: AtomicTableStatus, /// Manifest updates after last snapshot manifest_updates: AtomicUsize, @@ -192,7 +213,7 @@ impl fmt::Debug for TableData { .field("opts", &self.opts) .field("last_sequence", &self.last_sequence) .field("last_memtable_id", &self.last_memtable_id) - .field("dropped", &self.dropped.load(Ordering::Relaxed)) + .field("status", &self.status.load(Ordering::Relaxed)) .field("shard_info", &self.shard_info) .finish() } @@ -265,7 +286,7 @@ impl TableData { last_memtable_id: AtomicU64::new(0), allocator: IdAllocator::new(0, 0, DEFAULT_ALLOC_STEP), last_flush_time_ms: AtomicU64::new(0), - dropped: AtomicBool::new(false), + status: TableStatus::Ok.into(), metrics, shard_info: TableShardInfo::new(shard_id), serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)), @@ -310,7 +331,7 @@ impl TableData { last_memtable_id: AtomicU64::new(0), allocator, last_flush_time_ms: AtomicU64::new(0), - dropped: AtomicBool::new(false), + status: TableStatus::Ok.into(), metrics, shard_info: TableShardInfo::new(shard_id), serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)), @@ -382,13 +403,22 @@ impl TableData { #[inline] pub fn is_dropped(&self) -> bool { - self.dropped.load(Ordering::SeqCst) + self.status.load(Ordering::SeqCst) == TableStatus::Dropped } /// Set the table is dropped and forbid any writes/alter on this table. #[inline] pub fn set_dropped(&self) { - self.dropped.store(true, Ordering::SeqCst); + self.status.store(TableStatus::Dropped, Ordering::SeqCst) + } + + #[inline] + pub fn set_closed(&self) { + self.status.store(TableStatus::Closed, Ordering::SeqCst) + } + + pub fn is_closed(&self) -> bool { + self.status.load(Ordering::SeqCst) == TableStatus::Closed } /// Returns total memtable memory usage in bytes. From 0b193c5b910136e64011811a50aa7daff78bc59e Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 14 Sep 2023 12:13:49 +0800 Subject: [PATCH 2/3] fix ci --- analytic_engine/src/table/data.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index b92d1e4e77..0d01c94d3d 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -112,7 +112,7 @@ impl TableShardInfo { } /// `atomic_enum` macro will expand method like -/// ``` +/// ```text /// compare_exchange(..) -> Result /// ``` /// The result type is conflict with outer From 3a5421618023051061daeece3d773e4ddf823172 Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Thu, 14 Sep 2023 13:57:40 +0800 Subject: [PATCH 3/3] add allow compact --- analytic_engine/src/compaction/scheduler.rs | 4 ++-- analytic_engine/src/instance/flush_compaction.rs | 4 ++-- analytic_engine/src/table/data.rs | 11 +++++++---- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 07f1c5ccfb..961da37513 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -569,9 +569,9 @@ impl ScheduleWorker { async fn handle_table_compaction_request(&self, compact_req: TableCompactionRequest) { let table_data = compact_req.table_data.clone(); - if table_data.is_closed() { + if !table_data.allow_compaction() { error!( - "Table is already closed, unable to do compaction further, table:{}, table_id:{}", + "Table status is not ok, unable to compact further, table:{}, table_id:{}", table_data.name, table_data.id ); return; diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index b2eeb1ab09..429aea8ecd 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -725,10 +725,10 @@ impl SpaceStore { .await?; } - if table_data.is_closed() { + if !table_data.allow_compaction() { return Other { msg: format!( - "Table is already closed, unable to do update manifest, table:{}, table_id:{}", + "Table status is not ok, unable to update manifest, table:{}, table_id:{}", table_data.name, table_data.id ), } diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 0d01c94d3d..e2407e83cb 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -125,9 +125,8 @@ mod hack { #[derive(PartialEq)] pub enum TableStatus { Ok = 0, - /// No background jobs are allowed if the table is closed. Closed, - /// No write/alter are allowed if the table is dropped. + /// No write/alter are allowed after table is dropped. Dropped, } } @@ -417,8 +416,12 @@ impl TableData { self.status.store(TableStatus::Closed, Ordering::SeqCst) } - pub fn is_closed(&self) -> bool { - self.status.load(Ordering::SeqCst) == TableStatus::Closed + #[inline] + pub fn allow_compaction(&self) -> bool { + match self.status.load(Ordering::SeqCst) { + TableStatus::Ok => true, + TableStatus::Closed | TableStatus::Dropped => false, + } } /// Returns total memtable memory usage in bytes.