diff --git a/Cargo.lock b/Cargo.lock index b5bc7214f47..93d05ff7653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -579,6 +579,7 @@ dependencies = [ "tikv_util", "tokio", "tokio-stream", + "tokio-util 0.7.0", "txn_types", "uuid", ] @@ -1541,7 +1542,7 @@ dependencies = [ "tikv_alloc", "tikv_util", "tokio", - "tokio-util", + "tokio-util 0.6.6", "url", ] @@ -1581,7 +1582,7 @@ dependencies = [ "tempfile", "tikv_util", "tokio", - "tokio-util", + "tokio-util 0.6.6", "url", ] @@ -2115,7 +2116,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util", + "tokio-util 0.6.6", "tracing", ] @@ -6100,6 +6101,21 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" +dependencies = [ + "bytes 1.1.0", + "futures-core", + "futures-io", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.5.7" @@ -6132,7 +6148,7 @@ dependencies = [ "prost-derive 0.8.0", "tokio", "tokio-stream", - "tokio-util", + "tokio-util 0.6.6", "tower", "tower-layer", "tower-service", @@ -6166,7 +6182,7 @@ dependencies = [ "slab", "tokio", "tokio-stream", - "tokio-util", + "tokio-util 0.6.6", "tower-layer", "tower-service", "tracing", diff --git a/components/br-stream/Cargo.toml b/components/br-stream/Cargo.toml index 1d6cb0834cf..1ae692ddd4f 100644 --- a/components/br-stream/Cargo.toml +++ b/components/br-stream/Cargo.toml @@ -15,6 +15,7 @@ test-engines-rocksdb = [ [dependencies] tokio = { version = "1.5", features = ["rt-multi-thread", "macros", "time", "sync"] } +tokio-util = { version = "0.7", features = ["compat"] } prometheus = { version = "0.13", default-features = false, features = ["nightly"] } async-trait = { version = "0.1" } thiserror = "1" diff --git a/components/br-stream/src/endpoint.rs b/components/br-stream/src/endpoint.rs index f2c1231b8f5..8837a296501 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/br-stream/src/endpoint.rs @@ -10,11 +10,13 @@ use std::time::Duration; use dashmap::DashMap; use engine_traits::KvEngine; +use futures::executor::block_on; use kvproto::metapb::Region; use pd_client::PdClient; use raftstore::router::RaftStoreRouter; use raftstore::store::fsm::ChangeObserver; use resolved_ts::Resolver; + use tikv_util::time::Instant; use crossbeam_channel::tick; @@ -378,9 +380,13 @@ where let pd_cli = self.pd_client.clone(); let resolvers = self.resolvers.clone(); self.pool.spawn(async move { + let start = Instant::now_coarse(); // NOTE: Maybe push down the resolve step to the router? // Or if there are too many duplicated `Flush` command, we may do some useless works. let new_rts = Self::try_resolve(pd_cli.clone(), resolvers).await; + metrics::FLUSH_DURATION + .with_label_values(&["resolve_by_now"]) + .observe(start.saturating_elapsed_secs()); if let Some(rts) = router.do_flush(&task, store_id, new_rts).await { info!("flushing and refreshing checkpoint ts."; "checkpoint_ts" => %rts, "task" => %task); if rts == 0 { @@ -426,18 +432,75 @@ where Ok(()) } + fn observe_over_with_initial_data_from_checkpoint( + &self, + region: &Region, + task: String, + ) -> Result<()> { + let init = self.make_initial_loader(); + let handle = ObserveHandle::new(); + let region_id = region.get_id(); + let ob = ChangeObserver::from_cdc(region_id, handle.clone()); + let snap = init.observe_over(region, ob)?; + let meta_cli = self.meta_client.as_ref().unwrap().clone(); + self.observer.subs.register_region(region_id, handle); + self.resolvers.insert(region.id, Resolver::new(region.id)); + + let region = region.clone(); + // Note: Even we did the initial scanning, if the next_backup_ts was updated by periodic flushing, + // before the initial scanning done, there is still possibility of losing data: + // if the server crashes immediately, and data of this scanning hasn't been sent to sink, + // those data would be permanently lost. + // Maybe we need block the next_backup_ts from advancing before all initial scanning done(Or just for the region, via disabling the resolver)? + self.pool.spawn_blocking(move || { + let from_ts = match block_on(meta_cli.global_progress_of_task(&task)) { + Ok(ts) => ts, + Err(err) => { + err.report("failed to get global progress of task"); + return; + } + }; + + match init.do_initial_scan(®ion, TimeStamp::new(from_ts), snap) { + Ok(stat) => { + info!("initial scanning of leader transforming finished!"; "statistics" => ?stat, "region" => %region.get_id(), "from_ts" => %from_ts); + } + Err(err) => err.report(format!("during initial scanning of region {:?}", region)), + } + }); + Ok(()) + } + + fn find_task_by_region(&self, r: &Region) -> Option { + self.range_router + .find_task_by_range(&r.start_key, &r.end_key) + } + /// Modify observe over some region. /// This would register the region to the RaftStore. - /// - /// > Note: If using this to start observe, this won't trigger a incremental scanning. - /// > When the follower progress faster than leader and then be elected, - /// > there is a risk of losing data. pub fn on_modify_observe(&self, op: ObserveOp) { info!("br-stream: on_modify_observe"; "op" => ?op); match op { - ObserveOp::Start { region } => { - if let Err(e) = self.observe_over(®ion) { - e.report(format!("register region {} to raftstore", region.get_id())); + ObserveOp::Start { + region, + needs_initial_scanning, + } => { + let result = if needs_initial_scanning { + let for_task = self.find_task_by_region(®ion).unwrap_or_else(|| { + panic!( + "BUG: the region {:?} is register to no task but being observed", + region + ) + }); + self.observe_over_with_initial_data_from_checkpoint(®ion, for_task) + } else { + self.observe_over(®ion) + }; + if let Err(err) = result { + err.report(format!( + "during doing initial scanning for region {:?}", + region + )); } } ObserveOp::Stop { region } => { @@ -507,8 +570,10 @@ pub enum TaskOp { pub enum ObserveOp { Start { region: Region, - // Note: Maybe add the request for initial scanning too. - // needs_initial_scanning: bool + // if `true`, would scan and sink change from the global checkpoint ts. + // Note: maybe we'd better make it Option to make it more generic, + // but that needs the `observer` know where the checkpoint is, which is a little dirty... + needs_initial_scanning: bool, }, Stop { region: Region, diff --git a/components/br-stream/src/event_loader.rs b/components/br-stream/src/event_loader.rs index 89cb4ad74a5..8b54bc5d7b4 100644 --- a/components/br-stream/src/event_loader.rs +++ b/components/br-stream/src/event_loader.rs @@ -77,6 +77,8 @@ impl EventLoader { default: (key, value), .. } => { + // FIXME: we also need to update the information for the `resolver` in the endpoint, + // otherwise we may advance the resolved ts too far in some conditions? if !key.is_empty() { result.push(ApplyEvent { key, @@ -178,16 +180,12 @@ where Ok(snap) } - /// Initialize the region: register it to the raftstore and the observer. - /// At the same time, perform the initial scanning, (an incremental scanning from `start_ts`) - /// and generate the corresponding ApplyEvent to the sink directly. - pub fn initialize_region( + pub fn do_initial_scan( &self, region: &Region, start_ts: TimeStamp, - cmd: ChangeObserver, + snap: impl Snapshot, ) -> Result { - let snap = self.observe_over(region, cmd)?; // It is ok to sink more data than needed. So scan to +inf TS for convenance. let mut event_loader = EventLoader::load_from(snap, start_ts, TimeStamp::max(), region)?; let mut stats = StatisticsSummary::default(); @@ -210,6 +208,19 @@ where Ok(stats.stat) } + /// Initialize the region: register it to the raftstore and the observer. + /// At the same time, perform the initial scanning, (an incremental scanning from `start_ts`) + /// and generate the corresponding ApplyEvent to the sink directly. + pub fn initialize_region( + &self, + region: &Region, + start_ts: TimeStamp, + cmd: ChangeObserver, + ) -> Result { + let snap = self.observe_over(region, cmd)?; + self.do_initial_scan(region, start_ts, snap) + } + /// initialize a range: it simply scan the regions with leader role and send them to [`initialize_region`]. pub fn initialize_range( &self, diff --git a/components/br-stream/src/metadata/client.rs b/components/br-stream/src/metadata/client.rs index 438226c4fc7..5cefef8067a 100644 --- a/components/br-stream/src/metadata/client.rs +++ b/components/br-stream/src/metadata/client.rs @@ -10,9 +10,10 @@ use super::{ use kvproto::brpb::StreamBackupTaskInfo; -use tikv_util::{defer, time::Instant}; +use tikv_util::{defer, time::Instant, warn}; use tokio_stream::StreamExt; + use crate::errors::{Error, Result}; /// Some operations over stream backup metadata key space. @@ -257,7 +258,7 @@ impl MetadataClient { } let task = self.get_task(task_name).await?; let timestamp = self.meta_store.snapshot().await?; - let mut items = timestamp + let items = timestamp .get(Keys::Key(MetaKey::next_backup_ts_of( task_name, self.store_id, @@ -266,18 +267,42 @@ impl MetadataClient { if items.is_empty() { Ok(task.info.start_ts) } else { - assert!(items.len() == 1, "{:?}", items); - let next_backup_ts = std::mem::take(&mut items[0].1); - if next_backup_ts.len() != 8 { - return Err(Error::MalformedMetadata(format!( - "the length of next_backup_ts is {} bytes, require 8 bytes", - next_backup_ts.len() - ))); - } - let mut buf = [0u8; 8]; - buf.copy_from_slice(next_backup_ts.as_slice()); - Ok(u64::from_be_bytes(buf)) + assert_eq!(items.len(), 1); + Self::parse_ts_from_bytes(items[0].1.as_slice()) + } + } + + /// get the global progress (the min next_backup_ts among all stores). + pub async fn global_progress_of_task(&self, task_name: &str) -> Result { + let now = Instant::now(); + defer! { + super::metrics::METADATA_OPERATION_LATENCY.with_label_values(&["task_progress_get_global"]).observe(now.saturating_elapsed().as_secs_f64()) + } + let task = self.get_task(task_name).await?; + let snap = self.meta_store.snapshot().await?; + let global_ts = snap.get(Keys::Prefix(MetaKey::next_backup_ts(task_name))) + .await? + .iter() + .filter_map(|kv| { + Self::parse_ts_from_bytes(kv.1.as_slice()) + .map_err(|err| warn!("br-stream: failed to parse next_backup_ts."; "key" => ?kv.0, "err" => %err)) + .ok() + }) + .min() + .unwrap_or(task.info.start_ts); + Ok(global_ts) + } + + fn parse_ts_from_bytes(next_backup_ts: &[u8]) -> Result { + if next_backup_ts.len() != 8 { + return Err(Error::MalformedMetadata(format!( + "the length of next_backup_ts is {} bytes, require 8 bytes", + next_backup_ts.len() + ))); } + let mut buf = [0u8; 8]; + buf.copy_from_slice(next_backup_ts); + Ok(u64::from_be_bytes(buf)) } /// insert a task with ranges into the metadata store. diff --git a/components/br-stream/src/metadata/keys.rs b/components/br-stream/src/metadata/keys.rs index d98d5feafdf..3d9e613efda 100644 --- a/components/br-stream/src/metadata/keys.rs +++ b/components/br-stream/src/metadata/keys.rs @@ -99,13 +99,17 @@ impl MetaKey { /// The key of next backup ts of some region in some store. pub fn next_backup_ts_of(name: &str, store_id: u64) -> Self { - let base = format!("{}{}/{}", PREFIX, PATH_NEXT_BACKUP_TS, name); - let mut buf = bytes::BytesMut::from(base); - buf.put(b'/'); + let base = Self::next_backup_ts(name); + let mut buf = bytes::BytesMut::from(base.0); buf.put_u64_be(store_id); Self(buf.to_vec()) } + // The prefix for next backup ts. + pub fn next_backup_ts(name: &str) -> Self { + Self(format!("{}{}/{}/", PREFIX, PATH_NEXT_BACKUP_TS, name).into_bytes()) + } + /// The key for pausing some task. pub fn pause_of(name: &str) -> Self { Self(format!("{}{}/{}", PREFIX, PATH_PAUSE, name).into_bytes()) diff --git a/components/br-stream/src/metrics.rs b/components/br-stream/src/metrics.rs index de917ada8e1..6ca1e593778 100644 --- a/components/br-stream/src/metrics.rs +++ b/components/br-stream/src/metrics.rs @@ -53,4 +53,11 @@ lazy_static! { &["task"], ) .unwrap(); + pub static ref FLUSH_DURATION: HistogramVec = register_histogram_vec!( + "tikv_stream_flush_duration_sec", + "The time cost of flushing a task.", + &["stage"], + exponential_buckets(0.001, 2.0, 16).unwrap() + ) + .unwrap(); } diff --git a/components/br-stream/src/observer.rs b/components/br-stream/src/observer.rs index 4dd26b03430..8b290a14422 100644 --- a/components/br-stream/src/observer.rs +++ b/components/br-stream/src/observer.rs @@ -57,6 +57,7 @@ impl BackupStreamObserver { .scheduler .schedule(Task::ModifyObserve(ObserveOp::Start { region: region.clone(), + needs_initial_scanning: true, })) { Error::from(err).report(format_args!( @@ -273,7 +274,7 @@ mod tests { o.register_region(&r); let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap(); let handle = ObserveHandle::new(); - if let Task::ModifyObserve(ObserveOp::Start { region }) = task { + if let Task::ModifyObserve(ObserveOp::Start { region, .. }) = task { o.subs.register_region(region.get_id(), handle.clone()) } else { panic!("unexpected message received: it is {}", task); @@ -297,7 +298,7 @@ mod tests { o.register_region(&r); let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap(); let handle = ObserveHandle::new(); - if let Task::ModifyObserve(ObserveOp::Start { region }) = task { + if let Task::ModifyObserve(ObserveOp::Start { region, .. }) = task { o.subs.register_region(region.get_id(), handle.clone()); } else { panic!("not match, it is {:?}", task); diff --git a/components/br-stream/src/router.rs b/components/br-stream/src/router.rs index 78f740cf43d..f1e30b768d7 100644 --- a/components/br-stream/src/router.rs +++ b/components/br-stream/src/router.rs @@ -18,7 +18,7 @@ use crate::{ errors::Error, metadata::StreamTask, metrics::SKIP_KV_COUNTER, - utils::{self, SegmentMap, SlotMap}, + utils::{self, SegmentMap, SlotMap, StopWatch}, }; use super::errors::Result; @@ -49,9 +49,10 @@ use tikv_util::{ worker::Scheduler, Either, HandyRwLock, }; +use tokio::fs::{remove_file, File}; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::Mutex; -use tokio::{fs::remove_file, fs::File}; +use tokio_util::compat::TokioAsyncReadCompatExt; use txn_types::{Key, Lock, TimeStamp}; pub const FLUSH_STORAGE_INTERVAL: u64 = 300; @@ -305,6 +306,18 @@ impl RouterInner { } } + /// Find the task for a region. If `end_key` is empty, search from start_key to +inf. + /// It simply search for a random possible overlapping range and get its task. + /// FIXME: If a region crosses many tasks, this can only find one of them. + pub fn find_task_by_range(&self, start_key: &[u8], mut end_key: &[u8]) -> Option { + let r = self.ranges.rl(); + if end_key.is_empty() { + end_key = &[0xffu8; 32]; + } + r.find_overlapping((start_key, end_key)) + .map(|x| x.2.clone()) + } + /// Register some ranges associated to some task. /// Because the observer interface yields encoded data key, the key should be ENCODED DATA KEY too. /// (i.e. encoded by `Key::from_raw(key).into_encoded()`, [`utils::wrap_key`] could be a shortcut.). @@ -558,7 +571,7 @@ impl TempFileKey { pub struct StreamTaskInfo { task: StreamTask, /// support external storage. eg local/s3. - storage: Box, + storage: Arc, /// The parent directory of temporary files. temp_dir: PathBuf, /// The temporary file index. Both meta (m prefixed keys) and data (t prefixed keys). @@ -597,7 +610,10 @@ impl StreamTaskInfo { /// Create a new temporary file set at the `temp_dir`. pub async fn new(temp_dir: PathBuf, task: StreamTask) -> Result { tokio::fs::create_dir_all(&temp_dir).await?; - let storage = create_storage(task.info.get_storage(), BackendConfig::default())?; + let storage = Arc::from(create_storage( + task.info.get_storage(), + BackendConfig::default(), + )?); Ok(Self { task, storage, @@ -729,35 +745,46 @@ impl StreamTaskInfo { } } - pub async fn flush_log(&self) -> Result<()> { - // if failed to write storage, we should retry write flushing_files. - for (_, v) in self.flushing_files.write().await.iter() { - let data_file = v.lock().await; - // to do: limiter to storage - let limiter = Limiter::builder(std::f64::INFINITY).build(); - let reader = std::fs::File::open(data_file.local_path.clone()).unwrap(); - let reader = UnpinReader(Box::new(limiter.limit(AllowStdIo::new(reader)))); - let filepath = &data_file.storage_path; - - let ret = self.storage.write(filepath, reader, 1024).await; - match ret { - Ok(_) => { - debug!( - "backup stream flush success"; - "tmp file" => ?data_file.local_path, - "storage file" => ?filepath, - ); - } - Err(e) => { - warn!("backup stream flush failed"; - "file" => ?data_file.local_path, - "err" => ?e, - ); - return Err(Error::Io(e)); - } + async fn flush_log_file_to( + storage: Arc, + file: &Mutex, + ) -> Result<()> { + let data_file = file.lock().await; + // to do: limiter to storage + let limiter = Limiter::builder(std::f64::INFINITY).build(); + let reader = File::open(data_file.local_path.clone()).await?; + let stat = reader.metadata().await?; + let reader = UnpinReader(Box::new(limiter.limit(reader.compat()))); + let filepath = &data_file.storage_path; + + let ret = storage.write(filepath, reader, stat.len().max(4096)).await; + match ret { + Ok(_) => { + debug!( + "backup stream flush success"; + "tmp file" => ?data_file.local_path, + "storage file" => ?filepath, + ); + } + Err(e) => { + warn!("backup stream flush failed"; + "file" => ?data_file.local_path, + "err" => ?e, + ); + return Err(Error::Io(e)); } } + Ok(()) + } + pub async fn flush_log(&self) -> Result<()> { + // if failed to write storage, we should retry write flushing_files. + let storage = self.storage.clone(); + let files = self.flushing_files.write().await; + let futs = files + .values() + .map(|v| Self::flush_log_file_to(storage.clone(), v)); + futures::future::try_join_all(futs).await?; Ok(()) } @@ -789,6 +816,7 @@ impl StreamTaskInfo { if !self.is_flushing() { return Ok(None); } + let mut sw = StopWatch::new(); // generate meta data and prepare to flush to storage let mut metadata_info = self @@ -800,6 +828,9 @@ impl StreamTaskInfo { .min_resolved_ts .max(Some(resolved_ts_provided.into_inner())); let rts = metadata_info.min_resolved_ts; + crate::metrics::FLUSH_DURATION + .with_label_values(&["generate_metadata"]) + .observe(sw.lap().as_secs_f64()); // There is no file to flush, don't write the meta file. if metadata_info.files.is_empty() { @@ -811,9 +842,15 @@ impl StreamTaskInfo { // flush meta file to storage. self.flush_meta(metadata_info).await?; + crate::metrics::FLUSH_DURATION + .with_label_values(&["save_files"]) + .observe(sw.lap().as_secs_f64()); // clear flushing files self.clear_flushing_files().await; + crate::metrics::FLUSH_DURATION + .with_label_values(&["clear_temp_files"]) + .observe(sw.lap().as_secs_f64()); Ok(rts) } } diff --git a/components/br-stream/src/utils.rs b/components/br-stream/src/utils.rs index c20c9c04f51..5d38e6557e3 100644 --- a/components/br-stream/src/utils.rs +++ b/components/br-stream/src/utils.rs @@ -228,8 +228,7 @@ impl SegmentMap { self.get_by_point(point).map(|(k, v, _)| (k, v)) } - /// Check whether the range is overlapping with any range in the segment tree. - pub fn is_overlapping(&self, range: (&R, &R)) -> bool + pub fn find_overlapping(&self, range: (&R, &R)) -> Option<(&K, &K, &V)> where K: Borrow, R: Ord + ?Sized, @@ -240,7 +239,9 @@ impl SegmentMap { // |------+-s----+----e----| // Firstly, we check whether the start point is in some range. // if true, it must be overlapping. - let overlap_with_start = self.get_interval_by_point(range.0).is_some(); + if let Some(overlap_with_start) = self.get_by_point(range.0) { + return Some(overlap_with_start); + } // |--s----+-----+----e----| // Otherwise, the possibility of being overlapping would be there are some sub range // of the queried range... @@ -257,9 +258,17 @@ impl SegmentMap { .filter(|(start, end)| { >::borrow(&end.range_end) > range.1 || >::borrow(start) > range.0 - }) - .is_some(); - overlap_with_start || covered_by_the_range + }); + covered_by_the_range.map(|(k, v)| (k, &v.range_end, &v.item)) + } + + /// Check whether the range is overlapping with any range in the segment tree. + pub fn is_overlapping(&self, range: (&R, &R)) -> bool + where + K: Borrow, + R: Ord + ?Sized, + { + self.find_overlapping(range).is_some() } pub fn get_inner(&mut self) -> &mut BTreeMap> {