Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scan on Leader Change #31

Merged
merged 10 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/br-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ test-engines-rocksdb = [

[dependencies]
tokio = { version = "1.5", features = ["rt-multi-thread", "macros", "time", "sync"] }
tokio-util = { version = "0.7", features = ["compat"] }
prometheus = { version = "0.13", default-features = false, features = ["nightly"] }
async-trait = { version = "0.1" }
thiserror = "1"
Expand Down
83 changes: 74 additions & 9 deletions components/br-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use std::time::Duration;
use dashmap::DashMap;
use engine_traits::KvEngine;

use futures::executor::block_on;
use kvproto::metapb::Region;
use pd_client::PdClient;
use raftstore::router::RaftStoreRouter;
use raftstore::store::fsm::ChangeObserver;
use resolved_ts::Resolver;

use tikv_util::time::Instant;

use crossbeam_channel::tick;
Expand Down Expand Up @@ -378,9 +380,13 @@ where
let pd_cli = self.pd_client.clone();
let resolvers = self.resolvers.clone();
self.pool.spawn(async move {
let start = Instant::now_coarse();
// NOTE: Maybe push down the resolve step to the router?
// Or if there are too many duplicated `Flush` command, we may do some useless works.
let new_rts = Self::try_resolve(pd_cli.clone(), resolvers).await;
metrics::FLUSH_DURATION
.with_label_values(&["resolve_by_now"])
.observe(start.saturating_elapsed_secs());
if let Some(rts) = router.do_flush(&task, store_id, new_rts).await {
info!("flushing and refreshing checkpoint ts."; "checkpoint_ts" => %rts, "task" => %task);
if rts == 0 {
Expand Down Expand Up @@ -426,18 +432,75 @@ where
Ok(())
}

fn observe_over_with_initial_data_from_checkpoint(
&self,
region: &Region,
task: String,
) -> Result<()> {
let init = self.make_initial_loader();
let handle = ObserveHandle::new();
let region_id = region.get_id();
let ob = ChangeObserver::from_cdc(region_id, handle.clone());
let snap = init.observe_over(region, ob)?;
let meta_cli = self.meta_client.as_ref().unwrap().clone();
self.observer.subs.register_region(region_id, handle);
self.resolvers.insert(region.id, Resolver::new(region.id));

let region = region.clone();
// Note: Even we did the initial scanning, if the next_backup_ts was updated by periodic flushing,
// before the initial scanning done, there is still possibility of losing data:
// if the server crashes immediately, and data of this scanning hasn't been sent to sink,
// those data would be permanently lost.
// Maybe we need block the next_backup_ts from advancing before all initial scanning done(Or just for the region, via disabling the resolver)?
self.pool.spawn_blocking(move || {
let from_ts = match block_on(meta_cli.global_progress_of_task(&task)) {
Ok(ts) => ts,
Err(err) => {
err.report("failed to get global progress of task");
return;
}
};

match init.do_initial_scan(&region, TimeStamp::new(from_ts), snap) {
Ok(stat) => {
info!("initial scanning of leader transforming finished!"; "statistics" => ?stat, "region" => %region.get_id(), "from_ts" => %from_ts);
}
Err(err) => err.report(format!("during initial scanning of region {:?}", region)),
}
});
Ok(())
}

fn find_task_by_region(&self, r: &Region) -> Option<String> {
self.range_router
.find_task_by_range(&r.start_key, &r.end_key)
}

/// Modify observe over some region.
/// This would register the region to the RaftStore.
///
/// > Note: If using this to start observe, this won't trigger a incremental scanning.
/// > When the follower progress faster than leader and then be elected,
/// > there is a risk of losing data.
pub fn on_modify_observe(&self, op: ObserveOp) {
info!("br-stream: on_modify_observe"; "op" => ?op);
match op {
ObserveOp::Start { region } => {
if let Err(e) = self.observe_over(&region) {
e.report(format!("register region {} to raftstore", region.get_id()));
ObserveOp::Start {
region,
needs_initial_scanning,
} => {
let result = if needs_initial_scanning {
let for_task = self.find_task_by_region(&region).unwrap_or_else(|| {
panic!(
"BUG: the region {:?} is register to no task but being observed",
region
)
});
self.observe_over_with_initial_data_from_checkpoint(&region, for_task)
} else {
self.observe_over(&region)
};
if let Err(err) = result {
err.report(format!(
"during doing initial scanning for region {:?}",
region
));
}
}
ObserveOp::Stop { region } => {
Expand Down Expand Up @@ -507,8 +570,10 @@ pub enum TaskOp {
pub enum ObserveOp {
Start {
region: Region,
// Note: Maybe add the request for initial scanning too.
// needs_initial_scanning: bool
// if `true`, would scan and sink change from the global checkpoint ts.
// Note: maybe we'd better make it Option<TimeStamp> to make it more generic,
// but that needs the `observer` know where the checkpoint is, which is a little dirty...
needs_initial_scanning: bool,
},
Stop {
region: Region,
Expand Down
23 changes: 17 additions & 6 deletions components/br-stream/src/event_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ impl<S: Snapshot> EventLoader<S> {
default: (key, value),
..
} => {
// FIXME: we also need to update the information for the `resolver` in the endpoint,
// otherwise we may advance the resolved ts too far in some conditions?
if !key.is_empty() {
result.push(ApplyEvent {
key,
Expand Down Expand Up @@ -178,16 +180,12 @@ where
Ok(snap)
}

/// Initialize the region: register it to the raftstore and the observer.
/// At the same time, perform the initial scanning, (an incremental scanning from `start_ts`)
/// and generate the corresponding ApplyEvent to the sink directly.
pub fn initialize_region(
pub fn do_initial_scan(
&self,
region: &Region,
start_ts: TimeStamp,
cmd: ChangeObserver,
snap: impl Snapshot,
) -> Result<Statistics> {
let snap = self.observe_over(region, cmd)?;
// It is ok to sink more data than needed. So scan to +inf TS for convenance.
let mut event_loader = EventLoader::load_from(snap, start_ts, TimeStamp::max(), region)?;
let mut stats = StatisticsSummary::default();
Expand All @@ -210,6 +208,19 @@ where
Ok(stats.stat)
}

/// Initialize the region: register it to the raftstore and the observer.
/// At the same time, perform the initial scanning, (an incremental scanning from `start_ts`)
/// and generate the corresponding ApplyEvent to the sink directly.
pub fn initialize_region(
&self,
region: &Region,
start_ts: TimeStamp,
cmd: ChangeObserver,
) -> Result<Statistics> {
let snap = self.observe_over(region, cmd)?;
self.do_initial_scan(region, start_ts, snap)
}

/// initialize a range: it simply scan the regions with leader role and send them to [`initialize_region`].
pub fn initialize_range(
&self,
Expand Down
51 changes: 38 additions & 13 deletions components/br-stream/src/metadata/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use super::{

use kvproto::brpb::StreamBackupTaskInfo;

use tikv_util::{defer, time::Instant};
use tikv_util::{defer, time::Instant, warn};
use tokio_stream::StreamExt;


use crate::errors::{Error, Result};

/// Some operations over stream backup metadata key space.
Expand Down Expand Up @@ -257,7 +258,7 @@ impl<Store: MetaStore> MetadataClient<Store> {
}
let task = self.get_task(task_name).await?;
let timestamp = self.meta_store.snapshot().await?;
let mut items = timestamp
let items = timestamp
.get(Keys::Key(MetaKey::next_backup_ts_of(
task_name,
self.store_id,
Expand All @@ -266,18 +267,42 @@ impl<Store: MetaStore> MetadataClient<Store> {
if items.is_empty() {
Ok(task.info.start_ts)
} else {
assert!(items.len() == 1, "{:?}", items);
let next_backup_ts = std::mem::take(&mut items[0].1);
if next_backup_ts.len() != 8 {
return Err(Error::MalformedMetadata(format!(
"the length of next_backup_ts is {} bytes, require 8 bytes",
next_backup_ts.len()
)));
}
let mut buf = [0u8; 8];
buf.copy_from_slice(next_backup_ts.as_slice());
Ok(u64::from_be_bytes(buf))
assert_eq!(items.len(), 1);
Self::parse_ts_from_bytes(items[0].1.as_slice())
}
}

/// get the global progress (the min next_backup_ts among all stores).
pub async fn global_progress_of_task(&self, task_name: &str) -> Result<u64> {
let now = Instant::now();
defer! {
super::metrics::METADATA_OPERATION_LATENCY.with_label_values(&["task_progress_get_global"]).observe(now.saturating_elapsed().as_secs_f64())
}
let task = self.get_task(task_name).await?;
let snap = self.meta_store.snapshot().await?;
let global_ts = snap.get(Keys::Prefix(MetaKey::next_backup_ts(task_name)))
.await?
.iter()
.filter_map(|kv| {
Self::parse_ts_from_bytes(kv.1.as_slice())
.map_err(|err| warn!("br-stream: failed to parse next_backup_ts."; "key" => ?kv.0, "err" => %err))
.ok()
})
.min()
.unwrap_or(task.info.start_ts);
Ok(global_ts)
}

fn parse_ts_from_bytes(next_backup_ts: &[u8]) -> Result<u64> {
if next_backup_ts.len() != 8 {
return Err(Error::MalformedMetadata(format!(
"the length of next_backup_ts is {} bytes, require 8 bytes",
next_backup_ts.len()
)));
}
let mut buf = [0u8; 8];
buf.copy_from_slice(next_backup_ts);
Ok(u64::from_be_bytes(buf))
}

/// insert a task with ranges into the metadata store.
Expand Down
10 changes: 7 additions & 3 deletions components/br-stream/src/metadata/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,17 @@ impl MetaKey {

/// The key of next backup ts of some region in some store.
pub fn next_backup_ts_of(name: &str, store_id: u64) -> Self {
let base = format!("{}{}/{}", PREFIX, PATH_NEXT_BACKUP_TS, name);
let mut buf = bytes::BytesMut::from(base);
buf.put(b'/');
let base = Self::next_backup_ts(name);
let mut buf = bytes::BytesMut::from(base.0);
buf.put_u64_be(store_id);
Self(buf.to_vec())
}

// The prefix for next backup ts.
pub fn next_backup_ts(name: &str) -> Self {
Self(format!("{}{}/{}/", PREFIX, PATH_NEXT_BACKUP_TS, name).into_bytes())
}

/// The key for pausing some task.
pub fn pause_of(name: &str) -> Self {
Self(format!("{}{}/{}", PREFIX, PATH_PAUSE, name).into_bytes())
Expand Down
7 changes: 7 additions & 0 deletions components/br-stream/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,11 @@ lazy_static! {
&["task"],
)
.unwrap();
pub static ref FLUSH_DURATION: HistogramVec = register_histogram_vec!(
"tikv_stream_flush_duration_sec",
"The time cost of flushing a task.",
&["stage"],
exponential_buckets(0.001, 2.0, 16).unwrap()
)
.unwrap();
}
5 changes: 3 additions & 2 deletions components/br-stream/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl BackupStreamObserver {
.scheduler
.schedule(Task::ModifyObserve(ObserveOp::Start {
region: region.clone(),
needs_initial_scanning: true,
}))
{
Error::from(err).report(format_args!(
Expand Down Expand Up @@ -273,7 +274,7 @@ mod tests {
o.register_region(&r);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
let handle = ObserveHandle::new();
if let Task::ModifyObserve(ObserveOp::Start { region }) = task {
if let Task::ModifyObserve(ObserveOp::Start { region, .. }) = task {
o.subs.register_region(region.get_id(), handle.clone())
} else {
panic!("unexpected message received: it is {}", task);
Expand All @@ -297,7 +298,7 @@ mod tests {
o.register_region(&r);
let task = rx.recv_timeout(Duration::from_secs(0)).unwrap().unwrap();
let handle = ObserveHandle::new();
if let Task::ModifyObserve(ObserveOp::Start { region }) = task {
if let Task::ModifyObserve(ObserveOp::Start { region, .. }) = task {
o.subs.register_region(region.get_id(), handle.clone());
} else {
panic!("not match, it is {:?}", task);
Expand Down
Loading