diff --git a/Cargo.lock b/Cargo.lock index ee42aa9e78c..d91f58b4391 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,6 +433,51 @@ dependencies = [ "yatp", ] +[[package]] +name = "backup-stream" +version = "0.1.0" +dependencies = [ + "async-trait", + "bytes 0.4.12", + "crossbeam", + "crossbeam-channel", + "dashmap 5.0.0", + "engine_panic", + "engine_rocks", + "engine_traits", + "etcd-client", + "external_storage", + "external_storage_export", + "file_system", + "futures 0.3.15", + "hex 0.4.2", + "kvproto", + "lazy_static", + "log_wrappers", + "online_config", + "openssl", + "pd_client", + "prometheus", + "protobuf", + "raft", + "raftstore", + "rand 0.8.3", + "regex", + "resolved_ts", + "slog", + "slog-global", + "thiserror", + "tidb_query_datatype", + "tikv", + "tikv_alloc", + "tikv_util", + "tokio", + "tokio-stream", + "tokio-util 0.7.0", + "txn_types", + "uuid", +] + [[package]] name = "base64" version = "0.13.0" @@ -541,51 +586,6 @@ dependencies = [ "cmake", ] -[[package]] -name = "br-stream" -version = "0.1.0" -dependencies = [ - "async-trait", - "bytes 0.4.12", - "crossbeam", - "crossbeam-channel", - "dashmap 5.0.0", - "engine_panic", - "engine_rocks", - "engine_traits", - "etcd-client", - "external_storage", - "external_storage_export", - "file_system", - "futures 0.3.15", - "hex 0.4.2", - "kvproto", - "lazy_static", - "log_wrappers", - "online_config", - "openssl", - "pd_client", - "prometheus", - "protobuf", - "raft", - "raftstore", - "rand 0.8.3", - "regex", - "resolved_ts", - "slog", - "slog-global", - "thiserror", - "tidb_query_datatype", - "tikv", - "tikv_alloc", - "tikv_util", - "tokio", - "tokio-stream", - "tokio-util 0.7.0", - "txn_types", - "uuid", -] - [[package]] name = "bstr" version = "0.2.8" @@ -2474,7 +2474,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git?branch=br-stream#13a8f820a4e09e1fe471847ca9606e2f378c40a4" +source = "git+https://github.com/pingcap/kvproto.git?branch=br-stream#4f14da8e3dd1269765f82f6c2259458df9804ff0" dependencies = [ "futures 0.3.15", "grpcio", @@ -4751,7 +4751,7 @@ name = "server" version = "0.0.1" dependencies = [ "backup", - "br-stream", + "backup-stream", "cdc", "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index 7a35748e134..7c849efac66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -256,7 +256,7 @@ members = [ "components/error_code", "components/concurrency_manager", "components/server", - "components/br-stream", + "components/backup-stream", "components/file_system", "components/collections", "components/coprocessor_plugin_api", diff --git a/components/br-stream/Cargo.toml b/components/backup-stream/Cargo.toml similarity index 98% rename from components/br-stream/Cargo.toml rename to components/backup-stream/Cargo.toml index 1ae692ddd4f..889614d6da8 100644 --- a/components/br-stream/Cargo.toml +++ b/components/backup-stream/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "br-stream" +name = "backup-stream" version = "0.1.0" edition = "2018" diff --git a/components/br-stream/bin/playground.rs b/components/backup-stream/bin/playground.rs similarity index 98% rename from components/br-stream/bin/playground.rs rename to components/backup-stream/bin/playground.rs index 1ebbee6e6e0..0c00579cc46 100644 --- a/components/br-stream/bin/playground.rs +++ b/components/backup-stream/bin/playground.rs @@ -1,5 +1,5 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use br_stream::{ +use backup_stream::{ errors::Result, metadata::{store::EtcdStore, MetadataClient}, }; diff --git a/components/br-stream/src/config.rs b/components/backup-stream/src/config.rs similarity index 100% rename from components/br-stream/src/config.rs rename to components/backup-stream/src/config.rs diff --git a/components/br-stream/src/endpoint.rs b/components/backup-stream/src/endpoint.rs similarity index 98% rename from components/br-stream/src/endpoint.rs rename to components/backup-stream/src/endpoint.rs index 8837a296501..6262330ebf8 100644 --- a/components/br-stream/src/endpoint.rs +++ b/components/backup-stream/src/endpoint.rs @@ -81,7 +81,7 @@ where router: RT, pd_client: Arc, ) -> Endpoint { - let pool = create_tokio_runtime(config.num_threads, "br-stream") + let pool = create_tokio_runtime(config.num_threads, "backup-stream") .expect("failed to create tokio runtime for backup stream worker."); // TODO consider TLS? @@ -97,7 +97,7 @@ where }; let range_router = Router::new( - PathBuf::from(config.streaming_path.clone()), + PathBuf::from(config.temp_path.clone()), scheduler.clone(), config.temp_file_size_limit_per_task.0, ); @@ -116,7 +116,7 @@ where pool.spawn(Self::starts_flush_ticks(range_router.clone())); } - info!("the endpoint of stream backup started"; "path" => %config.streaming_path); + info!("the endpoint of stream backup started"; "path" => %config.temp_path); Endpoint { config, meta_client, @@ -395,7 +395,7 @@ where } if let Err(err) = pd_cli .update_service_safe_point( - format!("br-stream-{}-{}", task, store_id), + format!("backup-stream-{}-{}", task, store_id), TimeStamp::new(rts), Duration::from_secs(600), ) @@ -479,7 +479,7 @@ where /// Modify observe over some region. /// This would register the region to the RaftStore. pub fn on_modify_observe(&self, op: ObserveOp) { - info!("br-stream: on_modify_observe"; "op" => ?op); + info!("backup stream: on_modify_observe"; "op" => ?op); match op { ObserveOp::Start { region, diff --git a/components/br-stream/src/errors.rs b/components/backup-stream/src/errors.rs similarity index 100% rename from components/br-stream/src/errors.rs rename to components/backup-stream/src/errors.rs diff --git a/components/br-stream/src/event_loader.rs b/components/backup-stream/src/event_loader.rs similarity index 100% rename from components/br-stream/src/event_loader.rs rename to components/backup-stream/src/event_loader.rs diff --git a/components/br-stream/src/lib.rs b/components/backup-stream/src/lib.rs similarity index 100% rename from components/br-stream/src/lib.rs rename to components/backup-stream/src/lib.rs diff --git a/components/br-stream/src/metadata/client.rs b/components/backup-stream/src/metadata/client.rs similarity index 100% rename from components/br-stream/src/metadata/client.rs rename to components/backup-stream/src/metadata/client.rs diff --git a/components/br-stream/src/metadata/keys.rs b/components/backup-stream/src/metadata/keys.rs similarity index 100% rename from components/br-stream/src/metadata/keys.rs rename to components/backup-stream/src/metadata/keys.rs diff --git a/components/br-stream/src/metadata/metrics.rs b/components/backup-stream/src/metadata/metrics.rs similarity index 100% rename from components/br-stream/src/metadata/metrics.rs rename to components/backup-stream/src/metadata/metrics.rs diff --git a/components/br-stream/src/metadata/mod.rs b/components/backup-stream/src/metadata/mod.rs similarity index 100% rename from components/br-stream/src/metadata/mod.rs rename to components/backup-stream/src/metadata/mod.rs diff --git a/components/br-stream/src/metadata/store/etcd.rs b/components/backup-stream/src/metadata/store/etcd.rs similarity index 100% rename from components/br-stream/src/metadata/store/etcd.rs rename to components/backup-stream/src/metadata/store/etcd.rs diff --git a/components/br-stream/src/metadata/store/mod.rs b/components/backup-stream/src/metadata/store/mod.rs similarity index 100% rename from components/br-stream/src/metadata/store/mod.rs rename to components/backup-stream/src/metadata/store/mod.rs diff --git a/components/br-stream/src/metadata/store/slash_etc.rs b/components/backup-stream/src/metadata/store/slash_etc.rs similarity index 100% rename from components/br-stream/src/metadata/store/slash_etc.rs rename to components/backup-stream/src/metadata/store/slash_etc.rs diff --git a/components/br-stream/src/metadata/test.rs b/components/backup-stream/src/metadata/test.rs similarity index 100% rename from components/br-stream/src/metadata/test.rs rename to components/backup-stream/src/metadata/test.rs diff --git a/components/br-stream/src/metrics.rs b/components/backup-stream/src/metrics.rs similarity index 100% rename from components/br-stream/src/metrics.rs rename to components/backup-stream/src/metrics.rs diff --git a/components/br-stream/src/observer.rs b/components/backup-stream/src/observer.rs similarity index 100% rename from components/br-stream/src/observer.rs rename to components/backup-stream/src/observer.rs diff --git a/components/br-stream/src/router.rs b/components/backup-stream/src/router.rs similarity index 99% rename from components/br-stream/src/router.rs rename to components/backup-stream/src/router.rs index f1e30b768d7..bc49cc14242 100644 --- a/components/br-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -28,7 +28,7 @@ use engine_traits::{CfName, CF_DEFAULT, CF_LOCK, CF_WRITE}; use external_storage::{BackendConfig, UnpinReader}; use external_storage_export::{create_storage, ExternalStorage}; -use futures::io::{AllowStdIo, Cursor}; +use futures::io::Cursor; use kvproto::{ brpb::{DataFileInfo, FileType, Metadata}, raft_cmdpb::CmdType, diff --git a/components/br-stream/src/utils.rs b/components/backup-stream/src/utils.rs similarity index 100% rename from components/br-stream/src/utils.rs rename to components/backup-stream/src/utils.rs diff --git a/components/server/Cargo.toml b/components/server/Cargo.toml index ac641d5bd4c..09e128c667d 100644 --- a/components/server/Cargo.toml +++ b/components/server/Cargo.toml @@ -28,7 +28,7 @@ nortcheck = ["engine_rocks/nortcheck"] [dependencies] backup = { path = "../backup", default-features = false } -br-stream = { path = "../br-stream", default-features = false } +backup-stream = { path = "../backup-stream", default-features = false } cdc = { path = "../cdc", default-features = false } chrono = "0.4" tempfile = "3.0" diff --git a/components/server/src/server.rs b/components/server/src/server.rs index de3bcc4b5e0..f689e0a42a8 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -25,8 +25,8 @@ use std::{ u64, }; -use br_stream::config::BackupStreamConfigManager; -use br_stream::observer::BackupStreamObserver; +use backup_stream::config::BackupStreamConfigManager; +use backup_stream::observer::BackupStreamObserver; use cdc::{CdcConfigManager, MemoryQuota}; use concurrency_manager::ConcurrencyManager; use encryption_export::{data_key_manager_from_config, DataKeyManager}; @@ -797,9 +797,9 @@ impl TiKVServer { ); // Start backup stream - if self.config.backup_stream.enable_streaming { + if self.config.backup_stream.enable { // Create backup stream. - let mut backup_stream_worker = Box::new(LazyWorker::new("br-stream")); + let mut backup_stream_worker = Box::new(LazyWorker::new("backup-stream")); let backup_stream_scheduler = backup_stream_worker.scheduler(); // Register backup-stream observer. @@ -811,7 +811,7 @@ impl TiKVServer { Box::new(BackupStreamConfigManager(backup_stream_worker.scheduler())), ); - let backup_stream_endpoint = br_stream::Endpoint::new::( + let backup_stream_endpoint = backup_stream::Endpoint::new::( node.id(), &self.config.pd.endpoints, self.config.backup_stream.clone(), diff --git a/src/config.rs b/src/config.rs index 7eb6baa20d6..de441199f6c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2280,8 +2280,9 @@ impl Default for BackupConfig { #[serde(rename_all = "kebab-case")] pub struct BackupStreamConfig { pub num_threads: usize, - pub enable_streaming: bool, - pub streaming_path: String, + #[online_config(skip)] + pub enable: bool, + pub temp_path: String, pub temp_file_size_limit_per_task: ReadableSize, } @@ -2300,9 +2301,9 @@ impl Default for BackupStreamConfig { Self { // use at most 50% of vCPU by default num_threads: (cpu_num * 0.5).clamp(1.0, 8.0) as usize, - enable_streaming: false, + enable: false, // TODO: may be use raft store directory - streaming_path: env::temp_dir().into_os_string().into_string().unwrap(), + temp_path: env::temp_dir().into_os_string().into_string().unwrap(), temp_file_size_limit_per_task: ReadableSize::mb(128), } } diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index bcaf4448176..307a0d6e303 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -522,7 +522,7 @@ where let mut import_err = kvproto::import_sstpb::Error::default(); let err = r.response.get_header().get_error(); import_err - .set_message(format!("failed to complete raft command")); + .set_message("failed to complete raft command".to_string()); // FIXME: if there are many errors, we may lose some of them here. import_err .set_store_error(err.clone());