From 1fcada8a328be02987e8135bde0bd37f0eae59b0 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Thu, 10 Aug 2023 00:10:09 +0000 Subject: [PATCH 01/11] Improve transport connection errors (#4540) ## Issue Addressed #4538 ## Proposed Changes add newtype wrapper around DialError that extracts error messages and logs them in a more readable format ## Additional Info I was able to test Transport Dial Errors in the situation where a libp2p instance attempts to ping a nonexistent peer. That error message should look something like `A transport level error has ocurred: Connection refused (os error 61)` AgeManning mentioned we should try fetching only the most inner error (in situations where theres a nested error). I took a stab at implementing that For non transport DialErrors, I wrote out the error messages explicitly (as per the docs). Could potentially clean things up here if thats not necessary Co-authored-by: Age Manning --- .../lighthouse_network/src/discovery/mod.rs | 4 +- beacon_node/lighthouse_network/src/lib.rs | 41 +++++++++++++++++++ .../src/peer_manager/network_behaviour.rs | 4 +- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 0f8ddc53c1b..82a371d8a20 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -7,9 +7,9 @@ pub(crate) mod enr; pub mod enr_ext; // Allow external use of the lighthouse ENR builder -use crate::metrics; use crate::service::TARGET_SUBNET_PEERS; use crate::{error, Enr, NetworkConfig, NetworkGlobals, Subnet, SubnetDiscovery}; +use crate::{metrics, ClearDialError}; use discv5::{enr::NodeId, Discv5, Discv5Event}; pub use enr::{ build_enr, create_enr_builder_from_config, load_enr_from_disk, use_or_load_enr, CombinedKey, @@ -1111,7 +1111,7 @@ impl Discovery { | DialError::Transport(_) | DialError::WrongPeerId { .. } => { // set peer as disconnected in discovery DHT - debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id); + debug!(self.log, "Marking peer disconnected in DHT"; "peer_id" => %peer_id, "error" => %ClearDialError(error)); self.disconnect_peer(&peer_id); } DialError::DialPeerConditionFalse(_) | DialError::Aborted => {} diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index 3d539af3b28..7467fb7f067 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -17,6 +17,7 @@ pub mod rpc; pub mod types; pub use config::gossip_max_size; +use libp2p::swarm::DialError; pub use listen_addr::*; use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; @@ -63,6 +64,46 @@ impl<'de> Deserialize<'de> for PeerIdSerialized { } } +// A wrapper struct that prints a dial error nicely. +struct ClearDialError<'a>(&'a DialError); + +impl<'a> ClearDialError<'a> { + fn most_inner_error(err: &(dyn std::error::Error)) -> &(dyn std::error::Error) { + let mut current = err; + while let Some(source) = current.source() { + current = source; + } + current + } +} + +impl<'a> std::fmt::Display for ClearDialError<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { + match &self.0 { + DialError::Transport(errors) => { + for (_, transport_error) in errors { + match transport_error { + libp2p::TransportError::MultiaddrNotSupported(multiaddr_error) => { + write!(f, "Multiaddr not supported: {multiaddr_error}")?; + } + libp2p::TransportError::Other(other_error) => { + let inner_error = ClearDialError::most_inner_error(other_error); + write!(f, "Transport error: {inner_error}")?; + } + } + } + Ok(()) + } + DialError::LocalPeerId { .. } => write!(f, "The peer being dialed is the local peer."), + DialError::NoAddresses => write!(f, "No addresses for the peer to dial."), + DialError::DialPeerConditionFalse(_) => write!(f, "PeerCondition evaluation failed."), + DialError::Aborted => write!(f, "Connection aborted."), + DialError::WrongPeerId { .. } => write!(f, "Wrong peer id."), + DialError::Denied { cause } => write!(f, "Connection denied: {:?}", cause), + } + } +} + pub use crate::types::{ error, Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, Subnet, SubnetDiscovery, diff --git a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs index ce374bb9ab4..70f421681a3 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/network_behaviour.rs @@ -12,9 +12,9 @@ use libp2p::swarm::{ConnectionId, NetworkBehaviour, PollParameters, ToSwarm}; use slog::{debug, error}; use types::EthSpec; -use crate::metrics; use crate::rpc::GoodbyeReason; use crate::types::SyncState; +use crate::{metrics, ClearDialError}; use super::peerdb::BanResult; use super::{ConnectingType, PeerManager, PeerManagerEvent, ReportSource}; @@ -132,7 +132,7 @@ impl NetworkBehaviour for PeerManager { error, connection_id: _, }) => { - debug!(self.log, "Failed to dial peer"; "peer_id"=> ?peer_id, "error" => %error); + debug!(self.log, "Failed to dial peer"; "peer_id"=> ?peer_id, "error" => %ClearDialError(error)); self.on_dial_failure(peer_id); } FromSwarm::ExternalAddrConfirmed(_) => { From f1ac12f23ae581ace17f1cabcff36a6fb3a3e13f Mon Sep 17 00:00:00 2001 From: zhiqiangxu <652732310@qq.com> Date: Mon, 14 Aug 2023 00:29:43 +0000 Subject: [PATCH 02/11] Fix some typos (#4565) --- beacon_node/beacon_chain/src/beacon_chain.rs | 2 +- beacon_node/beacon_chain/src/beacon_proposer_cache.rs | 4 ++-- beacon_node/execution_layer/src/lib.rs | 2 +- beacon_node/http_api/src/lib.rs | 6 +++--- beacon_node/operation_pool/src/reward_cache.rs | 2 +- consensus/fork_choice/src/fork_choice.rs | 7 ++----- consensus/proto_array/src/proto_array.rs | 2 +- validator_client/src/duties_service.rs | 2 +- validator_client/src/lib.rs | 2 +- 9 files changed, 13 insertions(+), 16 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 25964ed2165..987ea9e7c33 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -164,7 +164,7 @@ pub enum WhenSlotSkipped { /// /// This is how the HTTP API behaves. None, - /// If the slot it a skip slot, return the previous non-skipped block. + /// If the slot is a skip slot, return the previous non-skipped block. /// /// This is generally how the specification behaves. Prev, diff --git a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs index e76a5a80588..eae71bd63ea 100644 --- a/beacon_node/beacon_chain/src/beacon_proposer_cache.rs +++ b/beacon_node/beacon_chain/src/beacon_proposer_cache.rs @@ -135,7 +135,7 @@ impl BeaconProposerCache { /// Compute the proposer duties using the head state without cache. pub fn compute_proposer_duties_from_head( - current_epoch: Epoch, + request_epoch: Epoch, chain: &BeaconChain, ) -> Result<(Vec, Hash256, ExecutionStatus, Fork), BeaconChainError> { // Atomically collect information about the head whilst holding the canonical head `Arc` as @@ -159,7 +159,7 @@ pub fn compute_proposer_duties_from_head( .ok_or(BeaconChainError::HeadMissingFromForkChoice(head_block_root))?; // Advance the state into the requested epoch. - ensure_state_is_in_epoch(&mut state, head_state_root, current_epoch, &chain.spec)?; + ensure_state_is_in_epoch(&mut state, head_state_root, request_epoch, &chain.spec)?; let indices = state .get_beacon_proposer_indices(&chain.spec) diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index 579bebdacba..b57bba7518e 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -76,7 +76,7 @@ const DEFAULT_SUGGESTED_FEE_RECIPIENT: [u8; 20] = /// A payload alongside some information about where it came from. pub enum ProvenancedPayload

{ - /// A good ol' fashioned farm-to-table payload from your local EE. + /// A good old fashioned farm-to-table payload from your local EE. Local(P), /// A payload from a builder (e.g. mev-boost). Builder(P), diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 9512d18aba8..4d28326d181 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -1426,10 +1426,10 @@ pub fn serve( ); /* - * beacon/blocks + * beacon/blinded_blocks */ - // POST beacon/blocks + // POST beacon/blinded_blocks let post_beacon_blinded_blocks = eth_v1 .and(warp::path("beacon")) .and(warp::path("blinded_blocks")) @@ -3208,7 +3208,7 @@ pub fn serve( }, ); - // POST validator/duties/sync + // POST validator/duties/sync/{epoch} let post_validator_duties_sync = eth_v1 .and(warp::path("validator")) .and(warp::path("duties")) diff --git a/beacon_node/operation_pool/src/reward_cache.rs b/beacon_node/operation_pool/src/reward_cache.rs index 5b9d4258e91..9e4c424bd7d 100644 --- a/beacon_node/operation_pool/src/reward_cache.rs +++ b/beacon_node/operation_pool/src/reward_cache.rs @@ -12,7 +12,7 @@ struct Initialization { #[derive(Debug, Clone, Default)] pub struct RewardCache { initialization: Option, - /// `BitVec` of validator indices which don't have default participation flags for the prev. epoch. + /// `BitVec` of validator indices which don't have default participation flags for the prev epoch. /// /// We choose to only track whether validators have *any* participation flag set because /// it's impossible to include a new attestation which is better than the existing participation diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index e60774fc86e..059494453cf 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -466,13 +466,10 @@ where // for lower slots to account for skip slots. .find(|(_, slot)| *slot <= ancestor_slot) .map(|(root, _)| root)), - Ordering::Less => Ok(Some(block_root)), - Ordering::Equal => // Root is older than queried slot, thus a skip slot. Return most recent root prior // to slot. - { - Ok(Some(block_root)) - } + Ordering::Less => Ok(Some(block_root)), + Ordering::Equal => Ok(Some(block_root)), } } diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 88111b461d5..6cb10e3d204 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -910,7 +910,7 @@ impl ProtoArray { Ok(()) } - /// Indicates if the node itself is viable for the head, or if it's best descendant is viable + /// Indicates if the node itself is viable for the head, or if its best descendant is viable /// for the head. fn node_leads_to_viable_head( &self, diff --git a/validator_client/src/duties_service.rs b/validator_client/src/duties_service.rs index 535f6aeb0a7..a3b3cabcccd 100644 --- a/validator_client/src/duties_service.rs +++ b/validator_client/src/duties_service.rs @@ -1021,7 +1021,7 @@ async fn fill_in_selection_proofs( /// 2. We won't miss a block if the duties for the current slot happen to change with this poll. /// /// This sounds great, but is it safe? Firstly, the additional notification will only contain block -/// producers that were not included in the first notification. This should be safety enough. +/// producers that were not included in the first notification. This should be safe enough. /// However, we also have the slashing protection as a second line of defence. These two factors /// provide an acceptable level of safety. /// diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index f7a80f0a8e7..6ca8db87d53 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -524,7 +524,7 @@ impl ProductionValidatorClient { pub fn start_service(&mut self) -> Result<(), String> { // We use `SLOTS_PER_EPOCH` as the capacity of the block notification channel, because - // we don't except notifications to be delayed by more than a single slot, let alone a + // we don't expect notifications to be delayed by more than a single slot, let alone a // whole epoch! let channel_capacity = T::slots_per_epoch() as usize; let (block_service_tx, block_service_rx) = mpsc::channel(channel_capacity); From 9d8b2764ef894cb6075722669e28dcc68243228c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Oliveira?= Date: Mon, 14 Aug 2023 00:29:44 +0000 Subject: [PATCH 03/11] align editorconfig with rustfmt (#4600) ## Issue Addressed There seems to be a conflict between `editorconfig` and `rustfmt`. `editorconfig` is configured with [`insert_final_newline=false`](https://github.com/sigp/lighthouse/blob/stable/.editorconfig#L9C1-L9C21) which [removes the newline](https://github.com/editorconfig/editorconfig/wiki/EditorConfig-Properties#insert_final_newline), whereas `rustfmt` [adds a newline](https://rust-lang.github.io/rustfmt/?version=v1.6.0&search=#newline_style). ## Proposed Changes Align `.editorconfig` with `rustfmt` --- .editorconfig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.editorconfig b/.editorconfig index a14dd7a5168..149415d1271 100644 --- a/.editorconfig +++ b/.editorconfig @@ -6,4 +6,4 @@ end_of_line=lf charset=utf-8 trim_trailing_whitespace=true max_line_length=100 -insert_final_newline=false +insert_final_newline=true \ No newline at end of file From 501ce62d7c9acd5be23b885fcd86e01331df903d Mon Sep 17 00:00:00 2001 From: zhiqiangxu <652732310@qq.com> Date: Mon, 14 Aug 2023 00:29:45 +0000 Subject: [PATCH 04/11] minor optimize process_active_validator: avoid a call to `state.get_validator` (#4608) --- .../src/per_epoch_processing/altair/participation_cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs index a5caddd0455..d67e7874cb9 100644 --- a/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs +++ b/consensus/state_processing/src/per_epoch_processing/altair/participation_cache.rs @@ -111,8 +111,8 @@ impl SingleEpochParticipationCache { current_epoch: Epoch, relative_epoch: RelativeEpoch, ) -> Result<(), BeaconStateError> { - let val_balance = state.get_effective_balance(val_index)?; let validator = state.get_validator(val_index)?; + let val_balance = validator.effective_balance; // Sanity check to ensure the validator is active. let epoch = relative_epoch.into_epoch(current_epoch); From fa93b58257c34c313f52debba184b3fa871c51df Mon Sep 17 00:00:00 2001 From: zhiqiangxu <652732310@qq.com> Date: Mon, 14 Aug 2023 00:29:46 +0000 Subject: [PATCH 05/11] remove `optional_eth2_network_config` (#4611) It seems the passed [`optional_config`](https://github.com/sigp/lighthouse/blob/dfcb3363c757671eb19d5f8e519b4b94ac74677a/lighthouse/src/main.rs#L515) is always `Some` instead of `None`. --- lighthouse/environment/src/lib.rs | 12 ------------ lighthouse/src/main.rs | 2 +- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/lighthouse/environment/src/lib.rs b/lighthouse/environment/src/lib.rs index 53915b52d96..fc7ab8d52c5 100644 --- a/lighthouse/environment/src/lib.rs +++ b/lighthouse/environment/src/lib.rs @@ -344,18 +344,6 @@ impl EnvironmentBuilder { Ok(self) } - /// Optionally adds a network configuration to the environment. - pub fn optional_eth2_network_config( - self, - optional_config: Option, - ) -> Result { - if let Some(config) = optional_config { - self.eth2_network_config(config) - } else { - Ok(self) - } - } - /// Consumes the builder, returning an `Environment`. pub fn build(self) -> Result, String> { let (signal, exit) = exit_future::signal(); diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 73e042342af..8b3271f43b8 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -513,7 +513,7 @@ fn run( let mut environment = builder .multi_threaded_tokio_runtime()? - .optional_eth2_network_config(Some(eth2_network_config))? + .eth2_network_config(eth2_network_config)? .build()?; let log = environment.core_context().log().clone(); From e92359b756124c948a0a9167844a2a9faa36449d Mon Sep 17 00:00:00 2001 From: zhiqiangxu <652732310@qq.com> Date: Mon, 14 Aug 2023 00:29:47 +0000 Subject: [PATCH 06/11] use account_manager::CMD instead of magic string (#4612) Make the code style a bit more consistent with following lines. --- lighthouse/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lighthouse/src/main.rs b/lighthouse/src/main.rs index 8b3271f43b8..d8b522307c4 100644 --- a/lighthouse/src/main.rs +++ b/lighthouse/src/main.rs @@ -559,7 +559,7 @@ fn run( (Some(_), Some(_)) => panic!("CLI prevents both --network and --testnet-dir"), }; - if let Some(sub_matches) = matches.subcommand_matches("account_manager") { + if let Some(sub_matches) = matches.subcommand_matches(account_manager::CMD) { eprintln!("Running account manager for {} network", network_name); // Pass the entire `environment` to the account manager so it can run blocking operations. account_manager::run(sub_matches, environment)?; From 842b42297b9bd44cb78a8348b7ed3096af6ffdf6 Mon Sep 17 00:00:00 2001 From: zhiqiangxu <652732310@qq.com> Date: Mon, 14 Aug 2023 00:29:47 +0000 Subject: [PATCH 07/11] Fix bug of `init_from_beacon_node` (#4613) --- validator_client/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index 6ca8db87d53..6f071055a4a 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -627,8 +627,8 @@ async fn init_from_beacon_node( let num_available = beacon_nodes.num_available().await; let num_total = beacon_nodes.num_total(); - let proposer_available = beacon_nodes.num_available().await; - let proposer_total = beacon_nodes.num_total(); + let proposer_available = proposer_nodes.num_available().await; + let proposer_total = proposer_nodes.num_total(); if proposer_total > 0 && proposer_available == 0 { warn!( From ca050053bf2758467f4f17d601d939c89fb24c9d Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 14 Aug 2023 03:16:03 +0000 Subject: [PATCH 08/11] Use the native `concurrency` property to cancel workflows (#4572) I noticed that some of our workflows aren't getting cancelled when a new one has been triggered, so we ended up having a long queue in our CI when multiple changes are triggered in a short period. Looking at the comment here, I noticed the list of workflow IDs are outdated and no longer exist, and some new ones are missing: https://github.com/sigp/lighthouse/blob/dfcb3363c757671eb19d5f8e519b4b94ac74677a/.github/workflows/cancel-previous-runs.yml#L12-L13 I attempted to update these, and came across this comment on the [`cancel-workflow-action`](https://github.com/styfle/cancel-workflow-action) repo: > You probably don't need to install this custom action. > > Instead, use the native [concurrency](https://github.blog/changelog/2021-04-19-github-actions-limit-workflow-run-or-job-concurrency/) property to cancel workflows, for example: So I thought instead of updating the workflow and maintaining the workflow IDs, perhaps we can try experimenting the [native `concurrency` property](https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#concurrency). --- .github/workflows/book.yml | 4 ++++ .github/workflows/cancel-previous-runs.yml | 14 -------------- .github/workflows/docker-antithesis.yml | 4 ++++ .github/workflows/docker.yml | 4 ++++ .github/workflows/linkcheck.yml | 4 ++++ .github/workflows/local-testnet.yml | 4 ++++ .github/workflows/release.yml | 4 ++++ .github/workflows/test-suite.yml | 5 +++++ 8 files changed, 29 insertions(+), 14 deletions(-) delete mode 100644 .github/workflows/cancel-previous-runs.yml diff --git a/.github/workflows/book.yml b/.github/workflows/book.yml index 598754368e9..db458a3dbfd 100644 --- a/.github/workflows/book.yml +++ b/.github/workflows/book.yml @@ -5,6 +5,10 @@ on: branches: - unstable +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: build-and-upload-to-s3: runs-on: ubuntu-20.04 diff --git a/.github/workflows/cancel-previous-runs.yml b/.github/workflows/cancel-previous-runs.yml deleted file mode 100644 index 2eaefa40ca0..00000000000 --- a/.github/workflows/cancel-previous-runs.yml +++ /dev/null @@ -1,14 +0,0 @@ -name: cancel previous runs -on: [push] -jobs: - cancel: - name: 'Cancel Previous Runs' - runs-on: ubuntu-latest - timeout-minutes: 3 - steps: - # https://github.com/styfle/cancel-workflow-action/releases - - uses: styfle/cancel-workflow-action@514c783324374c6940d1b92bfb962d0763d22de3 # 0.7.0 - with: - # https://api.github.com/repos/sigp/lighthouse/actions/workflows - workflow_id: 697364,2434944,4462424,308241,2883401,316 - access_token: ${{ github.token }} diff --git a/.github/workflows/docker-antithesis.yml b/.github/workflows/docker-antithesis.yml index 84f5541a3cc..a96431fafbd 100644 --- a/.github/workflows/docker-antithesis.yml +++ b/.github/workflows/docker-antithesis.yml @@ -5,6 +5,10 @@ on: branches: - unstable +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: ANTITHESIS_PASSWORD: ${{ secrets.ANTITHESIS_PASSWORD }} ANTITHESIS_USERNAME: ${{ secrets.ANTITHESIS_USERNAME }} diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index c3119db3780..21ca4940d9c 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -8,6 +8,10 @@ on: tags: - v* +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} diff --git a/.github/workflows/linkcheck.yml b/.github/workflows/linkcheck.yml index 8428c0a3b0a..19236691f63 100644 --- a/.github/workflows/linkcheck.yml +++ b/.github/workflows/linkcheck.yml @@ -9,6 +9,10 @@ on: - 'book/**' merge_group: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: linkcheck: name: Check broken links diff --git a/.github/workflows/local-testnet.yml b/.github/workflows/local-testnet.yml index ea4c1e24887..1269aee6270 100644 --- a/.github/workflows/local-testnet.yml +++ b/.github/workflows/local-testnet.yml @@ -8,6 +8,10 @@ on: pull_request: merge_group: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: run-local-testnet: strategy: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 30e4211b88c..e38b03daf70 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -5,6 +5,10 @@ on: tags: - v* +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index ab31b3a92bc..91a0b734537 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -9,6 +9,11 @@ on: - 'pr/*' pull_request: merge_group: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + env: # Deny warnings in CI # Disable debug info (see https://github.com/sigp/lighthouse/issues/4005) From dfab24bf92fad9bdb10311851a63963ed620ea55 Mon Sep 17 00:00:00 2001 From: zhiqiangxu <652732310@qq.com> Date: Mon, 14 Aug 2023 03:16:04 +0000 Subject: [PATCH 09/11] opt `maybe_update_best_child_and_descendant`: remove an impossible case (#4583) Here `child.weight == best_child.weight` is impossible since it's already checked [above](https://github.com/sigp/lighthouse/blob/dfcb3363c757671eb19d5f8e519b4b94ac74677a/consensus/proto_array/src/proto_array.rs#L878). --- consensus/proto_array/src/proto_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consensus/proto_array/src/proto_array.rs b/consensus/proto_array/src/proto_array.rs index 6cb10e3d204..7b6afb94f54 100644 --- a/consensus/proto_array/src/proto_array.rs +++ b/consensus/proto_array/src/proto_array.rs @@ -884,7 +884,7 @@ impl ProtoArray { } } else { // Choose the winner by weight. - if child.weight >= best_child.weight { + if child.weight > best_child.weight { change_to_child } else { no_change From 912f869829689dbf7881e3f2098d5d864a59d041 Mon Sep 17 00:00:00 2001 From: zhiqiangxu <652732310@qq.com> Date: Mon, 14 Aug 2023 04:06:34 +0000 Subject: [PATCH 10/11] ForkChoice: remove `head_block_root` field (#4590) This field is redundant with `ForkchoiceUpdateParameters.head_root`, the same `head_root` is assigned to both fields in [`get_head`](https://github.com/sigp/lighthouse/blob/dfcb3363c757671eb19d5f8e519b4b94ac74677a/consensus/fork_choice/src/fork_choice.rs#L508-L523). --- consensus/fork_choice/src/fork_choice.rs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/consensus/fork_choice/src/fork_choice.rs b/consensus/fork_choice/src/fork_choice.rs index 059494453cf..4f563f86398 100644 --- a/consensus/fork_choice/src/fork_choice.rs +++ b/consensus/fork_choice/src/fork_choice.rs @@ -289,9 +289,10 @@ pub enum AttestationFromBlock { False, } -/// Parameters which are cached between calls to `Self::get_head`. +/// Parameters which are cached between calls to `ForkChoice::get_head`. #[derive(Clone, Copy)] pub struct ForkchoiceUpdateParameters { + /// The most recent result of running `ForkChoice::get_head`. pub head_root: Hash256, pub head_hash: Option, pub justified_hash: Option, @@ -324,8 +325,6 @@ pub struct ForkChoice { queued_attestations: Vec, /// Stores a cache of the values required to be sent to the execution layer. forkchoice_update_parameters: ForkchoiceUpdateParameters, - /// The most recent result of running `Self::get_head`. - head_block_root: Hash256, _phantom: PhantomData, } @@ -410,14 +409,13 @@ where head_hash: None, justified_hash: None, finalized_hash: None, + // This will be updated during the next call to `Self::get_head`. head_root: Hash256::zero(), }, - // This will be updated during the next call to `Self::get_head`. - head_block_root: Hash256::zero(), _phantom: PhantomData, }; - // Ensure that `fork_choice.head_block_root` is updated. + // Ensure that `fork_choice.forkchoice_update_parameters.head_root` is updated. fork_choice.get_head(current_slot, spec)?; Ok(fork_choice) @@ -502,8 +500,6 @@ where spec, )?; - self.head_block_root = head_root; - // Cache some values for the next forkchoiceUpdate call to the execution layer. let head_hash = self .get_block(&head_root) @@ -607,7 +603,7 @@ where /// have *differing* finalized and justified information. pub fn cached_fork_choice_view(&self) -> ForkChoiceView { ForkChoiceView { - head_block_root: self.head_block_root, + head_block_root: self.forkchoice_update_parameters.head_root, justified_checkpoint: self.justified_checkpoint(), finalized_checkpoint: self.finalized_checkpoint(), } @@ -1518,10 +1514,9 @@ where head_hash: None, justified_hash: None, finalized_hash: None, + // Will be updated in the following call to `Self::get_head`. head_root: Hash256::zero(), }, - // Will be updated in the following call to `Self::get_head`. - head_block_root: Hash256::zero(), _phantom: PhantomData, }; From 249f85f1d9c643a0ec4f952966909bb4f7d4c64f Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 14 Aug 2023 04:06:37 +0000 Subject: [PATCH 11/11] Improve HTTP API error messages + tweaks (#4595) ## Issue Addressed Closes #3404 (mostly) ## Proposed Changes - Remove all uses of Warp's `and_then` (which backtracks) in favour of `then` (which doesn't). - Bump the priority of the `POST` method for `v2/blocks` to `P0`. Publishing a block needs to happen quickly. - Run the new SSZ POST endpoints on the beacon processor. I think this was missed in between merging #4462 and #4504/#4479. - Fix a minor issue in the validator registrations endpoint whereby an error from spawning the task on the beacon processor would be dropped. ## Additional Info I've tested this manually and can confirm that we no longer get the dreaded `Unsupported endpoint version` errors for queries like: ``` $ curl -X POST -H "Content-Type: application/json" --data @block.json "http://localhost:5052/eth/v2/beacon/blocks" | jq { "code": 400, "message": "BAD_REQUEST: WeakSubjectivityConflict", "stacktraces": [] } ``` ``` $ curl -X POST -H "Content-Type: application/octet-stream" --data @block.json "http://localhost:5052/eth/v2/beacon/blocks" | jq { "code": 400, "message": "BAD_REQUEST: invalid SSZ: OffsetOutOfBounds(572530811)", "stacktraces": [] } ``` ``` $ curl "http://localhost:5052/eth/v2/validator/blocks/7067595" {"code":400,"message":"BAD_REQUEST: invalid query: Invalid query string","stacktraces":[]} ``` However, I can still trigger it by leaving off the `Content-Type`. We can re-test this aspect with #4575. --- beacon_node/http_api/src/lib.rs | 362 +++++++++++------------ beacon_node/http_api/src/task_spawner.rs | 92 +++--- 2 files changed, 227 insertions(+), 227 deletions(-) diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 4d28326d181..4d5b98a8238 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -516,7 +516,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let genesis_data = api_types::GenesisData { @@ -549,7 +549,7 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -570,7 +570,7 @@ pub fn serve( .clone() .and(warp::path("fork")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -591,7 +591,7 @@ pub fn serve( .clone() .and(warp::path("finality_checkpoints")) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -627,7 +627,7 @@ pub fn serve( .and(warp::path("validator_balances")) .and(warp::path::end()) .and(multi_key_query::()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -685,7 +685,7 @@ pub fn serve( .and(warp::path("validators")) .and(warp::path::end()) .and(multi_key_query::()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -769,7 +769,7 @@ pub fn serve( )) })) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -837,7 +837,7 @@ pub fn serve( .and(warp::path("committees")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1020,7 +1020,7 @@ pub fn serve( .and(warp::path("sync_committees")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1086,7 +1086,7 @@ pub fn serve( .and(warp::path("randao")) .and(warp::query::()) .and(warp::path::end()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, @@ -1128,7 +1128,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::HeadersQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -1228,7 +1228,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1276,7 +1276,7 @@ pub fn serve( .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block: Arc>, task_spawner: TaskSpawner, chain: Arc>, @@ -1302,33 +1302,35 @@ pub fn serve( .and(warp::path("blocks")) .and(warp::path::end()) .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block_bytes: Bytes, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - let block = match SignedBeaconBlock::::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(e) => { - return Err(warp_utils::reject::custom_bad_request(format!("{:?}", e))) - } - }; - publish_blocks::publish_block( - None, - ProvenancedBlock::local(Arc::new(block)), - chain, - &network_tx, - log, - BroadcastValidation::default(), - ) - .await - .map(|()| warp::reply().into_response()) + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block = + SignedBeaconBlock::::from_ssz_bytes(&block_bytes, &chain.spec) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + })?; + publish_blocks::publish_block( + None, + ProvenancedBlock::local(Arc::new(block)), + chain, + &network_tx, + log, + BroadcastValidation::default(), + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1349,8 +1351,8 @@ pub fn serve( chain: Arc>, network_tx: UnboundedSender>, log: Logger| { - task_spawner.spawn_async(Priority::P1, async move { - match publish_blocks::publish_block( + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + publish_blocks::publish_block( None, ProvenancedBlock::local(block), chain, @@ -1359,17 +1361,7 @@ pub fn serve( validation_level.broadcast_validation, ) .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + .map(|()| warp::reply().into_response()) }) }, ); @@ -1380,48 +1372,36 @@ pub fn serve( .and(warp::query::()) .and(warp::path::end()) .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) .then( |validation_level: api_types::BroadcastValidationQuery, block_bytes: Bytes, + task_spawner: TaskSpawner, chain: Arc>, network_tx: UnboundedSender>, - log: Logger| async move { - let block = match SignedBeaconBlock::::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(_) => { - return warp::reply::with_status( - StatusCode::BAD_REQUEST, - eth2::StatusCode::BAD_REQUEST, - ) - .into_response(); - } - }; - match publish_blocks::publish_block( - None, - ProvenancedBlock::local(Arc::new(block)), - chain, - &network_tx, - log, - validation_level.broadcast_validation, - ) - .await - { - Ok(()) => warp::reply().into_response(), - Err(e) => match warp_utils::reject::handle_rejection(e).await { - Ok(reply) => reply.into_response(), - Err(_) => warp::reply::with_status( - StatusCode::INTERNAL_SERVER_ERROR, - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(), - }, - } + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block = + SignedBeaconBlock::::from_ssz_bytes(&block_bytes, &chain.spec) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!( + "invalid SSZ: {e:?}" + )) + })?; + publish_blocks::publish_block( + None, + ProvenancedBlock::local(Arc::new(block)), + chain, + &network_tx, + log, + validation_level.broadcast_validation, + ) + .await + .map(|()| warp::reply().into_response()) + }) }, ); @@ -1439,7 +1419,7 @@ pub fn serve( .and(chain_filter.clone()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |block: SignedBeaconBlock>, task_spawner: TaskSpawner, chain: Arc>, @@ -1460,33 +1440,29 @@ pub fn serve( ); // POST beacon/blocks - let post_beacon_blinded_blocks_ssz = - eth_v1 - .and(warp::path("beacon")) - .and(warp::path("blinded_blocks")) - .and(warp::path::end()) - .and(warp::body::bytes()) - .and(chain_filter.clone()) - .and(network_tx_filter.clone()) - .and(log_filter.clone()) - .and_then( - |block_bytes: Bytes, - chain: Arc>, - network_tx: UnboundedSender>, - log: Logger| async move { - let block = - match SignedBeaconBlock::>::from_ssz_bytes( - &block_bytes, - &chain.spec, - ) { - Ok(data) => data, - Err(e) => { - return Err(warp_utils::reject::custom_bad_request(format!( - "{:?}", - e - ))) - } - }; + let post_beacon_blinded_blocks_ssz = eth_v1 + .and(warp::path("beacon")) + .and(warp::path("blinded_blocks")) + .and(warp::path::end()) + .and(warp::body::bytes()) + .and(task_spawner_filter.clone()) + .and(chain_filter.clone()) + .and(network_tx_filter.clone()) + .and(log_filter.clone()) + .then( + |block_bytes: Bytes, + task_spawner: TaskSpawner, + chain: Arc>, + network_tx: UnboundedSender>, + log: Logger| { + task_spawner.spawn_async_with_rejection(Priority::P0, async move { + let block = SignedBeaconBlock::>::from_ssz_bytes( + &block_bytes, + &chain.spec, + ) + .map_err(|e| { + warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}")) + })?; publish_blocks::publish_blinded_block( block, chain, @@ -1496,8 +1472,9 @@ pub fn serve( ) .await .map(|()| warp::reply().into_response()) - }, - ); + }) + }, + ); let post_beacon_blinded_blocks_v2 = eth_v2 .and(warp::path("beacon")) @@ -1617,7 +1594,7 @@ pub fn serve( .clone() .and(warp::path::end()) .and(warp::header::optional::("accept")) - .and_then( + .then( |endpoint_version: EndpointVersion, block_id: BlockId, task_spawner: TaskSpawner, @@ -1660,7 +1637,7 @@ pub fn serve( .clone() .and(warp::path("root")) .and(warp::path::end()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1680,7 +1657,7 @@ pub fn serve( .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>| { @@ -1704,7 +1681,7 @@ pub fn serve( .and(chain_filter.clone()) .and(warp::path::end()) .and(warp::header::optional::("accept")) - .and_then( + .then( |block_id: BlockId, task_spawner: TaskSpawner, chain: Arc>, @@ -1762,7 +1739,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, attestations: Vec>, @@ -1904,7 +1881,7 @@ pub fn serve( .and(warp::path("attestations")) .and(warp::path::end()) .and(warp::query::()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, query: api_types::AttestationPoolQuery| { @@ -1937,7 +1914,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, slashing: AttesterSlashing, @@ -1979,7 +1956,7 @@ pub fn serve( .clone() .and(warp::path("attester_slashings")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_attester_slashings(); @@ -1995,7 +1972,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, slashing: ProposerSlashing, @@ -2037,7 +2014,7 @@ pub fn serve( .clone() .and(warp::path("proposer_slashings")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_proposer_slashings(); @@ -2053,7 +2030,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(network_tx_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, exit: SignedVoluntaryExit, @@ -2093,7 +2070,7 @@ pub fn serve( .clone() .and(warp::path("voluntary_exits")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let attestations = chain.op_pool.get_all_voluntary_exits(); @@ -2110,7 +2087,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, signatures: Vec, @@ -2130,7 +2107,7 @@ pub fn serve( .clone() .and(warp::path("bls_to_execution_changes")) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let address_changes = chain.op_pool.get_all_bls_to_execution_changes(); @@ -2147,7 +2124,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, address_changes: Vec, @@ -2239,7 +2216,7 @@ pub fn serve( .and(warp::header::optional::("accept")) .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) - .and_then( + .then( |accept_header: Option, task_spawner: TaskSpawner, eth1_service: eth1::Service| { @@ -2293,7 +2270,7 @@ pub fn serve( .and(warp::path("blocks")) .and(block_id_or_err) .and(warp::path::end()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, block_id: BlockId| { @@ -2326,7 +2303,7 @@ pub fn serve( .and(warp::path::param::()) .and(warp::path::end()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, epoch: Epoch, @@ -2378,7 +2355,7 @@ pub fn serve( .and(warp::path::end()) .and(warp::body::json()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, block_id: BlockId, @@ -2411,7 +2388,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let forks = ForkName::list_all() @@ -2430,7 +2407,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( move |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { let config_and_preset = @@ -2446,7 +2423,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -2477,7 +2454,7 @@ pub fn serve( .and(warp::header::optional::("accept")) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, state_id: StateId, accept_header: Option, @@ -2537,7 +2514,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, task_spawner: TaskSpawner, chain: Arc>| { @@ -2576,7 +2553,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let beacon_fork_choice = chain.canonical_head.fork_choice_read_lock(); @@ -2631,7 +2608,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -2687,7 +2664,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>, chain: Arc>| { @@ -2738,7 +2715,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(network_globals.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>, chain: Arc>| { @@ -2786,7 +2763,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |requested_peer_id: String, task_spawner: TaskSpawner, network_globals: Arc>| { @@ -2846,7 +2823,7 @@ pub fn serve( .and(multi_key_query::()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |query_res: Result, task_spawner: TaskSpawner, network_globals: Arc>| { @@ -2916,7 +2893,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -2969,7 +2946,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |epoch: Epoch, task_spawner: TaskSpawner, chain: Arc>, @@ -2995,7 +2972,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |endpoint_version: EndpointVersion, slot: Slot, query: api_types::ValidatorBlocksQuery, @@ -3064,7 +3041,7 @@ pub fn serve( .and(warp::query::()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |slot: Slot, query: api_types::ValidatorBlocksQuery, task_spawner: TaskSpawner, @@ -3121,7 +3098,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::ValidatorAttestationDataQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -3156,7 +3133,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query: api_types::ValidatorAggregateAttestationQuery, task_spawner: TaskSpawner, chain: Arc>| { @@ -3197,7 +3174,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, @@ -3223,7 +3200,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: api_types::ValidatorIndexData, task_spawner: TaskSpawner, @@ -3243,7 +3220,7 @@ pub fn serve( .and(not_while_syncing_filter.clone()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |sync_committee_data: SyncContributionData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3277,7 +3254,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, aggregates: Vec>, @@ -3390,7 +3367,7 @@ pub fn serve( .and(warp::body::json()) .and(network_tx_filter) .and(log_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, contributions: Vec>, @@ -3418,7 +3395,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |subscriptions: Vec, validator_subscription_tx: Sender, task_spawner: TaskSpawner, @@ -3470,7 +3447,7 @@ pub fn serve( .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, log: Logger, @@ -3521,15 +3498,15 @@ pub fn serve( .and(chain_filter.clone()) .and(log_filter.clone()) .and(warp::body::json()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>, log: Logger, register_val_data: Vec| async { let (tx, rx) = oneshot::channel(); - task_spawner - .spawn_async_with_rejection(Priority::P0, async move { + let initial_result = task_spawner + .spawn_async_with_rejection_no_conversion(Priority::P0, async move { let execution_layer = chain .execution_layer .as_ref() @@ -3671,17 +3648,22 @@ pub fn serve( // from what is sent back down the channel. Ok(warp::reply::reply().into_response()) }) - .await?; + .await; + + if initial_result.is_err() { + return task_spawner::convert_rejection(initial_result).await; + } // Await a response from the builder without blocking a // `BeaconProcessor` worker. - rx.await.unwrap_or_else(|_| { + task_spawner::convert_rejection(rx.await.unwrap_or_else(|_| { Ok(warp::reply::with_status( warp::reply::json(&"No response from channel"), eth2::StatusCode::INTERNAL_SERVER_ERROR, ) .into_response()) - }) + })) + .await }, ); // POST validator/sync_committee_subscriptions @@ -3694,7 +3676,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |subscriptions: Vec, validator_subscription_tx: Sender, task_spawner: TaskSpawner, @@ -3738,7 +3720,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, indices: Vec, task_spawner: TaskSpawner, @@ -3779,7 +3761,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: api_types::LivenessRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3823,7 +3805,7 @@ pub fn serve( .and(warp::path("health")) .and(warp::path::end()) .and(task_spawner_filter.clone()) - .and_then(|task_spawner: TaskSpawner| { + .then(|task_spawner: TaskSpawner| { task_spawner.blocking_json_task(Priority::P0, move || { eth2::lighthouse::Health::observe() .map(api_types::GenericResponse::from) @@ -3841,7 +3823,7 @@ pub fn serve( .and(app_start_filter) .and(data_dir_filter) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, sysinfo, app_start: std::time::Instant, @@ -3866,7 +3848,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { ui::get_validator_count(chain).map(api_types::GenericResponse::from) @@ -3882,7 +3864,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: ui::ValidatorMetricsRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3901,7 +3883,7 @@ pub fn serve( .and(warp::body::json()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |request_data: ui::ValidatorInfoRequestData, task_spawner: TaskSpawner, chain: Arc>| { @@ -3918,7 +3900,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P0, move || { @@ -3934,7 +3916,7 @@ pub fn serve( .and(warp::path("nat")) .and(task_spawner_filter.clone()) .and(warp::path::end()) - .and_then(|task_spawner: TaskSpawner| { + .then(|task_spawner: TaskSpawner| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( lighthouse_network::metrics::NAT_OPEN @@ -3952,7 +3934,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -3976,7 +3958,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(network_globals) - .and_then( + .then( |task_spawner: TaskSpawner, network_globals: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { @@ -3999,7 +3981,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_response_task(Priority::P1, move || { Ok::<_, warp::Rejection>(warp::reply::json( @@ -4023,7 +4005,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, validator_id: ValidatorId, task_spawner: TaskSpawner, @@ -4043,7 +4025,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |epoch: Epoch, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { validator_inclusion::global_validator_inclusion_data(epoch, &chain) @@ -4059,7 +4041,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { let current_slot_opt = chain.slot().ok(); @@ -4092,7 +4074,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(eth1_service_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, eth1_service: eth1::Service| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -4114,7 +4096,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(eth1_service_filter) - .and_then( + .then( |task_spawner: TaskSpawner, eth1_service: eth1::Service| { task_spawner.blocking_json_task(Priority::P1, move || { Ok(api_types::GenericResponse::from( @@ -4139,7 +4121,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>| { @@ -4166,7 +4148,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { if chain.eth1_chain.is_some() { @@ -4190,7 +4172,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || database::info(chain)) }, @@ -4203,7 +4185,7 @@ pub fn serve( .and(not_while_syncing_filter) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { chain.store_migrator.process_reconstruction(); @@ -4220,7 +4202,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |blocks: Vec>>, task_spawner: TaskSpawner, chain: Arc>, @@ -4246,7 +4228,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then(|query, task_spawner: TaskSpawner, chain, log| { + .then(|query, task_spawner: TaskSpawner, chain, log| { task_spawner.blocking_json_task(Priority::P1, move || { block_rewards::get_block_rewards(query, chain, log) }) @@ -4261,7 +4243,7 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( |blocks, task_spawner: TaskSpawner, chain, log| { task_spawner.blocking_json_task(Priority::P1, move || { block_rewards::compute_block_rewards(blocks, chain, log) @@ -4278,7 +4260,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |target, query, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { attestation_performance::get_attestation_performance(target, query, chain) @@ -4294,7 +4276,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |query, task_spawner: TaskSpawner, chain: Arc>| { task_spawner.blocking_json_task(Priority::P1, move || { block_packing_efficiency::get_block_packing_efficiency(query, chain) @@ -4308,7 +4290,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter.clone()) .and(chain_filter.clone()) - .and_then( + .then( |task_spawner: TaskSpawner, chain: Arc>| { task_spawner.spawn_async_with_rejection(Priority::P1, async move { let merge_readiness = chain.check_merge_readiness().await; @@ -4326,7 +4308,7 @@ pub fn serve( .and(multi_key_query::()) .and(task_spawner_filter.clone()) .and(chain_filter) - .and_then( + .then( |topics_res: Result, task_spawner: TaskSpawner, chain: Arc>| { @@ -4403,7 +4385,7 @@ pub fn serve( .and(warp::path::end()) .and(task_spawner_filter) .and(sse_component_filter) - .and_then( + .then( |task_spawner: TaskSpawner, sse_component: Option| { task_spawner.blocking_response_task(Priority::P1, move || { if let Some(logging_components) = sse_component { diff --git a/beacon_node/http_api/src/task_spawner.rs b/beacon_node/http_api/src/task_spawner.rs index b4da67f77cf..503faff717b 100644 --- a/beacon_node/http_api/src/task_spawner.rs +++ b/beacon_node/http_api/src/task_spawner.rs @@ -35,6 +35,24 @@ pub struct TaskSpawner { beacon_processor_send: Option>, } +/// Convert a warp `Rejection` into a `Response`. +/// +/// This function should *always* be used to convert rejections into responses. This prevents warp +/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404 +pub async fn convert_rejection(res: Result) -> Response { + match res { + Ok(response) => response.into_response(), + Err(e) => match warp_utils::reject::handle_rejection(e).await { + Ok(reply) => reply.into_response(), + Err(_) => warp::reply::with_status( + warp::reply::json(&"unhandled error"), + eth2::StatusCode::INTERNAL_SERVER_ERROR, + ) + .into_response(), + }, + } +} + impl TaskSpawner { pub fn new(beacon_processor_send: Option>) -> Self { Self { @@ -43,11 +61,7 @@ impl TaskSpawner { } /// Executes a "blocking" (non-async) task which returns a `Response`. - pub async fn blocking_response_task( - self, - priority: Priority, - func: F, - ) -> Result + pub async fn blocking_response_task(self, priority: Priority, func: F) -> Response where F: FnOnce() -> Result + Send + Sync + 'static, T: Reply + Send + 'static, @@ -65,31 +79,25 @@ impl TaskSpawner { }; // Send the function to the beacon processor for execution at some arbitrary time. - match send_to_beacon_processor( + let result = send_to_beacon_processor( beacon_processor_send, priority, BlockingOrAsync::Blocking(Box::new(process_fn)), rx, ) .await - { - Ok(result) => result.map(Reply::into_response), - Err(error_response) => Ok(error_response), - } + .and_then(|x| x); + convert_rejection(result).await } else { // There is no beacon processor so spawn a task directly on the // tokio executor. - warp_utils::task::blocking_response_task(func).await + convert_rejection(warp_utils::task::blocking_response_task(func).await).await } } /// Executes a "blocking" (non-async) task which returns a JSON-serializable /// object. - pub async fn blocking_json_task( - self, - priority: Priority, - func: F, - ) -> Result + pub async fn blocking_json_task(self, priority: Priority, func: F) -> Response where F: FnOnce() -> Result + Send + Sync + 'static, T: Serialize + Send + 'static, @@ -98,11 +106,26 @@ impl TaskSpawner { self.blocking_response_task(priority, func).await } - /// Executes an async task which may return a `warp::Rejection`. + /// Executes an async task which may return a `Rejection`, which will be converted to a response. pub async fn spawn_async_with_rejection( self, priority: Priority, func: impl Future> + Send + Sync + 'static, + ) -> Response { + let result = self + .spawn_async_with_rejection_no_conversion(priority, func) + .await; + convert_rejection(result).await + } + + /// Same as `spawn_async_with_rejection` but returning a result with the unhandled rejection. + /// + /// If you call this function you MUST convert the rejection to a response and not let it + /// propagate into Warp's filters. See `convert_rejection`. + pub async fn spawn_async_with_rejection_no_conversion( + self, + priority: Priority, + func: impl Future> + Send + Sync + 'static, ) -> Result { if let Some(beacon_processor_send) = &self.beacon_processor_send { // Create a wrapper future that will execute `func` and send the @@ -124,18 +147,16 @@ impl TaskSpawner { rx, ) .await - .unwrap_or_else(Result::Ok) + .and_then(|x| x) } else { // There is no beacon processor so spawn a task directly on the // tokio executor. - tokio::task::spawn(func).await.unwrap_or_else(|e| { - let response = warp::reply::with_status( - warp::reply::json(&format!("Tokio did not execute task: {e:?}")), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(); - Ok(response) - }) + tokio::task::spawn(func) + .await + .map_err(|_| { + warp_utils::reject::custom_server_error("Tokio failed to spawn task".into()) + }) + .and_then(|x| x) } } @@ -158,14 +179,14 @@ impl TaskSpawner { }; // Send the function to the beacon processor for execution at some arbitrary time. - send_to_beacon_processor( + let result = send_to_beacon_processor( beacon_processor_send, priority, BlockingOrAsync::Async(Box::pin(process_fn)), rx, ) - .await - .unwrap_or_else(|error_response| error_response) + .await; + convert_rejection(result).await } else { // There is no beacon processor so spawn a task directly on the // tokio executor. @@ -182,14 +203,14 @@ impl TaskSpawner { /// Send a task to the beacon processor and await execution. /// -/// If the task is not executed, return an `Err(response)` with an error message +/// If the task is not executed, return an `Err` with an error message /// for the API consumer. async fn send_to_beacon_processor( beacon_processor_send: &BeaconProcessorSend, priority: Priority, process_fn: BlockingOrAsync, rx: oneshot::Receiver, -) -> Result { +) -> Result { let error_message = match beacon_processor_send.try_send(priority.work_event(process_fn)) { Ok(()) => { match rx.await { @@ -205,10 +226,7 @@ async fn send_to_beacon_processor( Err(TrySendError::Closed(_)) => "The task was dropped. The server is shutting down.", }; - let error_response = warp::reply::with_status( - warp::reply::json(&error_message), - eth2::StatusCode::INTERNAL_SERVER_ERROR, - ) - .into_response(); - Err(error_response) + Err(warp_utils::reject::custom_server_error( + error_message.to_string(), + )) }