diff --git a/components/backup-stream/src/event_loader.rs b/components/backup-stream/src/event_loader.rs index 103b21df797..3735f785937 100644 --- a/components/backup-stream/src/event_loader.rs +++ b/components/backup-stream/src/event_loader.rs @@ -346,7 +346,7 @@ where .register_region(&r.region, handle.clone(), Some(start_ts)); let region_id = r.region.get_id(); let snap = self.observe_over_with_retry(&r.region, move || { - ChangeObserver::from_cdc(region_id, handle.clone()) + ChangeObserver::from_pitr(region_id, handle.clone()) })?; let stat = self.do_initial_scan(&r.region, start_ts, snap)?; total_stat.add_statistics(&stat); diff --git a/components/backup-stream/src/observer.rs b/components/backup-stream/src/observer.rs index 5fd5aa77baf..6061c91fb20 100644 --- a/components/backup-stream/src/observer.rs +++ b/components/backup-stream/src/observer.rs @@ -252,7 +252,8 @@ mod tests { } // Test events with key in the range can be observed. - let observe_info = CmdObserveInfo::from_handle(handle.clone(), ObserveHandle::new()); + let observe_info = + CmdObserveInfo::from_handle(handle.clone(), ObserveHandle::new(), ObserveHandle::new()); let mut cb = CmdBatch::new(&observe_info, 42); cb.push(&observe_info, 42, Cmd::default()); let mut cmd_batches = vec![cb]; @@ -263,7 +264,11 @@ mod tests { ); // Test event from other region should not be send. - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::new(), + ); let mut cb = CmdBatch::new(&observe_info, 43); cb.push(&observe_info, 43, Cmd::default()); cb.level = ObserveLevel::None; diff --git a/components/cdc/src/observer.rs b/components/cdc/src/observer.rs index 8ed19cf83c7..ca64eef2f93 100644 --- a/components/cdc/src/observer.rs +++ b/components/cdc/src/observer.rs @@ -207,7 +207,11 @@ mod tests { fn test_register_and_deregister() { let (scheduler, mut rx) = tikv_util::worker::dummy_scheduler(); let observer = CdcObserver::new(scheduler); - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::new(), + ); let engine = TestEngineBuilder::new().build().unwrap().get_rocksdb(); let mut cb = CmdBatch::new(&observe_info, 0); @@ -228,6 +232,7 @@ mod tests { // Stop observing cmd observe_info.cdc_id.stop_observing(); + observe_info.pitr_id.stop_observing(); let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, Cmd::default()); >::on_flush_applied_cmd_batch( @@ -238,7 +243,7 @@ mod tests { ); match rx.recv_timeout(Duration::from_millis(10)) { Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {} - _ => panic!("unexpected result"), + any => panic!("unexpected result: {:?}", any), }; // Does not send unsubscribed region events. diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index e22291648da..2fb6ee568f4 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -737,7 +737,11 @@ mod tests { host.post_apply_sst_from_snapshot(®ion, "default", ""); assert_all!([&ob.called], &[55]); - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::new(), + ); let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, Cmd::default()); host.on_flush_applied_cmd_batch(cb.level, vec![cb], &PanicEngine); diff --git a/components/raftstore/src/coprocessor/mod.rs b/components/raftstore/src/coprocessor/mod.rs index f5ea212d9ea..7871fce0e8b 100644 --- a/components/raftstore/src/coprocessor/mod.rs +++ b/components/raftstore/src/coprocessor/mod.rs @@ -1,6 +1,5 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. -use std::cmp; use std::fmt::{self, Debug, Formatter}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; @@ -247,11 +246,20 @@ impl ObserveHandle { pub struct CmdObserveInfo { pub cdc_id: ObserveHandle, pub rts_id: ObserveHandle, + pub pitr_id: ObserveHandle, } impl CmdObserveInfo { - pub fn from_handle(cdc_id: ObserveHandle, rts_id: ObserveHandle) -> CmdObserveInfo { - CmdObserveInfo { cdc_id, rts_id } + pub fn from_handle( + cdc_id: ObserveHandle, + rts_id: ObserveHandle, + pitr_id: ObserveHandle, + ) -> CmdObserveInfo { + CmdObserveInfo { + cdc_id, + rts_id, + pitr_id, + } } fn observe_level(&self) -> ObserveLevel { @@ -261,13 +269,18 @@ impl CmdObserveInfo { } else { ObserveLevel::None }; + let pitr = if self.pitr_id.is_observing() { + ObserveLevel::All + } else { + ObserveLevel::None + }; let rts = if self.rts_id.is_observing() { // `resolved-ts` observe lock related data ObserveLevel::LockRelated } else { ObserveLevel::None }; - cmp::max(cdc, rts) + cdc.max(rts).max(pitr) } } @@ -276,6 +289,7 @@ impl Debug for CmdObserveInfo { f.debug_struct("CmdObserveInfo") .field("cdc_id", &self.cdc_id.id) .field("rts_id", &self.rts_id.id) + .field("pitr_id", &self.pitr_id.id) .finish() } } @@ -296,6 +310,7 @@ pub struct CmdBatch { pub level: ObserveLevel, pub cdc_id: ObserveID, pub rts_id: ObserveID, + pub pitr_id: ObserveID, pub region_id: u64, pub cmds: Vec, } @@ -306,6 +321,7 @@ impl CmdBatch { level: observe_info.observe_level(), cdc_id: observe_info.cdc_id.id, rts_id: observe_info.rts_id.id, + pitr_id: observe_info.pitr_id.id, region_id, cmds: Vec::new(), } @@ -315,6 +331,7 @@ impl CmdBatch { assert_eq!(region_id, self.region_id); assert_eq!(observe_info.cdc_id.id, self.cdc_id); assert_eq!(observe_info.rts_id.id, self.rts_id); + assert_eq!(observe_info.pitr_id.id, self.pitr_id); self.cmds.push(cmd) } @@ -376,22 +393,47 @@ mod tests { #[test] fn test_observe_level() { // Both cdc and `resolved-ts` are observing - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::new(), + ); assert_eq!(observe_info.observe_level(), ObserveLevel::All); // No observer observe_info.cdc_id.stop_observing(); observe_info.rts_id.stop_observing(); + observe_info.pitr_id.stop_observing(); assert_eq!(observe_info.observe_level(), ObserveLevel::None); // Only cdc observing - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::new(), + ); observe_info.rts_id.stop_observing(); + observe_info.pitr_id.stop_observing(); assert_eq!(observe_info.observe_level(), ObserveLevel::All); // Only `resolved-ts` observing - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::new(), + ); observe_info.cdc_id.stop_observing(); + observe_info.pitr_id.stop_observing(); assert_eq!(observe_info.observe_level(), ObserveLevel::LockRelated); + + // Only `backup-stream(pitr)` observing + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::new(), + ); + observe_info.cdc_id.stop_observing(); + observe_info.rts_id.stop_observing(); + assert_eq!(observe_info.observe_level(), ObserveLevel::All); } } diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index bd7eeeddee3..ef1ba3b4f05 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -3063,26 +3063,47 @@ impl Debug for GenSnapTask { } } +#[derive(Debug)] +enum ObserverType { + Cdc(ObserveHandle), + Rts(ObserveHandle), + Pitr(ObserveHandle), +} + +impl ObserverType { + fn handle(&self) -> &ObserveHandle { + match self { + ObserverType::Cdc(h) => h, + ObserverType::Rts(h) => h, + ObserverType::Pitr(h) => h, + } + } +} + #[derive(Debug)] pub struct ChangeObserver { - cdc_id: Option, - rts_id: Option, + ty: ObserverType, region_id: u64, } impl ChangeObserver { pub fn from_cdc(region_id: u64, id: ObserveHandle) -> Self { Self { - cdc_id: Some(id), - rts_id: None, + ty: ObserverType::Cdc(id), region_id, } } pub fn from_rts(region_id: u64, id: ObserveHandle) -> Self { Self { - cdc_id: None, - rts_id: Some(id), + ty: ObserverType::Rts(id), + region_id, + } + } + + pub fn from_pitr(region_id: u64, id: ObserveHandle) -> Self { + Self { + ty: ObserverType::Pitr(id), region_id, } } @@ -3517,38 +3538,30 @@ where region_epoch: RegionEpoch, cb: Callback, ) { - let ChangeObserver { - cdc_id, - rts_id, - region_id, - } = cmd; - - if let Some(ObserveHandle { id, .. }) = cdc_id { - if self.delegate.observe_info.cdc_id.id > id { - notify_stale_req_with_msg( - self.delegate.term, - format!( - "stale observe id {:?}, current id: {:?}", - id, self.delegate.observe_info.cdc_id.id - ), - cb, - ); - return; - } - } + let ChangeObserver { region_id, ty } = cmd; - if let Some(ObserveHandle { id, .. }) = rts_id { - if self.delegate.observe_info.rts_id.id > id { - notify_stale_req_with_msg( - self.delegate.term, - format!( - "stale observe id {:?}, current id: {:?}", - id, self.delegate.observe_info.rts_id.id - ), - cb, - ); - return; + let is_stale_cmd = match ty { + ObserverType::Cdc(ObserveHandle { id, .. }) => { + self.delegate.observe_info.cdc_id.id > id + } + ObserverType::Rts(ObserveHandle { id, .. }) => { + self.delegate.observe_info.rts_id.id > id } + ObserverType::Pitr(ObserveHandle { id, .. }) => { + self.delegate.observe_info.pitr_id.id > id + } + }; + if is_stale_cmd { + notify_stale_req_with_msg( + self.delegate.term, + format!( + "stale observe id {:?}, current id: {:?}", + ty.handle().id, + self.delegate.observe_info.pitr_id.id + ), + cb, + ); + return; } assert_eq!(self.delegate.region_id(), region_id); @@ -3584,13 +3597,17 @@ where } }; - if let Some(id) = cdc_id { - self.delegate.observe_info.cdc_id = id; - } - if let Some(id) = rts_id { - self.delegate.observe_info.rts_id = id; + match ty { + ObserverType::Cdc(id) => { + self.delegate.observe_info.cdc_id = id; + } + ObserverType::Rts(id) => { + self.delegate.observe_info.rts_id = id; + } + ObserverType::Pitr(id) => { + self.delegate.observe_info.pitr_id = id; + } } - cb.invoke_read(resp); } @@ -5422,11 +5439,7 @@ mod tests { 1, Msg::Change { region_epoch: region_epoch.clone(), - cmd: ChangeObserver { - cdc_id: Some(observe_handle.clone()), - rts_id: Some(observe_handle.clone()), - region_id: 1, - }, + cmd: ChangeObserver::from_cdc(1, observe_handle.clone()), cb: Callback::Read(Box::new(|resp: ReadResponse| { assert!(!resp.response.get_header().has_error()); assert!(resp.snapshot.is_some()); @@ -5441,6 +5454,7 @@ mod tests { let cmd_batch = cmdbatch_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert_eq!(cmd_batch.cdc_id, ObserveHandle::with_id(0).id); assert_eq!(cmd_batch.rts_id, ObserveHandle::with_id(0).id); + assert_eq!(cmd_batch.pitr_id, ObserveHandle::with_id(0).id); let (capture_tx, capture_rx) = mpsc::channel(); let put_entry = EntryBuilder::new(3, 2) @@ -5462,7 +5476,6 @@ mod tests { assert!(!resp.get_header().has_error(), "{:?}", resp); let cmd_batch = cmdbatch_rx.recv_timeout(Duration::from_secs(3)).unwrap(); assert_eq!(cmd_batch.cdc_id, observe_handle.id); - assert_eq!(cmd_batch.rts_id, observe_handle.id); assert_eq!(resp, cmd_batch.into_iter(1).next().unwrap().response); let put_entry1 = EntryBuilder::new(4, 2) @@ -5495,11 +5508,7 @@ mod tests { 2, Msg::Change { region_epoch, - cmd: ChangeObserver { - cdc_id: Some(observe_handle.clone()), - rts_id: Some(observe_handle), - region_id: 2, - }, + cmd: ChangeObserver::from_cdc(2, observe_handle), cb: Callback::Read(Box::new(|resp: ReadResponse<_>| { assert!( resp.response @@ -5671,11 +5680,7 @@ mod tests { 1, Msg::Change { region_epoch: region_epoch.clone(), - cmd: ChangeObserver { - cdc_id: Some(observe_handle.clone()), - rts_id: Some(observe_handle.clone()), - region_id: 1, - }, + cmd: ChangeObserver::from_cdc(1, observe_handle.clone()), cb: Callback::Read(Box::new(|resp: ReadResponse<_>| { assert!(!resp.response.get_header().has_error(), "{:?}", resp); assert!(resp.snapshot.is_some()); @@ -5830,11 +5835,7 @@ mod tests { 1, Msg::Change { region_epoch, - cmd: ChangeObserver { - cdc_id: Some(observe_handle.clone()), - rts_id: Some(observe_handle), - region_id: 1, - }, + cmd: ChangeObserver::from_cdc(1, observe_handle), cb: Callback::Read(Box::new(move |resp: ReadResponse<_>| { assert!( resp.response.get_header().get_error().has_epoch_not_match(), diff --git a/components/resolved_ts/src/observer.rs b/components/resolved_ts/src/observer.rs index 79ef6d03545..afb07ec3e58 100644 --- a/components/resolved_ts/src/observer.rs +++ b/components/resolved_ts/src/observer.rs @@ -191,7 +191,11 @@ mod test { } // Both cdc and resolved-ts worker are observing - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::default(), + ); let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, cmd.clone()); observer.on_flush_applied_cmd_batch(cb.level, &mut vec![cb], &engine); @@ -199,7 +203,11 @@ mod test { expect_recv(&mut rx, data.clone()); // Only cdc is observing - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::default(), + ); observe_info.rts_id.stop_observing(); let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, cmd.clone()); @@ -207,8 +215,24 @@ mod test { // Still observe all data expect_recv(&mut rx, data.clone()); + // Pitr and rts is observing + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::default(), + ObserveHandle::new(), + ObserveHandle::new(), + ); + let mut cb = CmdBatch::new(&observe_info, 0); + cb.push(&observe_info, 0, cmd.clone()); + observer.on_flush_applied_cmd_batch(cb.level, &mut vec![cb], &engine); + // Still observe all data + expect_recv(&mut rx, data.clone()); + // Only resolved-ts worker is observing - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::default(), + ); observe_info.cdc_id.stop_observing(); let mut cb = CmdBatch::new(&observe_info, 0); cb.push(&observe_info, 0, cmd.clone()); @@ -218,7 +242,11 @@ mod test { expect_recv(&mut rx, data); // Both cdc and resolved-ts worker are not observing - let observe_info = CmdObserveInfo::from_handle(ObserveHandle::new(), ObserveHandle::new()); + let observe_info = CmdObserveInfo::from_handle( + ObserveHandle::new(), + ObserveHandle::new(), + ObserveHandle::default(), + ); observe_info.rts_id.stop_observing(); observe_info.cdc_id.stop_observing(); let mut cb = CmdBatch::new(&observe_info, 0);