diff --git a/Cargo.lock b/Cargo.lock index 88243e2f439..01739c6dbe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2283,7 +2283,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git?branch=br-stream#b74e38458440e5300b489d29ecf63d1ea586a0d2" +source = "git+https://github.com/pingcap/kvproto.git?branch=br-stream#11c47a2c2f8e3c59b4cb12abd479675eaad69f3b" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/components/br-stream/src/codec.rs b/components/br-stream/src/codec.rs deleted file mode 100644 index 59f07b47263..00000000000 --- a/components/br-stream/src/codec.rs +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use bytes::{Buf, Bytes}; -use std::io::prelude::*; -use std::io::Cursor; -use tikv_util::Either; - -pub struct Encoder; - -impl Encoder { - // TODO move this function to a indepentent module. - pub fn encode_event<'e>(key: &'e [u8], value: &'e [u8]) -> [impl AsRef<[u8]> + 'e; 4] { - let key_len = (key.len() as u32).to_le_bytes(); - let val_len = (value.len() as u32).to_le_bytes(); - [ - Either::Left(key_len), - Either::Right(key), - Either::Left(val_len), - Either::Right(value), - ] - } - - #[allow(dead_code)] - pub fn decode_event(e: &[u8]) -> (Vec, Vec) { - let mut buf = Cursor::new(Bytes::from(e)); - let len = buf.get_u32_le() as usize; - let mut key = vec![0; len]; - buf.read_exact(key.as_mut_slice()).unwrap(); - let len = buf.get_u32_le() as usize; - let mut val = vec![0; len]; - buf.read_exact(val.as_mut_slice()).unwrap(); - (key, val) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use rand::Rng; - - #[test] - fn test_encode_decode() { - let mut rng = rand::thread_rng(); - for _i in 0..10 { - let key: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); - let val: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); - let e = Encoder::encode_event(&key, &val); - let mut event = vec![]; - for s in e { - event.extend_from_slice(s.as_ref()); - } - let (decoded_key, decoded_val) = Encoder::decode_event(&event); - assert_eq!(key, decoded_key); - assert_eq!(val, decoded_val); - } - } -} diff --git a/components/br-stream/src/lib.rs b/components/br-stream/src/lib.rs index 017c434a7e9..4471ea256bd 100644 --- a/components/br-stream/src/lib.rs +++ b/components/br-stream/src/lib.rs @@ -1,6 +1,5 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. #![feature(inherent_ascii_escape)] -mod codec; pub mod config; mod endpoint; pub mod errors; diff --git a/components/br-stream/src/router.rs b/components/br-stream/src/router.rs index 939fc49d333..cd1b24f228d 100644 --- a/components/br-stream/src/router.rs +++ b/components/br-stream/src/router.rs @@ -13,7 +13,6 @@ use std::{ use crate::{ annotate, - codec::Encoder, endpoint::Task, errors::Error, metadata::StreamTask, @@ -41,7 +40,9 @@ use slog_global::debug; use tidb_query_datatype::codec::table::decode_table_id; use tikv_util::{ - box_err, defer, error, info, + box_err, + codec::stream_event::EventEncoder, + defer, info, error, time::{Instant, Limiter}, warn, worker::Scheduler, @@ -825,7 +826,7 @@ impl DataFile { async fn on_event(&mut self, mut kv: ApplyEvent) -> Result { let now = Instant::now_coarse(); let _entry_size = kv.size(); - let encoded = Encoder::encode_event(&kv.key, &kv.value); + let encoded = EventEncoder::encode_event(&kv.key, &kv.value); let mut size = 0; for slice in encoded { let slice = slice.as_ref(); diff --git a/components/sst_importer/src/import_file.rs b/components/sst_importer/src/import_file.rs index 2769a7a0eed..240a0315df2 100644 --- a/components/sst_importer/src/import_file.rs +++ b/components/sst_importer/src/import_file.rs @@ -219,8 +219,7 @@ impl ImportDir { }) } - pub fn join(&self, meta: &SstMeta) -> Result { - let file_name = sst_meta_to_path(meta)?; + pub fn get_import_path(&self, file_name: &str) -> Result { let save_path = self.root_dir.join(&file_name); let temp_path = self.temp_dir.join(&file_name); let clone_path = self.clone_dir.join(&file_name); @@ -231,6 +230,11 @@ impl ImportDir { }) } + pub fn join(&self, meta: &SstMeta) -> Result { + let file_name = sst_meta_to_path(meta)?; + self.get_import_path(file_name.to_str().unwrap()) + } + pub fn create( &self, meta: &SstMeta, diff --git a/components/sst_importer/src/metrics.rs b/components/sst_importer/src/metrics.rs index 7c3238cfc53..95768a9abac 100644 --- a/components/sst_importer/src/metrics.rs +++ b/components/sst_importer/src/metrics.rs @@ -83,4 +83,12 @@ lazy_static! { &["type", "error"] ) .unwrap(); + pub static ref IMPORTER_APPLY_DURATION: HistogramVec = register_histogram_vec!( + "tikv_import_apply_duration", + "Bucketed histogram of importer apply duration", + &["type"], + // Start from 10ms. + exponential_buckets(0.01, 2.0, 20).unwrap() + ) + .unwrap(); } diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 56097135309..617d78de8f7 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -2,6 +2,8 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::fs::File; +use std::io::{prelude::*, BufReader}; use std::ops::Bound; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -19,7 +21,10 @@ use engine_traits::{ }; use file_system::{get_io_rate_limiter, OpenOptions}; use kvproto::kvrpcpb::ApiVersion; -use tikv_util::time::{Instant, Limiter}; +use tikv_util::{ + codec::stream_event::{EventIterator, Iterator as EIterator}, + time::{Instant, Limiter}, +}; use txn_types::{Key, TimeStamp, WriteRef}; use crate::import_file::{ImportDir, ImportFile}; @@ -127,6 +132,41 @@ impl SSTImporter { self.dir.exist(meta).unwrap_or(false) } + // Donwloads and apply a KV file from an external storage. + pub fn apply( + &self, + backend: &StorageBackend, + name: &str, + rewrite_rule: &RewriteRule, + cf: &str, + speed_limiter: Limiter, + engine: E, + ) -> Result> { + debug!("apply start"; + "url" => ?backend, + "name" => name, + "cf" => cf, + "rewrite_rule" => ?rewrite_rule, + ); + match self.do_download_and_apply::( + backend, + name, + rewrite_rule, + cf, + &speed_limiter, + engine, + ) { + Ok(r) => { + info!("apply"; "name" => name, "range" => ?r); + Ok(r) + } + Err(e) => { + error!(%e; "apply failed"; "name" => name,); + Err(e) + } + } + } + // Downloads an SST file from an external storage. // // This method is blocking. It performs the following transformations before @@ -166,7 +206,7 @@ impl SSTImporter { name, rewrite_rule, crypter, - speed_limiter, + &speed_limiter, engine, ) { Ok(r) => { @@ -192,6 +232,168 @@ impl SSTImporter { self.switcher.get_mode() } + fn download_file_from_external_storage( + &self, + file_length: u64, + src_file_name: &str, + dst_file: std::path::PathBuf, + backend: &StorageBackend, + file_crypter: Option, + speed_limiter: &Limiter, + ) -> Result<()> { + let start_read = Instant::now(); + // prepare to download the file from the external_storage + // TODO: pass a config to support hdfs + let ext_storage = external_storage_export::create_storage(backend, Default::default())?; + let url = ext_storage.url()?.to_string(); + + let ext_storage: Box = + if let Some(key_manager) = &self.key_manager { + Box::new(external_storage_export::EncryptedExternalStorage { + key_manager: (*key_manager).clone(), + storage: ext_storage, + }) as _ + } else { + ext_storage as _ + }; + + let result = ext_storage.restore( + src_file_name, + dst_file.clone(), + file_length, + speed_limiter, + file_crypter, + ); + IMPORTER_DOWNLOAD_BYTES.observe(file_length as _); + result.map_err(|e| Error::CannotReadExternalStorage { + url: url.to_string(), + name: src_file_name.to_owned(), + local_path: dst_file.clone(), + err: e, + })?; + + OpenOptions::new() + .append(true) + .open(dst_file)? + .sync_data()?; + + IMPORTER_DOWNLOAD_DURATION + .with_label_values(&["read"]) + .observe(start_read.saturating_elapsed().as_secs_f64()); + + debug!("downloaded file succeed"; + "name" => src_file_name, + "url" => %url, + ); + Ok(()) + } + + fn do_download_and_apply( + &self, + backend: &StorageBackend, + name: &str, + rewrite_rule: &RewriteRule, + cf: &str, + speed_limiter: &Limiter, + engine: E, + ) -> Result> { + let path = self.dir.get_import_path(name)?; + let start = Instant::now(); + self.download_file_from_external_storage( + // don't check file length after download file for now. + 0, + name, + path.temp.clone(), + backend, + // don't support encrypt for now. + None, + speed_limiter, + )?; + info!("download file finished {}", name); + + IMPORTER_APPLY_DURATION + .with_label_values(&["download"]) + .observe(start.saturating_elapsed().as_secs_f64()); + + // iterator `path.temp` file and performs rewrites and apply. + let file = File::open(path.temp)?; + let mut reader = BufReader::new(file); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer)?; + + let mut event_iter = EventIterator::new(buffer); + + let old_prefix = rewrite_rule.get_old_key_prefix(); + let new_prefix = rewrite_rule.get_new_key_prefix(); + + let perform_rewrite = old_prefix != new_prefix; + + // perform iteration and key rewrite. + let mut key = keys::data_key(new_prefix); + let new_prefix_data_key_len = key.len(); + let mut smallest_key = None; + let mut largest_key = None; + + let start = Instant::now(); + loop { + if !event_iter.valid() { + break; + } + event_iter.next()?; + let iter_key = event_iter.key().to_vec(); + + smallest_key = smallest_key.map_or_else(|| Some(iter_key.clone()), |v: Vec| { + Some(v.min(iter_key.clone())) + }); + + largest_key = largest_key.map_or_else(|| Some(iter_key.clone()), |v: Vec| { + Some(v.max(iter_key.clone())) + }); + + if perform_rewrite { + let old_key = event_iter.key(); + + if !old_key.starts_with(old_prefix) { + return Err(Error::WrongKeyPrefix { + what: "Key in file", + key: old_key.to_vec(), + prefix: old_prefix.to_vec(), + }); + } + key.truncate(new_prefix_data_key_len); + key.extend_from_slice(&old_key[old_prefix.len()..]); + + debug!( + "perform rewrite new key: {:?}, new key prefix: {:?}, old key prefix: {:?}", + log_wrappers::Value::key(keys::origin_key(&key)), + log_wrappers::Value::key(new_prefix), + log_wrappers::Value::key(old_prefix), + ); + } else { + key = keys::data_key(event_iter.key()); + } + let value = Cow::Borrowed(event_iter.value()); + // TODO handle delete cf + engine.put_cf(cf, &key, &value)?; + } + engine.flush_cf(cf, true)?; + let label = if perform_rewrite { "rewrite" } else { "normal" }; + info!("apply file finished {}", name); + IMPORTER_APPLY_DURATION + .with_label_values(&[label]) + .observe(start.saturating_elapsed().as_secs_f64()); + + match (smallest_key, largest_key) { + (Some(sk), Some(lk)) => { + let mut final_range = Range::default(); + final_range.set_start(sk); + final_range.set_end(lk); + Ok(Some(final_range)) + } + _ => Ok(None), + } + } + fn do_download( &self, meta: &SstMeta, @@ -199,73 +401,37 @@ impl SSTImporter { name: &str, rewrite_rule: &RewriteRule, crypter: Option, - speed_limiter: Limiter, + speed_limiter: &Limiter, engine: E, ) -> Result> { let path = self.dir.join(meta)?; - let url = { - let start_read = Instant::now(); - - // prepare to download the file from the external_storage - // TODO: pass a config to support hdfs - let ext_storage = external_storage_export::create_storage(backend, Default::default())?; - let url = ext_storage.url()?.to_string(); - - let ext_storage: Box = - if let Some(key_manager) = &self.key_manager { - Box::new(external_storage_export::EncryptedExternalStorage { - key_manager: (*key_manager).clone(), - storage: ext_storage, - }) as _ - } else { - ext_storage as _ - }; - - let file_crypter = crypter.map(|c| FileEncryptionInfo { - method: encryption_method_to_db_encryption_method(c.cipher_type), - key: c.cipher_key, - iv: meta.cipher_iv.to_owned(), - }); - let result = ext_storage.restore( - name, - path.temp.to_owned(), - meta.length, - &speed_limiter, - file_crypter, - ); - IMPORTER_DOWNLOAD_BYTES.observe(meta.length as _); - result.map_err(|e| Error::CannotReadExternalStorage { - url: url.to_string(), - name: name.to_owned(), - local_path: path.temp.to_owned(), - err: e, - })?; - - OpenOptions::new() - .append(true) - .open(&path.temp)? - .sync_data()?; - - IMPORTER_DOWNLOAD_DURATION - .with_label_values(&["read"]) - .observe(start_read.saturating_elapsed().as_secs_f64()); + let file_crypter = crypter.map(|c| FileEncryptionInfo { + method: encryption_method_to_db_encryption_method(c.cipher_type), + key: c.cipher_key, + iv: meta.cipher_iv.to_owned(), + }); - url - }; + self.download_file_from_external_storage( + meta.length, + name, + path.temp.clone(), + backend, + file_crypter, + speed_limiter, + )?; // now validate the SST file. - let path_str = path.temp.to_str().unwrap(); let env = get_env(self.key_manager.clone(), get_io_rate_limiter())?; // Use abstracted SstReader after Env is abstracted. - let sst_reader = RocksSstReader::open_with_env(path_str, Some(env))?; + let dst_file_name = path.temp.to_str().unwrap(); + let sst_reader = RocksSstReader::open_with_env(dst_file_name, Some(env))?; sst_reader.verify_checksum()?; debug!("downloaded file and verified"; "meta" => ?meta, - "url" => %url, "name" => name, - "path" => path_str, + "path" => dst_file_name, ); // undo key rewrite so we could compare with the keys inside SST diff --git a/components/tikv_util/src/codec/mod.rs b/components/tikv_util/src/codec/mod.rs index c89bfc11554..fa0ec4d7d16 100644 --- a/components/tikv_util/src/codec/mod.rs +++ b/components/tikv_util/src/codec/mod.rs @@ -2,6 +2,7 @@ pub mod bytes; pub mod number; +pub mod stream_event; use std::io::{self, ErrorKind}; diff --git a/components/tikv_util/src/codec/stream_event.rs b/components/tikv_util/src/codec/stream_event.rs new file mode 100644 index 00000000000..e5c9dab2d34 --- /dev/null +++ b/components/tikv_util/src/codec/stream_event.rs @@ -0,0 +1,152 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use crate::{codec::Result, Either}; +use bytes::{Buf, Bytes}; +use std::io::prelude::*; +use std::io::Cursor; + +pub trait Iterator { + fn next(&mut self) -> Result<()>; + + fn valid(&self) -> bool; + + fn key(&self) -> &[u8]; + + fn value(&self) -> &[u8]; +} + +pub struct EventIterator { + buf: Cursor>, + index: usize, + len: usize, + key: Vec, + val: Vec, +} + +impl EventIterator { + pub fn new(buf: Vec) -> EventIterator { + let len = buf.len(); + EventIterator { + buf: Cursor::new(buf), + index: 0, + len, + key: vec![], + val: vec![], + } + } +} + +impl Iterator for EventIterator { + fn next(&mut self) -> Result<()> { + if self.valid() { + let len = self.buf.get_u32_le() as usize; + self.index += 4; + self.key.resize(len, 0); + self.buf.read_exact(self.key.as_mut_slice())?; + self.index += len; + + let len = self.buf.get_u32_le() as usize; + self.index += 4; + self.val.resize(len, 0); + self.buf.read_exact(self.val.as_mut_slice())?; + self.index += len; + } + Ok(()) + } + + fn valid(&self) -> bool { + self.index < self.len + } + + fn key(&self) -> &[u8] { + &self.key + } + + fn value(&self) -> &[u8] { + &self.val + } +} + +#[derive(Clone)] +pub struct EventEncoder; + +impl EventEncoder { + pub fn encode_event<'e>(key: &'e [u8], value: &'e [u8]) -> [impl AsRef<[u8]> + 'e; 4] { + let key_len = (key.len() as u32).to_le_bytes(); + let val_len = (value.len() as u32).to_le_bytes(); + [ + Either::Left(key_len), + Either::Right(key), + Either::Left(val_len), + Either::Right(value), + ] + } + + #[allow(dead_code)] + fn decode_event(e: &[u8]) -> (Vec, Vec) { + let mut buf = Cursor::new(Bytes::from(e.to_vec())); + let len = buf.get_u32_le() as usize; + let mut key = vec![0; len]; + buf.read_exact(key.as_mut_slice()).unwrap(); + let len = buf.get_u32_le() as usize; + let mut val = vec![0; len]; + buf.read_exact(val.as_mut_slice()).unwrap(); + (key, val) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::Rng; + + #[test] + fn test_encode_decode() { + let mut rng = rand::thread_rng(); + for _i in 0..10 { + let key: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); + let val: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); + let e = EventEncoder::encode_event(&key, &val); + let mut event = vec![]; + for s in e { + event.extend_from_slice(s.as_ref()); + } + let (decoded_key, decoded_val) = EventEncoder::decode_event(&event); + assert_eq!(key, decoded_key); + assert_eq!(val, decoded_val); + } + } + + #[test] + fn test_decode_events() { + let mut rng = rand::thread_rng(); + let mut event = vec![]; + let mut keys = vec![]; + let mut vals = vec![]; + let count = 20; + + for _i in 0..count { + let key: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); + let val: Vec = (0..100).map(|_| rng.gen_range(0..255)).collect(); + let e = EventEncoder::encode_event(&key, &val); + for s in e { + event.extend_from_slice(s.as_ref()); + } + keys.push(key); + vals.push(val); + } + + let mut iter = EventIterator::new(event); + + let mut index = 0_usize; + loop { + if !iter.valid() { + break; + } + iter.next().unwrap(); + assert_eq!(iter.key(), keys[index]); + assert_eq!(iter.value(), vals[index]); + index += 1; + } + assert_eq!(count, index); + } +} diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index 22b8d8d5898..74fe478f2ab 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -368,6 +368,45 @@ where self.threads.spawn_ok(handle_task); } + // Downloads KV file and performs key-rewrite then apply kv into this tikv store. + fn apply(&mut self, _ctx: RpcContext<'_>, req: ApplyRequest, sink: UnarySink) { + let label = "apply"; + let timer = Instant::now_coarse(); + let importer = Arc::clone(&self.importer); + let engine = self.engine.clone(); + let limiter = self.limiter.clone(); + let start = Instant::now(); + + let handle_task = async move { + // Records how long the apply task waits to be scheduled. + sst_importer::metrics::IMPORTER_APPLY_DURATION + .with_label_values(&["queue"]) + .observe(start.saturating_elapsed().as_secs_f64()); + + let res = importer.apply::( + req.get_storage_backend(), + req.get_name(), + req.get_rewrite_rule(), + req.get_cf(), + limiter, + engine, + ); + let mut resp = ApplyResponse::default(); + match res { + Ok(range) => { + if let Some(r) = range { + resp.set_range(r); + } + } + Err(e) => resp.set_error(e.into()), + } + let resp = Ok(resp); + crate::send_rpc_response!(resp, sink, label, timer); + }; + + self.threads.spawn_ok(handle_task); + } + /// Downloads the file and performs key-rewrite for later ingesting. fn download( &mut self,