From 639a8ea93fde2c84457e4ab1de57e3c61e1779ad Mon Sep 17 00:00:00 2001 From: joccau Date: Mon, 21 Mar 2022 19:08:42 +0800 Subject: [PATCH 1/2] support checksum in file level for br-stream Signed-off-by: joccau --- Cargo.lock | 1 + .../backup-stream/src/metadata/client.rs | 1 - components/external_storage/Cargo.toml | 1 + .../external_storage/export/src/export.rs | 2 + components/external_storage/src/lib.rs | 43 ++++++++++++++++++- components/sst_importer/src/sst_importer.rs | 4 ++ 6 files changed, 49 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d332f2f85..33d905423f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1590,6 +1590,7 @@ dependencies = [ "lazy_static", "libloading", "matches", + "openssl", "prometheus", "protobuf", "rand 0.8.3", diff --git a/components/backup-stream/src/metadata/client.rs b/components/backup-stream/src/metadata/client.rs index 5cefef8067..218deaabb4 100644 --- a/components/backup-stream/src/metadata/client.rs +++ b/components/backup-stream/src/metadata/client.rs @@ -13,7 +13,6 @@ use kvproto::brpb::StreamBackupTaskInfo; use tikv_util::{defer, time::Instant, warn}; use tokio_stream::StreamExt; - use crate::errors::{Error, Result}; /// Some operations over stream backup metadata key space. diff --git a/components/external_storage/Cargo.toml b/components/external_storage/Cargo.toml index 4568323d8e..6306dab306 100644 --- a/components/external_storage/Cargo.toml +++ b/components/external_storage/Cargo.toml @@ -43,6 +43,7 @@ tokio = { version = "1.5", features = ["time", "fs", "process"] } tokio-util = { version = "0.6", features = ["compat"] } url = "2.0" async-trait = "0.1" +openssl = "0.10" [dev-dependencies] structopt = "0.3" diff --git a/components/external_storage/export/src/export.rs b/components/external_storage/export/src/export.rs index 8605e6ef96..57291c78d0 100644 --- a/components/external_storage/export/src/export.rs +++ b/components/external_storage/export/src/export.rs @@ -325,6 +325,7 @@ impl ExternalStorage for EncryptedExternalStorage { storage_name: &str, restore_name: std::path::PathBuf, expected_length: u64, + expected_sha256: Option>, speed_limiter: &Limiter, file_crypter: Option, ) -> io::Result<()> { @@ -339,6 +340,7 @@ impl ExternalStorage for EncryptedExternalStorage { file_writer, speed_limiter, expected_length, + expected_sha256, min_read_speed, )) } diff --git a/components/external_storage/src/lib.rs b/components/external_storage/src/lib.rs index f9125edd4d..94b4714458 100644 --- a/components/external_storage/src/lib.rs +++ b/components/external_storage/src/lib.rs @@ -20,6 +20,7 @@ use engine_traits::FileEncryptionInfo; use file_system::File; use futures_io::AsyncRead; use futures_util::AsyncReadExt; +use openssl::hash::{Hasher, MessageDigest}; use tikv_util::stream::{block_on_external_io, READ_BUF_SIZE}; use tikv_util::time::{Instant, Limiter}; use tokio::time::timeout; @@ -78,6 +79,7 @@ pub trait ExternalStorage: 'static + Send + Sync { storage_name: &str, restore_name: std::path::PathBuf, expected_length: u64, + expected_sha256: Option>, speed_limiter: &Limiter, file_crypter: Option, ) -> io::Result<()> { @@ -100,6 +102,7 @@ pub trait ExternalStorage: 'static + Send + Sync { output, speed_limiter, expected_length, + expected_sha256, min_read_speed, )) } @@ -143,8 +146,8 @@ impl ExternalStorage for Box { } } -// Wrap the reader with file_crypter -// Return the reader directly if file_crypter is None +/// Wrap the reader with file_crypter. +/// Return the reader directly if file_crypter is None. pub fn encrypt_wrap_reader<'a>( file_crypter: Option, reader: Box, @@ -167,6 +170,7 @@ pub async fn read_external_storage_into_file( output: &mut dyn Write, speed_limiter: &Limiter, expected_length: u64, + expected_sha256: Option>, min_read_speed: usize, ) -> io::Result<()> { let dur = Duration::from_secs((READ_BUF_SIZE / min_read_speed) as u64); @@ -174,6 +178,12 @@ pub async fn read_external_storage_into_file( // do the I/O copy from external_storage to the local file. let mut buffer = vec![0u8; READ_BUF_SIZE]; let mut file_length = 0; + let mut hasher = Hasher::new(MessageDigest::sha256()).map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("openssl hasher failed to init: {}", err), + ) + })?; loop { // separate the speed limiting from actual reading so it won't @@ -186,6 +196,14 @@ pub async fn read_external_storage_into_file( } speed_limiter.consume(bytes_read).await; output.write_all(&buffer[..bytes_read])?; + if expected_sha256.is_some() { + hasher.update(&buffer[..bytes_read]).map_err(|err| { + io::Error::new( + io::ErrorKind::Other, + format!("openssl hasher udpate failed: {}", err), + ) + })?; + } file_length += bytes_read as u64; } @@ -199,5 +217,26 @@ pub async fn read_external_storage_into_file( )); } + if let Some(expected_s) = expected_sha256 { + let cal_sha256 = hasher.finish().map_or_else( + |err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("openssl hasher finish failed: {}", err), + )) + }, + |bytes| Ok(bytes.to_vec()), + )?; + if !expected_s.eq(&cal_sha256) { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "sha256 not match, expect: {:?}, calculate: {:?}", + expected_s, cal_sha256, + ), + )); + } + } + Ok(()) } diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 446c84c7d7..52ab389d64 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -212,6 +212,7 @@ impl SSTImporter { src_file_name: &str, dst_file: std::path::PathBuf, backend: &StorageBackend, + expect_256: Option>, file_crypter: Option, speed_limiter: &Limiter, ) -> Result<()> { @@ -235,6 +236,7 @@ impl SSTImporter { src_file_name, dst_file.clone(), file_length, + expect_256, speed_limiter, file_crypter, ); @@ -277,6 +279,7 @@ impl SSTImporter { name, path.temp.clone(), backend, + Some(meta.get_sha256().to_vec()), // don't support encrypt for now. None, speed_limiter, @@ -416,6 +419,7 @@ impl SSTImporter { name, path.temp.clone(), backend, + None, file_crypter, speed_limiter, )?; From 55c743add8fa9567816bd5e98dacf646e580fbb5 Mon Sep 17 00:00:00 2001 From: joccau Date: Wed, 23 Mar 2022 17:19:56 +0800 Subject: [PATCH 2/2] update kvproto package and maintain testcase. Signed-off-by: joccau --- Cargo.lock | 3 ++- components/backup-stream/src/observer.rs | 2 +- components/backup-stream/src/router.rs | 2 +- .../external_storage/export/src/request.rs | 1 + components/pd_client/src/lib.rs | 2 +- components/sst_importer/Cargo.toml | 1 + components/sst_importer/src/sst_importer.rs | 17 ++++++++++++++--- src/import/sst_service.rs | 2 +- 8 files changed, 22 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33d905423f..a012f64681 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2551,7 +2551,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git?branch=br-stream#72f70645bb04ae590d279080a5794eba56a24d54" +source = "git+https://github.com/pingcap/kvproto.git?branch=br-stream#b87187a88066427469e0e5bb82994d5b74c39ed2" dependencies = [ "futures 0.3.15", "grpcio", @@ -5021,6 +5021,7 @@ dependencies = [ "kvproto", "lazy_static", "log_wrappers", + "openssl", "prometheus", "serde", "serde_derive", diff --git a/components/backup-stream/src/observer.rs b/components/backup-stream/src/observer.rs index 41b6c875a3..6387614124 100644 --- a/components/backup-stream/src/observer.rs +++ b/components/backup-stream/src/observer.rs @@ -243,7 +243,7 @@ mod tests { use raft::StateRole; use raftstore::coprocessor::{ Cmd, CmdBatch, CmdObserveInfo, CmdObserver, ObserveHandle, ObserveLevel, ObserverContext, - RegionChangeEvent, RegionChangeObserver, RoleObserver, RoleChange, + RegionChangeEvent, RegionChangeObserver, RoleChange, RoleObserver, }; use tikv_util::worker::dummy_scheduler; diff --git a/components/backup-stream/src/router.rs b/components/backup-stream/src/router.rs index 5196bc53aa..c22d223d73 100644 --- a/components/backup-stream/src/router.rs +++ b/components/backup-stream/src/router.rs @@ -1019,7 +1019,7 @@ impl DataFile { self.set_storage_path(file_key.file_name(self.min_ts, self.max_ts)); let mut meta = DataFileInfo::new(); - meta.set_sha_256( + meta.set_sha256( self.sha256 .finish() .map(|bytes| bytes.to_vec()) diff --git a/components/external_storage/export/src/request.rs b/components/external_storage/export/src/request.rs index cf6029fbff..0634073f98 100644 --- a/components/external_storage/export/src/request.rs +++ b/components/external_storage/export/src/request.rs @@ -69,6 +69,7 @@ pub async fn restore_inner( output, &limiter, expected_length, + None, MINIMUM_READ_SPEED, ) .await; diff --git a/components/pd_client/src/lib.rs b/components/pd_client/src/lib.rs index 2ddaf65e13..5333d08c0e 100644 --- a/components/pd_client/src/lib.rs +++ b/components/pd_client/src/lib.rs @@ -21,8 +21,8 @@ pub use self::util::{merge_bucket_stats, new_bucket_stats}; use std::cmp::Ordering; use std::collections::HashMap; use std::ops::Deref; -use std::time::Duration; use std::sync::Arc; +use std::time::Duration; use futures::future::BoxFuture; use grpcio::ClientSStreamReceiver; diff --git a/components/sst_importer/Cargo.toml b/components/sst_importer/Cargo.toml index 006837cbe4..1e8e3cb770 100644 --- a/components/sst_importer/Cargo.toml +++ b/components/sst_importer/Cargo.toml @@ -39,6 +39,7 @@ tikv_util = { path = "../tikv_util", default-features = false } tokio = { version = "1.5", features = ["time", "rt-multi-thread", "macros"] } txn_types = { path = "../txn_types", default-features = false } uuid = { version = "0.8.1", features = ["serde", "v4"] } +openssl = "0.10" [dev-dependencies] tempfile = "3.0" diff --git a/components/sst_importer/src/sst_importer.rs b/components/sst_importer/src/sst_importer.rs index 52ab389d64..2b4bec4a9f 100644 --- a/components/sst_importer/src/sst_importer.rs +++ b/components/sst_importer/src/sst_importer.rs @@ -212,7 +212,7 @@ impl SSTImporter { src_file_name: &str, dst_file: std::path::PathBuf, backend: &StorageBackend, - expect_256: Option>, + expect_sha256: Option>, file_crypter: Option, speed_limiter: &Limiter, ) -> Result<()> { @@ -236,7 +236,7 @@ impl SSTImporter { src_file_name, dst_file.clone(), file_length, - expect_256, + expect_sha256, speed_limiter, file_crypter, ); @@ -273,13 +273,16 @@ impl SSTImporter { let name = meta.get_name(); let path = self.dir.get_import_path(name)?; let start = Instant::now(); + let sha256 = meta.get_sha256().to_vec(); + let expected_sha256 = if sha256.len() > 0 { Some(sha256) } else { None }; + self.download_file_from_external_storage( // don't check file length after download file for now. meta.get_length(), name, path.temp.clone(), backend, - Some(meta.get_sha256().to_vec()), + expected_sha256, // don't support encrypt for now. None, speed_limiter, @@ -713,6 +716,7 @@ mod tests { SeekKey, SstReader, SstWriter, CF_DEFAULT, DATA_CFS, }; use file_system::File; + use openssl::hash::{Hasher, MessageDigest}; use tempfile::Builder; use test_sst_importer::*; use test_util::new_test_key_manager; @@ -1066,11 +1070,17 @@ mod tests { let mut input = data; let mut output = Vec::new(); let input_len = input.len() as u64; + + let mut hasher = Hasher::new(MessageDigest::sha256()).unwrap(); + hasher.update(data); + let hash256 = hasher.finish().unwrap().to_vec(); + block_on_external_io(external_storage_export::read_external_storage_into_file( &mut input, &mut output, &Limiter::new(f64::INFINITY), input_len, + Some(hash256), 8192, )) .unwrap(); @@ -1088,6 +1098,7 @@ mod tests { &mut output, &Limiter::new(f64::INFINITY), 0, + None, usize::MAX, )) .unwrap_err(); diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index 0e12568527..a2d4c94b3a 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -526,7 +526,7 @@ where import_err .set_message("failed to complete raft command".to_string()); // FIXME: if there are many errors, we may lose some of them here. - import_err + import_err .set_store_error(err.clone()); warn!("failed to apply the file to the store"; "error" => ?err, "file" => %meta.get_name()); resp.set_error(import_err);