Skip to content

Commit

Permalink
Merge branch 'unstable' of https://github.com/sigp/lighthouse into re…
Browse files Browse the repository at this point in the history
…alized-unrealized-experimentation
  • Loading branch information
realbigsean committed Jul 12, 2022
2 parents e939bd4 + 4212f22 commit 828d5bc
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 75 deletions.
2 changes: 0 additions & 2 deletions beacon_node/execution_layer/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ impl From<builder_client::Error> for Error {
}
}

pub struct EngineApi;

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum PayloadStatusV1Status {
Valid,
Expand Down
14 changes: 5 additions & 9 deletions beacon_node/execution_layer/src/engine_api/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use reqwest::header::CONTENT_TYPE;
use sensitive_url::SensitiveUrl;
use serde::de::DeserializeOwned;
use serde_json::json;
use std::marker::PhantomData;

use std::time::Duration;
use types::EthSpec;
Expand Down Expand Up @@ -169,7 +168,7 @@ pub mod deposit_log {
/// state of the deposit contract.
pub mod deposit_methods {
use super::Log;
use crate::{EngineApi, HttpJsonRpc};
use crate::HttpJsonRpc;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::fmt;
Expand Down Expand Up @@ -298,7 +297,7 @@ pub mod deposit_methods {
}
}

impl HttpJsonRpc<EngineApi> {
impl HttpJsonRpc {
/// Get the eth1 chain id of the given endpoint.
pub async fn get_chain_id(&self, timeout: Duration) -> Result<Eth1Id, String> {
let chain_id: String = self
Expand Down Expand Up @@ -517,20 +516,18 @@ pub mod deposit_methods {
}
}

pub struct HttpJsonRpc<T = EngineApi> {
pub struct HttpJsonRpc {
pub client: Client,
pub url: SensitiveUrl,
auth: Option<Auth>,
_phantom: PhantomData<T>,
}

impl<T> HttpJsonRpc<T> {
impl HttpJsonRpc {
pub fn new(url: SensitiveUrl) -> Result<Self, Error> {
Ok(Self {
client: Client::builder().build()?,
url,
auth: None,
_phantom: PhantomData,
})
}

Expand All @@ -539,7 +536,6 @@ impl<T> HttpJsonRpc<T> {
client: Client::builder().build()?,
url,
auth: Some(auth),
_phantom: PhantomData,
})
}

Expand Down Expand Up @@ -592,7 +588,7 @@ impl std::fmt::Display for HttpJsonRpc {
}
}

impl HttpJsonRpc<EngineApi> {
impl HttpJsonRpc {
pub async fn upcheck(&self) -> Result<(), Error> {
let result: serde_json::Value = self
.rpc_request(ETH_SYNCING, json!([]), ETH_SYNCING_TIMEOUT)
Expand Down
76 changes: 31 additions & 45 deletions beacon_node/execution_layer/src/engines.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Provides generic behaviour for multiple execution engines, specifically fallback behaviour.
use crate::engine_api::{
EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId,
};
use crate::HttpJsonRpc;
use lru::LruCache;
Expand Down Expand Up @@ -55,20 +55,32 @@ struct PayloadIdCacheKey {
pub suggested_fee_recipient: Address,
}

#[derive(Debug)]
pub enum EngineError {
Offline,
Api { error: EngineApiError },
BuilderApi { error: EngineApiError },
Auth,
}

/// An execution engine.
pub struct Engine<T> {
pub api: HttpJsonRpc<T>,
pub struct Engine {
pub api: HttpJsonRpc,
payload_id_cache: Mutex<LruCache<PayloadIdCacheKey, PayloadId>>,
state: RwLock<EngineState>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}

impl<T> Engine<T> {
impl Engine {
/// Creates a new, offline engine.
pub fn new(api: HttpJsonRpc<T>) -> Self {
pub fn new(api: HttpJsonRpc, log: &Logger) -> Self {
Self {
api,
payload_id_cache: Mutex::new(LruCache::new(PAYLOAD_ID_LRU_CACHE_SIZE)),
state: RwLock::new(EngineState::Offline),
latest_forkchoice_state: Default::default(),
log: log.clone(),
}
}

Expand All @@ -90,9 +102,7 @@ impl<T> Engine<T> {
})
.cloned()
}
}

impl Engine<EngineApi> {
pub async fn notify_forkchoice_updated(
&self,
forkchoice_state: ForkChoiceState,
Expand Down Expand Up @@ -120,26 +130,7 @@ impl Engine<EngineApi> {

Ok(response)
}
}

// This structure used to hold multiple execution engines managed in a fallback manner. This
// functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this
// struct will likely be removed in the future.
pub struct Engines {
pub engine: Engine<EngineApi>,
pub latest_forkchoice_state: RwLock<Option<ForkChoiceState>>,
pub log: Logger,
}

#[derive(Debug)]
pub enum EngineError {
Offline,
Api { error: EngineApiError },
BuilderApi { error: EngineApiError },
Auth,
}

impl Engines {
async fn get_latest_forkchoice_state(&self) -> Option<ForkChoiceState> {
*self.latest_forkchoice_state.read().await
}
Expand Down Expand Up @@ -169,12 +160,7 @@ impl Engines {

// For simplicity, payload attributes are never included in this call. It may be
// reasonable to include them in the future.
if let Err(e) = self
.engine
.api
.forkchoice_updated_v1(forkchoice_state, None)
.await
{
if let Err(e) = self.api.forkchoice_updated_v1(forkchoice_state, None).await {
debug!(
self.log,
"Failed to issue latest head to engine";
Expand All @@ -191,14 +177,14 @@ impl Engines {

/// Returns `true` if the engine has a "synced" status.
pub async fn is_synced(&self) -> bool {
*self.engine.state.read().await == EngineState::Synced
*self.state.read().await == EngineState::Synced
}
/// Run the `EngineApi::upcheck` function if the node's last known state is not synced. This
/// might be used to recover the node if offline.
pub async fn upcheck_not_synced(&self, logging: Logging) {
let mut state_lock = self.engine.state.write().await;
let mut state_lock = self.state.write().await;
if *state_lock != EngineState::Synced {
match self.engine.api.upcheck().await {
match self.api.upcheck().await {
Ok(()) => {
if logging.is_enabled() {
info!(
Expand Down Expand Up @@ -261,7 +247,7 @@ impl Engines {
/// upcheck it and then run the function again.
pub async fn first_success<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
F: Fn(&'a Engine) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.first_success_without_retry(func).await {
Expand All @@ -282,26 +268,26 @@ impl Engines {
func: F,
) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G,
F: Fn(&'a Engine) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let (engine_synced, engine_auth_failed) = {
let state = self.engine.state.read().await;
let state = self.state.read().await;
(
*state == EngineState::Synced,
*state == EngineState::AuthFailed,
)
};
if engine_synced {
match func(&self.engine).await {
match func(self).await {
Ok(result) => Ok(result),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
);
*self.engine.state.write().await = EngineState::Offline;
*self.state.write().await = EngineState::Offline;
Err(EngineError::Api { error })
}
}
Expand All @@ -318,7 +304,7 @@ impl Engines {
/// it runs, it will try to upcheck all offline nodes and then run the function again.
pub async fn broadcast<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G + Copy,
F: Fn(&'a Engine) -> G + Copy,
G: Future<Output = Result<H, EngineApiError>>,
{
match self.broadcast_without_retry(func).await {
Expand All @@ -333,22 +319,22 @@ impl Engines {
/// Runs `func` on the node if it's last state is not offline.
pub async fn broadcast_without_retry<'a, F, G, H>(&'a self, func: F) -> Result<H, EngineError>
where
F: Fn(&'a Engine<EngineApi>) -> G,
F: Fn(&'a Engine) -> G,
G: Future<Output = Result<H, EngineApiError>>,
{
let func = &func;
if *self.engine.state.read().await == EngineState::Offline {
if *self.state.read().await == EngineState::Offline {
Err(EngineError::Offline)
} else {
match func(&self.engine).await {
match func(self).await {
Ok(res) => Ok(res),
Err(error) => {
debug!(
self.log,
"Execution engine call failed";
"error" => ?error,
);
*self.engine.state.write().await = EngineState::Offline;
*self.state.write().await = EngineState::Offline;
Err(EngineError::Api { error })
}
}
Expand Down
31 changes: 13 additions & 18 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use engine_api::Error as ApiError;
pub use engine_api::*;
pub use engine_api::{http, http::deposit_methods, http::HttpJsonRpc};
pub use engines::ForkChoiceState;
use engines::{Engine, EngineError, Engines, Logging};
use engines::{Engine, EngineError, Logging};
use lru::LruCache;
use payload_status::process_payload_status;
pub use payload_status::PayloadStatus;
Expand Down Expand Up @@ -64,7 +64,7 @@ const CONFIG_POLL_INTERVAL: Duration = Duration::from_secs(60);

#[derive(Debug)]
pub enum Error {
NoEngines,
NoEngine,
NoPayloadBuilder,
ApiError(ApiError),
Builder(builder_client::Error),
Expand Down Expand Up @@ -101,7 +101,7 @@ pub struct Proposer {
}

struct Inner<E: EthSpec> {
engines: Engines,
engine: Engine,
builder: Option<BuilderHttpClient>,
execution_engine_forkchoice_lock: Mutex<()>,
suggested_fee_recipient: Option<Address>,
Expand Down Expand Up @@ -162,7 +162,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
if urls.len() > 1 {
warn!(log, "Only the first execution engine url will be used");
}
let execution_url = urls.into_iter().next().ok_or(Error::NoEngines)?;
let execution_url = urls.into_iter().next().ok_or(Error::NoEngine)?;

// Use the default jwt secret path if not provided via cli.
let secret_file = secret_files
Expand Down Expand Up @@ -198,24 +198,19 @@ impl<T: EthSpec> ExecutionLayer<T> {
.map_err(Error::InvalidJWTSecret)
}?;

let engine: Engine<EngineApi> = {
let engine: Engine = {
let auth = Auth::new(jwt_key, jwt_id, jwt_version);
debug!(log, "Loaded execution endpoint"; "endpoint" => %execution_url, "jwt_path" => ?secret_file.as_path());
let api = HttpJsonRpc::<EngineApi>::new_with_auth(execution_url, auth)
.map_err(Error::ApiError)?;
Engine::<EngineApi>::new(api)
let api = HttpJsonRpc::new_with_auth(execution_url, auth).map_err(Error::ApiError)?;
Engine::new(api, &log)
};

let builder = builder_url
.map(|url| BuilderHttpClient::new(url).map_err(Error::Builder))
.transpose()?;

let inner = Inner {
engines: Engines {
engine,
latest_forkchoice_state: <_>::default(),
log: log.clone(),
},
engine,
builder,
execution_engine_forkchoice_lock: <_>::default(),
suggested_fee_recipient,
Expand All @@ -234,8 +229,8 @@ impl<T: EthSpec> ExecutionLayer<T> {
}

impl<T: EthSpec> ExecutionLayer<T> {
fn engines(&self) -> &Engines {
&self.inner.engines
fn engines(&self) -> &Engine {
&self.inner.engine
}

pub fn builder(&self) -> &Option<BuilderHttpClient> {
Expand Down Expand Up @@ -1004,7 +999,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
/// https://github.com/ethereum/consensus-specs/blob/v1.1.5/specs/merge/validator.md
async fn get_pow_block_hash_at_total_difficulty(
&self,
engine: &Engine<EngineApi>,
engine: &Engine,
spec: &ChainSpec,
) -> Result<Option<ExecutionBlockHash>, ApiError> {
let mut block = engine
Expand Down Expand Up @@ -1118,7 +1113,7 @@ impl<T: EthSpec> ExecutionLayer<T> {
/// https://github.com/ethereum/consensus-specs/issues/2636
async fn get_pow_block(
&self,
engine: &Engine<EngineApi>,
engine: &Engine,
hash: ExecutionBlockHash,
) -> Result<Option<ExecutionBlock>, ApiError> {
if let Some(cached) = self.execution_blocks().await.get(&hash).copied() {
Expand Down Expand Up @@ -1153,7 +1148,7 @@ impl<T: EthSpec> ExecutionLayer<T> {

async fn get_payload_by_block_hash_from_engine(
&self,
engine: &Engine<EngineApi>,
engine: &Engine,
hash: ExecutionBlockHash,
) -> Result<Option<ExecutionPayload<T>>, ApiError> {
let _timer = metrics::start_timer(&metrics::EXECUTION_LAYER_GET_PAYLOAD_BY_BLOCK_HASH);
Expand Down
9 changes: 8 additions & 1 deletion common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub struct Timeouts {
pub liveness: Duration,
pub proposal: Duration,
pub proposer_duties: Duration,
pub sync_committee_contribution: Duration,
pub sync_duties: Duration,
}

Expand All @@ -121,6 +122,7 @@ impl Timeouts {
liveness: timeout,
proposal: timeout,
proposer_duties: timeout,
sync_committee_contribution: timeout,
sync_duties: timeout,
}
}
Expand Down Expand Up @@ -907,7 +909,12 @@ impl BeaconNodeHttpClient {
.push("validator")
.push("contribution_and_proofs");

self.post(path, &signed_contributions).await?;
self.post_with_timeout(
path,
&signed_contributions,
self.timeouts.sync_committee_contribution,
)
.await?;

Ok(())
}
Expand Down
Loading

0 comments on commit 828d5bc

Please sign in to comment.