Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine br-stream to backup-stream #34

Merged
merged 2 commits into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 47 additions & 47 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ members = [
"components/error_code",
"components/concurrency_manager",
"components/server",
"components/br-stream",
"components/backup-stream",
"components/file_system",
"components/collections",
"components/coprocessor_plugin_api",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "br-stream"
name = "backup-stream"
version = "0.1.0"
edition = "2018"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use br_stream::{
use backup_stream::{
errors::Result,
metadata::{store::EtcdStore, MetadataClient},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
router: RT,
pd_client: Arc<PDC>,
) -> Endpoint<EtcdStore, R, E, RT, PDC> {
let pool = create_tokio_runtime(config.num_threads, "br-stream")
let pool = create_tokio_runtime(config.num_threads, "backup-stream")
.expect("failed to create tokio runtime for backup stream worker.");

// TODO consider TLS?
Expand All @@ -97,7 +97,7 @@ where
};

let range_router = Router::new(
PathBuf::from(config.streaming_path.clone()),
PathBuf::from(config.temp_path.clone()),
scheduler.clone(),
config.temp_file_size_limit_per_task.0,
);
Expand All @@ -116,7 +116,7 @@ where
pool.spawn(Self::starts_flush_ticks(range_router.clone()));
}

info!("the endpoint of stream backup started"; "path" => %config.streaming_path);
info!("the endpoint of stream backup started"; "path" => %config.temp_path);
Endpoint {
config,
meta_client,
Expand Down Expand Up @@ -395,7 +395,7 @@ where
}
if let Err(err) = pd_cli
.update_service_safe_point(
format!("br-stream-{}-{}", task, store_id),
format!("backup-stream-{}-{}", task, store_id),
TimeStamp::new(rts),
Duration::from_secs(600),
)
Expand Down Expand Up @@ -479,7 +479,7 @@ where
/// Modify observe over some region.
/// This would register the region to the RaftStore.
pub fn on_modify_observe(&self, op: ObserveOp) {
info!("br-stream: on_modify_observe"; "op" => ?op);
info!("backup stream: on_modify_observe"; "op" => ?op);
match op {
ObserveOp::Start {
region,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use engine_traits::{CfName, CF_DEFAULT, CF_LOCK, CF_WRITE};
use external_storage::{BackendConfig, UnpinReader};
use external_storage_export::{create_storage, ExternalStorage};

use futures::io::{AllowStdIo, Cursor};
use futures::io::Cursor;
use kvproto::{
brpb::{DataFileInfo, FileType, Metadata},
raft_cmdpb::CmdType,
Expand Down
2 changes: 1 addition & 1 deletion components/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ nortcheck = ["engine_rocks/nortcheck"]

[dependencies]
backup = { path = "../backup", default-features = false }
br-stream = { path = "../br-stream", default-features = false }
backup-stream = { path = "../backup-stream", default-features = false }
cdc = { path = "../cdc", default-features = false }
chrono = "0.4"
tempfile = "3.0"
Expand Down
10 changes: 5 additions & 5 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::{
u64,
};

use br_stream::config::BackupStreamConfigManager;
use br_stream::observer::BackupStreamObserver;
use backup_stream::config::BackupStreamConfigManager;
use backup_stream::observer::BackupStreamObserver;
use cdc::{CdcConfigManager, MemoryQuota};
use concurrency_manager::ConcurrencyManager;
use encryption_export::{data_key_manager_from_config, DataKeyManager};
Expand Down Expand Up @@ -797,9 +797,9 @@ impl<ER: RaftEngine> TiKVServer<ER> {
);

// Start backup stream
if self.config.backup_stream.enable_streaming {
if self.config.backup_stream.enable {
// Create backup stream.
let mut backup_stream_worker = Box::new(LazyWorker::new("br-stream"));
let mut backup_stream_worker = Box::new(LazyWorker::new("backup-stream"));
let backup_stream_scheduler = backup_stream_worker.scheduler();

// Register backup-stream observer.
Expand All @@ -811,7 +811,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
Box::new(BackupStreamConfigManager(backup_stream_worker.scheduler())),
);

let backup_stream_endpoint = br_stream::Endpoint::new::<String>(
let backup_stream_endpoint = backup_stream::Endpoint::new::<String>(
node.id(),
&self.config.pd.endpoints,
self.config.backup_stream.clone(),
Expand Down
9 changes: 5 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2280,8 +2280,9 @@ impl Default for BackupConfig {
#[serde(rename_all = "kebab-case")]
pub struct BackupStreamConfig {
pub num_threads: usize,
pub enable_streaming: bool,
pub streaming_path: String,
#[online_config(skip)]
pub enable: bool,
pub temp_path: String,
pub temp_file_size_limit_per_task: ReadableSize,
}

Expand All @@ -2300,9 +2301,9 @@ impl Default for BackupStreamConfig {
Self {
// use at most 50% of vCPU by default
num_threads: (cpu_num * 0.5).clamp(1.0, 8.0) as usize,
enable_streaming: false,
enable: false,
// TODO: may be use raft store directory
streaming_path: env::temp_dir().into_os_string().into_string().unwrap(),
temp_path: env::temp_dir().into_os_string().into_string().unwrap(),
temp_file_size_limit_per_task: ReadableSize::mb(128),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/import/sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ where
let mut import_err = kvproto::import_sstpb::Error::default();
let err = r.response.get_header().get_error();
import_err
.set_message(format!("failed to complete raft command"));
.set_message("failed to complete raft command".to_string());
// FIXME: if there are many errors, we may lose some of them here.
import_err
.set_store_error(err.clone());
Expand Down