Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mc-watcher keeps log of attestation verification reports #694

Merged
merged 13 commits into from
Feb 2, 2021
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,30 @@ path = "src/bin/main.rs"

[dependencies]
mc-api = { path = "../api" }
mc-attest-core = { path = "../attest/core" }
mc-common = { path = "../common", features = ["log"] }
mc-connection = { path = "../connection" }
mc-crypto-digestible = { path = "../crypto/digestible" }
mc-crypto-keys = { path = "../crypto/keys" }
mc-ledger-db = { path = "../ledger/db" }
mc-ledger-sync = { path = "../ledger/sync" }
mc-transaction-core = { path = "../transaction/core" }
mc-transaction-core-test-utils = { path = "../transaction/core/test-utils" }
mc-util-from-random = { path = "../util/from-random" }
mc-util-lmdb = { path = "../util/lmdb" }
mc-util-repr-bytes = { path = "../util/repr-bytes" }
mc-util-serial = { path = "../util/serial" }
mc-util-uri = { path = "../util/uri" }
mc-watcher-api = { path = "api" }

failure = "0.1.8"
grpcio = "0.6.0"
hex = "0.4"
lmdb-rkv = "0.14.0"
prost = { version = "0.6.1", default-features = false, features = ["prost-derive"] }
serde = { version = "1.0", default-features = false, features = ["alloc", "derive"] }
structopt = "0.3"
toml = "0.5"
url = "2.1"

[dev-dependencies]
Expand Down
37 changes: 29 additions & 8 deletions watcher/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@
//! blocks are synced.

use mc_watcher::{
config::WatcherConfig, watcher::Watcher, watcher_db::create_or_open_rw_watcher_db,
config::WatcherConfig, verification_reports_collector::VerificationReportsCollector,
watcher::Watcher, watcher_db::create_or_open_rw_watcher_db,
};

use mc_common::logger::{create_app_logger, o};
use mc_common::logger::{create_app_logger, log, o};
use mc_ledger_sync::ReqwestTransactionsFetcher;
use std::{thread::sleep, time::Duration};
use structopt::StructOpt;

const SYNC_RETRY_INTERVAL: Duration = Duration::from_secs(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this set here as opposed to either from config, or via using something like the retry crate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its there because realistically no-one will ever choose a different value. retry is inappropriate since this is actually a polling interval so the name is misleading. I'll add it to the command line config.


fn main() {
mc_common::setup_panic_handler();
let (logger, _global_logger_guard) = create_app_logger(o!());

let config = WatcherConfig::from_args();
let sources_config = config.sources_config();

let transactions_fetcher =
ReqwestTransactionsFetcher::new(config.tx_source_urls.clone(), logger.clone())
ReqwestTransactionsFetcher::new(sources_config.tx_source_urls(), logger.clone())
.expect("Failed creating ReqwestTransactionsFetcher");

let watcher_db = create_or_open_rw_watcher_db(
Expand All @@ -30,9 +35,25 @@ fn main() {
logger.clone(),
)
.expect("Could not create or open watcher db");
let watcher = Watcher::new(watcher_db, transactions_fetcher, logger);
// For now, ignore origin block, as it does not have a signature.
watcher
.sync_signatures(1, config.max_block_height)
.expect("Could not sync signatures");
let watcher = Watcher::new(watcher_db.clone(), transactions_fetcher, logger.clone());

let _verification_reports_collector = VerificationReportsCollector::new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now we only collect verification reports inside the mc-watcher binary. mc-mobilecoind remains unchanged (it uses the WatcherDB and WatcherSyncThread to sync timestamps and block signatures but has no need for the verification reports).

watcher_db,
sources_config.tx_source_urls_to_consensus_client_urls(),
SYNC_RETRY_INTERVAL,
logger.clone(),
);

loop {
// For now, ignore origin block, as it does not have a signature.
let syncing_done = watcher
.sync_signatures(1, config.max_block_height)
.expect("Could not sync signatures");
if syncing_done {
log::info!(logger, "sync_signatures indicates we're done");
break;
}

sleep(SYNC_RETRY_INTERVAL);
}
}
117 changes: 111 additions & 6 deletions watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

//! Configuration parameters for the watcher test utility.

use std::path::PathBuf;
use mc_common::HashMap;
use mc_util_uri::ConsensusClientUri;
use serde::{Deserialize, Serialize};
use std::{fs, iter::FromIterator, path::PathBuf};
use structopt::StructOpt;

#[derive(Debug, StructOpt)]
Expand All @@ -16,13 +19,115 @@ pub struct WatcherConfig {
#[structopt(long, default_value = "/tmp/watcher-db", parse(from_os_str))]
pub watcher_db: PathBuf,

/// URLs to use to pull blocks.
///
/// For example: https://s3-us-west-1.amazonaws.com/mobilecoin.chain/node1.master.mobilecoin.com/
#[structopt(long = "tx-source-url", required = true, min_values = 1)]
pub tx_source_urls: Vec<String>,
/// The location of the sources.toml file. This file configures the list of block sources and
/// consensus nodes that are being watched.
#[structopt(long)]
pub sources_path: PathBuf,

/// (Optional) Number of blocks to sync
#[structopt(long)]
pub max_block_height: Option<u64>,
}

impl WatcherConfig {
/// Load the sources configuration file.
pub fn sources_config(&self) -> SourcesConfig {
// Read configuration file.
let data = fs::read_to_string(&self.sources_path)
.unwrap_or_else(|err| panic!("failed reading {:?}: {:?}", self.sources_path, err));

// Parse configuration file.
toml::from_str(&data)
.unwrap_or_else(|err| panic!("failed TOML parsing {:?}: {:?}", self.sources_path, err))
}
}

/// A single watched source configuration.
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq, Hash)]
struct SourceConfig {
/// URL to use for pulling blocks.
///
/// For example: https://s3-us-west-1.amazonaws.com/mobilecoin.chain/node1.master.mobilecoin.com/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would maybe put testnet URL as example

tx_source_url: String,

/// (Optional) Consensus node client URL to use for fetching the remote attestation report
/// whenever a block signer change is detected.
consensus_client_url: Option<ConsensusClientUri>,
}

impl SourceConfig {
// Get the tx_source_url and ensure it has a trailing slash.
// This is compatible with the behavior inside ReqwestTransactionsFetcher and ensures
// everywhere we use URLs we always have "slash-terminated" URLs
pub fn tx_source_url(&self) -> String {
let mut url = self.tx_source_url.clone();
if !url.ends_with('/') {
url.push_str("/");
}
url
}
}

/// Sources configuration - this configures which sources are being watched.
#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq, Hash)]
pub struct SourcesConfig {
/// List of sources being watched.
sources: Vec<SourceConfig>,
}

impl SourcesConfig {
/// Returns a list of URLs that can be used to fetch block contents from.
pub fn tx_source_urls(&self) -> Vec<String> {
self.sources
.iter()
.map(|source_config| source_config.tx_source_url())
.collect()
}

/// Returns a map of tx source url -> consensus client url. This is used when we want to try
/// and connect to the consensus block that provided some block from a given URL.
pub fn tx_source_urls_to_consensus_client_urls(&self) -> HashMap<String, ConsensusClientUri> {
HashMap::from_iter(self.sources.iter().filter_map(|source_config| {
source_config
.consensus_client_url
.clone()
.map(|client_url| (source_config.tx_source_url(), client_url))
}))
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;

#[test]
fn sources_config_toml() {
let expected_config = SourcesConfig {
sources: vec![
SourceConfig {
tx_source_url: "https://www.source.com/".to_owned(),
consensus_client_url: None,
},
SourceConfig {
tx_source_url: "https://www.2nd-source.com/".to_owned(),
consensus_client_url: Some(
ConsensusClientUri::from_str("mc://www.x.com:443/").unwrap(),
),
},
],
};

let input_toml: &str = r#"
[[sources]]
tx_source_url = "https://www.source.com/"

[[sources]]
tx_source_url = "https://www.2nd-source.com/"
consensus_client_url = "mc://www.x.com:443/"
"#;
let config: SourcesConfig = toml::from_str(input_toml).expect("failed parsing toml");

assert_eq!(config, expected_config);
}
}
45 changes: 42 additions & 3 deletions watcher/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) 2018-2021 The MobileCoin Foundation

use failure::Fail;
use mc_connection::Error as ConnectionError;
use mc_crypto_keys::KeyError;
use mc_util_lmdb::MetadataStoreError;
use std::string::FromUtf8Error;

/// Watcher Errors
#[derive(Debug, Eq, PartialEq, Copy, Clone, Fail)]
#[derive(Debug, Fail)]
pub enum WatcherError {
#[fail(display = "URL Parse Error: {}", _0)]
URLParse(url::ParseError),
Expand All @@ -14,17 +17,26 @@ pub enum WatcherError {

#[fail(display = "SyncFailed")]
SyncFailed,

#[fail(display = "Node connection error: {}", _0)]
Connection(ConnectionError),
}

impl From<url::ParseError> for WatcherError {
fn from(src: url::ParseError) -> Self {
WatcherError::URLParse(src)
Self::URLParse(src)
}
}

impl From<WatcherDBError> for WatcherError {
fn from(src: WatcherDBError) -> Self {
WatcherError::DB(src)
Self::DB(src)
}
}

impl From<ConnectionError> for WatcherError {
fn from(src: ConnectionError) -> Self {
Self::Connection(src)
}
}

Expand Down Expand Up @@ -54,6 +66,15 @@ pub enum WatcherDBError {

#[fail(display = "Metadata store error: {}", _0)]
MetadataStore(MetadataStoreError),

#[fail(display = "Utf8 error")]
Utf8,

#[fail(display = "URL Parse Error: {}", _0)]
URLParse(url::ParseError),

#[fail(display = "Crypto key error: {}", _0)]
CryptoKey(KeyError),
}

impl From<lmdb::Error> for WatcherDBError {
Expand Down Expand Up @@ -85,3 +106,21 @@ impl From<MetadataStoreError> for WatcherDBError {
Self::MetadataStore(e)
}
}

impl From<FromUtf8Error> for WatcherDBError {
fn from(_src: FromUtf8Error) -> Self {
Self::Utf8
}
}

impl From<url::ParseError> for WatcherDBError {
fn from(src: url::ParseError) -> Self {
Self::URLParse(src)
}
}

impl From<KeyError> for WatcherDBError {
fn from(src: KeyError) -> Self {
Self::CryptoKey(src)
}
}
1 change: 1 addition & 0 deletions watcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#![forbid(unsafe_code)]

pub mod config;
pub mod verification_reports_collector;
pub mod watcher;
pub mod watcher_db;

Expand Down
Loading