Skip to content

Commit

Permalink
Added retry for initial scanning and some metrics (#47)
Browse files Browse the repository at this point in the history
* added new metrics and retry for initial scanning

Signed-off-by: Yu Juncen <[email protected]>

* make memory a gauge

Signed-off-by: Yu Juncen <[email protected]>

* fixed task pause, make flush checking more frquent

Signed-off-by: Yu Juncen <[email protected]>

* fix metrics

Signed-off-by: Yu Juncen <[email protected]>

* make clippy happy

Signed-off-by: Yu Juncen <[email protected]>

* don't panic when channel dropped

Signed-off-by: Yu Juncen <[email protected]>
  • Loading branch information
YuJuncen authored Apr 26, 2022
1 parent 72d6f67 commit 35d1dac
Show file tree
Hide file tree
Showing 8 changed files with 357 additions and 122 deletions.
210 changes: 148 additions & 62 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use error_code::ErrorCodeExt;
use kvproto::brpb::StreamBackupError;
use kvproto::metapb::Region;
use pd_client::PdClient;
use raft::StateRole;
use raftstore::router::RaftStoreRouter;
use raftstore::store::fsm::ChangeObserver;

Expand All @@ -26,6 +27,7 @@ use tokio::runtime::Runtime;
use tokio_stream::StreamExt;
use txn_types::TimeStamp;

use crate::annotate;
use crate::errors::Error;
use crate::event_loader::InitialDataLoader;
use crate::metadata::store::{EtcdStore, MetaStore};
Expand All @@ -41,7 +43,7 @@ use raftstore::coprocessor::{CmdBatch, ObserveHandle, RegionInfoProvider};
use tikv::config::BackupStreamConfig;

use tikv_util::worker::{Runnable, Scheduler};
use tikv_util::{debug, error, info};
use tikv_util::{box_err, debug, error, info};
use tikv_util::{warn, HandyRwLock};

use super::metrics::{HANDLE_EVENT_DURATION_HISTOGRAM, HANDLE_KV_HISTOGRAM};
Expand Down Expand Up @@ -149,6 +151,7 @@ where
pd_client: Arc<PDC>,
concurrency_manager: ConcurrencyManager,
) -> Endpoint<EtcdStore, R, E, RT, PDC> {
crate::metrics::STREAM_ENABLED.inc();
let pool = create_tokio_runtime(config.num_threads, "backup-stream")
.expect("failed to create tokio runtime for backup stream worker.");

Expand Down Expand Up @@ -220,6 +223,7 @@ where
fn on_fatal_error(&self, task: String, err: Box<Error>) {
// Let's pause the task locally first.
self.on_unregister(&task);
err.report(format_args!("fatal for task {}", err));

let meta_cli = self.get_meta_client();
let store_id = self.store_id;
Expand Down Expand Up @@ -249,8 +253,9 @@ where

async fn starts_flush_ticks(router: Router) {
loop {
// wait 1min to trigger tick
tokio::time::sleep(Duration::from_secs(FLUSH_STORAGE_INTERVAL / 5)).await;
// check every 15s.
// TODO: maybe use global timer handle in the `tikv_utils::timer` (instead of enabling timing in the current runtime)?
tokio::time::sleep(Duration::from_secs(FLUSH_STORAGE_INTERVAL / 20)).await;
debug!("backup stream trigger flush tick");
router.tick().await;
}
Expand Down Expand Up @@ -349,6 +354,7 @@ where
return;
}
};
let sched = self.scheduler.clone();

let kvs = ApplyEvents::from_cmd_batch(batch, resolver.value_mut().resolver());
drop(resolver);
Expand All @@ -367,14 +373,10 @@ where
let kv_count = kvs.len();
let total_size = kvs.size();
metrics::HEAP_MEMORY
.with_label_values(&["alloc"])
.inc_by(total_size as f64);
if let Err(err) = router.on_events(kvs).await {
err.report("failed to send event.");
}
.add(total_size as _);
utils::handle_on_event_result(&sched, router.on_events(kvs).await);
metrics::HEAP_MEMORY
.with_label_values(&["free"])
.inc_by(total_size as f64);
.sub(total_size as _);
HANDLE_KV_HISTOGRAM.observe(kv_count as _);
let time_cost = sw.lap().as_secs_f64();
if time_cost > SLOW_EVENT_THRESHOLD {
Expand All @@ -393,6 +395,7 @@ where
self.regions.clone(),
self.range_router.clone(),
self.subs.clone(),
self.scheduler.clone(),
)
}

Expand Down Expand Up @@ -421,12 +424,6 @@ where
end_key: Vec<u8>,
) -> Result<()> {
let start = Instant::now_coarse();
let mut start_ts = task.info.get_start_ts();
// Should scan from checkpoint_ts rather than start_ts if checkpoint_ts exists in Metadata.
if let Some(cli) = &self.meta_client {
let checkpoint_ts = cli.progress_of_task(task.info.get_name()).await?;
start_ts = start_ts.max(checkpoint_ts);
}
let success = self
.observer
.ranges
Expand All @@ -440,17 +437,16 @@ where
);
}
tokio::task::spawn_blocking(move || {
let range_init_result =
init.initialize_range(start_key.clone(), end_key.clone(), TimeStamp::new(start_ts));
let range_init_result = init.initialize_range(start_key.clone(), end_key.clone());
match range_init_result {
Ok(stat) => {
info!("backup stream success to do initial scanning"; "stat" => ?stat,
Ok(()) => {
info!("backup stream success to initialize";
"start_key" => utils::redact(&start_key),
"end_key" => utils::redact(&end_key),
"take" => ?start.saturating_elapsed(),)
}
Err(e) => {
e.report("backup stream do initial scanning failed");
e.report("backup stream initialize failed");
}
}
});
Expand Down Expand Up @@ -526,6 +522,10 @@ where
self.pool.block_on(async move {
router.unregister_task(task).await;
});
// for now, we support one concurrent task only.
// so simply clear all info would be fine.
self.subs.clear();
self.observer.ranges.wl().clear();
}

/// try advance the resolved ts by the pd tso.
Expand Down Expand Up @@ -632,13 +632,12 @@ where

/// Start observe over some region.
/// This would modify some internal state, and delegate the task to InitialLoader::observe_over.
fn observe_over(&self, region: &Region) -> Result<()> {
fn observe_over(&self, region: &Region, handle: ObserveHandle) -> Result<()> {
let init = self.make_initial_loader();
let handle = ObserveHandle::new();
let region_id = region.get_id();
self.subs.register_region(&region, handle.clone(), None);
self.subs.register_region(region, handle.clone(), None);
init.observe_over_with_retry(region, || {
ChangeObserver::from_cdc(region_id, handle.clone())
ChangeObserver::from_pitr(region_id, handle.clone())
})?;
Ok(())
}
Expand All @@ -647,33 +646,31 @@ where
&self,
region: &Region,
task: String,
handle: ObserveHandle,
) -> Result<()> {
let init = self.make_initial_loader();

let handle = ObserveHandle::new();
let meta_cli = self.meta_client.as_ref().unwrap().clone();
let last_checkpoint = TimeStamp::new(
self.pool
.block_on(meta_cli.global_progress_of_task(&task))?,
);
self.subs
.register_region(&region, handle.clone(), Some(last_checkpoint));
.register_region(region, handle.clone(), Some(last_checkpoint));

let region_id = region.get_id();
let snap = init.observe_over_with_retry(region, move || {
ChangeObserver::from_cdc(region_id, handle.clone())
ChangeObserver::from_pitr(region_id, handle.clone())
})?;
let region = region.clone();

// Note: Even we did the initial scanning, if the next_backup_ts was updated by periodic flushing,
// before the initial scanning done, there is still possibility of losing data:
// if the server crashes immediately, and data of this scanning hasn't been sent to sink,
// those data would be permanently lost.
// Maybe we need block the next_backup_ts from advancing before all initial scanning done(Or just for the region, via disabling the resolver)?
self.pool.spawn_blocking(move || {
match init.do_initial_scan(&region, last_checkpoint, snap) {
Ok(stat) => {
info!("initial scanning of leader transforming finished!"; "statistics" => ?stat, "region" => %region.get_id(), "from_ts" => %last_checkpoint);
utils::record_cf_stat("lock", &stat.lock);
utils::record_cf_stat("write", &stat.write);
utils::record_cf_stat("default", &stat.data);
}
Err(err) => err.report(format!("during initial scanning of region {:?}", region)),
}
Expand All @@ -694,32 +691,14 @@ where
ObserveOp::Start {
region,
needs_initial_scanning,
} => {
let result = if needs_initial_scanning {
let for_task = self.find_task_by_region(&region).unwrap_or_else(|| {
panic!(
"BUG: the region {:?} is register to no task but being observed",
region
)
});
self.observe_over_with_initial_data_from_checkpoint(&region, for_task)
} else {
self.observe_over(&region)
};
if let Err(err) = result {
err.report(format!(
"during doing initial scanning for region {:?}",
region
));
}
}
} => self.start_observe(region, needs_initial_scanning),
ObserveOp::Stop { ref region } => {
self.subs.deregister_region(region, |_, _| true);
}
ObserveOp::CheckEpochAndStop { ref region } => {
self.subs.deregister_region(region, |old, new| {
raftstore::store::util::compare_region_epoch(
old.get_region_epoch(),
old.meta.get_region_epoch(),
new,
true,
true,
Expand All @@ -730,31 +709,133 @@ where
});
}
ObserveOp::RefreshResolver { ref region } => {
let need_refresh_all = !self.subs.try_update_region(&region);
let need_refresh_all = !self.subs.try_update_region(region);

if need_refresh_all {
let canceled = self.subs.deregister_region(region, |_, _| true);
let handle = ObserveHandle::new();
if canceled {
let for_task = self.find_task_by_region(&region).unwrap_or_else(|| {
let for_task = self.find_task_by_region(region).unwrap_or_else(|| {
panic!(
"BUG: the region {:?} is register to no task but being observed",
region
)
});
if let Err(e) =
self.observe_over_with_initial_data_from_checkpoint(&region, for_task)
{
e.report(format!(
"register region {} to raftstore when refreshing",
region.get_id()
));
if let Err(e) = self.observe_over_with_initial_data_from_checkpoint(
region,
for_task,
handle.clone(),
) {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::NotifyFailToStartObserve {
region: region.clone(),
handle,
err: Box::new(e)
})
);
}
}
}
}
ObserveOp::NotifyFailToStartObserve {
region,
handle,
err,
} => {
info!("retry observe region"; "region" => %region.get_id(), "err" => %err);
match self.retry_observe(region, handle) {
Ok(()) => {}
Err(e) => {
try_send!(
self.scheduler,
Task::FatalError(
format!("While retring to observe region, origin error is {}", err),
Box::new(e)
)
);
}
}
}
}
}

fn start_observe(&self, region: Region, needs_initial_scanning: bool) {
let handle = ObserveHandle::new();
let result = if needs_initial_scanning {
let for_task = self.find_task_by_region(&region).unwrap_or_else(|| {
panic!(
"BUG: the region {:?} is register to no task but being observed (start_key = {}; end_key = {}; task_stat = {:?})",
region, utils::redact(&region.get_start_key()), utils::redact(&region.get_end_key()), self.range_router
)
});
self.observe_over_with_initial_data_from_checkpoint(&region, for_task, handle.clone())
} else {
self.observe_over(&region, handle.clone())
};
if let Err(err) = result {
try_send!(
self.scheduler,
Task::ModifyObserve(ObserveOp::NotifyFailToStartObserve {
region,
handle,
err: Box::new(err)
})
);
}
}

fn retry_observe(&self, region: Region, handle: ObserveHandle) -> Result<()> {
let (tx, rx) = crossbeam::channel::bounded(1);
self.regions
.find_region_by_id(
region.get_id(),
Box::new(move |item| {
tx.send(item)
.expect("BUG: failed to send to newly created channel.");
}),
)
.map_err(|err| {
annotate!(
err,
"failed to send request to region info accessor, server maybe too too too busy. (region id = {})",
region.get_id()
)
})?;
let new_region_info = rx
.recv()
.map_err(|err| annotate!(err, "BUG?: unexpected channel message dropped."))?;
if new_region_info.is_none() {
metrics::SKIP_RETRY
.with_label_values(&["region-absent"])
.inc();
return Ok(());
}
if new_region_info
.as_ref()
.map(|r| r.role != StateRole::Leader)
.unwrap_or(true)
{
metrics::SKIP_RETRY.with_label_values(&["not-leader"]).inc();
return Ok(());
}
let removed = self.subs.deregister_region(&region, |old, _| {
let should_remove = old.handle().id == handle.id;
if !should_remove {
warn!("stale retry command"; "region" => ?region, "handle" => ?handle, "old_handle" => ?old.handle());
}
should_remove
});
if !removed {
metrics::SKIP_RETRY
.with_label_values(&["stale-command"])
.inc();
return Ok(());
}
self.start_observe(region, true);
Ok(())
}

pub fn run_task(&self, task: Task) {
debug!("run backup stream task"; "task" => ?task);
match task {
Expand Down Expand Up @@ -835,6 +916,11 @@ pub enum ObserveOp {
RefreshResolver {
region: Region,
},
NotifyFailToStartObserve {
region: Region,
handle: ObserveHandle,
err: Box<Error>,
},
}

impl fmt::Debug for Task {
Expand Down
8 changes: 8 additions & 0 deletions components/backup-stream/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ impl Error {
}
}

/// add some context to the error.
pub fn context(self, msg: impl Display) -> Self {
Self::Contextual {
inner_error: Box::new(self),
context: msg.to_string(),
}
}

fn kind(&self) -> &'static str {
self.error_code().code
}
Expand Down
Loading

0 comments on commit 35d1dac

Please sign in to comment.