From d84e3e391e336a5819b8734a3cf285f8821204a7 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 4 Jul 2024 14:27:41 +1000 Subject: [PATCH] Store pubkey cache decompressed on disk (#5897) * Support uncompressed keys in crypto/bls * Use uncompressed keys in cache * Implement DB upgrade * Implement downgrade * More logging on v20 upgrade * Revert "More logging on v20 upgrade" This reverts commit cc5789b9d36bdebe30206c656d7bc6b788089146. * Merge remote-tracking branch 'origin/unstable' into uncompressed-pubkeys * Add a little more logging * Merge remote-tracking branch 'origin/unstable' into uncompressed-pubkeys --- beacon_node/beacon_chain/src/schema_change.rs | 9 ++ .../src/schema_change/migration_schema_v20.rs | 4 + .../src/schema_change/migration_schema_v21.rs | 83 +++++++++++++++++++ .../src/validator_pubkey_cache.rs | 58 ++++++++----- beacon_node/store/src/metadata.rs | 2 +- crypto/bls/src/generic_public_key.rs | 24 ++++++ crypto/bls/src/impls/blst.rs | 23 ++++- crypto/bls/src/impls/fake_crypto.rs | 12 ++- crypto/bls/src/lib.rs | 4 +- crypto/bls/tests/tests.rs | 5 ++ 10 files changed, 199 insertions(+), 25 deletions(-) create mode 100644 beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs diff --git a/beacon_node/beacon_chain/src/schema_change.rs b/beacon_node/beacon_chain/src/schema_change.rs index 06d189a8c01..3fe75e348ce 100644 --- a/beacon_node/beacon_chain/src/schema_change.rs +++ b/beacon_node/beacon_chain/src/schema_change.rs @@ -3,6 +3,7 @@ mod migration_schema_v17; mod migration_schema_v18; mod migration_schema_v19; mod migration_schema_v20; +mod migration_schema_v21; use crate::beacon_chain::BeaconChainTypes; use crate::types::ChainSpec; @@ -87,6 +88,14 @@ pub fn migrate_schema( let ops = migration_schema_v20::downgrade_from_v20::(db.clone(), log)?; db.store_schema_version_atomically(to, ops) } + (SchemaVersion(20), SchemaVersion(21)) => { + let ops = migration_schema_v21::upgrade_to_v21::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } + (SchemaVersion(21), SchemaVersion(20)) => { + let ops = migration_schema_v21::downgrade_from_v21::(db.clone(), log)?; + db.store_schema_version_atomically(to, ops) + } // Anything else is an error. (_, _) => Err(HotColdDBError::UnsupportedSchemaVersion { target_version: to, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs index 737fcd0a935..d556d5988d6 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v20.rs @@ -11,6 +11,8 @@ pub fn upgrade_to_v20( db: Arc>, log: Logger, ) -> Result, Error> { + info!(log, "Upgrading from v19 to v20"); + // Load a V15 op pool and transform it to V20. let Some(PersistedOperationPoolV15:: { attestations_v15, @@ -52,6 +54,8 @@ pub fn downgrade_from_v20( db: Arc>, log: Logger, ) -> Result, Error> { + info!(log, "Downgrading from v20 to v19"); + // Load a V20 op pool and transform it to V15. let Some(PersistedOperationPoolV20:: { attestations, diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs new file mode 100644 index 00000000000..4042d328207 --- /dev/null +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs @@ -0,0 +1,83 @@ +use crate::beacon_chain::BeaconChainTypes; +use crate::validator_pubkey_cache::DatabasePubkey; +use slog::{info, Logger}; +use ssz::{Decode, Encode}; +use std::sync::Arc; +use store::{ + get_key_for_col, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, +}; +use types::{Hash256, PublicKey}; + +const LOG_EVERY: usize = 200_000; + +pub fn upgrade_to_v21( + db: Arc>, + log: Logger, +) -> Result, Error> { + info!(log, "Upgrading from v20 to v21"); + + let mut ops = vec![]; + + // Iterate through all pubkeys and decompress them. + for (i, res) in db + .hot_db + .iter_column::(DBColumn::PubkeyCache) + .enumerate() + { + let (key, value) = res?; + let pubkey = PublicKey::from_ssz_bytes(&value)?; + let decompressed = DatabasePubkey::from_pubkey(&pubkey); + ops.push(decompressed.as_kv_store_op(key)); + + if i > 0 && i % LOG_EVERY == 0 { + info!( + log, + "Public key decompression in progress"; + "keys_decompressed" => i + ); + } + } + info!(log, "Public key decompression complete"); + + Ok(ops) +} + +pub fn downgrade_from_v21( + db: Arc>, + log: Logger, +) -> Result, Error> { + info!(log, "Downgrading from v21 to v20"); + + let mut ops = vec![]; + + // Iterate through all pubkeys and recompress them. + for (i, res) in db + .hot_db + .iter_column::(DBColumn::PubkeyCache) + .enumerate() + { + let (key, value) = res?; + let decompressed = DatabasePubkey::from_ssz_bytes(&value)?; + let (_, pubkey_bytes) = decompressed.as_pubkey().map_err(|e| Error::DBError { + message: format!("{e:?}"), + })?; + + let db_key = get_key_for_col(DBColumn::PubkeyCache.into(), key.as_bytes()); + ops.push(KeyValueStoreOp::PutKeyValue( + db_key, + pubkey_bytes.as_ssz_bytes(), + )); + + if i > 0 && i % LOG_EVERY == 0 { + info!( + log, + "Public key compression in progress"; + "keys_compressed" => i + ); + } + } + + info!(log, "Public key compression complete"); + + Ok(ops) +} diff --git a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs index e1b50706286..576fbf0fd1f 100644 --- a/beacon_node/beacon_chain/src/validator_pubkey_cache.rs +++ b/beacon_node/beacon_chain/src/validator_pubkey_cache.rs @@ -1,6 +1,9 @@ use crate::errors::BeaconChainError; use crate::{BeaconChainTypes, BeaconStore}; +use bls::PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN; +use smallvec::SmallVec; use ssz::{Decode, Encode}; +use ssz_derive::{Decode, Encode}; use std::collections::HashMap; use std::marker::PhantomData; use store::{DBColumn, Error as StoreError, StoreItem, StoreOp}; @@ -49,14 +52,13 @@ impl ValidatorPubkeyCache { let mut pubkey_bytes = vec![]; for validator_index in 0.. { - if let Some(DatabasePubkey(pubkey)) = + if let Some(db_pubkey) = store.get_item(&DatabasePubkey::key_for_index(validator_index))? { - pubkeys.push((&pubkey).try_into().map_err(|e| { - BeaconChainError::ValidatorPubkeyCacheError(format!("{:?}", e)) - })?); - pubkey_bytes.push(pubkey); - indices.insert(pubkey, validator_index); + let (pk, pk_bytes) = DatabasePubkey::as_pubkey(&db_pubkey)?; + pubkeys.push(pk); + indices.insert(pk_bytes, validator_index); + pubkey_bytes.push(pk_bytes); } else { break; } @@ -104,29 +106,29 @@ impl ValidatorPubkeyCache { self.indices.reserve(validator_keys.len()); let mut store_ops = Vec::with_capacity(validator_keys.len()); - for pubkey in validator_keys { + for pubkey_bytes in validator_keys { let i = self.pubkeys.len(); - if self.indices.contains_key(&pubkey) { + if self.indices.contains_key(&pubkey_bytes) { return Err(BeaconChainError::DuplicateValidatorPublicKey); } + let pubkey = (&pubkey_bytes) + .try_into() + .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; + // Stage the new validator key for writing to disk. // It will be committed atomically when the block that introduced it is written to disk. // Notably it is NOT written while the write lock on the cache is held. // See: https://github.com/sigp/lighthouse/issues/2327 store_ops.push(StoreOp::KeyValueOp( - DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)), + DatabasePubkey::from_pubkey(&pubkey) + .as_kv_store_op(DatabasePubkey::key_for_index(i)), )); - self.pubkeys.push( - (&pubkey) - .try_into() - .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?, - ); - self.pubkey_bytes.push(pubkey); - - self.indices.insert(pubkey, i); + self.pubkeys.push(pubkey); + self.pubkey_bytes.push(pubkey_bytes); + self.indices.insert(pubkey_bytes, i); } Ok(store_ops) @@ -166,7 +168,10 @@ impl ValidatorPubkeyCache { /// Wrapper for a public key stored in the database. /// /// Keyed by the validator index as `Hash256::from_low_u64_be(index)`. -struct DatabasePubkey(PublicKeyBytes); +#[derive(Encode, Decode)] +pub struct DatabasePubkey { + pubkey: SmallVec<[u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN]>, +} impl StoreItem for DatabasePubkey { fn db_column() -> DBColumn { @@ -174,11 +179,11 @@ impl StoreItem for DatabasePubkey { } fn as_store_bytes(&self) -> Vec { - self.0.as_ssz_bytes() + self.as_ssz_bytes() } fn from_store_bytes(bytes: &[u8]) -> Result { - Ok(Self(PublicKeyBytes::from_ssz_bytes(bytes)?)) + Ok(Self::from_ssz_bytes(bytes)?) } } @@ -186,6 +191,19 @@ impl DatabasePubkey { fn key_for_index(index: usize) -> Hash256 { Hash256::from_low_u64_be(index as u64) } + + pub fn from_pubkey(pubkey: &PublicKey) -> Self { + Self { + pubkey: pubkey.serialize_uncompressed().into(), + } + } + + pub fn as_pubkey(&self) -> Result<(PublicKey, PublicKeyBytes), BeaconChainError> { + let pubkey = PublicKey::deserialize_uncompressed(&self.pubkey) + .map_err(BeaconChainError::InvalidValidatorPubkeyBytes)?; + let pubkey_bytes = pubkey.compress(); + Ok((pubkey, pubkey_bytes)) + } } #[cfg(test)] diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 116926ad3f1..a22dc4aab4c 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -4,7 +4,7 @@ use ssz::{Decode, Encode}; use ssz_derive::{Decode, Encode}; use types::{Checkpoint, Hash256, Slot}; -pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(20); +pub const CURRENT_SCHEMA_VERSION: SchemaVersion = SchemaVersion(21); // All the keys that get stored under the `BeaconMeta` column. // diff --git a/crypto/bls/src/generic_public_key.rs b/crypto/bls/src/generic_public_key.rs index 462e4cb2cb0..80b42dfa714 100644 --- a/crypto/bls/src/generic_public_key.rs +++ b/crypto/bls/src/generic_public_key.rs @@ -11,6 +11,9 @@ use tree_hash::TreeHash; /// The byte-length of a BLS public key when serialized in compressed form. pub const PUBLIC_KEY_BYTES_LEN: usize = 48; +/// The byte-length of a BLS public key when serialized in uncompressed form. +pub const PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN: usize = 96; + /// Represents the public key at infinity. pub const INFINITY_PUBLIC_KEY: [u8; PUBLIC_KEY_BYTES_LEN] = [ 0xc0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -23,8 +26,17 @@ pub trait TPublicKey: Sized + Clone { /// Serialize `self` as compressed bytes. fn serialize(&self) -> [u8; PUBLIC_KEY_BYTES_LEN]; + /// Serialize `self` as uncompressed bytes. + fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN]; + /// Deserialize `self` from compressed bytes. fn deserialize(bytes: &[u8]) -> Result; + + /// Deserialize `self` from uncompressed bytes. + /// + /// This function *does not* perform thorough checks of the input bytes and should only be + /// used with bytes output from `Self::serialize_uncompressed`. + fn deserialize_uncompressed(bytes: &[u8]) -> Result; } /// A BLS public key that is generic across some BLS point (`Pub`). @@ -65,6 +77,11 @@ where self.point.serialize() } + /// Serialize `self` as uncompressed bytes. + pub fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN] { + self.point.serialize_uncompressed() + } + /// Deserialize `self` from compressed bytes. pub fn deserialize(bytes: &[u8]) -> Result { if bytes == &INFINITY_PUBLIC_KEY[..] { @@ -75,6 +92,13 @@ where }) } } + + /// Deserialize `self` from compressed bytes. + pub fn deserialize_uncompressed(bytes: &[u8]) -> Result { + Ok(Self { + point: Pub::deserialize_uncompressed(bytes)?, + }) + } } impl Eq for GenericPublicKey {} diff --git a/crypto/bls/src/impls/blst.rs b/crypto/bls/src/impls/blst.rs index 0049d79cc55..54c7ad2944e 100644 --- a/crypto/bls/src/impls/blst.rs +++ b/crypto/bls/src/impls/blst.rs @@ -1,10 +1,12 @@ use crate::{ generic_aggregate_public_key::TAggregatePublicKey, generic_aggregate_signature::TAggregateSignature, - generic_public_key::{GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN}, + generic_public_key::{ + GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN, PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN, + }, generic_secret_key::TSecretKey, generic_signature::{TSignature, SIGNATURE_BYTES_LEN}, - Error, Hash256, ZeroizeHash, INFINITY_SIGNATURE, + BlstError, Error, Hash256, ZeroizeHash, INFINITY_SIGNATURE, }; pub use blst::min_pk as blst_core; use blst::{blst_scalar, BLST_ERROR}; @@ -121,6 +123,10 @@ impl TPublicKey for blst_core::PublicKey { self.compress() } + fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN] { + blst_core::PublicKey::serialize(self) + } + fn deserialize(bytes: &[u8]) -> Result { // key_validate accepts uncompressed bytes too so enforce byte length here. // It also does subgroup checks, noting infinity check is done in `generic_public_key.rs`. @@ -132,6 +138,19 @@ impl TPublicKey for blst_core::PublicKey { } Self::key_validate(bytes).map_err(Into::into) } + + fn deserialize_uncompressed(bytes: &[u8]) -> Result { + if bytes.len() != PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN { + return Err(Error::InvalidByteLength { + got: bytes.len(), + expected: PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN, + }); + } + // Ensure we use the `blst` function rather than the one from this trait. + let result: Result = Self::deserialize(bytes); + let key = result?; + Ok(key) + } } /// A wrapper that allows for `PartialEq` and `Clone` impls. diff --git a/crypto/bls/src/impls/fake_crypto.rs b/crypto/bls/src/impls/fake_crypto.rs index f2d8b79b986..a09fb347e6b 100644 --- a/crypto/bls/src/impls/fake_crypto.rs +++ b/crypto/bls/src/impls/fake_crypto.rs @@ -1,7 +1,9 @@ use crate::{ generic_aggregate_public_key::TAggregatePublicKey, generic_aggregate_signature::TAggregateSignature, - generic_public_key::{GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN}, + generic_public_key::{ + GenericPublicKey, TPublicKey, PUBLIC_KEY_BYTES_LEN, PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN, + }, generic_secret_key::{TSecretKey, SECRET_KEY_BYTES_LEN}, generic_signature::{TSignature, SIGNATURE_BYTES_LEN}, Error, Hash256, ZeroizeHash, INFINITY_PUBLIC_KEY, INFINITY_SIGNATURE, @@ -46,11 +48,19 @@ impl TPublicKey for PublicKey { self.0 } + fn serialize_uncompressed(&self) -> [u8; PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN] { + panic!("fake_crypto does not support uncompressed keys") + } + fn deserialize(bytes: &[u8]) -> Result { let mut pubkey = Self::infinity(); pubkey.0[..].copy_from_slice(&bytes[0..PUBLIC_KEY_BYTES_LEN]); Ok(pubkey) } + + fn deserialize_uncompressed(_: &[u8]) -> Result { + panic!("fake_crypto does not support uncompressed keys") + } } impl Eq for PublicKey {} diff --git a/crypto/bls/src/lib.rs b/crypto/bls/src/lib.rs index fef9804b784..af269b943d7 100644 --- a/crypto/bls/src/lib.rs +++ b/crypto/bls/src/lib.rs @@ -33,7 +33,9 @@ mod zeroize_hash; pub mod impls; -pub use generic_public_key::{INFINITY_PUBLIC_KEY, PUBLIC_KEY_BYTES_LEN}; +pub use generic_public_key::{ + INFINITY_PUBLIC_KEY, PUBLIC_KEY_BYTES_LEN, PUBLIC_KEY_UNCOMPRESSED_BYTES_LEN, +}; pub use generic_secret_key::SECRET_KEY_BYTES_LEN; pub use generic_signature::{INFINITY_SIGNATURE, SIGNATURE_BYTES_LEN}; pub use get_withdrawal_credentials::get_withdrawal_credentials; diff --git a/crypto/bls/tests/tests.rs b/crypto/bls/tests/tests.rs index 478c1b7dc26..dac2e97f407 100644 --- a/crypto/bls/tests/tests.rs +++ b/crypto/bls/tests/tests.rs @@ -341,6 +341,11 @@ macro_rules! test_suite { .assert_single_message_verify(true) } + #[test] + fn deserialize_infinity_public_key() { + PublicKey::deserialize(&bls::INFINITY_PUBLIC_KEY).unwrap_err(); + } + /// A helper struct to make it easer to deal with `SignatureSet` lifetimes. struct OwnedSignatureSet { signature: AggregateSignature,