diff --git a/Cargo.lock b/Cargo.lock index 7813b5513a0..6fd85f33ac9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1337,6 +1337,7 @@ dependencies = [ "clap", "clap_utils", "eth2_network_config", + "tempfile", ] [[package]] @@ -1672,6 +1673,7 @@ dependencies = [ "sensitive_url", "serde", "serde_json", + "slashing_protection", "store", "types", ] @@ -4853,6 +4855,7 @@ dependencies = [ "rand_chacha 0.2.2", "rand_core 0.5.1", "rand_hc 0.2.0", + "rand_pcg", ] [[package]] @@ -4923,6 +4926,15 @@ dependencies = [ "rand_core 0.6.3", ] +[[package]] +name = "rand_pcg" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16abd0c1b639e9eb4d7c50c0b8100b0d0f849be2349829c740fe8e6eb4816429" +dependencies = [ + "rand_core 0.5.1", +] + [[package]] name = "rand_xorshift" version = "0.2.0" @@ -6727,6 +6739,7 @@ dependencies = [ "lighthouse_metrics", "lighthouse_version", "lockfile", + "logging", "monitoring_api", "parking_lot", "rand 0.7.3", diff --git a/common/account_utils/src/lib.rs b/common/account_utils/src/lib.rs index dc79a1f2035..89de3803856 100644 --- a/common/account_utils/src/lib.rs +++ b/common/account_utils/src/lib.rs @@ -85,15 +85,23 @@ pub fn write_file_via_temporary( Ok(()) } -/// Generates a random alphanumeric password of length `DEFAULT_PASSWORD_LEN`. +/// Generates a random alphanumeric password of length `DEFAULT_PASSWORD_LEN` as `PlainText`. pub fn random_password() -> PlainText { + random_password_raw_string().into_bytes().into() +} + +/// Generates a random alphanumeric password of length `DEFAULT_PASSWORD_LEN` as `ZeroizeString`. +pub fn random_password_string() -> ZeroizeString { + random_password_raw_string().into() +} + +/// Common implementation for `random_password` and `random_password_string`. +fn random_password_raw_string() -> String { rand::thread_rng() .sample_iter(&Alphanumeric) .take(DEFAULT_PASSWORD_LEN) .map(char::from) - .collect::() - .into_bytes() - .into() + .collect() } /// Remove any number of newline or carriage returns from the end of a vector of bytes. diff --git a/common/account_utils/src/validator_definitions.rs b/common/account_utils/src/validator_definitions.rs index 418c0fb3c68..d66683bee01 100644 --- a/common/account_utils/src/validator_definitions.rs +++ b/common/account_utils/src/validator_definitions.rs @@ -46,9 +46,6 @@ pub enum Error { } /// Defines how the validator client should attempt to sign messages for this validator. -/// -/// Presently there is only a single variant, however we expect more variants to arise (e.g., -/// remote signing). #[derive(Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "type")] pub enum SigningDefinition { @@ -78,6 +75,12 @@ pub enum SigningDefinition { }, } +impl SigningDefinition { + pub fn is_local_keystore(&self) -> bool { + matches!(self, SigningDefinition::LocalKeystore { .. }) + } +} + /// A validator that may be initialized by this validator client. /// /// Presently there is only a single variant, however we expect more variants to arise (e.g., @@ -293,6 +296,11 @@ impl ValidatorDefinitions { Ok(()) } + /// Retain only the definitions matching the given predicate. + pub fn retain(&mut self, f: impl FnMut(&ValidatorDefinition) -> bool) { + self.0.retain(f); + } + /// Adds a new `ValidatorDefinition` to `self`. pub fn push(&mut self, def: ValidatorDefinition) { self.0.push(def) diff --git a/common/eth2/Cargo.toml b/common/eth2/Cargo.toml index b22cfc28f1a..07b6c71b780 100644 --- a/common/eth2/Cargo.toml +++ b/common/eth2/Cargo.toml @@ -25,6 +25,7 @@ eth2_ssz_derive = "0.3.0" futures-util = "0.3.8" futures = "0.3.8" store = { path = "../../beacon_node/store", optional = true } +slashing_protection = { path = "../../validator_client/slashing_protection", optional = true } [target.'cfg(target_os = "linux")'.dependencies] # TODO: update psutil once fix is merged: https://github.com/rust-psutil/rust-psutil/pull/93 @@ -34,4 +35,4 @@ procinfo = { version = "0.4.2", optional = true } [features] default = ["lighthouse"] -lighthouse = ["proto_array", "psutil", "procinfo", "store"] +lighthouse = ["proto_array", "psutil", "procinfo", "store", "slashing_protection"] diff --git a/common/eth2/src/lighthouse_vc/http_client.rs b/common/eth2/src/lighthouse_vc/http_client.rs index cd640e6158a..55b8217d076 100644 --- a/common/eth2/src/lighthouse_vc/http_client.rs +++ b/common/eth2/src/lighthouse_vc/http_client.rs @@ -200,6 +200,24 @@ impl ValidatorClientHttpClient { Ok(()) } + /// Perform a HTTP DELETE request. + async fn delete( + &self, + url: U, + body: &T, + ) -> Result { + let response = self + .client + .delete(url) + .headers(self.headers()?) + .json(body) + .send() + .await + .map_err(Error::Reqwest)?; + let response = ok_or_error(response).await?; + self.signed_json(response).await + } + /// `GET lighthouse/version` pub async fn get_lighthouse_version(&self) -> Result, Error> { let mut path = self.server.full.clone(); @@ -345,6 +363,40 @@ impl ValidatorClientHttpClient { self.patch(path, &ValidatorPatchRequest { enabled }).await } + + fn make_keystores_url(&self) -> Result { + let mut url = self.server.full.clone(); + url.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("eth") + .push("v1") + .push("keystores"); + Ok(url) + } + + /// `GET eth/v1/keystores` + pub async fn get_keystores(&self) -> Result { + let url = self.make_keystores_url()?; + self.get(url).await + } + + /// `POST eth/v1/keystores` + pub async fn post_keystores( + &self, + req: &ImportKeystoresRequest, + ) -> Result { + let url = self.make_keystores_url()?; + self.post(url, req).await + } + + /// `DELETE eth/v1/keystores` + pub async fn delete_keystores( + &self, + req: &DeleteKeystoresRequest, + ) -> Result { + let url = self.make_keystores_url()?; + self.delete(url, req).await + } } /// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an diff --git a/common/eth2/src/lighthouse_vc/mod.rs b/common/eth2/src/lighthouse_vc/mod.rs index b7de7c71520..81b4fca283a 100644 --- a/common/eth2/src/lighthouse_vc/mod.rs +++ b/common/eth2/src/lighthouse_vc/mod.rs @@ -1,4 +1,5 @@ pub mod http_client; +pub mod std_types; pub mod types; /// The number of bytes in the secp256k1 public key used as the authorization token for the VC API. diff --git a/common/eth2/src/lighthouse_vc/std_types.rs b/common/eth2/src/lighthouse_vc/std_types.rs new file mode 100644 index 00000000000..26423545e10 --- /dev/null +++ b/common/eth2/src/lighthouse_vc/std_types.rs @@ -0,0 +1,78 @@ +use account_utils::ZeroizeString; +use eth2_keystore::Keystore; +use serde::{Deserialize, Serialize}; +use slashing_protection::interchange::Interchange; +use types::PublicKeyBytes; + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct ListKeystoresResponse { + pub keystores: Vec, +} + +#[derive(Debug, Deserialize, Serialize, PartialEq)] +pub struct SingleKeystoreResponse { + pub validating_pubkey: PublicKeyBytes, + pub derivation_path: Option, +} + +#[derive(Deserialize, Serialize)] +pub struct ImportKeystoresRequest { + pub keystores: Vec, + pub keystores_password: ZeroizeString, + pub slashing_protection: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ImportKeystoresResponse { + pub statuses: Vec>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Status { + pub status: T, + #[serde(skip_serializing_if = "Option::is_none")] + pub message: Option, +} + +impl Status { + pub fn ok(status: T) -> Self { + Self { + status, + message: None, + } + } + + pub fn error(status: T, message: String) -> Self { + Self { + status, + message: Some(message), + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum ImportKeystoreStatus { + Imported, + Duplicate, + Error, +} + +#[derive(Deserialize, Serialize)] +pub struct DeleteKeystoresRequest { + pub pubkeys: Vec, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct DeleteKeystoresResponse { + pub statuses: Vec>, + pub slashing_protection: Interchange, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub enum DeleteKeystoreStatus { + Deleted, + NotFound, + Error, +} diff --git a/common/eth2/src/lighthouse_vc/types.rs b/common/eth2/src/lighthouse_vc/types.rs index 9e311c9d6ba..25b30505380 100644 --- a/common/eth2/src/lighthouse_vc/types.rs +++ b/common/eth2/src/lighthouse_vc/types.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; pub use crate::lighthouse::Health; +pub use crate::lighthouse_vc::std_types::*; pub use crate::types::{GenericResponse, VersionData}; pub use types::*; diff --git a/common/validator_dir/src/builder.rs b/common/validator_dir/src/builder.rs index 4d6de051636..861a6afe962 100644 --- a/common/validator_dir/src/builder.rs +++ b/common/validator_dir/src/builder.rs @@ -134,15 +134,18 @@ impl<'a> Builder<'a> { self } + /// Return the path to the validator dir to be built, i.e. `base_dir/pubkey`. + pub fn get_dir_path(base_validators_dir: &Path, voting_keystore: &Keystore) -> PathBuf { + base_validators_dir.join(format!("0x{}", voting_keystore.pubkey())) + } + /// Consumes `self`, returning a `ValidatorDir` if no error is encountered. pub fn build(self) -> Result { let (voting_keystore, voting_password) = self .voting_keystore .ok_or(Error::UninitializedVotingKeystore)?; - let dir = self - .base_validators_dir - .join(format!("0x{}", voting_keystore.pubkey())); + let dir = Self::get_dir_path(&self.base_validators_dir, &voting_keystore); if dir.exists() { return Err(Error::DirectoryAlreadyExists(dir)); diff --git a/validator_client/Cargo.toml b/validator_client/Cargo.toml index d1b212cc736..26ebf66d0a8 100644 --- a/validator_client/Cargo.toml +++ b/validator_client/Cargo.toml @@ -10,6 +10,7 @@ path = "src/lib.rs" [dev-dependencies] tokio = { version = "1.10.0", features = ["time", "rt-multi-thread", "macros"] } +logging = { path = "../common/logging" } [dependencies] tree_hash = "0.4.0" @@ -48,7 +49,7 @@ hyper = "0.14.4" eth2_serde_utils = "0.1.0" libsecp256k1 = "0.6.0" ring = "0.16.19" -rand = "0.7.3" +rand = { version = "0.7.3", features = ["small_rng"] } lighthouse_metrics = { path = "../common/lighthouse_metrics" } lazy_static = "1.4.0" itertools = "0.10.0" diff --git a/validator_client/slashing_protection/Cargo.toml b/validator_client/slashing_protection/Cargo.toml index 7f30170de9c..4462ddf427f 100644 --- a/validator_client/slashing_protection/Cargo.toml +++ b/validator_client/slashing_protection/Cargo.toml @@ -3,6 +3,11 @@ name = "slashing_protection" version = "0.1.0" authors = ["Michael Sproul ", "pscott "] edition = "2018" +autotests = false + +[[test]] +name = "slashing_protection_tests" +path = "tests/main.rs" [dependencies] tempfile = "3.1.0" diff --git a/validator_client/slashing_protection/migration-tests/v0_no_enabled_column.sqlite b/validator_client/slashing_protection/migration-tests/v0_no_enabled_column.sqlite new file mode 100644 index 00000000000..5a95fe36e6e Binary files /dev/null and b/validator_client/slashing_protection/migration-tests/v0_no_enabled_column.sqlite differ diff --git a/validator_client/slashing_protection/src/lib.rs b/validator_client/slashing_protection/src/lib.rs index 858acbfe9b0..1610b523720 100644 --- a/validator_client/slashing_protection/src/lib.rs +++ b/validator_client/slashing_protection/src/lib.rs @@ -30,6 +30,7 @@ pub const SLASHING_PROTECTION_FILENAME: &str = "slashing_protection.sqlite"; #[derive(PartialEq, Debug)] pub enum NotSafe { UnregisteredValidator(PublicKeyBytes), + DisabledValidator(PublicKeyBytes), InvalidBlock(InvalidBlock), InvalidAttestation(InvalidAttestation), PermissionsError, diff --git a/validator_client/slashing_protection/src/registration_tests.rs b/validator_client/slashing_protection/src/registration_tests.rs index 40a3d6ee71e..a9b3ae3f8ff 100644 --- a/validator_client/slashing_protection/src/registration_tests.rs +++ b/validator_client/slashing_protection/src/registration_tests.rs @@ -30,3 +30,5 @@ fn double_register_validators() { assert_eq!(slashing_db.num_validator_rows().unwrap(), num_validators); assert_eq!(validator_ids, get_validator_ids()); } + +// FIXME(sproul): test de-registration/re-registration with `enabled`=false/true/etc diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index 725aa6057dd..fb918445272 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -28,6 +28,9 @@ pub const CONNECTION_TIMEOUT: Duration = Duration::from_millis(100); /// Supported version of the interchange format. pub const SUPPORTED_INTERCHANGE_FORMAT_VERSION: u64 = 5; +/// Column ID of the `validators.enabled` column. +pub const VALIDATORS_ENABLED_CID: i64 = 2; + #[derive(Debug, Clone)] pub struct SlashingDatabase { conn_pool: Pool, @@ -55,7 +58,7 @@ impl SlashingDatabase { restrict_file_permissions(path).map_err(|_| NotSafe::PermissionsError)?; let conn_pool = Self::open_conn_pool(path)?; - let conn = conn_pool.get()?; + let mut conn = conn_pool.get()?; conn.execute( "CREATE TABLE validators ( @@ -88,13 +91,55 @@ impl SlashingDatabase { params![], )?; + // The tables created above are for the v0 schema. We immediately update them + // to the latest schema without dropping the connection. + let txn = conn.transaction()?; + Self::apply_schema_migrations(&txn)?; + txn.commit()?; + Ok(Self { conn_pool }) } /// Open an existing `SlashingDatabase` from disk. + /// + /// This will automatically check for and apply the latest schema migrations. pub fn open(path: &Path) -> Result { let conn_pool = Self::open_conn_pool(path)?; - Ok(Self { conn_pool }) + let db = Self { conn_pool }; + db.with_transaction(|txn| Self::apply_schema_migrations(txn))?; + Ok(db) + } + + fn apply_schema_migrations(txn: &Transaction) -> Result<(), NotSafe> { + // Add the `enabled` column to the `validators` table if it does not already exist. + let enabled_col_exists = txn + .query_row( + "SELECT cid, name FROM pragma_table_info('validators') WHERE name = 'enabled'", + params![], + |row| Ok((row.get(0)?, row.get(1)?)), + ) + .optional()? + .map(|(cid, name): (i64, String)| { + // Check that the enabled column is in the correct position with the right name. + // This is a defensive check that shouldn't do anything in practice unless the + // slashing DB has been manually edited. + if cid == VALIDATORS_ENABLED_CID && name == "enabled" { + Ok(()) + } else { + Err(NotSafe::ConsistencyError) + } + }) + .transpose()? + .is_some(); + + if !enabled_col_exists { + txn.execute( + "ALTER TABLE validators ADD COLUMN enabled BOOL NOT NULL DEFAULT TRUE", + params![], + )?; + } + + Ok(()) } /// Open a new connection pool with all of the necessary settings and tweaks. @@ -166,15 +211,37 @@ impl SlashingDatabase { public_keys: impl Iterator, txn: &Transaction, ) -> Result<(), NotSafe> { - let mut stmt = txn.prepare("INSERT INTO validators (public_key) VALUES (?1)")?; + let mut stmt = + txn.prepare("INSERT INTO validators (public_key, enabled) VALUES (?1, TRUE)")?; for pubkey in public_keys { - if self.get_validator_id_opt(txn, pubkey)?.is_none() { - stmt.execute([pubkey.as_hex_string()])?; + match self.get_validator_id_with_status(txn, pubkey)? { + None => { + stmt.execute([pubkey.as_hex_string()])?; + } + Some((validator_id, false)) => { + self.update_validator_status(txn, validator_id, true)?; + } + Some((_, true)) => { + // Validator already registered and enabled. + } } } Ok(()) } + pub fn update_validator_status( + &self, + txn: &Transaction, + validator_id: i64, + status: bool, + ) -> Result<(), NotSafe> { + txn.execute( + "UPDATE validators SET enabled = ? WHERE id = ?", + params![status, validator_id], + )?; + Ok(()) + } + /// Check that all of the given validators are registered. pub fn check_validator_registrations<'a>( &self, @@ -203,7 +270,7 @@ impl SlashingDatabase { .collect() } - /// Get the database-internal ID for a validator. + /// Get the database-internal ID for an enabled validator. /// /// This is NOT the same as a validator index, and depends on the ordering that validators /// are registered with the slashing protection database (and may vary between machines). @@ -218,21 +285,38 @@ impl SlashingDatabase { txn: &Transaction, public_key: &PublicKeyBytes, ) -> Result { - self.get_validator_id_opt(txn, public_key)? - .ok_or_else(|| NotSafe::UnregisteredValidator(*public_key)) + let (validator_id, enabled) = self + .get_validator_id_with_status(txn, public_key)? + .ok_or_else(|| NotSafe::UnregisteredValidator(*public_key))?; + if enabled { + Ok(validator_id) + } else { + Err(NotSafe::DisabledValidator(*public_key)) + } } - /// Optional version of `get_validator_id`. - fn get_validator_id_opt( + /// Get validator ID regardless of whether or not it is enabled. + pub fn get_validator_id_ignoring_status( &self, txn: &Transaction, public_key: &PublicKeyBytes, - ) -> Result, NotSafe> { + ) -> Result { + let (validator_id, _) = self + .get_validator_id_with_status(txn, public_key)? + .ok_or_else(|| NotSafe::UnregisteredValidator(*public_key))?; + Ok(validator_id) + } + + pub fn get_validator_id_with_status( + &self, + txn: &Transaction, + public_key: &PublicKeyBytes, + ) -> Result, NotSafe> { Ok(txn .query_row( - "SELECT id FROM validators WHERE public_key = ?1", + "SELECT id, enabled FROM validators WHERE public_key = ?1", params![&public_key.as_hex_string()], - |row| row.get(0), + |row| Ok((row.get(0)?, row.get(1)?)), ) .optional()?) } @@ -734,13 +818,21 @@ impl SlashingDatabase { ) -> Result { let mut conn = self.conn_pool.get()?; let txn = &conn.transaction()?; + self.export_interchange_info_in_txn(genesis_validators_root, selected_pubkeys, txn) + } + pub fn export_interchange_info_in_txn( + &self, + genesis_validators_root: Hash256, + selected_pubkeys: Option<&[PublicKeyBytes]>, + txn: &Transaction, + ) -> Result { // Determine the validator IDs and public keys to export data for. let to_export = if let Some(selected_pubkeys) = selected_pubkeys { selected_pubkeys .iter() .map(|pubkey| { - let id = self.get_validator_id_in_txn(txn, pubkey)?; + let id = self.get_validator_id_ignoring_status(txn, pubkey)?; Ok((id, *pubkey)) }) .collect::>()? @@ -1084,7 +1176,6 @@ impl From for InterchangeError { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::pubkey; use tempfile::tempdir; #[test] @@ -1101,8 +1192,7 @@ mod tests { let file = dir.path().join("db.sqlite"); let _db1 = SlashingDatabase::create(&file).unwrap(); - let db2 = SlashingDatabase::open(&file).unwrap(); - db2.register_validator(pubkey(0)).unwrap_err(); + SlashingDatabase::open(&file).unwrap_err(); } // Attempting to create the same database twice should error. @@ -1147,9 +1237,12 @@ mod tests { fn test_transaction_failure() { let dir = tempdir().unwrap(); let file = dir.path().join("db.sqlite"); - let _db1 = SlashingDatabase::create(&file).unwrap(); + let db = SlashingDatabase::create(&file).unwrap(); - let db2 = SlashingDatabase::open(&file).unwrap(); - db2.test_transaction().unwrap_err(); + db.with_transaction(|_| { + db.test_transaction().unwrap_err(); + Ok::<(), NotSafe>(()) + }) + .unwrap(); } } diff --git a/validator_client/slashing_protection/tests/main.rs b/validator_client/slashing_protection/tests/main.rs new file mode 100644 index 00000000000..5b66bd87e61 --- /dev/null +++ b/validator_client/slashing_protection/tests/main.rs @@ -0,0 +1,2 @@ +mod interop; +mod migration; diff --git a/validator_client/slashing_protection/tests/migration.rs b/validator_client/slashing_protection/tests/migration.rs new file mode 100644 index 00000000000..cd3561f2114 --- /dev/null +++ b/validator_client/slashing_protection/tests/migration.rs @@ -0,0 +1,68 @@ +//! Tests for upgrading a previous version of the database to the latest schema. +use slashing_protection::{NotSafe, SlashingDatabase}; +use std::collections::HashMap; +use std::fs; +use std::path::{Path, PathBuf}; +use tempfile::tempdir; +use types::Hash256; + +fn test_data_dir() -> PathBuf { + Path::new(&std::env::var("CARGO_MANIFEST_DIR").unwrap()).join("migration-tests") +} + +/// Copy `filename` from the test data dir to the temporary `dest` for testing. +fn make_copy(filename: &str, dest: &Path) -> PathBuf { + let source_file = test_data_dir().join(filename); + let dest_file = dest.join(filename); + fs::copy(source_file, &dest_file).unwrap(); + dest_file +} + +#[test] +fn add_enabled_column() { + let tmp = tempdir().unwrap(); + + let path = make_copy("v0_no_enabled_column.sqlite", tmp.path()); + let num_expected_validators = 5; + + // Database should open without errors, indicating successfull application of migrations. + // The input file has no `enabled` column, which should get added when opening it here. + let db = SlashingDatabase::open(&path).unwrap(); + + // Check that exporting an interchange file lists all the validators. + let interchange = db.export_all_interchange_info(Hash256::zero()).unwrap(); + assert_eq!(interchange.data.len(), num_expected_validators); + + db.with_transaction(|txn| { + // Check that all the validators are enabled and unique. + let uniq_validator_ids = interchange + .data + .iter() + .map(|data| { + let (validator_id, enabled) = db + .get_validator_id_with_status(txn, &data.pubkey) + .unwrap() + .unwrap(); + assert!(enabled); + (validator_id, data.pubkey) + }) + .collect::>(); + + assert_eq!(uniq_validator_ids.len(), num_expected_validators); + + // Check that we can disable them all. + for (&validator_id, pubkey) in &uniq_validator_ids { + db.update_validator_status(txn, validator_id, false) + .unwrap(); + let (loaded_id, enabled) = db + .get_validator_id_with_status(txn, pubkey) + .unwrap() + .unwrap(); + assert_eq!(validator_id, loaded_id); + assert!(!enabled); + } + + Ok::<_, NotSafe>(()) + }) + .unwrap(); +} diff --git a/validator_client/src/http_api/keystores.rs b/validator_client/src/http_api/keystores.rs new file mode 100644 index 00000000000..0cacd81ca9d --- /dev/null +++ b/validator_client/src/http_api/keystores.rs @@ -0,0 +1,234 @@ +//! Implementation of the standard keystore management API. +use crate::{signing_method::SigningMethod, InitializedValidators, ValidatorStore}; +use account_utils::ZeroizeString; +use eth2::lighthouse_vc::std_types::{ + DeleteKeystoreStatus, DeleteKeystoresRequest, DeleteKeystoresResponse, ImportKeystoreStatus, + ImportKeystoresRequest, ImportKeystoresResponse, ListKeystoresResponse, SingleKeystoreResponse, + Status, +}; +use eth2_keystore::Keystore; +use slog::{info, warn, Logger}; +use slot_clock::SlotClock; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Weak; +use tokio::runtime::Runtime; +use types::{EthSpec, PublicKeyBytes}; +use validator_dir::Builder as ValidatorDirBuilder; +use warp::Rejection; +use warp_utils::reject::{custom_bad_request, custom_server_error}; + +pub fn list( + validator_store: Arc>, +) -> ListKeystoresResponse { + let initialized_validators_rwlock = validator_store.initialized_validators(); + let initialized_validators = initialized_validators_rwlock.read(); + + let keystores = initialized_validators + .validator_definitions() + .iter() + .filter(|def| def.enabled && def.signing_definition.is_local_keystore()) + .map(|def| { + let validating_pubkey = def.voting_public_key.compress(); + + let derivation_path = initialized_validators + .signing_method(&validating_pubkey) + .and_then(|signing_method| match *signing_method { + SigningMethod::LocalKeystore { + ref voting_keystore, + .. + } => voting_keystore.path(), + SigningMethod::Web3Signer { .. } => None, + }); + + SingleKeystoreResponse { + validating_pubkey, + derivation_path, + } + }) + .collect::>(); + + ListKeystoresResponse { keystores } +} + +pub fn import( + request: ImportKeystoresRequest, + validator_dir: PathBuf, + validator_store: Arc>, + runtime: Weak, + log: Logger, +) -> Result { + info!( + log, + "Importing keystores via standard HTTP API"; + "count" => request.keystores.len(), + ); + + // Import slashing protection data before keystores, so that new keystores don't start signing + // without it. + if let Some(slashing_protection) = request.slashing_protection { + // Warn for missing slashing protection. + for keystore in &request.keystores { + if let Some(public_key) = keystore.public_key() { + let pubkey_bytes = public_key.compress(); + if !slashing_protection + .data + .iter() + .any(|data| data.pubkey == pubkey_bytes) + { + warn!( + log, + "Slashing protection data not provided"; + "public_key" => ?public_key, + ); + } + } + } + + validator_store + .import_slashing_protection(slashing_protection) + .map_err(|e| { + custom_bad_request(format!("error importing slashing protection: {:?}", e)) + })? + } else { + warn!(log, "No slashing protection data provided with keystores"); + } + + // Import each keystore. Some keystores may fail to be imported, so we record a status for each. + let mut statuses = Vec::with_capacity(request.keystores.len()); + + for keystore in request.keystores { + let pubkey_str = keystore.pubkey().to_string(); + + let status = if let Some(runtime) = runtime.upgrade() { + match import_single_keystore( + keystore, + request.keystores_password.clone(), + validator_dir.clone(), + &validator_store, + runtime, + ) { + Ok(status) => Status::ok(status), + Err(e) => { + warn!( + log, + "Error importing keystore, skipped"; + "pubkey" => pubkey_str, + "error" => ?e, + ); + Status::error(ImportKeystoreStatus::Error, e) + } + } + } else { + Status::error( + ImportKeystoreStatus::Error, + "validator client shutdown".into(), + ) + }; + statuses.push(status); + } + + Ok(ImportKeystoresResponse { statuses }) +} + +fn import_single_keystore( + keystore: Keystore, + password: ZeroizeString, + validator_dir_path: PathBuf, + validator_store: &ValidatorStore, + runtime: Arc, +) -> Result { + // Check if the validator key already exists. + let pubkey = keystore + .public_key() + .ok_or_else(|| format!("invalid pubkey: {}", keystore.pubkey()))?; + if validator_store + .initialized_validators() + .read() + .is_enabled(&pubkey) + .unwrap_or(false) + { + return Ok(ImportKeystoreStatus::Duplicate); + } + + let validator_dir = ValidatorDirBuilder::new(validator_dir_path) + .voting_keystore(keystore, password.as_ref()) + .store_withdrawal_keystore(false) + .build() + .map_err(|e| format!("failed to build validator directory: {:?}", e))?; + + // Drop validator dir so that `add_validator_keystore` can re-lock the keystore. + let voting_keystore_path = validator_dir.voting_keystore_path(); + drop(validator_dir); + + runtime + .block_on(validator_store.add_validator_keystore( + voting_keystore_path, + password, + true, + None, + )) + .map_err(|e| format!("failed to initialize validator: {:?}", e))?; + + Ok(ImportKeystoreStatus::Imported) +} + +pub fn delete( + request: DeleteKeystoresRequest, + validator_store: Arc>, + runtime: Weak, + log: Logger, +) -> Result { + // Remove from initialized validators. + let initialized_validators_rwlock = validator_store.initialized_validators(); + let mut initialized_validators = initialized_validators_rwlock.write(); + + let statuses = request + .pubkeys + .iter() + .map(|pubkey_bytes| { + match delete_single_keystore(pubkey_bytes, &mut initialized_validators, runtime.clone()) + { + Ok(status) => Status::ok(status), + Err(error) => { + warn!( + log, + "Error deleting keystore"; + "pubkey" => ?pubkey_bytes, + "error" => ?error, + ); + Status::error(DeleteKeystoreStatus::Error, error) + } + } + }) + .collect(); + + let slashing_protection = validator_store + .export_slashing_protection_for_keys(&request.pubkeys) + .map_err(|e| { + custom_server_error(format!("error exporting slashing protection: {:?}", e)) + })?; + + Ok(DeleteKeystoresResponse { + statuses, + slashing_protection, + }) +} + +fn delete_single_keystore( + pubkey_bytes: &PublicKeyBytes, + initialized_validators: &mut InitializedValidators, + runtime: Weak, +) -> Result { + if let Some(runtime) = runtime.upgrade() { + let pubkey = pubkey_bytes + .decompress() + .map_err(|e| format!("invalid pubkey, {:?}: {:?}", pubkey_bytes, e))?; + + runtime + .block_on(initialized_validators.delete_definition_and_keystore(&pubkey)) + .map_err(|e| format!("unable to disable and delete: {:?}", e)) + } else { + Err("validator client shutdown".into()) + } +} diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index 5e0f3443a25..593edc36f45 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -1,5 +1,6 @@ mod api_secret; mod create_validator; +mod keystores; mod tests; use crate::ValidatorStore; @@ -106,7 +107,7 @@ pub fn serve( // Configure CORS. let cors_builder = { let builder = warp::cors() - .allow_methods(vec!["GET", "POST", "PATCH"]) + .allow_methods(vec!["GET", "POST", "PATCH", "DELETE"]) .allow_headers(vec!["Content-Type", "Authorization"]); warp_utils::cors::set_builder_origins( @@ -154,6 +155,9 @@ pub fn serve( }) }); + let inner_ctx = ctx.clone(); + let log_filter = warp::any().map(move || inner_ctx.log.clone()); + let inner_spec = Arc::new(ctx.spec.clone()); let spec_filter = warp::any().map(move || inner_spec.clone()); @@ -348,7 +352,7 @@ pub fn serve( .and(warp::path("keystore")) .and(warp::path::end()) .and(warp::body::json()) - .and(validator_dir_filter) + .and(validator_dir_filter.clone()) .and(validator_store_filter.clone()) .and(signer.clone()) .and(runtime_filter.clone()) @@ -451,9 +455,9 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::end()) .and(warp::body::json()) - .and(validator_store_filter) - .and(signer) - .and(runtime_filter) + .and(validator_store_filter.clone()) + .and(signer.clone()) + .and(runtime_filter.clone()) .and_then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, @@ -495,6 +499,47 @@ pub fn serve( }, ); + // Standard key-manager endpoints. + let eth_v1 = warp::path("eth").and(warp::path("v1")); + let std_keystores = eth_v1.and(warp::path("keystores")).and(warp::path::end()); + + // GET /eth/v1/keystores + let get_std_keystores = std_keystores + .and(signer.clone()) + .and(validator_store_filter.clone()) + .and_then(|signer, validator_store: Arc>| { + blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store))) + }); + + // POST /eth/v1/keystores + let post_std_keystores = std_keystores + .and(warp::body::json()) + .and(signer.clone()) + .and(validator_dir_filter) + .and(validator_store_filter.clone()) + .and(runtime_filter.clone()) + .and(log_filter.clone()) + .and_then( + |request, signer, validator_dir, validator_store, runtime, log| { + blocking_signed_json_task(signer, move || { + keystores::import(request, validator_dir, validator_store, runtime, log) + }) + }, + ); + + // DELETE /eth/v1/keystores + let delete_std_keystores = std_keystores + .and(warp::body::json()) + .and(signer) + .and(validator_store_filter) + .and(runtime_filter) + .and(log_filter) + .and_then(|request, signer, validator_store, runtime, log| { + blocking_signed_json_task(signer, move || { + keystores::delete(request, validator_store, runtime, log) + }) + }); + let routes = warp::any() .and(authorization_header_filter) // Note: it is critical that the `authorization_header_filter` is applied to all routes. @@ -508,15 +553,18 @@ pub fn serve( .or(get_lighthouse_health) .or(get_lighthouse_spec) .or(get_lighthouse_validators) - .or(get_lighthouse_validators_pubkey), + .or(get_lighthouse_validators_pubkey) + .or(get_std_keystores), ) .or(warp::post().and( post_validators .or(post_validators_keystore) .or(post_validators_mnemonic) - .or(post_validators_web3signer), + .or(post_validators_web3signer) + .or(post_std_keystores), )) - .or(warp::patch().and(patch_validators)), + .or(warp::patch().and(patch_validators)) + .or(warp::delete().and(delete_std_keystores)), ) // Maps errors into HTTP responses. .recover(warp_utils::reject::handle_rejection) @@ -550,7 +598,7 @@ pub async fn blocking_signed_json_task( ) -> Result where S: Fn(&[u8]) -> String, - F: Fn() -> Result + Send + 'static, + F: FnOnce() -> Result + Send + 'static, T: Serialize + Send + 'static, { warp_utils::task::blocking_task(func) diff --git a/validator_client/src/http_api/tests.rs b/validator_client/src/http_api/tests.rs index c9ef869be50..ab7dc637952 100644 --- a/validator_client/src/http_api/tests.rs +++ b/validator_client/src/http_api/tests.rs @@ -1,6 +1,8 @@ #![cfg(test)] #![cfg(not(debug_assertions))] +mod keystores; + use crate::doppelganger_service::DoppelgangerService; use crate::{ http_api::{ApiSecret, Config as HttpConfig, Context}, @@ -12,13 +14,13 @@ use account_utils::{ ZeroizeString, }; use deposit_contract::decode_eth1_tx_data; -use environment::null_logger; use eth2::{ lighthouse_vc::{http_client::ValidatorClientHttpClient, types::*}, types::ErrorMessage as ApiErrorMessage, Error as ApiError, }; use eth2_keystore::KeystoreBuilder; +use logging::test_logger; use parking_lot::RwLock; use sensitive_url::SensitiveUrl; use slashing_protection::{SlashingDatabase, SLASHING_PROTECTION_FILENAME}; @@ -40,6 +42,7 @@ type E = MainnetEthSpec; struct ApiTester { client: ValidatorClientHttpClient, initialized_validators: Arc>, + validator_store: Arc>, url: SensitiveUrl, _server_shutdown: oneshot::Sender<()>, _validator_dir: TempDir, @@ -58,7 +61,7 @@ fn build_runtime() -> Arc { impl ApiTester { pub async fn new(runtime: std::sync::Weak) -> Self { - let log = null_logger().unwrap(); + let log = test_logger(); let validator_dir = tempdir().unwrap(); let secrets_dir = tempdir().unwrap(); @@ -92,7 +95,7 @@ impl ApiTester { let (shutdown_tx, _) = futures::channel::mpsc::channel(1); let executor = TaskExecutor::new(runtime.clone(), exit, log.clone(), shutdown_tx); - let validator_store = ValidatorStore::<_, E>::new( + let validator_store = Arc::new(ValidatorStore::<_, E>::new( initialized_validators, slashing_protection, Hash256::repeat_byte(42), @@ -101,7 +104,7 @@ impl ApiTester { slot_clock, executor, log.clone(), - ); + )); validator_store .register_all_in_doppelganger_protection_if_enabled() @@ -113,7 +116,7 @@ impl ApiTester { runtime, api_secret, validator_dir: Some(validator_dir.path().into()), - validator_store: Some(Arc::new(validator_store)), + validator_store: Some(validator_store.clone()), spec: E::default_spec(), config: HttpConfig { enabled: true, @@ -144,11 +147,12 @@ impl ApiTester { let client = ValidatorClientHttpClient::new(url.clone(), api_pubkey).unwrap(); Self { - initialized_validators, - _validator_dir: validator_dir, client, + initialized_validators, + validator_store, url, _server_shutdown: shutdown_tx, + _validator_dir: validator_dir, _runtime_shutdown: runtime_shutdown, } } diff --git a/validator_client/src/http_api/tests/keystores.rs b/validator_client/src/http_api/tests/keystores.rs new file mode 100644 index 00000000000..f1e76c6db9d --- /dev/null +++ b/validator_client/src/http_api/tests/keystores.rs @@ -0,0 +1,409 @@ +use super::*; +use account_utils::random_password_string; +use eth2::lighthouse_vc::std_types::*; +use eth2_keystore::Keystore; +use itertools::Itertools; +use rand::{rngs::SmallRng, Rng, SeedableRng}; +use std::collections::HashMap; + +fn new_keystore(password: ZeroizeString) -> Keystore { + let keypair = Keypair::random(); + KeystoreBuilder::new(&keypair, password.as_ref(), String::new()) + .unwrap() + .build() + .unwrap() +} + +fn run_test(f: F) +where + F: FnOnce(ApiTester) -> V, + V: Future, +{ + let runtime = build_runtime(); + let weak_runtime = Arc::downgrade(&runtime); + runtime.block_on(async { + let tester = ApiTester::new(weak_runtime).await; + f(tester).await + }); +} + +fn keystore_pubkey(keystore: &Keystore) -> PublicKeyBytes { + keystore.public_key().unwrap().compress() +} + +fn all_imported(keystores: &[Keystore]) -> impl Iterator + '_ { + keystores.iter().map(|_| ImportKeystoreStatus::Imported) +} + +fn all_duplicate(keystores: &[Keystore]) -> impl Iterator + '_ { + keystores.iter().map(|_| ImportKeystoreStatus::Duplicate) +} + +fn all_deleted(keystores: &[Keystore]) -> impl Iterator + '_ { + keystores.iter().map(|_| DeleteKeystoreStatus::Deleted) +} + +fn check_get_response<'a>( + response: &ListKeystoresResponse, + expected_keystores: impl IntoIterator, +) { + for (ks1, ks2) in response.keystores.iter().zip_eq(expected_keystores) { + assert_eq!(ks1.validating_pubkey, keystore_pubkey(ks2)); + assert_eq!(ks1.derivation_path, ks2.path()); + } +} + +fn check_import_response<'a>( + response: &ImportKeystoresResponse, + expected_statuses: impl IntoIterator, +) { + for (status, expected_status) in response.statuses.iter().zip_eq(expected_statuses) { + assert_eq!( + status.status, expected_status, + "message: {:?}", + status.message + ); + } +} + +fn check_delete_response<'a>( + response: &DeleteKeystoresResponse, + expected_statuses: impl IntoIterator, +) { + for (status, expected_status) in response.statuses.iter().zip_eq(expected_statuses) { + assert_eq!( + status.status, expected_status, + "message: {:?}", + status.message + ); + } +} + +#[test] +fn get_empty_keystores() { + run_test(|tester| async move { + let res = tester.client.get_keystores().await.unwrap(); + assert_eq!(res, ListKeystoresResponse { keystores: vec![] }); + }) +} + +#[test] +fn import_new_keystores() { + run_test(|tester| async move { + let password = random_password_string(); + let keystores = (0..3) + .map(|_| new_keystore(password.clone())) + .collect::>(); + + let import_res = tester + .client + .post_keystores(&ImportKeystoresRequest { + keystores: keystores.clone(), + keystores_password: password, + slashing_protection: None, + }) + .await + .unwrap(); + + // All keystores should be imported. + check_import_response(&import_res, all_imported(&keystores)); + + // Check that GET lists all the imported keystores. + let get_res = tester.client.get_keystores().await.unwrap(); + check_get_response(&get_res, &keystores); + }) +} + +#[test] +fn import_only_duplicate_keystores() { + run_test(|tester| async move { + let password = random_password_string(); + let keystores = (0..3) + .map(|_| new_keystore(password.clone())) + .collect::>(); + + let req = ImportKeystoresRequest { + keystores: keystores.clone(), + keystores_password: password, + slashing_protection: None, + }; + + // All keystores should be imported on first import. + let import_res = tester.client.post_keystores(&req).await.unwrap(); + check_import_response(&import_res, all_imported(&keystores)); + + // No keystores should be imported on repeat import. + let import_res = tester.client.post_keystores(&req).await.unwrap(); + check_import_response(&import_res, all_duplicate(&keystores)); + + // Check that GET lists all the imported keystores. + let get_res = tester.client.get_keystores().await.unwrap(); + check_get_response(&get_res, &keystores); + }) +} + +#[test] +fn import_some_duplicate_keystores() { + run_test(|tester| async move { + let password = random_password_string(); + let num_keystores = 5; + let keystores_all = (0..num_keystores) + .map(|_| new_keystore(password.clone())) + .collect::>(); + + // Import even numbered keystores first. + let keystores1 = keystores_all + .iter() + .enumerate() + .filter_map(|(i, keystore)| { + if i % 2 == 0 { + Some(keystore.clone()) + } else { + None + } + }) + .collect::>(); + + let req1 = ImportKeystoresRequest { + keystores: keystores1.clone(), + keystores_password: password.clone(), + slashing_protection: None, + }; + + let req2 = ImportKeystoresRequest { + keystores: keystores_all.clone(), + keystores_password: password, + slashing_protection: None, + }; + + let import_res = tester.client.post_keystores(&req1).await.unwrap(); + check_import_response(&import_res, all_imported(&keystores1)); + + // Check partial import. + let expected = (0..num_keystores).map(|i| { + if i % 2 == 0 { + ImportKeystoreStatus::Duplicate + } else { + ImportKeystoreStatus::Imported + } + }); + let import_res = tester.client.post_keystores(&req2).await.unwrap(); + check_import_response(&import_res, expected); + }) +} + +// FIXME(sproul): finish these test stubs +// FIXME(sproul): add test involving web3signer validators +#[test] +fn import_keystores_wrong_password_all() {} + +#[test] +fn import_keystores_wrong_password_some() {} + +#[test] +fn import_keystores_full_slashing_protection() {} + +#[test] +fn import_keystores_partial_slashing_protection() {} + +#[test] +fn delete_some_keystores() {} + +#[test] +fn delete_all_keystores() {} + +#[test] +fn delete_same_keystores_twice() {} + +#[test] +fn delete_some_keystores_twice() {} + +#[test] +fn delete_nonexistent_keystores() {} + +#[test] +fn delete_some_nonexistent_keystores() {} + +fn make_attestation(source_epoch: u64, target_epoch: u64) -> Attestation { + Attestation { + aggregation_bits: BitList::with_capacity( + ::MaxValidatorsPerCommittee::to_usize(), + ) + .unwrap(), + data: AttestationData { + source: Checkpoint { + epoch: Epoch::new(source_epoch), + root: Hash256::from_low_u64_le(source_epoch), + }, + target: Checkpoint { + epoch: Epoch::new(target_epoch), + root: Hash256::from_low_u64_le(target_epoch), + }, + ..AttestationData::default() + }, + signature: AggregateSignature::empty(), + } +} + +#[test] +fn delete_concurrent_with_signing() { + let runtime = build_runtime(); + let num_keys = 8; + let num_signing_threads = 8; + let num_attestations = 100; + let num_delete_threads = 8; + let num_delete_attempts = 100; + let delete_prob = 0.01; + + assert!( + num_keys % num_signing_threads == 0, + "num_keys should be divisible by num threads for simplicity" + ); + + let weak_runtime = Arc::downgrade(&runtime); + runtime.block_on(async { + let tester = ApiTester::new(weak_runtime).await; + + // Generate a lot of keys and import them. + let password = random_password_string(); + let keystores = (0..num_keys) + .map(|_| new_keystore(password.clone())) + .collect::>(); + let all_pubkeys = keystores.iter().map(keystore_pubkey).collect::>(); + + let import_res = tester + .client + .post_keystores(&ImportKeystoresRequest { + keystores: keystores.clone(), + keystores_password: password, + slashing_protection: None, + }) + .await + .unwrap(); + check_import_response(&import_res, all_imported(&keystores)); + + // Start several threads signing attestations at sequential epochs. + let mut join_handles = vec![]; + + for thread_index in 0..num_signing_threads { + let keys_per_thread = num_keys / num_signing_threads; + let validator_store = tester.validator_store.clone(); + let thread_pubkeys = all_pubkeys + [thread_index * keys_per_thread..(thread_index + 1) * keys_per_thread] + .to_vec(); + + let handle = runtime.spawn(async move { + for j in 0..num_attestations { + let mut att = make_attestation(j, j + 1); + for (validator_id, public_key) in thread_pubkeys.iter().enumerate() { + let res = validator_store + .sign_attestation(*public_key, 0, &mut att, Epoch::new(j + 1)) + .await; + + println!( + "attestation {}->{} for validator t{}/v{}: {:?}", + j, + j + 1, + thread_index, + validator_id, + res + ); + } + } + }); + join_handles.push(handle); + } + + // Concurrently, delete each validator one at a time. Store the slashing protection + // data so we can ensure it doesn't change after a key is exported. + let mut delete_handles = vec![]; + for _ in 0..num_delete_threads { + let client = tester.client.clone(); + let all_pubkeys = all_pubkeys.clone(); + + let handle = runtime.spawn(async move { + // let mut rng = SmallRng::from_seed([42; 16]); + let mut rng = SmallRng::from_entropy(); + + let mut slashing_protection = vec![]; + for _ in 0..num_delete_attempts { + let to_delete = all_pubkeys + .iter() + .filter(|_| rng.gen_bool(delete_prob)) + .copied() + .collect::>(); + let delete_res = client + .delete_keystores(&DeleteKeystoresRequest { pubkeys: to_delete }) + .await + .unwrap(); + + for status in delete_res.statuses.iter() { + assert_ne!(status.status, DeleteKeystoreStatus::Error); + } + + slashing_protection.push(delete_res.slashing_protection); + } + slashing_protection + }); + + delete_handles.push(handle); + } + + // Collect slashing protection. + let mut slashing_protection_map = HashMap::new(); + let collected_slashing_protection = futures::future::join_all(delete_handles).await; + + for interchange in collected_slashing_protection + .into_iter() + .map(Result::unwrap) + .flatten() + { + for validator_data in interchange.data { + slashing_protection_map + .entry(validator_data.pubkey) + .and_modify(|existing| { + assert_eq!( + *existing, validator_data, + "slashing protection data changed after first export" + ) + }) + .or_insert(validator_data); + } + } + + futures::future::join_all(join_handles).await + }); +} + +#[test] +fn delete_then_reimport() { + run_test(|tester| async move { + let password = random_password_string(); + let keystores = (0..2) + .map(|_| new_keystore(password.clone())) + .collect::>(); + + // 1. Import all keystores. + let import_req = ImportKeystoresRequest { + keystores: keystores.clone(), + keystores_password: password, + slashing_protection: None, + }; + let import_res = tester.client.post_keystores(&import_req).await.unwrap(); + check_import_response(&import_res, all_imported(&keystores)); + + // 2. Delete all. + let delete_res = tester + .client + .delete_keystores(&DeleteKeystoresRequest { + pubkeys: keystores.iter().map(keystore_pubkey).collect(), + }) + .await + .unwrap(); + check_delete_response(&delete_res, all_deleted(&keystores)); + + // 3. Re-import + let import_res = tester.client.post_keystores(&import_req).await.unwrap(); + check_import_response(&import_res, all_imported(&keystores)); + }) +} diff --git a/validator_client/src/initialized_validators.rs b/validator_client/src/initialized_validators.rs index 57585e2672f..b6521195272 100644 --- a/validator_client/src/initialized_validators.rs +++ b/validator_client/src/initialized_validators.rs @@ -14,19 +14,21 @@ use account_utils::{ }, ZeroizeString, }; +use eth2::lighthouse_vc::std_types::DeleteKeystoreStatus; use eth2_keystore::Keystore; use lighthouse_metrics::set_gauge; use lockfile::{Lockfile, LockfileError}; use reqwest::{Certificate, Client, Error as ReqwestError}; use slog::{debug, error, info, warn, Logger}; use std::collections::{HashMap, HashSet}; -use std::fs::File; +use std::fs::{self, File}; use std::io::{self, Read}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; use types::{Graffiti, Keypair, PublicKey, PublicKeyBytes}; use url::{ParseError, Url}; +use validator_dir::Builder as ValidatorDirBuilder; use crate::key_cache; use crate::key_cache::KeyCache; @@ -67,6 +69,10 @@ pub enum Error { UnableToSaveDefinitions(validator_definitions::Error), /// It is not legal to try and initialize a disabled validator definition. UnableToInitializeDisabledValidator, + /// There was an error while deleting a keystore file. + UnableToDeleteKeystore(PathBuf, io::Error), + /// There was an error while deleting a validator dir. + UnableToDeleteValidatorDir(PathBuf, io::Error), /// There was an error reading from stdin. UnableToReadPasswordFromUser(String), /// There was an error running a tokio async task. @@ -384,6 +390,20 @@ impl InitializedValidators { .map(|v| v.signing_method.clone()) } + pub async fn add_definition_replace_disabled( + &mut self, + def: ValidatorDefinition, + ) -> Result<(), Error> { + // Drop any disabled definitions with the same public key. + let delete_def = |existing_def: &ValidatorDefinition| { + !existing_def.enabled && existing_def.voting_public_key == def.voting_public_key + }; + self.definitions.retain(|def| !delete_def(def)); + + // Add the definition. + self.add_definition(def).await + } + /// Add a validator definition to `self`, overwriting the on-disk representation of `self`. pub async fn add_definition(&mut self, def: ValidatorDefinition) -> Result<(), Error> { if self @@ -406,6 +426,74 @@ impl InitializedValidators { Ok(()) } + pub async fn delete_definition_and_keystore( + &mut self, + pubkey: &PublicKey, + ) -> Result { + // 1. Disable the validator definition. + // + // We disable before removing so that in case of a crash the auto-discovery mechanism + // won't re-activate the keystore. + if let Some(def) = self.definitions.as_mut_slice().iter_mut().find(|def| { + &def.voting_public_key == pubkey && def.signing_definition.is_local_keystore() + }) { + def.enabled = false; + self.definitions + .save(&self.validators_dir) + .map_err(Error::UnableToSaveDefinitions)?; + } else { + return Ok(DeleteKeystoreStatus::NotFound); + } + + // 2. Delete from `self.validators`, which holds the signing method. + // Delete the keystore files. + if let Some(initialized_validator) = self.validators.remove(&pubkey.compress()) { + if let SigningMethod::LocalKeystore { + ref voting_keystore_path, + ref voting_keystore, + .. + } = *initialized_validator.signing_method + { + self.delete_keystore_or_validator_dir(voting_keystore_path, voting_keystore)?; + } + } + + // FIXME(sproul): remove key cache related warnings/optimise this? + self.update_validators().await?; + + // 3. Delete from validator definitions entirely. + self.definitions + .retain(|def| &def.voting_public_key != pubkey); + self.definitions + .save(&self.validators_dir) + .map_err(Error::UnableToSaveDefinitions)?; + + Ok(DeleteKeystoreStatus::Deleted) + } + + fn delete_keystore_or_validator_dir( + &self, + voting_keystore_path: &Path, + voting_keystore: &Keystore, + ) -> Result<(), Error> { + // If the parent directory is a `ValidatorDir` within `self.validators_dir`, then + // delete the entire directory so that it may be recreated if the keystore is + // re-imported. + if let Some(validator_dir) = voting_keystore_path.parent() { + if validator_dir + == ValidatorDirBuilder::get_dir_path(&self.validators_dir, voting_keystore) + { + fs::remove_dir_all(validator_dir) + .map_err(|e| Error::UnableToDeleteValidatorDir(validator_dir.into(), e))?; + return Ok(()); + } + } + // Otherwise just delete the keystore file. + fs::remove_file(voting_keystore_path) + .map_err(|e| Error::UnableToDeleteKeystore(voting_keystore_path.into(), e))?; + Ok(()) + } + /// Returns a slice of all defined validators (regardless of their enabled state). pub fn validator_definitions(&self) -> &[ValidatorDefinition] { self.definitions.as_slice() @@ -469,7 +557,7 @@ impl InitializedValidators { ) -> Result { //read relevant key_stores let mut definitions_map = HashMap::new(); - for def in self.definitions.as_slice() { + for def in self.definitions.as_slice().iter().filter(|def| def.enabled) { match &def.signing_definition { SigningDefinition::LocalKeystore { voting_keystore_path, diff --git a/validator_client/src/validator_store.rs b/validator_client/src/validator_store.rs index d7efa806aef..ed2ff74ba7c 100644 --- a/validator_client/src/validator_store.rs +++ b/validator_client/src/validator_store.rs @@ -6,7 +6,9 @@ use crate::{ }; use account_utils::{validator_definitions::ValidatorDefinition, ZeroizeString}; use parking_lot::{Mutex, RwLock}; -use slashing_protection::{NotSafe, Safe, SlashingDatabase}; +use slashing_protection::{ + interchange::Interchange, InterchangeError, NotSafe, Safe, SlashingDatabase, +}; use slog::{crit, error, info, warn, Logger}; use slot_clock::SlotClock; use std::iter::FromIterator; @@ -183,7 +185,7 @@ impl ValidatorStore { self.validators .write() - .add_definition(validator_def.clone()) + .add_definition_replace_disabled(validator_def.clone()) .await .map_err(|e| format!("Unable to add definition: {:?}", e))?; @@ -693,6 +695,36 @@ impl ValidatorStore { Ok(SignedContributionAndProof { message, signature }) } + pub fn import_slashing_protection( + &self, + interchange: Interchange, + ) -> Result<(), InterchangeError> { + self.slashing_protection + .import_interchange_info(interchange, self.genesis_validators_root)?; + Ok(()) + } + + /// Export slashing protection data while also disabling the given keys in the database. + pub fn export_slashing_protection_for_keys( + &self, + pubkeys: &[PublicKeyBytes], + ) -> Result { + self.slashing_protection.with_transaction(|txn| { + for pubkey in pubkeys { + let validator_id = self + .slashing_protection + .get_validator_id_ignoring_status(txn, pubkey)?; + self.slashing_protection + .update_validator_status(txn, validator_id, false)?; + } + self.slashing_protection.export_interchange_info_in_txn( + self.genesis_validators_root, + Some(pubkeys), + txn, + ) + }) + } + /// Prune the slashing protection database so that it remains performant. /// /// This function will only do actual pruning periodically, so it should usually be