Skip to content

Commit

Permalink
implement stream restore for tikv side. (#12)
Browse files Browse the repository at this point in the history
* sst-importer: impl apply for br stream restore

Signed-off-by: 3pointer <[email protected]>

* move br-stream::codec outside for sst-importer

Signed-off-by: 3pointer <[email protected]>

* perform rewrite

Signed-off-by: 3pointer <[email protected]>

* make clippy

Signed-off-by: 3pointer <[email protected]>

* add test

Signed-off-by: 3pointer <[email protected]>

* address some somments

Signed-off-by: 3pointer <[email protected]>

* address comment

Signed-off-by: 3pointer <[email protected]>

* address comment

Signed-off-by: 3pointer <[email protected]>

* address comment

Signed-off-by: 3pointer <[email protected]>

* fix valid
  • Loading branch information
3pointer authored Feb 10, 2022
1 parent 6eaa365 commit 2c0c168
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 118 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 0 additions & 56 deletions components/br-stream/src/codec.rs

This file was deleted.

1 change: 0 additions & 1 deletion components/br-stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
#![feature(inherent_ascii_escape)]
mod codec;
pub mod config;
mod endpoint;
pub mod errors;
Expand Down
7 changes: 4 additions & 3 deletions components/br-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use std::{

use crate::{
annotate,
codec::Encoder,
endpoint::Task,
errors::Error,
metadata::StreamTask,
Expand Down Expand Up @@ -41,7 +40,9 @@ use slog_global::debug;
use tidb_query_datatype::codec::table::decode_table_id;

use tikv_util::{
box_err, defer, error, info,
box_err,
codec::stream_event::EventEncoder,
defer, info, error,
time::{Instant, Limiter},
warn,
worker::Scheduler,
Expand Down Expand Up @@ -825,7 +826,7 @@ impl DataFile {
async fn on_event(&mut self, mut kv: ApplyEvent) -> Result<usize> {
let now = Instant::now_coarse();
let _entry_size = kv.size();
let encoded = Encoder::encode_event(&kv.key, &kv.value);
let encoded = EventEncoder::encode_event(&kv.key, &kv.value);
let mut size = 0;
for slice in encoded {
let slice = slice.as_ref();
Expand Down
8 changes: 6 additions & 2 deletions components/sst_importer/src/import_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,7 @@ impl ImportDir {
})
}

pub fn join(&self, meta: &SstMeta) -> Result<ImportPath> {
let file_name = sst_meta_to_path(meta)?;
pub fn get_import_path(&self, file_name: &str) -> Result<ImportPath> {
let save_path = self.root_dir.join(&file_name);
let temp_path = self.temp_dir.join(&file_name);
let clone_path = self.clone_dir.join(&file_name);
Expand All @@ -231,6 +230,11 @@ impl ImportDir {
})
}

pub fn join(&self, meta: &SstMeta) -> Result<ImportPath> {
let file_name = sst_meta_to_path(meta)?;
self.get_import_path(file_name.to_str().unwrap())
}

pub fn create(
&self,
meta: &SstMeta,
Expand Down
8 changes: 8 additions & 0 deletions components/sst_importer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,12 @@ lazy_static! {
&["type", "error"]
)
.unwrap();
pub static ref IMPORTER_APPLY_DURATION: HistogramVec = register_histogram_vec!(
"tikv_import_apply_duration",
"Bucketed histogram of importer apply duration",
&["type"],
// Start from 10ms.
exponential_buckets(0.01, 2.0, 20).unwrap()
)
.unwrap();
}
Loading

0 comments on commit 2c0c168

Please sign in to comment.