diff --git a/components/backup-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs index 366b47c33f..9d9bcf5810 100644 --- a/components/backup-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -15,6 +15,7 @@ use error_code::ErrorCodeExt; use kvproto::brpb::StreamBackupError; use kvproto::metapb::Region; use pd_client::PdClient; +use raft::StateRole; use raftstore::router::RaftStoreRouter; use raftstore::store::fsm::ChangeObserver; @@ -26,6 +27,7 @@ use tokio::runtime::Runtime; use tokio_stream::StreamExt; use txn_types::TimeStamp; +use crate::annotate; use crate::errors::Error; use crate::event_loader::InitialDataLoader; use crate::metadata::store::{EtcdStore, MetaStore}; @@ -41,7 +43,7 @@ use raftstore::coprocessor::{CmdBatch, ObserveHandle, RegionInfoProvider}; use tikv::config::BackupStreamConfig; use tikv_util::worker::{Runnable, Scheduler}; -use tikv_util::{debug, error, info}; +use tikv_util::{box_err, debug, error, info}; use tikv_util::{warn, HandyRwLock}; use super::metrics::{HANDLE_EVENT_DURATION_HISTOGRAM, HANDLE_KV_HISTOGRAM}; @@ -149,6 +151,7 @@ where pd_client: Arc, concurrency_manager: ConcurrencyManager, ) -> Endpoint { + crate::metrics::STREAM_ENABLED.inc(); let pool = create_tokio_runtime(config.num_threads, "backup-stream") .expect("failed to create tokio runtime for backup stream worker."); @@ -220,6 +223,7 @@ where fn on_fatal_error(&self, task: String, err: Box) { // Let's pause the task locally first. self.on_unregister(&task); + err.report(format_args!("fatal for task {}", err)); let meta_cli = self.get_meta_client(); let store_id = self.store_id; @@ -249,8 +253,9 @@ where async fn starts_flush_ticks(router: Router) { loop { - // wait 1min to trigger tick - tokio::time::sleep(Duration::from_secs(FLUSH_STORAGE_INTERVAL / 5)).await; + // check every 15s. + // TODO: maybe use global timer handle in the `tikv_utils::timer` (instead of enabling timing in the current runtime)? + tokio::time::sleep(Duration::from_secs(FLUSH_STORAGE_INTERVAL / 20)).await; debug!("backup stream trigger flush tick"); router.tick().await; } @@ -349,6 +354,7 @@ where return; } }; + let sched = self.scheduler.clone(); let kvs = ApplyEvents::from_cmd_batch(batch, resolver.value_mut().resolver()); drop(resolver); @@ -367,14 +373,10 @@ where let kv_count = kvs.len(); let total_size = kvs.size(); metrics::HEAP_MEMORY - .with_label_values(&["alloc"]) - .inc_by(total_size as f64); - if let Err(err) = router.on_events(kvs).await { - err.report("failed to send event."); - } + .add(total_size as _); + utils::handle_on_event_result(&sched, router.on_events(kvs).await); metrics::HEAP_MEMORY - .with_label_values(&["free"]) - .inc_by(total_size as f64); + .sub(total_size as _); HANDLE_KV_HISTOGRAM.observe(kv_count as _); let time_cost = sw.lap().as_secs_f64(); if time_cost > SLOW_EVENT_THRESHOLD { @@ -393,6 +395,7 @@ where self.regions.clone(), self.range_router.clone(), self.subs.clone(), + self.scheduler.clone(), ) } @@ -421,12 +424,6 @@ where end_key: Vec, ) -> Result<()> { let start = Instant::now_coarse(); - let mut start_ts = task.info.get_start_ts(); - // Should scan from checkpoint_ts rather than start_ts if checkpoint_ts exists in Metadata. - if let Some(cli) = &self.meta_client { - let checkpoint_ts = cli.progress_of_task(task.info.get_name()).await?; - start_ts = start_ts.max(checkpoint_ts); - } let success = self .observer .ranges @@ -440,17 +437,16 @@ where ); } tokio::task::spawn_blocking(move || { - let range_init_result = - init.initialize_range(start_key.clone(), end_key.clone(), TimeStamp::new(start_ts)); + let range_init_result = init.initialize_range(start_key.clone(), end_key.clone()); match range_init_result { - Ok(stat) => { - info!("backup stream success to do initial scanning"; "stat" => ?stat, + Ok(()) => { + info!("backup stream success to initialize"; "start_key" => utils::redact(&start_key), "end_key" => utils::redact(&end_key), "take" => ?start.saturating_elapsed(),) } Err(e) => { - e.report("backup stream do initial scanning failed"); + e.report("backup stream initialize failed"); } } }); @@ -526,6 +522,10 @@ where self.pool.block_on(async move { router.unregister_task(task).await; }); + // for now, we support one concurrent task only. + // so simply clear all info would be fine. + self.subs.clear(); + self.observer.ranges.wl().clear(); } /// try advance the resolved ts by the pd tso. @@ -632,13 +632,12 @@ where /// Start observe over some region. /// This would modify some internal state, and delegate the task to InitialLoader::observe_over. - fn observe_over(&self, region: &Region) -> Result<()> { + fn observe_over(&self, region: &Region, handle: ObserveHandle) -> Result<()> { let init = self.make_initial_loader(); - let handle = ObserveHandle::new(); let region_id = region.get_id(); - self.subs.register_region(®ion, handle.clone(), None); + self.subs.register_region(region, handle.clone(), None); init.observe_over_with_retry(region, || { - ChangeObserver::from_cdc(region_id, handle.clone()) + ChangeObserver::from_pitr(region_id, handle.clone()) })?; Ok(()) } @@ -647,33 +646,31 @@ where &self, region: &Region, task: String, + handle: ObserveHandle, ) -> Result<()> { let init = self.make_initial_loader(); - let handle = ObserveHandle::new(); let meta_cli = self.meta_client.as_ref().unwrap().clone(); let last_checkpoint = TimeStamp::new( self.pool .block_on(meta_cli.global_progress_of_task(&task))?, ); self.subs - .register_region(®ion, handle.clone(), Some(last_checkpoint)); + .register_region(region, handle.clone(), Some(last_checkpoint)); let region_id = region.get_id(); let snap = init.observe_over_with_retry(region, move || { - ChangeObserver::from_cdc(region_id, handle.clone()) + ChangeObserver::from_pitr(region_id, handle.clone()) })?; let region = region.clone(); - // Note: Even we did the initial scanning, if the next_backup_ts was updated by periodic flushing, - // before the initial scanning done, there is still possibility of losing data: - // if the server crashes immediately, and data of this scanning hasn't been sent to sink, - // those data would be permanently lost. - // Maybe we need block the next_backup_ts from advancing before all initial scanning done(Or just for the region, via disabling the resolver)? self.pool.spawn_blocking(move || { match init.do_initial_scan(®ion, last_checkpoint, snap) { Ok(stat) => { info!("initial scanning of leader transforming finished!"; "statistics" => ?stat, "region" => %region.get_id(), "from_ts" => %last_checkpoint); + utils::record_cf_stat("lock", &stat.lock); + utils::record_cf_stat("write", &stat.write); + utils::record_cf_stat("default", &stat.data); } Err(err) => err.report(format!("during initial scanning of region {:?}", region)), } @@ -694,32 +691,14 @@ where ObserveOp::Start { region, needs_initial_scanning, - } => { - let result = if needs_initial_scanning { - let for_task = self.find_task_by_region(®ion).unwrap_or_else(|| { - panic!( - "BUG: the region {:?} is register to no task but being observed", - region - ) - }); - self.observe_over_with_initial_data_from_checkpoint(®ion, for_task) - } else { - self.observe_over(®ion) - }; - if let Err(err) = result { - err.report(format!( - "during doing initial scanning for region {:?}", - region - )); - } - } + } => self.start_observe(region, needs_initial_scanning), ObserveOp::Stop { ref region } => { self.subs.deregister_region(region, |_, _| true); } ObserveOp::CheckEpochAndStop { ref region } => { self.subs.deregister_region(region, |old, new| { raftstore::store::util::compare_region_epoch( - old.get_region_epoch(), + old.meta.get_region_epoch(), new, true, true, @@ -730,31 +709,133 @@ where }); } ObserveOp::RefreshResolver { ref region } => { - let need_refresh_all = !self.subs.try_update_region(®ion); + let need_refresh_all = !self.subs.try_update_region(region); if need_refresh_all { let canceled = self.subs.deregister_region(region, |_, _| true); + let handle = ObserveHandle::new(); if canceled { - let for_task = self.find_task_by_region(®ion).unwrap_or_else(|| { + let for_task = self.find_task_by_region(region).unwrap_or_else(|| { panic!( "BUG: the region {:?} is register to no task but being observed", region ) }); - if let Err(e) = - self.observe_over_with_initial_data_from_checkpoint(®ion, for_task) - { - e.report(format!( - "register region {} to raftstore when refreshing", - region.get_id() - )); + if let Err(e) = self.observe_over_with_initial_data_from_checkpoint( + region, + for_task, + handle.clone(), + ) { + try_send!( + self.scheduler, + Task::ModifyObserve(ObserveOp::NotifyFailToStartObserve { + region: region.clone(), + handle, + err: Box::new(e) + }) + ); } } } } + ObserveOp::NotifyFailToStartObserve { + region, + handle, + err, + } => { + info!("retry observe region"; "region" => %region.get_id(), "err" => %err); + match self.retry_observe(region, handle) { + Ok(()) => {} + Err(e) => { + try_send!( + self.scheduler, + Task::FatalError( + format!("While retring to observe region, origin error is {}", err), + Box::new(e) + ) + ); + } + } + } } } + fn start_observe(&self, region: Region, needs_initial_scanning: bool) { + let handle = ObserveHandle::new(); + let result = if needs_initial_scanning { + let for_task = self.find_task_by_region(®ion).unwrap_or_else(|| { + panic!( + "BUG: the region {:?} is register to no task but being observed (start_key = {}; end_key = {}; task_stat = {:?})", + region, utils::redact(®ion.get_start_key()), utils::redact(®ion.get_end_key()), self.range_router + ) + }); + self.observe_over_with_initial_data_from_checkpoint(®ion, for_task, handle.clone()) + } else { + self.observe_over(®ion, handle.clone()) + }; + if let Err(err) = result { + try_send!( + self.scheduler, + Task::ModifyObserve(ObserveOp::NotifyFailToStartObserve { + region, + handle, + err: Box::new(err) + }) + ); + } + } + + fn retry_observe(&self, region: Region, handle: ObserveHandle) -> Result<()> { + let (tx, rx) = crossbeam::channel::bounded(1); + self.regions + .find_region_by_id( + region.get_id(), + Box::new(move |item| { + tx.send(item) + .expect("BUG: failed to send to newly created channel."); + }), + ) + .map_err(|err| { + annotate!( + err, + "failed to send request to region info accessor, server maybe too too too busy. (region id = {})", + region.get_id() + ) + })?; + let new_region_info = rx + .recv() + .map_err(|err| annotate!(err, "BUG?: unexpected channel message dropped."))?; + if new_region_info.is_none() { + metrics::SKIP_RETRY + .with_label_values(&["region-absent"]) + .inc(); + return Ok(()); + } + if new_region_info + .as_ref() + .map(|r| r.role != StateRole::Leader) + .unwrap_or(true) + { + metrics::SKIP_RETRY.with_label_values(&["not-leader"]).inc(); + return Ok(()); + } + let removed = self.subs.deregister_region(®ion, |old, _| { + let should_remove = old.handle().id == handle.id; + if !should_remove { + warn!("stale retry command"; "region" => ?region, "handle" => ?handle, "old_handle" => ?old.handle()); + } + should_remove + }); + if !removed { + metrics::SKIP_RETRY + .with_label_values(&["stale-command"]) + .inc(); + return Ok(()); + } + self.start_observe(region, true); + Ok(()) + } + pub fn run_task(&self, task: Task) { debug!("run backup stream task"; "task" => ?task); match task { @@ -835,6 +916,11 @@ pub enum ObserveOp { RefreshResolver { region: Region, }, + NotifyFailToStartObserve { + region: Region, + handle: ObserveHandle, + err: Box, + }, } impl fmt::Debug for Task { diff --git a/components/backup-stream/src/errors.rs b/components/backup-stream/src/errors.rs index 5f0530cca4..0e61c580f9 100644 --- a/components/backup-stream/src/errors.rs +++ b/components/backup-stream/src/errors.rs @@ -142,6 +142,14 @@ impl Error { } } + /// add some context to the error. + pub fn context(self, msg: impl Display) -> Self { + Self::Contextual { + inner_error: Box::new(self), + context: msg.to_string(), + } + } + fn kind(&self) -> &'static str { self.error_code().code } diff --git a/components/backup-stream/src/event_loader.rs b/components/backup-stream/src/event_loader.rs index 3735f78593..785fca304a 100644 --- a/components/backup-stream/src/event_loader.rs +++ b/components/backup-stream/src/event_loader.rs @@ -1,12 +1,12 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::marker::PhantomData; +use std::{marker::PhantomData, time::Duration}; use engine_traits::{KvEngine, CF_DEFAULT, CF_WRITE}; use futures::executor::block_on; use raftstore::{ - coprocessor::{ObserveHandle, RegionInfoProvider}, + coprocessor::RegionInfoProvider, router::RaftStoreRouter, store::{fsm::ChangeObserver, Callback, SignificantMsg}, }; @@ -17,15 +17,18 @@ use tikv::storage::{ txn::{EntryBatch, TxnEntry, TxnEntryScanner}, Snapshot, Statistics, }; -use tikv_util::{box_err, time::Instant, warn}; +use tikv_util::{box_err, time::Instant, warn, worker::Scheduler}; use txn_types::{Key, Lock, TimeStamp}; use crate::{ annotate, debug, + endpoint::ObserveOp, errors::{ContextualResultExt, Error, Result}, router::ApplyEvent, subscription_track::{SubscriptionTracer, TwoPhaseResolver}, + try_send, utils::{self, RegionPager}, + Task, }; use crate::{ metrics, @@ -135,6 +138,7 @@ pub struct InitialDataLoader { // method `async (KvEvent) -> Result<()>`? sink: Router, tracing: SubscriptionTracer, + scheduler: Scheduler, _engine: PhantomData, } @@ -145,12 +149,19 @@ where R: RegionInfoProvider + Clone + 'static, RT: RaftStoreRouter, { - pub fn new(router: RT, regions: R, sink: Router, tracing: SubscriptionTracer) -> Self { + pub fn new( + router: RT, + regions: R, + sink: Router, + tracing: SubscriptionTracer, + sched: Scheduler, + ) -> Self { Self { router, regions, sink, tracing, + scheduler: sched, _engine: PhantomData, } } @@ -187,6 +198,7 @@ where if !can_retry { break; } + std::thread::sleep(Duration::from_millis(500)); continue; } } @@ -283,11 +295,13 @@ where } stats.add_statistics(&stat); let sink = self.sink.clone(); - metrics::INCREMENTAL_SCAN_SIZE.observe(events.size() as f64); + let event_size = events.size(); + let sched = self.scheduler.clone(); + metrics::INCREMENTAL_SCAN_SIZE.observe(event_size as f64); + metrics::HEAP_MEMORY.add(event_size as _); join_handles.push(tokio::spawn(async move { - if let Err(err) = sink.on_events(events).await { - warn!("failed to send event to sink"; "err" => %err); - } + utils::handle_on_event_result(&sched, sink.on_events(events).await); + metrics::HEAP_MEMORY.sub(event_size as _); })); } } @@ -314,7 +328,10 @@ where warn!("failed to join task."; "err" => %err); } } - if let Err(err) = Self::with_resolver_by(&tr, region_id, |r| Ok(r.phase_one_done())) { + if let Err(err) = Self::with_resolver_by(&tr, region_id, |r| { + r.phase_one_done(); + Ok(()) + }) { err.report(format_args!( "failed to finish phase 1 for region {:?}", region_id @@ -326,14 +343,8 @@ where } /// initialize a range: it simply scan the regions with leader role and send them to [`initialize_region`]. - pub fn initialize_range( - &self, - start_key: Vec, - end_key: Vec, - start_ts: TimeStamp, - ) -> Result { + pub fn initialize_range(&self, start_key: Vec, end_key: Vec) -> Result<()> { let mut pager = RegionPager::scan_from(self.regions.clone(), start_key, end_key); - let mut total_stat = StatisticsSummary::default(); loop { let regions = pager.next_page(8)?; debug!("scanning for entries in region."; "regions" => ?regions); @@ -341,17 +352,19 @@ where break; } for r in regions { - let handle = ObserveHandle::new(); - self.tracing - .register_region(&r.region, handle.clone(), Some(start_ts)); - let region_id = r.region.get_id(); - let snap = self.observe_over_with_retry(&r.region, move || { - ChangeObserver::from_pitr(region_id, handle.clone()) - })?; - let stat = self.do_initial_scan(&r.region, start_ts, snap)?; - total_stat.add_statistics(&stat); + // Note: Even we did the initial scanning, and blocking resolved ts from advancing, + // if the next_backup_ts was updated in some extreme condition, there is still little chance to lost data: + // For example, if a region cannot elect the leader for long time. (say, net work partition) + // At that time, we have nowhere to record the lock status of this region. + try_send!( + self.scheduler, + Task::ModifyObserve(ObserveOp::Start { + region: r.region, + needs_initial_scanning: true + }) + ); } } - Ok(total_stat.stat) + Ok(()) } } diff --git a/components/backup-stream/src/metrics.rs b/components/backup-stream/src/metrics.rs index 29b4e9f2b2..20227e278e 100644 --- a/components/backup-stream/src/metrics.rs +++ b/components/backup-stream/src/metrics.rs @@ -34,10 +34,9 @@ lazy_static! { &["type"] ) .unwrap(); - pub static ref HEAP_MEMORY: CounterVec = register_counter_vec!( + pub static ref HEAP_MEMORY: IntGauge = register_int_gauge!( "tikv_stream_heap_memory", - "The heap memory allocating by stream backup.", - &["type"] + "The heap memory allocating by stream backup." ) .unwrap(); pub static ref ON_EVENT_COST_HISTOGRAM: HistogramVec = register_histogram_vec!( @@ -60,10 +59,39 @@ lazy_static! { exponential_buckets(1.0, 2.0, 16).unwrap() ) .unwrap(); + pub static ref FLUSH_FILE_SIZE: Histogram = register_histogram!( + "tikv_stream_flush_file_size", + "Some statistics of flushing of this run.", + exponential_buckets(1024.0, 2.0, 16).unwrap() + ) + .unwrap(); pub static ref INITIAL_SCAN_DURATION: Histogram = register_histogram!( - "tikv_stream_initial_scan_duration", + "tikv_stream_initial_scan_duration_sec", "The duration of initial scanning.", exponential_buckets(0.001, 2.0, 16).unwrap() ) .unwrap(); + pub static ref SKIP_RETRY: IntCounterVec = register_int_counter_vec!( + "tikv_stream_skip_retry_observe", + "The reason of giving up observing region when meeting error.", + &["reason"], + ) + .unwrap(); + pub static ref INITIAL_SCAN_STAT: IntCounterVec = register_int_counter_vec!( + "tikv_stream_initial_scan_operations", + "The operations over rocksdb during initial scanning.", + &["cf", "op"], + ) + .unwrap(); + pub static ref STREAM_ENABLED: IntCounter = register_int_counter!( + "tikv_stream_enabled", + "When gt 0, this node enabled streaming." + ) + .unwrap(); + pub static ref TRACK_REGION: IntCounterVec = register_int_counter_vec!( + "tikv_stream_observed_region", + "the region being observed by the current store.", + &["type"], + ) + .unwrap(); } diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index 67879fd340..361a60d74c 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -16,7 +16,7 @@ use std::{ use crate::{ annotate, endpoint::Task, - errors::Error, + errors::{ContextualResultExt, Error}, metadata::StreamTask, metrics::SKIP_KV_COUNTER, subscription_track::TwoPhaseResolver, @@ -434,13 +434,13 @@ impl RouterInner { Ok(()) } - pub async fn on_events(&self, kv: ApplyEvents) -> Result<()> { + pub async fn on_events(&self, kv: ApplyEvents) -> Vec<(String, Result<()>)> { + use futures::FutureExt; let partitioned_events = kv.partition_by_range(&self.ranges.rl()); let tasks = partitioned_events .into_iter() - .map(|(task, events)| self.on_event(task, events)); - futures::future::try_join_all(tasks).await?; - Ok(()) + .map(|(task, events)| self.on_event(task.clone(), events).map(move |r| (task, r))); + futures::future::join_all(tasks).await } /// flush the specified task, once once success, return the min resolved ts of this flush. @@ -497,13 +497,13 @@ impl RouterInner { } /// The handle of a temporary file. -#[derive(Debug, PartialEq, Eq, Clone, Hash)] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] struct TempFileKey { - is_meta: bool, table_id: i64, region_id: u64, cf: CfName, cmd_type: CmdType, + is_meta: bool, } impl TempFileKey { @@ -663,7 +663,8 @@ impl StreamTaskInfo { flushing_files: RwLock::default(), last_flush_time: AtomicPtr::new(Box::into_raw(Box::new(Instant::now()))), // TODO make this config set by config or task? - flush_interval: Duration::from_secs(FLUSH_STORAGE_INTERVAL), + // Keep `0.2 * FLUSH_STORAGE_INTERVAL` for doing flushing. + flush_interval: Duration::from_secs((FLUSH_STORAGE_INTERVAL as f64 * 0.8).round() as _), total_size: AtomicUsize::new(0), flushing: AtomicBool::new(false), flush_fail_count: AtomicUsize::new(0), @@ -681,10 +682,12 @@ impl StreamTaskInfo { let mut w = self.files.write().await; // double check before insert. there may be someone already insert that // when we are waiting for the write lock. + // slience the lint advising us to use the `Entry` API which may introduce copying. + #[allow(clippy::map_entry)] if !w.contains_key(&key) { let path = self.temp_dir.join(key.temp_file_name()); let val = Mutex::new(DataFile::new(path).await?); - w.insert(key.clone(), val); + w.insert(key, val); } let f = w.get(&key).unwrap(); @@ -696,12 +699,14 @@ impl StreamTaskInfo { /// Append a event to the files. This wouldn't trigger `fsync` syscall. /// i.e. No guarantee of persistence. pub async fn on_events(&self, kv: ApplyEvents) -> Result<()> { + use futures::FutureExt; let now = Instant::now_coarse(); - futures::future::try_join_all( - kv.partition_by_table_key() - .into_iter() - .map(|(key, events)| self.on_events_of_key(key, events)), - ) + futures::future::try_join_all(kv.partition_by_table_key().into_iter().map( + |(key, events)| { + self.on_events_of_key(key, events) + .map(move |r| r.context(format_args!("when handling the file key {:?}", key))) + }, + )) .await?; crate::metrics::ON_EVENT_COST_HISTOGRAM .with_label_values(&["write_to_tempfile"]) @@ -895,8 +900,11 @@ impl StreamTaskInfo { // flush log file to storage. self.flush_log().await?; - let file_len = metadata_info.files.len(); - let file_size = metadata_info.files.iter().fold(0, |a, d| a + d.length); + let file_size_vec = metadata_info + .files + .iter() + .map(|d| d.length) + .collect::>(); // flush meta file to storage. self.flush_meta(metadata_info).await?; crate::metrics::FLUSH_DURATION @@ -908,10 +916,12 @@ impl StreamTaskInfo { crate::metrics::FLUSH_DURATION .with_label_values(&["clear_temp_files"]) .observe(sw.lap().as_secs_f64()); - + file_size_vec + .iter() + .for_each(|size| crate::metrics::FLUSH_FILE_SIZE.observe(*size as _)); info!("log backup flush done"; - "files" => %file_len, - "total_size" => %file_size, + "files" => %file_size_vec.len(), + "total_size" => %file_size_vec.iter().sum::(), "take" => ?begin.saturating_elapsed(), ); Ok(rts) @@ -1286,6 +1296,14 @@ mod tests { .expect("failed to register task") } + fn check_on_events_result(item: &Vec<(String, Result<()>)>) { + for (task, r) in item { + if let Err(err) = r { + panic!("task {} failed: {}", task, err); + } + } + } + #[tokio::test] async fn test_basic_file() -> Result<()> { test_util::init_log_for_test(); @@ -1307,7 +1325,7 @@ mod tests { region1.put_table(CF_WRITE, 1, b"hello", b"still isn't a write record :3"); region1.delete_table(CF_DEFAULT, 1, b"hello"); let events = region1.flush_events(); - router.on_events(events).await?; + check_on_events_result(&router.on_events(events).await); tokio::time::sleep(Duration::from_millis(200)).await; let end_ts = TimeStamp::physical_now(); @@ -1456,14 +1474,14 @@ mod tests { i.storage = Arc::new(ErrorStorage::with_first_time_error(i.storage.clone())) }) .await; - router.on_events(build_kv_event(0, 10)).await.unwrap(); + check_on_events_result(&router.on_events(build_kv_event(0, 10)).await); assert!( router .do_flush("error_prone", 42, TimeStamp::max()) .await .is_none() ); - router.on_events(build_kv_event(10, 10)).await.unwrap(); + check_on_events_result(&router.on_events(build_kv_event(10, 10)).await); let _ = router.do_flush("error_prone", 42, TimeStamp::max()).await; let t = router.get_task_info("error_prone").await.unwrap(); assert_eq!(t.total_size(), 0); @@ -1511,7 +1529,7 @@ mod tests { }) .await; for i in 0..=16 { - router.on_events(build_kv_event(i * 10, 10)).await.unwrap(); + check_on_events_result(&router.on_events(build_kv_event(i * 10, 10)).await); assert_eq!( router .do_flush("flush_failure", 42, TimeStamp::zero()) diff --git a/components/backup-stream/src/subscription_track.rs b/components/backup-stream/src/subscription_track.rs index 9999d7d7d1..344fa119ba 100644 --- a/components/backup-stream/src/subscription_track.rs +++ b/components/backup-stream/src/subscription_track.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::debug; +use crate::metrics::TRACK_REGION; use crate::utils; use dashmap::mapref::one::RefMut; use dashmap::DashMap; @@ -55,9 +56,22 @@ impl RegionSubscription { pub fn resolver(&mut self) -> &mut TwoPhaseResolver { &mut self.resolver } + + pub fn handle(&self) -> &ObserveHandle { + &self.handle + } } impl SubscriptionTracer { + /// clear the current `SubscriptionTracer`. + pub fn clear(&self) { + self.0.retain(|_, v| { + v.stop_observing(); + TRACK_REGION.with_label_values(&["dec"]).inc(); + false + }); + } + // Register a region as tracing. // The `start_ts` is used to tracking the progress of initial scanning. // (Note: the `None` case of `start_ts` is for testing / refresh region status when split / merge, @@ -69,10 +83,12 @@ impl SubscriptionTracer { start_ts: Option, ) { info!("start listen stream from store"; "observer" => ?handle, "region_id" => %region.get_id()); + TRACK_REGION.with_label_values(&["inc"]).inc(); if let Some(o) = self.0.insert( region.get_id(), RegionSubscription::new(region.clone(), handle, start_ts), ) { + TRACK_REGION.with_label_values(&["dec"]).inc(); warn!("register region which is already registered"; "region_id" => %region.get_id()); o.stop_observing(); } @@ -110,14 +126,15 @@ impl SubscriptionTracer { pub fn deregister_region( &self, region: &Region, - if_cond: impl FnOnce(&Region, &Region) -> bool, + if_cond: impl FnOnce(&RegionSubscription, &Region) -> bool, ) -> bool { let region_id = region.get_id(); - let remove_result = self.0.remove_if(®ion_id, |_, old_region| { - if_cond(&old_region.meta, region) - }); + let remove_result = self + .0 + .remove_if(®ion_id, |_, old_region| if_cond(old_region, region)); match remove_result { Some(o) => { + TRACK_REGION.with_label_values(&["dec"]).inc(); o.1.stop_observing(); info!("stop listen stream from store"; "observer" => ?o.1, "region_id"=> %region_id); true @@ -173,7 +190,10 @@ impl SubscriptionTracer { exists && still_observing } - pub fn get_subscription_of(&self, region_id: u64) -> Option> { + pub fn get_subscription_of( + &self, + region_id: u64, + ) -> Option> { self.0.get_mut(®ion_id) } } diff --git a/components/backup-stream/src/utils.rs b/components/backup-stream/src/utils.rs index e8bbc233d5..f01f60cef7 100644 --- a/components/backup-stream/src/utils.rs +++ b/components/backup-stream/src/utils.rs @@ -6,7 +6,10 @@ use std::{ time::Duration, }; -use crate::errors::{Error, Result}; +use crate::{ + errors::{Error, Result}, + Task, +}; use engine_traits::{CfName, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; use futures::{channel::mpsc, executor::block_on, StreamExt}; @@ -14,7 +17,8 @@ use kvproto::raft_cmdpb::{CmdType, Request}; use raft::StateRole; use raftstore::{coprocessor::RegionInfoProvider, RegionInfo}; -use tikv_util::{box_err, time::Instant, warn, Either}; +use tikv::storage::CfStatistics; +use tikv_util::{box_err, time::Instant, warn, worker::Scheduler, Either}; use tokio::sync::{Mutex, RwLock}; use txn_types::Key; @@ -181,6 +185,11 @@ impl SegmentMap { } impl SegmentMap { + /// Remove all records in the map. + pub fn clear(&mut self) { + self.0.clear(); + } + /// Like `add`, but insert a value associated to the key. pub fn insert(&mut self, (start, end): (K, K), value: V) -> bool { if self.is_overlapping((&start, &end)) { @@ -330,6 +339,58 @@ macro_rules! debug { }; } +macro_rules! record_fields { + ($m:expr,$cf:expr,$stat:expr, [ $(.$s:ident),+ ]) => { + { + let m = &$m; + let cf = &$cf; + let stat = &$stat; + $( m.with_label_values(&[cf, stringify!($s)]).inc_by(stat.$s as _) );+ + } + }; +} + +pub fn record_cf_stat(cf_name: &str, stat: &CfStatistics) { + let m = &crate::metrics::INITIAL_SCAN_STAT; + m.with_label_values(&[cf_name, "read_bytes"]) + .inc_by(stat.flow_stats.read_bytes as _); + m.with_label_values(&[cf_name, "read_keys"]) + .inc_by(stat.flow_stats.read_keys as _); + record_fields!( + m, + cf_name, + stat, + [ + .get, + .next, + .prev, + .seek, + .seek_for_prev, + .over_seek_bound, + .next_tombstone, + .prev_tombstone, + .seek_tombstone, + .seek_for_prev_tombstone, + .ttl_tombstone + ] + ); +} + +/// a shortcut for handing the result return from `Router::on_events`, when any faliure, send a fatal error to the `doom_messenger`. +pub fn handle_on_event_result(doom_messenger: &Scheduler, result: Vec<(String, Result<()>)>) { + for (task, res) in result.into_iter() { + if let Err(err) = res { + try_send!( + doom_messenger, + Task::FatalError( + task, + Box::new(err.context("failed to record event to local temporary files")) + ) + ); + } + } +} + #[cfg(test)] mod test { use crate::utils::SegmentMap; diff --git a/components/backup-stream/tests/mod.rs b/components/backup-stream/tests/mod.rs index 831b9f9b00..620de0e4ca 100644 --- a/components/backup-stream/tests/mod.rs +++ b/components/backup-stream/tests/mod.rs @@ -527,6 +527,7 @@ mod test { test_util::init_log_for_test(); let suite = super::Suite::new("fatal_error", 3); suite.must_register_task(1, "test_fatal_error"); + std::thread::sleep(Duration::from_secs(2)); let (victim, endpoint) = suite.endpoints.iter().next().unwrap(); endpoint .scheduler()