Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: support checkSum during stream restore dml kv-events #35

Merged
merged 2 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion components/backup-stream/src/metadata/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use kvproto::brpb::StreamBackupTaskInfo;
use tikv_util::{defer, time::Instant, warn};
use tokio_stream::StreamExt;


use crate::errors::{Error, Result};

/// Some operations over stream backup metadata key space.
Expand Down
2 changes: 1 addition & 1 deletion components/backup-stream/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ mod tests {
use raft::StateRole;
use raftstore::coprocessor::{
Cmd, CmdBatch, CmdObserveInfo, CmdObserver, ObserveHandle, ObserveLevel, ObserverContext,
RegionChangeEvent, RegionChangeObserver, RoleObserver, RoleChange,
RegionChangeEvent, RegionChangeObserver, RoleChange, RoleObserver,
};

use tikv_util::worker::dummy_scheduler;
Expand Down
2 changes: 1 addition & 1 deletion components/backup-stream/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,7 @@ impl DataFile {
self.set_storage_path(file_key.file_name(self.min_ts, self.max_ts));

let mut meta = DataFileInfo::new();
meta.set_sha_256(
meta.set_sha256(
self.sha256
.finish()
.map(|bytes| bytes.to_vec())
Expand Down
1 change: 1 addition & 0 deletions components/external_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ tokio = { version = "1.5", features = ["time", "fs", "process"] }
tokio-util = { version = "0.6", features = ["compat"] }
url = "2.0"
async-trait = "0.1"
openssl = "0.10"

[dev-dependencies]
structopt = "0.3"
Expand Down
2 changes: 2 additions & 0 deletions components/external_storage/export/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ impl ExternalStorage for EncryptedExternalStorage {
storage_name: &str,
restore_name: std::path::PathBuf,
expected_length: u64,
expected_sha256: Option<Vec<u8>>,
speed_limiter: &Limiter,
file_crypter: Option<FileEncryptionInfo>,
) -> io::Result<()> {
Expand All @@ -339,6 +340,7 @@ impl ExternalStorage for EncryptedExternalStorage {
file_writer,
speed_limiter,
expected_length,
expected_sha256,
min_read_speed,
))
}
Expand Down
1 change: 1 addition & 0 deletions components/external_storage/export/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub async fn restore_inner(
output,
&limiter,
expected_length,
None,
MINIMUM_READ_SPEED,
)
.await;
Expand Down
43 changes: 41 additions & 2 deletions components/external_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use engine_traits::FileEncryptionInfo;
use file_system::File;
use futures_io::AsyncRead;
use futures_util::AsyncReadExt;
use openssl::hash::{Hasher, MessageDigest};
use tikv_util::stream::{block_on_external_io, READ_BUF_SIZE};
use tikv_util::time::{Instant, Limiter};
use tokio::time::timeout;
Expand Down Expand Up @@ -78,6 +79,7 @@ pub trait ExternalStorage: 'static + Send + Sync {
storage_name: &str,
restore_name: std::path::PathBuf,
expected_length: u64,
expected_sha256: Option<Vec<u8>>,
speed_limiter: &Limiter,
file_crypter: Option<FileEncryptionInfo>,
) -> io::Result<()> {
Expand All @@ -100,6 +102,7 @@ pub trait ExternalStorage: 'static + Send + Sync {
output,
speed_limiter,
expected_length,
expected_sha256,
min_read_speed,
))
}
Expand Down Expand Up @@ -143,8 +146,8 @@ impl ExternalStorage for Box<dyn ExternalStorage> {
}
}

// Wrap the reader with file_crypter
// Return the reader directly if file_crypter is None
/// Wrap the reader with file_crypter.
/// Return the reader directly if file_crypter is None.
pub fn encrypt_wrap_reader<'a>(
file_crypter: Option<FileEncryptionInfo>,
reader: Box<dyn AsyncRead + Unpin + 'a>,
Expand All @@ -167,13 +170,20 @@ pub async fn read_external_storage_into_file(
output: &mut dyn Write,
speed_limiter: &Limiter,
expected_length: u64,
expected_sha256: Option<Vec<u8>>,
min_read_speed: usize,
) -> io::Result<()> {
let dur = Duration::from_secs((READ_BUF_SIZE / min_read_speed) as u64);

// do the I/O copy from external_storage to the local file.
let mut buffer = vec![0u8; READ_BUF_SIZE];
let mut file_length = 0;
let mut hasher = Hasher::new(MessageDigest::sha256()).map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!("openssl hasher failed to init: {}", err),
)
})?;

loop {
// separate the speed limiting from actual reading so it won't
Expand All @@ -186,6 +196,14 @@ pub async fn read_external_storage_into_file(
}
speed_limiter.consume(bytes_read).await;
output.write_all(&buffer[..bytes_read])?;
if expected_sha256.is_some() {
hasher.update(&buffer[..bytes_read]).map_err(|err| {
io::Error::new(
io::ErrorKind::Other,
format!("openssl hasher udpate failed: {}", err),
)
})?;
}
file_length += bytes_read as u64;
}

Expand All @@ -199,5 +217,26 @@ pub async fn read_external_storage_into_file(
));
}

if let Some(expected_s) = expected_sha256 {
let cal_sha256 = hasher.finish().map_or_else(
|err| {
Err(io::Error::new(
io::ErrorKind::Other,
format!("openssl hasher finish failed: {}", err),
))
},
|bytes| Ok(bytes.to_vec()),
)?;
if !expected_s.eq(&cal_sha256) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !expected_s.eq(&cal_sha256) {
if expected_s != cal_sha256 {

Copy link
Collaborator Author

@joccau joccau Mar 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

!x.eq(&y) equals x != y

return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"sha256 not match, expect: {:?}, calculate: {:?}",
expected_s, cal_sha256,
),
));
}
}

Ok(())
}
2 changes: 1 addition & 1 deletion components/pd_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub use self::util::{merge_bucket_stats, new_bucket_stats};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::ops::Deref;
use std::time::Duration;
use std::sync::Arc;
use std::time::Duration;

use futures::future::BoxFuture;
use grpcio::ClientSStreamReceiver;
Expand Down
1 change: 1 addition & 0 deletions components/sst_importer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tikv_util = { path = "../tikv_util", default-features = false }
tokio = { version = "1.5", features = ["time", "rt-multi-thread", "macros"] }
txn_types = { path = "../txn_types", default-features = false }
uuid = { version = "0.8.1", features = ["serde", "v4"] }
openssl = "0.10"

[dev-dependencies]
tempfile = "3.0"
Expand Down
15 changes: 15 additions & 0 deletions components/sst_importer/src/sst_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ impl SSTImporter {
src_file_name: &str,
dst_file: std::path::PathBuf,
backend: &StorageBackend,
expect_sha256: Option<Vec<u8>>,
file_crypter: Option<FileEncryptionInfo>,
speed_limiter: &Limiter,
) -> Result<()> {
Expand All @@ -235,6 +236,7 @@ impl SSTImporter {
src_file_name,
dst_file.clone(),
file_length,
expect_sha256,
speed_limiter,
file_crypter,
);
Expand Down Expand Up @@ -271,12 +273,16 @@ impl SSTImporter {
let name = meta.get_name();
let path = self.dir.get_import_path(name)?;
let start = Instant::now();
let sha256 = meta.get_sha256().to_vec();
let expected_sha256 = if sha256.len() > 0 { Some(sha256) } else { None };

self.download_file_from_external_storage(
// don't check file length after download file for now.
meta.get_length(),
name,
path.temp.clone(),
backend,
expected_sha256,
// don't support encrypt for now.
None,
speed_limiter,
Expand Down Expand Up @@ -416,6 +422,7 @@ impl SSTImporter {
name,
path.temp.clone(),
backend,
None,
file_crypter,
speed_limiter,
)?;
Expand Down Expand Up @@ -709,6 +716,7 @@ mod tests {
SeekKey, SstReader, SstWriter, CF_DEFAULT, DATA_CFS,
};
use file_system::File;
use openssl::hash::{Hasher, MessageDigest};
use tempfile::Builder;
use test_sst_importer::*;
use test_util::new_test_key_manager;
Expand Down Expand Up @@ -1062,11 +1070,17 @@ mod tests {
let mut input = data;
let mut output = Vec::new();
let input_len = input.len() as u64;

let mut hasher = Hasher::new(MessageDigest::sha256()).unwrap();
hasher.update(data);
let hash256 = hasher.finish().unwrap().to_vec();

block_on_external_io(external_storage_export::read_external_storage_into_file(
&mut input,
&mut output,
&Limiter::new(f64::INFINITY),
input_len,
Some(hash256),
8192,
))
.unwrap();
Expand All @@ -1084,6 +1098,7 @@ mod tests {
&mut output,
&Limiter::new(f64::INFINITY),
0,
None,
usize::MAX,
))
.unwrap_err();
Expand Down
2 changes: 1 addition & 1 deletion src/import/sst_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ where
import_err
.set_message("failed to complete raft command".to_string());
// FIXME: if there are many errors, we may lose some of them here.
import_err
import_err
.set_store_error(err.clone());
warn!("failed to apply the file to the store"; "error" => ?err, "file" => %meta.get_name());
resp.set_error(import_err);
Expand Down