From 2c20dfbef49204458ba9c30aa5360fca97173837 Mon Sep 17 00:00:00 2001 From: Paul Harris Date: Wed, 27 Nov 2024 17:24:10 +1000 Subject: [PATCH 1/4] alter ActiveP2pNetwork concept of close to in sync - `isInSync` is often set when there's no other peers to sync to so its not a great measure - moved the isCloseToInSync into recentChaindata Signed-off-by: Paul Harris --- .../beacon/sync/events/SyncStateTracker.java | 1 + .../logic/common/util/ForkChoiceUtil.java | 2 +- .../networking/eth2/ActiveEth2P2PNetwork.java | 23 +----- .../teku/networking/eth2/Eth2P2PNetwork.java | 2 +- .../eth2/mock/NoOpEth2P2PNetwork.java | 2 +- .../eth2/ActiveEth2P2PNetworkTest.java | 76 +++++-------------- .../beaconchain/BeaconChainController.java | 2 +- .../services/beaconchain/SlotProcessor.java | 2 +- .../teku/storage/client/RecentChainData.java | 26 +++++++ .../storage/client/RecentChainDataTest.java | 44 +++++++++++ 10 files changed, 97 insertions(+), 83 deletions(-) diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java index 432c4e78747..5a22c27e02f 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java @@ -136,6 +136,7 @@ private void updateCurrentState() { currentState = SyncState.SYNCING; } else if (startingUp) { currentState = SyncState.START_UP; + } else { currentState = SyncState.IN_SYNC; } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ForkChoiceUtil.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ForkChoiceUtil.java index 859b5fe6e81..2e6ce3e6d39 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ForkChoiceUtil.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/util/ForkChoiceUtil.java @@ -68,7 +68,7 @@ public ForkChoiceUtil( } public UInt64 getSlotsSinceGenesis(final ReadOnlyStore store, final boolean useUnixTime) { - UInt64 time = + final UInt64 time = useUnixTime ? UInt64.valueOf(Instant.now().getEpochSecond()) : store.getTimeSeconds(); return getCurrentSlot(time, store.getGenesisTime()); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java index 6dd13b98f27..ef6dfbfa16b 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java @@ -130,7 +130,7 @@ private synchronized void startup() { processedAttestationSubscriptionProvider.subscribe(gossipForkManager::publishAttestation); eventChannels.subscribe(BlockGossipChannel.class, gossipForkManager::publishBlock); eventChannels.subscribe(BlobSidecarGossipChannel.class, gossipForkManager::publishBlobSidecar); - if (isCloseToInSync()) { + if (recentChainData.isCloseToInSync()) { startGossip(); } } @@ -167,36 +167,19 @@ private synchronized void stopGossip() { } @Override - public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) { + public void onSyncStateChanged(final boolean isOptimistic) { gossipForkManager.onOptimisticHeadChanged(isOptimistic); if (state.get() != State.RUNNING) { return; } - if (isInSync || isCloseToInSync()) { + if (recentChainData.isCloseToInSync()) { startGossip(); } else { stopGossip(); } } - @VisibleForTesting - boolean isCloseToInSync() { - final Optional currentEpoch = recentChainData.getCurrentEpoch(); - if (currentEpoch.isEmpty()) { - return false; - } - - final int maxLookaheadEpochs = spec.getSpecConfig(currentEpoch.get()).getMaxSeedLookahead(); - final int slotsPerEpoch = spec.slotsPerEpoch(currentEpoch.get()); - final int maxLookaheadSlots = slotsPerEpoch * maxLookaheadEpochs; - - return recentChainData - .getChainHeadSlotsBehind() - .orElse(UInt64.MAX_VALUE) - .isLessThanOrEqualTo(maxLookaheadSlots); - } - private void setTopicScoringParams() { gossipUpdateTask = asyncRunner.runWithFixedDelay( diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java index 0b7dbc34c49..df8a0c239c7 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java @@ -28,7 +28,7 @@ public interface Eth2P2PNetwork extends P2PNetwork { void onEpoch(UInt64 epoch); - void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic); + void onSyncStateChanged(final boolean isOptimistic); void subscribeToAttestationSubnetId(int subnetId); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java index f80fab25fea..bf18322d9a4 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java @@ -37,7 +37,7 @@ public NoOpEth2P2PNetwork(final Spec spec) { public void onEpoch(final UInt64 epoch) {} @Override - public void onSyncStateChanged(final boolean isInSync, final boolean isOptimistic) {} + public void onSyncStateChanged(final boolean isOptimistic) {} @Override public void subscribeToAttestationSubnetId(final int subnetId) {} diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java index 91810bc2cf4..d4231b56b87 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java @@ -81,10 +81,6 @@ public class ActiveEth2P2PNetworkTest { private Fork altairFork; private Bytes32 genesisValidatorsRoot; - private final int maxFollowDistanceSlots = - spec.getGenesisSpecConfig().getMaxSeedLookahead() - * spec.slotsPerEpoch(storageSystem.combinedChainDataClient().getCurrentEpoch()); - @BeforeEach public void setup() { when(discoveryNetwork.start()).thenReturn(SafeFuture.completedFuture(null)); @@ -192,13 +188,11 @@ public void unsubscribeFromSyncCommitteeSubnetId_shouldUpdateDiscoveryENR() { @Test void onSyncStateChanged_shouldEnableGossipWhenInSync() { // Current slot is a long way beyond the chain head - storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000)); + storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(32)); assertThat(network.start()).isCompleted(); - // Won't start gossip as chain head is too old - verify(gossipForkManager, never()).configureGossipForEpoch(any()); - network.onSyncStateChanged(true, false); + network.onSyncStateChanged(false); // Even though we're a long way behind, start gossip because we believe we're in sync verify(gossipForkManager).configureGossipForEpoch(any()); @@ -210,90 +204,56 @@ void onSyncStateChanged_shouldStopGossipWhenTooFarBehindAndNotInSync() { storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000)); assertThat(network.start()).isCompleted(); - network.onSyncStateChanged(true, false); - // Even though we're a long way behind, start gossip because we believe we're in sync - verify(gossipForkManager).configureGossipForEpoch(any()); + network.onSyncStateChanged(false); + // based on network time we know we're too far behind, so we don't start gossip + verify(gossipForkManager, never()).configureGossipForEpoch(any()); - network.onSyncStateChanged(false, false); - verify(gossipForkManager).stopGossip(); + network.onSyncStateChanged(false); + verify(gossipForkManager, never()).stopGossip(); } @Test void onSyncStateChanged_shouldNotifyForkManagerOfOptimisticSyncState() { assertThat(network.start()).isCompleted(); - network.onSyncStateChanged(false, true); + network.onSyncStateChanged(true); verify(gossipForkManager).onOptimisticHeadChanged(true); - network.onSyncStateChanged(false, false); + network.onSyncStateChanged(false); verify(gossipForkManager).onOptimisticHeadChanged(false); - network.onSyncStateChanged(true, true); + network.onSyncStateChanged(true); verify(gossipForkManager, times(2)).onOptimisticHeadChanged(true); - network.onSyncStateChanged(true, false); + network.onSyncStateChanged(false); verify(gossipForkManager, times(2)).onOptimisticHeadChanged(false); } @Test void onSyncStateChanged_shouldNotResultInMultipleSubscriptions() { // Current slot is a long way beyond the chain head - storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000)); + storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(32)); assertThat(network.start()).isCompleted(); // Won't start gossip as chain head is too old - verify(gossipForkManager, never()).configureGossipForEpoch(any()); + // verify(gossipForkManager, never()).configureGossipForEpoch(any()); - network.onSyncStateChanged(true, false); + network.onSyncStateChanged(false); verify(gossipForkManager).configureGossipForEpoch(any()); assertThat(subscribers.getSubscriberCount()).isEqualTo(1); verify(eventChannels, times(1)).subscribe(eq(BlockGossipChannel.class), any()); - network.onSyncStateChanged(false, false); + storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(100)); + network.onSyncStateChanged(false); verify(gossipForkManager).stopGossip(); - network.onSyncStateChanged(true, false); - verify(gossipForkManager, times(2)).configureGossipForEpoch(any()); + network.onSyncStateChanged(false); + verify(gossipForkManager).configureGossipForEpoch(any()); // Can't unsubscribe from these so should only subscribe once assertThat(subscribers.getSubscriberCount()).isEqualTo(1); verify(eventChannels, times(1)).subscribe(eq(BlockGossipChannel.class), any()); } - @Test - void isCloseToInSync_shouldCalculateWhenDistanceOutOfRange() { - storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(maxFollowDistanceSlots + 1)); - assertThat(network.isCloseToInSync()).isFalse(); - } - - @Test - void isCloseToInSync_shouldCalculateWhenDistanceInRange() { - storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(maxFollowDistanceSlots)); - assertThat(network.isCloseToInSync()).isTrue(); - } - - @Test - void isCloseToInSync_shouldReturnFalseWhenEmptyCurrentEpoch() { - final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(); - final RecentChainData recentChainData = storageSystem.recentChainData(); - final ActiveEth2P2PNetwork network = - new ActiveEth2P2PNetwork( - spec, - asyncRunner, - discoveryNetwork, - peerManager, - gossipForkManager, - eventChannels, - recentChainData, - attestationSubnetService, - syncCommitteeSubnetService, - gossipEncoding, - gossipConfigurator, - processedAttestationSubscriptionProvider, - true); - - assertThat(network.isCloseToInSync()).isFalse(); - } - @SuppressWarnings("unchecked") private ArgumentCaptor> subnetIdCaptor() { return ArgumentCaptor.forClass(Iterable.class); diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index d57ef3bb489..83e1f7cdfd2 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -1327,7 +1327,7 @@ public void initSyncService() { // p2pNetwork subscription so gossip can be enabled and disabled appropriately syncService.subscribeToSyncStateChangesAndUpdate( - state -> p2pNetwork.onSyncStateChanged(state.isInSync(), state.isOptimistic())); + state -> p2pNetwork.onSyncStateChanged(state.isOptimistic())); } protected void initOperationsReOrgManager() { diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java index 752ddba91fb..75521aece80 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java @@ -151,7 +151,7 @@ private void processEpochPrecompute(final UInt64 epoch) { } private void processSlotWhileSyncing(final SyncState currentSyncState) { - UInt64 slot = nodeSlot.getValue(); + final UInt64 slot = nodeSlot.getValue(); this.forkChoiceTrigger.onSlotStartedWhileSyncing(slot); if (currentSyncState == SyncState.AWAITING_EL) { eventLog.syncEventAwaitingEL(slot, recentChainData.getHeadSlot(), p2pNetwork.getPeerCount()); diff --git a/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java b/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java index 4b274001d9a..2edb38d37d1 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java @@ -15,6 +15,7 @@ import static tech.pegasys.teku.infrastructure.time.TimeUtilities.secondsToMillis; +import com.google.common.annotations.VisibleForTesting; import java.util.Collections; import java.util.List; import java.util.Map; @@ -41,6 +42,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.SpecVersion; +import tech.pegasys.teku.spec.config.SpecConfig; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary; @@ -58,6 +60,7 @@ import tech.pegasys.teku.spec.datastructures.state.ForkInfo; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconStateCache; +import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers; import tech.pegasys.teku.spec.logic.common.util.BeaconStateUtil; import tech.pegasys.teku.storage.api.ChainHeadChannel; import tech.pegasys.teku.storage.api.FinalizedCheckpointChannel; @@ -194,6 +197,29 @@ public UInt64 computeTimeAtSlot(final UInt64 slot) { return genesisTime.plus(slot.times(spec.getSecondsPerSlot(slot))); } + @VisibleForTesting + boolean isCloseToInSync(final Optional currentEpoch, final UInt64 currentTimeSeconds) { + final SpecVersion specVersion = spec.atEpoch(currentEpoch.orElseThrow()); + final MiscHelpers miscHelpers = specVersion.miscHelpers(); + final SpecConfig specConfig = specVersion.getConfig(); + final UInt64 networkSlot = miscHelpers.computeSlotAtTime(getGenesisTime(), currentTimeSeconds); + + final int maxLookaheadEpochs = specConfig.getMaxSeedLookahead(); + final int slotsPerEpoch = specVersion.getSlotsPerEpoch(); + final int maxLookaheadSlots = slotsPerEpoch * maxLookaheadEpochs; + + return networkSlot.minusMinZero(getHeadSlot()).isLessThanOrEqualTo(maxLookaheadSlots); + } + + public boolean isCloseToInSync() { + // current epoch is time based, and gives network current epoch + final Optional currentEpoch = getCurrentEpoch(); + if (currentEpoch.isEmpty()) { + return false; + } + return isCloseToInSync(currentEpoch, getStore().getTimeInSeconds()); + } + public Optional getGenesisData() { return genesisData; } diff --git a/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java b/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java index c040f5e291b..464a2402c86 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java @@ -752,6 +752,50 @@ public void getBlockRootBySlotWithHeadRoot_forUnknownHeadRoot() { assertThat(recentChainData.getBlockRootInEffectBySlot(bestBlock.getSlot(), headRoot)).isEmpty(); } + @Test + public void isCloseToInSync_preGenesis() { + initPreGenesis(); + assertThat(recentChainData.isCloseToInSync()).isFalse(); + } + + @Test + public void isCloseToSync_belowBoundary() { + initPostGenesis(); + final SpecConfig specConfig = spec.getGenesisSpecConfig(); + final int seconds = + specConfig.getMaxSeedLookahead() + * specConfig.getSlotsPerEpoch() + * specConfig.getSecondsPerSlot(); + assertThat( + recentChainData.isCloseToInSync(Optional.of(UInt64.ZERO), UInt64.valueOf(seconds - 1))) + .isTrue(); + } + + @Test + public void isCloseToSync_atBoundary() { + initPostGenesis(); + final SpecConfig specConfig = spec.getGenesisSpecConfig(); + final int seconds = + specConfig.getMaxSeedLookahead() + * specConfig.getSlotsPerEpoch() + * specConfig.getSecondsPerSlot(); + assertThat(recentChainData.isCloseToInSync(Optional.of(UInt64.ZERO), UInt64.valueOf(seconds))) + .isTrue(); + } + + @Test + public void isCloseToSync_aboveBoundary() { + initPostGenesis(); + final SpecConfig specConfig = spec.getGenesisSpecConfig(); + final int seconds = + specConfig.getMaxSeedLookahead() + * specConfig.getSlotsPerEpoch() + * specConfig.getSecondsPerSlot(); + assertThat( + recentChainData.isCloseToInSync(Optional.of(UInt64.ZERO), UInt64.valueOf(seconds + 8))) + .isFalse(); + } + @Test public void getBlockRootBySlotWithHeadRoot_withForkRoot() { initPostGenesis(); From 64a06959be75519e87947f6cd20c14612e6adc34 Mon Sep 17 00:00:00 2001 From: Paul Harris Date: Thu, 28 Nov 2024 10:43:14 +1000 Subject: [PATCH 2/4] fix AT. Signed-off-by: Paul Harris --- .../pegasys/teku/storage/client/RecentChainData.java | 10 ++++------ .../teku/storage/client/RecentChainDataTest.java | 11 +++-------- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java b/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java index 2edb38d37d1..9c18c758156 100644 --- a/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java +++ b/storage/src/main/java/tech/pegasys/teku/storage/client/RecentChainData.java @@ -198,8 +198,8 @@ public UInt64 computeTimeAtSlot(final UInt64 slot) { } @VisibleForTesting - boolean isCloseToInSync(final Optional currentEpoch, final UInt64 currentTimeSeconds) { - final SpecVersion specVersion = spec.atEpoch(currentEpoch.orElseThrow()); + boolean isCloseToInSync(final UInt64 currentTimeSeconds) { + final SpecVersion specVersion = spec.getGenesisSpec(); final MiscHelpers miscHelpers = specVersion.miscHelpers(); final SpecConfig specConfig = specVersion.getConfig(); final UInt64 networkSlot = miscHelpers.computeSlotAtTime(getGenesisTime(), currentTimeSeconds); @@ -212,12 +212,10 @@ boolean isCloseToInSync(final Optional currentEpoch, final UInt64 curren } public boolean isCloseToInSync() { - // current epoch is time based, and gives network current epoch - final Optional currentEpoch = getCurrentEpoch(); - if (currentEpoch.isEmpty()) { + if (store == null) { return false; } - return isCloseToInSync(currentEpoch, getStore().getTimeInSeconds()); + return isCloseToInSync(store.getTimeInSeconds()); } public Optional getGenesisData() { diff --git a/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java b/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java index 464a2402c86..ae88e4465d1 100644 --- a/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java +++ b/storage/src/test/java/tech/pegasys/teku/storage/client/RecentChainDataTest.java @@ -766,9 +766,7 @@ public void isCloseToSync_belowBoundary() { specConfig.getMaxSeedLookahead() * specConfig.getSlotsPerEpoch() * specConfig.getSecondsPerSlot(); - assertThat( - recentChainData.isCloseToInSync(Optional.of(UInt64.ZERO), UInt64.valueOf(seconds - 1))) - .isTrue(); + assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds - 1))).isTrue(); } @Test @@ -779,8 +777,7 @@ public void isCloseToSync_atBoundary() { specConfig.getMaxSeedLookahead() * specConfig.getSlotsPerEpoch() * specConfig.getSecondsPerSlot(); - assertThat(recentChainData.isCloseToInSync(Optional.of(UInt64.ZERO), UInt64.valueOf(seconds))) - .isTrue(); + assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds))).isTrue(); } @Test @@ -791,9 +788,7 @@ public void isCloseToSync_aboveBoundary() { specConfig.getMaxSeedLookahead() * specConfig.getSlotsPerEpoch() * specConfig.getSecondsPerSlot(); - assertThat( - recentChainData.isCloseToInSync(Optional.of(UInt64.ZERO), UInt64.valueOf(seconds + 8))) - .isFalse(); + assertThat(recentChainData.isCloseToInSync(UInt64.valueOf(seconds + 8))).isFalse(); } @Test From 33155be43491bbbbf395c35110084abd77291c39 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Fri, 29 Nov 2024 16:21:11 +0100 Subject: [PATCH 3/4] make sure we start gossip when peers connects and we are in sync --- .../networking/eth2/ActiveEth2P2PNetwork.java | 10 +++++++ .../eth2/ActiveEth2P2PNetworkTest.java | 29 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java index ef6dfbfa16b..178b30f9b0c 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java @@ -133,6 +133,16 @@ private synchronized void startup() { if (recentChainData.isCloseToInSync()) { startGossip(); } + peerManager.subscribeConnect(peer -> onPeerConnected()); + } + + private void onPeerConnected() { + if (gossipStarted.get() || state.get() != State.RUNNING) { + return; + } + if (recentChainData.isCloseToInSync()) { + startGossip(); + } } private synchronized void startGossip() { diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java index d4231b56b87..d27094fb2f7 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java @@ -39,8 +39,10 @@ import tech.pegasys.teku.networking.eth2.gossip.encoding.GossipEncoding; import tech.pegasys.teku.networking.eth2.gossip.forks.GossipForkManager; import tech.pegasys.teku.networking.eth2.gossip.topics.ProcessedAttestationSubscriptionProvider; +import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; import tech.pegasys.teku.networking.eth2.peers.Eth2PeerManager; import tech.pegasys.teku.networking.p2p.discovery.DiscoveryNetwork; +import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.attestation.ProcessedAttestationListener; @@ -185,6 +187,33 @@ public void unsubscribeFromSyncCommitteeSubnetId_shouldUpdateDiscoveryENR() { assertThat(capturedValues.get(3)).containsExactlyInAnyOrder(1, 3); } + @Test + void shouldStartGossipOnPeerConnect() { + @SuppressWarnings("unchecked") + final ArgumentCaptor> peerManagerCaptor = + ArgumentCaptor.forClass(PeerConnectedSubscriber.class); + // Current slot is a long way beyond the chain head + storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(64)); + + assertThat(network.start()).isCompleted(); + verify(peerManager).subscribeConnect(peerManagerCaptor.capture()); + + network.onSyncStateChanged(false); + // based on network time we know we're too far behind, so we don't start gossip + verify(gossipForkManager, never()).configureGossipForEpoch(any()); + + // we are still too far behind, so on peer connect gossip is not started + peerManagerCaptor.getValue().onConnected(mock(Eth2Peer.class)); + verify(gossipForkManager, never()).configureGossipForEpoch(any()); + + // Advance the chain + storageSystem.chainUpdater().updateBestBlock(storageSystem.chainUpdater().advanceChain(64)); + + // on peer connect gossip is started + peerManagerCaptor.getValue().onConnected(mock(Eth2Peer.class)); + verify(gossipForkManager).configureGossipForEpoch(any()); + } + @Test void onSyncStateChanged_shouldEnableGossipWhenInSync() { // Current slot is a long way beyond the chain head From 04c46cb51eddb8bf1c3a719817102f81b94496f3 Mon Sep 17 00:00:00 2001 From: Paul Harris Date: Mon, 2 Dec 2024 06:46:07 +1000 Subject: [PATCH 4/4] put isCloseToInSync into the onSyncStateChanged fn Signed-off-by: Paul Harris --- .../beacon/sync/events/SyncStateTracker.java | 1 - .../networking/eth2/ActiveEth2P2PNetwork.java | 4 +-- .../teku/networking/eth2/Eth2P2PNetwork.java | 2 +- .../eth2/mock/NoOpEth2P2PNetwork.java | 2 +- .../eth2/ActiveEth2P2PNetworkTest.java | 35 +++++++++---------- .../beaconchain/BeaconChainController.java | 3 +- 6 files changed, 23 insertions(+), 24 deletions(-) diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java index 5a22c27e02f..432c4e78747 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/events/SyncStateTracker.java @@ -136,7 +136,6 @@ private void updateCurrentState() { currentState = SyncState.SYNCING; } else if (startingUp) { currentState = SyncState.START_UP; - } else { currentState = SyncState.IN_SYNC; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java index 178b30f9b0c..53fcc248414 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetwork.java @@ -177,13 +177,13 @@ private synchronized void stopGossip() { } @Override - public void onSyncStateChanged(final boolean isOptimistic) { + public void onSyncStateChanged(final boolean isCloseToInSync, final boolean isOptimistic) { gossipForkManager.onOptimisticHeadChanged(isOptimistic); if (state.get() != State.RUNNING) { return; } - if (recentChainData.isCloseToInSync()) { + if (isCloseToInSync) { startGossip(); } else { stopGossip(); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java index df8a0c239c7..96dce96b2c5 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/Eth2P2PNetwork.java @@ -28,7 +28,7 @@ public interface Eth2P2PNetwork extends P2PNetwork { void onEpoch(UInt64 epoch); - void onSyncStateChanged(final boolean isOptimistic); + void onSyncStateChanged(final boolean isCloseToInSync, final boolean isOptimistic); void subscribeToAttestationSubnetId(int subnetId); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java index bf18322d9a4..e230b90b5d3 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/mock/NoOpEth2P2PNetwork.java @@ -37,7 +37,7 @@ public NoOpEth2P2PNetwork(final Spec spec) { public void onEpoch(final UInt64 epoch) {} @Override - public void onSyncStateChanged(final boolean isOptimistic) {} + public void onSyncStateChanged(final boolean isCloseToInSync, final boolean isOptimistic) {} @Override public void subscribeToAttestationSubnetId(final int subnetId) {} diff --git a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java index d27094fb2f7..55fdbcd61c3 100644 --- a/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java +++ b/networking/eth2/src/test/java/tech/pegasys/teku/networking/eth2/ActiveEth2P2PNetworkTest.java @@ -198,7 +198,7 @@ void shouldStartGossipOnPeerConnect() { assertThat(network.start()).isCompleted(); verify(peerManager).subscribeConnect(peerManagerCaptor.capture()); - network.onSyncStateChanged(false); + network.onSyncStateChanged(false, false); // based on network time we know we're too far behind, so we don't start gossip verify(gossipForkManager, never()).configureGossipForEpoch(any()); @@ -221,7 +221,7 @@ void onSyncStateChanged_shouldEnableGossipWhenInSync() { assertThat(network.start()).isCompleted(); - network.onSyncStateChanged(false); + network.onSyncStateChanged(true, false); // Even though we're a long way behind, start gossip because we believe we're in sync verify(gossipForkManager).configureGossipForEpoch(any()); @@ -233,51 +233,50 @@ void onSyncStateChanged_shouldStopGossipWhenTooFarBehindAndNotInSync() { storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000)); assertThat(network.start()).isCompleted(); - network.onSyncStateChanged(false); - // based on network time we know we're too far behind, so we don't start gossip - verify(gossipForkManager, never()).configureGossipForEpoch(any()); + network.onSyncStateChanged(true, false); + // Even though we're a long way behind, start gossip because we believe we're in sync + verify(gossipForkManager).configureGossipForEpoch(any()); - network.onSyncStateChanged(false); - verify(gossipForkManager, never()).stopGossip(); + network.onSyncStateChanged(false, false); + verify(gossipForkManager).stopGossip(); } @Test void onSyncStateChanged_shouldNotifyForkManagerOfOptimisticSyncState() { assertThat(network.start()).isCompleted(); - network.onSyncStateChanged(true); + network.onSyncStateChanged(false, true); verify(gossipForkManager).onOptimisticHeadChanged(true); - network.onSyncStateChanged(false); + network.onSyncStateChanged(false, false); verify(gossipForkManager).onOptimisticHeadChanged(false); - network.onSyncStateChanged(true); + network.onSyncStateChanged(true, true); verify(gossipForkManager, times(2)).onOptimisticHeadChanged(true); - network.onSyncStateChanged(false); + network.onSyncStateChanged(true, false); verify(gossipForkManager, times(2)).onOptimisticHeadChanged(false); } @Test void onSyncStateChanged_shouldNotResultInMultipleSubscriptions() { // Current slot is a long way beyond the chain head - storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(32)); + storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(1000)); assertThat(network.start()).isCompleted(); // Won't start gossip as chain head is too old - // verify(gossipForkManager, never()).configureGossipForEpoch(any()); + verify(gossipForkManager, never()).configureGossipForEpoch(any()); - network.onSyncStateChanged(false); + network.onSyncStateChanged(true, false); verify(gossipForkManager).configureGossipForEpoch(any()); assertThat(subscribers.getSubscriberCount()).isEqualTo(1); verify(eventChannels, times(1)).subscribe(eq(BlockGossipChannel.class), any()); - storageSystem.chainUpdater().setCurrentSlot(UInt64.valueOf(100)); - network.onSyncStateChanged(false); + network.onSyncStateChanged(false, false); verify(gossipForkManager).stopGossip(); - network.onSyncStateChanged(false); - verify(gossipForkManager).configureGossipForEpoch(any()); + network.onSyncStateChanged(true, false); + verify(gossipForkManager, times(2)).configureGossipForEpoch(any()); // Can't unsubscribe from these so should only subscribe once assertThat(subscribers.getSubscriberCount()).isEqualTo(1); verify(eventChannels, times(1)).subscribe(eq(BlockGossipChannel.class), any()); diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index a36e15fccfd..47e0d1540e4 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -1326,7 +1326,8 @@ public void initSyncService() { // p2pNetwork subscription so gossip can be enabled and disabled appropriately syncService.subscribeToSyncStateChangesAndUpdate( - state -> p2pNetwork.onSyncStateChanged(state.isOptimistic())); + state -> + p2pNetwork.onSyncStateChanged(recentChainData.isCloseToInSync(), state.isOptimistic())); } protected void initOperationsReOrgManager() {