Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mc-watcher keeps log of attestation verification reports #694

Merged
merged 13 commits into from
Feb 2, 2021
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,30 @@ path = "src/bin/main.rs"

[dependencies]
mc-api = { path = "../api" }
mc-attest-core = { path = "../attest/core" }
mc-common = { path = "../common", features = ["log"] }
mc-connection = { path = "../connection" }
mc-crypto-digestible = { path = "../crypto/digestible" }
mc-crypto-keys = { path = "../crypto/keys" }
mc-ledger-db = { path = "../ledger/db" }
mc-ledger-sync = { path = "../ledger/sync" }
mc-transaction-core = { path = "../transaction/core" }
mc-transaction-core-test-utils = { path = "../transaction/core/test-utils" }
mc-util-from-random = { path = "../util/from-random" }
mc-util-lmdb = { path = "../util/lmdb" }
mc-util-repr-bytes = { path = "../util/repr-bytes" }
mc-util-serial = { path = "../util/serial" }
mc-util-uri = { path = "../util/uri" }
mc-watcher-api = { path = "api" }

failure = "0.1.8"
grpcio = "0.6.0"
hex = "0.4"
lmdb-rkv = "0.14.0"
prost = { version = "0.6.1", default-features = false, features = ["prost-derive"] }
serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] }
structopt = "0.3"
toml = "0.5"
url = "2.1"

[dev-dependencies]
Expand Down
35 changes: 27 additions & 8 deletions watcher/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,24 @@
//! blocks are synced.

use mc_watcher::{
config::WatcherConfig, watcher::Watcher, watcher_db::create_or_open_rw_watcher_db,
config::WatcherConfig, verification_reports_collector::VerificationReportsCollector,
watcher::Watcher, watcher_db::create_or_open_rw_watcher_db,
};

use mc_common::logger::{create_app_logger, o};
use mc_common::logger::{create_app_logger, log, o};
use mc_ledger_sync::ReqwestTransactionsFetcher;
use std::thread::sleep;
use structopt::StructOpt;

fn main() {
mc_common::setup_panic_handler();
let (logger, _global_logger_guard) = create_app_logger(o!());

let config = WatcherConfig::from_args();
let sources_config = config.sources_config();

let transactions_fetcher =
ReqwestTransactionsFetcher::new(config.tx_source_urls.clone(), logger.clone())
ReqwestTransactionsFetcher::new(sources_config.tx_source_urls(), logger.clone())
.expect("Failed creating ReqwestTransactionsFetcher");

let watcher_db = create_or_open_rw_watcher_db(
Expand All @@ -30,9 +33,25 @@ fn main() {
logger.clone(),
)
.expect("Could not create or open watcher db");
let watcher = Watcher::new(watcher_db, transactions_fetcher, logger);
// For now, ignore origin block, as it does not have a signature.
watcher
.sync_signatures(1, config.max_block_height)
.expect("Could not sync signatures");
let watcher = Watcher::new(watcher_db.clone(), transactions_fetcher, logger.clone());

let _verification_reports_collector = VerificationReportsCollector::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we only collect verification reports inside the mc-watcher binary. mc-mobilecoind remains unchanged (it uses the WatcherDB and WatcherSyncThread to sync timestamps and block signatures but has no need for the verification reports).

watcher_db,
sources_config.tx_source_urls_to_consensus_client_urls(),
config.poll_interval,
logger.clone(),
);

loop {
// For now, ignore origin block, as it does not have a signature.
let syncing_done = watcher
.sync_signatures(1, config.max_block_height)
.expect("Could not sync signatures");
if syncing_done {
log::info!(logger, "sync_signatures indicates we're done");
break;
}

sleep(config.poll_interval);
}
}
125 changes: 119 additions & 6 deletions watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

//! Configuration parameters for the watcher test utility.

use std::path::PathBuf;
use mc_common::HashMap;
use mc_util_uri::ConsensusClientUri;
use serde::{Deserialize, Serialize};
use std::{fs, iter::FromIterator, path::PathBuf, str::FromStr, time::Duration};
use structopt::StructOpt;

#[derive(Debug, StructOpt)]
Expand All @@ -16,13 +19,123 @@ pub struct WatcherConfig {
#[structopt(long, default_value = "/tmp/watcher-db", parse(from_os_str))]
pub watcher_db: PathBuf,

/// URLs to use to pull blocks.
///
/// For example: https://s3-us-west-1.amazonaws.com/mobilecoin.chain/node1.master.mobilecoin.com/
#[structopt(long = "tx-source-url", required = true, min_values = 1)]
pub tx_source_urls: Vec<String>,
/// The location of the sources.toml file. This file configures the list of block sources and
/// consensus nodes that are being watched.
#[structopt(long)]
pub sources_path: PathBuf,

/// (Optional) Number of blocks to sync
#[structopt(long)]
pub max_block_height: Option<u64>,

/// How many seconds to wait between polling.
#[structopt(long, default_value = "1", parse(try_from_str=parse_duration_in_seconds))]
pub poll_interval: Duration,
}

impl WatcherConfig {
/// Load the sources configuration file.
pub fn sources_config(&self) -> SourcesConfig {
// Read configuration file.
let data = fs::read_to_string(&self.sources_path)
.unwrap_or_else(|err| panic!("failed reading {:?}: {:?}", self.sources_path, err));

// Parse configuration file.
toml::from_str(&data)
.unwrap_or_else(|err| panic!("failed TOML parsing {:?}: {:?}", self.sources_path, err))
}
}

/// A single watched source configuration.
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq, Hash)]
struct SourceConfig {
/// URL to use for pulling blocks.
///
/// For example: https://s3-us-west-1.amazonaws.com/mobilecoin.chain/node1.test.mobilecoin.com/
tx_source_url: String,

/// (Optional) Consensus node client URL to use for fetching the remote attestation report
/// whenever a block signer change is detected.
consensus_client_url: Option<ConsensusClientUri>,
}

impl SourceConfig {
// Get the tx_source_url and ensure it has a trailing slash.
// This is compatible with the behavior inside ReqwestTransactionsFetcher and ensures
// everywhere we use URLs we always have "slash-terminated" URLs
pub fn tx_source_url(&self) -> String {
let mut url = self.tx_source_url.clone();
if !url.ends_with('/') {
url.push_str("/");
}
url
}
}

/// Sources configuration - this configures which sources are being watched.
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq, Hash)]
pub struct SourcesConfig {
/// List of sources being watched.
sources: Vec<SourceConfig>,
}

impl SourcesConfig {
/// Returns a list of URLs that can be used to fetch block contents from.
pub fn tx_source_urls(&self) -> Vec<String> {
self.sources
.iter()
.map(|source_config| source_config.tx_source_url())
.collect()
}

/// Returns a map of tx source url -> consensus client url. This is used when we want to try
/// and connect to the consensus block that provided some block from a given URL.
pub fn tx_source_urls_to_consensus_client_urls(&self) -> HashMap<String, ConsensusClientUri> {
HashMap::from_iter(self.sources.iter().filter_map(|source_config| {
source_config
.consensus_client_url
.clone()
.map(|client_url| (source_config.tx_source_url(), client_url))
}))
}
}

fn parse_duration_in_seconds(src: &str) -> Result<Duration, std::num::ParseIntError> {
Ok(Duration::from_secs(u64::from_str(src)?))
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;

#[test]
fn sources_config_toml() {
let expected_config = SourcesConfig {
sources: vec![
SourceConfig {
tx_source_url: "https://www.source.com/".to_owned(),
consensus_client_url: None,
},
SourceConfig {
tx_source_url: "https://www.2nd-source.com/".to_owned(),
consensus_client_url: Some(
ConsensusClientUri::from_str("mc://www.x.com:443/").unwrap(),
),
},
],
};

let input_toml: &str = r#"
[[sources]]
tx_source_url = "https://www.source.com/"

[[sources]]
tx_source_url = "https://www.2nd-source.com/"
consensus_client_url = "mc://www.x.com:443/"
"#;
let config: SourcesConfig = toml::from_str(input_toml).expect("failed parsing toml");

assert_eq!(config, expected_config);
}
}
45 changes: 42 additions & 3 deletions watcher/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) 2018-2021 The MobileCoin Foundation

use failure::Fail;
use mc_connection::Error as ConnectionError;
use mc_crypto_keys::KeyError;
use mc_util_lmdb::MetadataStoreError;
use std::string::FromUtf8Error;

/// Watcher Errors
#[derive(Debug, Eq, PartialEq, Copy, Clone, Fail)]
#[derive(Debug, Fail)]
pub enum WatcherError {
#[fail(display = "URL Parse Error: {}", _0)]
URLParse(url::ParseError),
Expand All @@ -14,17 +17,26 @@ pub enum WatcherError {

#[fail(display = "SyncFailed")]
SyncFailed,

#[fail(display = "Node connection error: {}", _0)]
Connection(ConnectionError),
}

impl From<url::ParseError> for WatcherError {
fn from(src: url::ParseError) -> Self {
WatcherError::URLParse(src)
Self::URLParse(src)
}
}

impl From<WatcherDBError> for WatcherError {
fn from(src: WatcherDBError) -> Self {
WatcherError::DB(src)
Self::DB(src)
}
}

impl From<ConnectionError> for WatcherError {
fn from(src: ConnectionError) -> Self {
Self::Connection(src)
}
}

Expand Down Expand Up @@ -54,6 +66,15 @@ pub enum WatcherDBError {

#[fail(display = "Metadata store error: {}", _0)]
MetadataStore(MetadataStoreError),

#[fail(display = "Utf8 error")]
Utf8,

#[fail(display = "URL Parse Error: {}", _0)]
URLParse(url::ParseError),

#[fail(display = "Crypto key error: {}", _0)]
CryptoKey(KeyError),
}

impl From<lmdb::Error> for WatcherDBError {
Expand Down Expand Up @@ -85,3 +106,21 @@ impl From<MetadataStoreError> for WatcherDBError {
Self::MetadataStore(e)
}
}

impl From<FromUtf8Error> for WatcherDBError {
fn from(_src: FromUtf8Error) -> Self {
Self::Utf8
}
}

impl From<url::ParseError> for WatcherDBError {
fn from(src: url::ParseError) -> Self {
Self::URLParse(src)
}
}

impl From<KeyError> for WatcherDBError {
fn from(src: KeyError) -> Self {
Self::CryptoKey(src)
}
}
1 change: 1 addition & 0 deletions watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![forbid(unsafe_code)]

pub mod config;
pub mod verification_reports_collector;
pub mod watcher;
pub mod watcher_db;

Expand Down
Loading