Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt error code for endpoint::Error and implement the contextual error #41

Merged
merged 2 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions components/backup-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@ name = "backup-stream"
version = "0.1.0"
edition = "2018"

[[bin]]
name = "playground"
path = "bin/playground.rs"

[features]
default = ["test-engines-rocksdb"]
test-engines-rocksdb = [
Expand Down Expand Up @@ -56,6 +52,7 @@ online_config = { path = "../online_config" }
resolved_ts = { path = "../resolved_ts"}
pd_client = {path = "../pd_client"}
concurrency_manager = { path = "../concurrency_manager"}
error_code = {path = "../error_code"}

[dev-dependencies]
rand = "0.8.0"
Expand Down
40 changes: 0 additions & 40 deletions components/backup-stream/bin/playground.rs

This file was deleted.

12 changes: 12 additions & 0 deletions components/backup-stream/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ where
RT: RaftStoreRouter<E> + 'static,
PDC: PdClient + 'static,
{
fn on_fatal_error(&self, _task: String, _err: Box<Error>) {
// This is a stub.
// TODO: implement the feature of reporting fatal error to the meta storage,
// and pause the task then.
}

async fn starts_flush_ticks(router: Router) {
loop {
// wait 1min to trigger tick
Expand Down Expand Up @@ -664,6 +670,8 @@ pub enum Task {
ModifyObserve(ObserveOp),
/// Convert status of some task into `flushing` and do flush then.
ForceFlush(String),
/// FatalError pauses the task and set the error.
FatalError(String, Box<Error>),
}

#[derive(Debug)]
Expand Down Expand Up @@ -701,6 +709,9 @@ impl fmt::Debug for Task {
Self::Flush(arg0) => f.debug_tuple("Flush").field(arg0).finish(),
Self::ModifyObserve(op) => f.debug_tuple("ModifyObserve").field(op).finish(),
Self::ForceFlush(arg0) => f.debug_tuple("ForceFlush").field(arg0).finish(),
Self::FatalError(task, err) => {
f.debug_tuple("FatalError").field(task).field(err).finish()
}
}
}
}
Expand Down Expand Up @@ -729,6 +740,7 @@ where
Task::Flush(task) => self.on_flush(task, self.store_id),
Task::ModifyObserve(op) => self.on_modify_observe(op),
Task::ForceFlush(task) => self.on_force_flush(task, self.store_id),
Task::FatalError(task, err) => self.on_fatal_error(task, err),
_ => (),
}
}
Expand Down
229 changes: 202 additions & 27 deletions components/backup-stream/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use error_code::ErrorCodeExt;
use etcd_client::Error as EtcdError;
use kvproto::errorpb::Error as StoreError;
use pd_client::Error as PdError;
use protobuf::ProtobufError;
use slog_global::error;
use std::error::Error as StdError;
use std::fmt::Display;
use std::io::Error as IoError;
use std::result::Result as StdResult;
use thiserror::Error as ThisError;
use tikv::storage::txn::Error as TxnError;
use tikv_util::error;
use tikv_util::worker::ScheduleError;

use crate::endpoint::Task;
#[cfg(not(test))]
use crate::metrics;

#[derive(ThisError, Debug)]
Expand All @@ -33,12 +35,81 @@ pub enum Error {
Sched(#[from] ScheduleError<Task>),
#[error("PD client meet error: {0}")]
Pd(#[from] PdError),
#[error("Error During requesting raftstore: {0:?}")]
RaftStore(StoreError),
#[error("{context}: {inner_error}")]
Contextual {
context: String,
inner_error: Box<Self>,
},
#[error("Other Error: {0}")]
Other(#[from] Box<dyn StdError + Send + Sync + 'static>),
}

impl ErrorCodeExt for Error {
fn error_code(&self) -> error_code::ErrorCode {
use error_code::backup_stream::*;
match self {
Error::Etcd(_) => ETCD,
Error::Protobuf(_) => PROTO,
Error::NoSuchTask { .. } => NO_SUCH_TASK,
Error::MalformedMetadata(_) => MALFORMED_META,
Error::Io(_) => IO,
Error::Txn(_) => TXN,
Error::Sched(_) => SCHED,
Error::Pd(_) => PD,
Error::RaftStore(_) => RAFTSTORE,
Error::Contextual { inner_error, .. } => inner_error.error_code(),
Error::Other(_) => OTHER,
}
}
}

impl<'a> ErrorCodeExt for &'a Error {
fn error_code(&self) -> error_code::ErrorCode {
Error::error_code(*self)
}
}

pub type Result<T> = StdResult<T, Error>;

impl From<StoreError> for Error {
fn from(e: StoreError) -> Self {
Self::RaftStore(e)
}
}

pub trait ContextualResultExt<T>
where
Self: Sized,
{
/// attache a context to the error.
fn context(self, context: impl ToString) -> Result<T>;

fn context_with(self, context: impl Fn() -> String) -> Result<T>;
}

impl<T, E> ContextualResultExt<T> for StdResult<T, E>
where
E: Into<Error>,
{
#[inline(always)]
fn context(self, context: impl ToString) -> Result<T> {
self.map_err(|err| Error::Contextual {
context: context.to_string(),
inner_error: Box::new(err.into()),
})
}

#[inline(always)]
fn context_with(self, context: impl Fn() -> String) -> Result<T> {
self.map_err(|err| Error::Contextual {
context: context(),
inner_error: Box::new(err.into()),
})
}
}

/// Like `errors.Annotate` in Go.
/// Wrap an unknown error with [`Error::Other`].
#[macro_export(crate)]
Expand All @@ -53,33 +124,137 @@ macro_rules! annotate {

impl Error {
pub fn report(&self, context: impl Display) {
#[cfg(test)]
println!(
"backup stream meet error (context = {}, err = {})",
context, self
);
#[cfg(not(test))]
{
// TODO: adapt the error_code module, use `tikv_util::error!` to replace this.
error!("backup stream meet error"; "context" => %context, "err" => %self);
metrics::STREAM_ERROR
.with_label_values(&[self.kind()])
.inc()
}
error!(%self; "backup stream meet error"; "context" => %context,);
metrics::STREAM_ERROR
.with_label_values(&[self.kind()])
.inc()
}

#[cfg(not(test))]
fn kind(&self) -> &'static str {
match self {
Error::Etcd(_) => "etcd",
Error::Protobuf(_) => "protobuf",
Error::NoSuchTask { .. } => "no_such_task",
Error::MalformedMetadata(_) => "malformed_metadata",
Error::Io(_) => "io",
Error::Txn(_) => "transaction",
Error::Other(_) => "unknown",
Error::Sched(_) => "schedule",
Error::Pd(_) => "pd",
}
self.error_code().code
}
}

#[cfg(test)]
mod test {
extern crate test;

use super::{ContextualResultExt, Error, Result};
use error_code::ErrorCodeExt;
use std::io::{self, ErrorKind};

#[test]
fn test_contextual_error() {
let err = Error::Io(io::Error::new(
ErrorKind::Other,
"the absence of error messages, is also a kind of error message",
));
let result: Result<()> = Err(err);
let result = result.context(format_args!(
"a cat named {} cut off the power wire",
"neko"
));

let err = result.unwrap_err();
assert_eq!(
err.to_string(),
"a cat named neko cut off the power wire: I/O Error: the absence of error messages, is also a kind of error message"
);

assert_eq!(err.error_code(), error_code::backup_stream::IO,);
}

// Bench: Pod at Intel(R) Xeon(R) Gold 6240 CPU @ 2.60GHz
// With CPU Claim = 16 cores.

#[bench]
// 2,685 ns/iter (+/- 194)
fn contextual_add_format_strings_directly(b: &mut test::Bencher) {
b.iter(|| {
let err = Error::Io(io::Error::new(
ErrorKind::Other,
"basement, it is the fundamental basement.",
));
let result: Result<()> = Err(err);
let lucky_number = rand::random::<u8>();
let result = result.context(format!("lucky: the number is {}", lucky_number));
assert_eq!(
result.unwrap_err().to_string(),
format!(
"lucky: the number is {}: I/O Error: basement, it is the fundamental basement.",
lucky_number
)
)
})
}

#[bench]
// 1,922 ns/iter (+/- 273)
fn contextual_add_format_strings(b: &mut test::Bencher) {
b.iter(|| {
let err = Error::Io(io::Error::new(
ErrorKind::Other,
"basement, it is the fundamental basement.",
));
let result: Result<()> = Err(err);
let lucky_number = rand::random::<u8>();
let result = result.context(format_args!("lucky: the number is {}", lucky_number));
assert_eq!(
result.unwrap_err().to_string(),
format!(
"lucky: the number is {}: I/O Error: basement, it is the fundamental basement.",
lucky_number
)
)
})
}

#[bench]
// 1,988 ns/iter (+/- 89)
fn contextual_add_closure(b: &mut test::Bencher) {
b.iter(|| {
let err = Error::Io(io::Error::new(
ErrorKind::Other,
"basement, it is the fundamental basement.",
));
let result: Result<()> = Err(err);
let lucky_number = rand::random::<u8>();
let result = result.context_with(|| format!("lucky: the number is {}", lucky_number));
assert_eq!(
result.unwrap_err().to_string(),
format!(
"lucky: the number is {}: I/O Error: basement, it is the fundamental basement.",
lucky_number
)
)
})
}

#[bench]
// 773 ns/iter (+/- 8)
fn baseline(b: &mut test::Bencher) {
b.iter(|| {
let err = Error::Io(io::Error::new(
ErrorKind::Other,
"basement, it is the fundamental basement.",
));
let result: Result<()> = Err(err);
let _lucky_number = rand::random::<u8>();
assert_eq!(
result.unwrap_err().to_string(),
"I/O Error: basement, it is the fundamental basement.".to_string(),
)
})
}

#[bench]
// 3 ns/iter (+/- 0)
fn contextual_ok(b: &mut test::Bencher) {
b.iter(|| {
let result: Result<()> = Ok(());
let lucky_number = rand::random::<u8>();
let result = result.context_with(|| format!("lucky: the number is {}", lucky_number));
assert!(result.is_ok());
})
}
}
Loading