Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: support pause/resume stream task #42

Merged
merged 3 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 139 additions & 51 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use resolved_ts::Resolver;
use tikv_util::time::Instant;

use tokio::io::Result as TokioResult;
use tokio::runtime::Handle;
use tokio::runtime::Runtime;
use tokio_stream::StreamExt;
use txn_types::TimeStamp;
Expand Down Expand Up @@ -101,15 +102,16 @@ where
let scheduler_clone = scheduler.clone();
// TODO build a error handle mechanism #error 2
pool.spawn(async {
if let Err(err) = Self::starts_watch_tasks(meta_client_clone, scheduler_clone).await
if let Err(err) =
joccau marked this conversation as resolved.
Show resolved Hide resolved
joccau marked this conversation as resolved.
Show resolved Hide resolved
Self::start_and_watch_tasks(meta_client_clone, scheduler_clone).await
{
err.report("failed to start watch tasks");
}
});
pool.spawn(Self::starts_flush_ticks(range_router.clone()));
}

info!("the endpoint of stream backup started"; "path" => %config.temp_path);
info!("the endpoint of backup stream started"; "path" => %config.temp_path);
Endpoint {
config,
meta_client,
Expand Down Expand Up @@ -173,11 +175,13 @@ where
let scheduler_clone = scheduler.clone();
// TODO build a error handle mechanism #error 2
pool.spawn(async {
if let Err(err) = Self::starts_watch_tasks(meta_client_clone, scheduler_clone).await
if let Err(err) =
Self::start_and_watch_tasks(meta_client_clone, scheduler_clone).await
{
err.report("failed to start watch tasks");
}
});

pool.spawn(Self::starts_flush_ticks(range_router.clone()));
}

Expand Down Expand Up @@ -218,30 +222,83 @@ where
}

// TODO find a proper way to exit watch tasks
async fn starts_watch_tasks(
async fn start_and_watch_tasks(
meta_client: MetadataClient<S>,
scheduler: Scheduler<Task>,
) -> Result<()> {
let tasks = meta_client.get_tasks().await?;
for task in tasks.inner {
info!("backup stream watch task"; "task" => ?task);
if task.is_paused {
continue;
}
// move task to schedule
scheduler.schedule(Task::WatchTask(TaskOp::AddTask(task)))?;
}

let mut watcher = meta_client.events_from(tasks.revision).await?;
let revision = tasks.revision;
let meta_client_clone = meta_client.clone();
let scheduler_clone = scheduler.clone();

Handle::current().spawn(async move {
if let Err(err) =
Self::starts_watch_task(meta_client_clone, scheduler_clone, revision).await
{
err.report("failed to start watch tasks");
}
});

Handle::current().spawn(async move {
if let Err(err) = Self::starts_watch_pause(meta_client, scheduler, revision).await {
err.report("failed to start watch pause");
}
});

Ok(())
}

async fn starts_watch_task(
meta_client: MetadataClient<S>,
scheduler: Scheduler<Task>,
revision: i64,
) -> Result<()> {
let mut watcher = meta_client.events_from(revision).await?;
loop {
if let Some(event) = watcher.stream.next().await {
info!("backup stream watch event from etcd"; "event" => ?event);
match event {
MetadataEvent::AddTask { task } => {
let t = meta_client.get_task(&task).await?;
scheduler.schedule(Task::WatchTask(TaskOp::AddTask(t)))?;
scheduler.schedule(Task::WatchTask(TaskOp::AddTask(task)))?;
}
MetadataEvent::RemoveTask { task } => {
scheduler.schedule(Task::WatchTask(TaskOp::RemoveTask(task)))?;
}
MetadataEvent::Error { err } => err.report("metadata client watch meet error"),
_ => panic!("BUG: invalid event {:?}", event),
}
}
}
}

async fn starts_watch_pause(
meta_client: MetadataClient<S>,
scheduler: Scheduler<Task>,
revision: i64,
) -> Result<()> {
let mut watcher = meta_client.events_from_pause(revision).await?;
loop {
if let Some(event) = watcher.stream.next().await {
info!("backup stream watch event from etcd"; "event" => ?event);
match event {
MetadataEvent::PauseTask { task } => {
scheduler.schedule(Task::WatchTask(TaskOp::PauseTask(task)))?;
}
MetadataEvent::ResumeTask { task } => {
let task = meta_client.get_task(&task).await?;
scheduler.schedule(Task::WatchTask(TaskOp::ResumeTask(task)))?;
}
MetadataEvent::Error { err } => err.report("metadata client watch meet error"),
_ => panic!("BUG: invalid event {:?}", event),
}
}
}
Expand Down Expand Up @@ -311,7 +368,71 @@ where
TaskOp::RemoveTask(task_name) => {
self.on_unregister(&task_name);
}
TaskOp::PauseTask(task_name) => {
self.on_unregister(&task_name);
}
TaskOp::ResumeTask(task) => {
self.on_register(task);
}
}
}

async fn observe_and_scan_region(
&self,
init: InitialDataLoader<E, R, RT>,
task: &StreamTask,
start_key: Vec<u8>,
end_key: Vec<u8>,
) -> Result<()> {
let start = Instant::now_coarse();
let mut start_ts = task.info.get_start_ts();
let ob = self.observer.clone();
let rs = self.resolvers.clone();

// Should scan from checkpoint_ts rather than start_ts if checkpoint_ts exists in Metadata.
if let Some(cli) = &self.meta_client {
let checkpoint_ts = cli.progress_of_task(task.info.get_name()).await?;
start_ts = start_ts.max(checkpoint_ts);
}

let success = self
.observer
.ranges
.wl()
.add((start_key.clone(), end_key.clone()));
if !success {
warn!("backup stream task ranges overlapped, which hasn't been supported for now";
"task" => ?task,
"start_key" => utils::redact(&start_key),
"end_key" => utils::redact(&end_key),
);
}

tokio::task::spawn_blocking(move || {
let range_init_result = init.initialize_range(
start_key.clone(),
end_key.clone(),
TimeStamp::new(start_ts),
|region_id, handle| {
// Note: maybe we'd better schedule a "register region" here?
ob.subs.register_region(region_id, handle);
rs.insert(region_id, Resolver::new(region_id));
},
);
match range_init_result {
Ok(stat) => {
info!("backup stream do initial scanning successfully"; "stat" => ?stat,
"start_key" => utils::redact(&start_key),
"end_key" => utils::redact(&end_key),
"take" => ?start.saturating_elapsed(),)
}
Err(e) => {
e.report("backup stream failed to initialize regions");
}
}
});

Ok(())
}

// register task ranges
Expand Down Expand Up @@ -346,53 +467,18 @@ where
.register_task(task.clone(), ranges.clone())
.await
{
err.report(format!("failed to register task {}", task.info.name));
err.report(format!(
"failed to register backup stream task {}",
task.info.name
));
return;
}

for (start_key, end_key) in ranges {
let init = init.clone();
let start_key = start_key;
let end_key = end_key;
let start = Instant::now_coarse();
let start_ts = task.info.get_start_ts();
let ob = self.observer.clone();
let rs = self.resolvers.clone();
let success = self
.observer
.ranges
.wl()
.add((start_key.clone(), end_key.clone()));
if !success {
warn!("task ranges overlapped, which hasn't been supported for now";
"task" => ?task,
"start_key" => utils::redact(&start_key),
"end_key" => utils::redact(&end_key),
);
continue;
}
tokio::task::spawn_blocking(move || {
let range_init_result = init.initialize_range(
start_key.clone(),
end_key.clone(),
TimeStamp::new(start_ts),
|region_id, handle| {
// Note: maybe we'd better schedule a "register region" here?
ob.subs.register_region(region_id, handle);
rs.insert(region_id, Resolver::new(region_id));
},
);
match range_init_result {
Ok(stat) => {
info!("success to do initial scanning"; "stat" => ?stat,
"start_key" => utils::redact(&start_key),
"end_key" => utils::redact(&end_key),
"take" => ?start.saturating_elapsed(),)
}
Err(e) => {
e.report("failed to initialize regions");
}
}
});
self.observe_and_scan_region(init, &task, start_key, end_key)
.await
.unwrap();
}
info!(
"finish register backup stream ranges";
Expand All @@ -401,7 +487,7 @@ where
}
Err(e) => {
e.report(format!(
"failed to register task {} to router: ranges not found",
"failed to register backup stream task {} to router: ranges not found",
task.info.get_name()
));
// TODO build a error handle mechanism #error 5
Expand Down Expand Up @@ -670,6 +756,8 @@ pub enum Task {
pub enum TaskOp {
AddTask(StreamTask),
RemoveTask(String),
PauseTask(String),
ResumeTask(StreamTask),
}

#[derive(Debug)]
Expand Down
Loading