diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/NotSynchronisingResult.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/NotSynchronisingResult.java index 71f4ad25deb..d67ba7b9f1b 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/NotSynchronisingResult.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/NotSynchronisingResult.java @@ -24,4 +24,14 @@ public class NotSynchronisingResult implements JsonRpcResult { public boolean getResult() { return false; } + + @Override + public boolean equals(final Object o) { + return (this == o) || (o != null && getClass() == o.getClass()); + } + + @Override + public int hashCode() { + return "NotSyncingResult".hashCode(); + } } diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java index 55749f67545..3f5d108c27a 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java @@ -21,9 +21,12 @@ import org.hyperledger.besu.ethereum.core.Synchronizer; import org.hyperledger.besu.plugin.data.SyncStatus; +import java.util.Optional; + public class SyncingSubscriptionService { private final SubscriptionManager subscriptionManager; + private Optional lastMessageWasInSync = Optional.empty(); public SyncingSubscriptionService( final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) { @@ -37,15 +40,21 @@ private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) { Subscription.class, syncingSubscriptions -> { if (syncStatus.inSync()) { - syncingSubscriptions.forEach( - s -> - subscriptionManager.sendMessage( - s.getSubscriptionId(), new NotSynchronisingResult())); + if (!lastMessageWasInSync.orElse(Boolean.FALSE)) { + syncingSubscriptions.forEach( + s -> + subscriptionManager.sendMessage( + s.getSubscriptionId(), new NotSynchronisingResult())); + lastMessageWasInSync = Optional.of(Boolean.TRUE); + } } else { - syncingSubscriptions.forEach( - s -> - subscriptionManager.sendMessage( - s.getSubscriptionId(), new SyncingResult(syncStatus))); + if (lastMessageWasInSync.orElse(Boolean.TRUE)) { + syncingSubscriptions.forEach( + s -> + subscriptionManager.sendMessage( + s.getSubscriptionId(), new SyncingResult(syncStatus))); + lastMessageWasInSync = Optional.of(Boolean.FALSE); + } } }); } diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java index 9b475f92d66..13987413ca1 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java @@ -14,12 +14,16 @@ */ package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.syncing; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.JsonRpcResult; import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.SyncingResult; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType; @@ -102,4 +106,89 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() { ArgumentMatchers.eq(subscription.getSubscriptionId()), any(NotSynchronisingResult.class)); } + + @Test + public void shouldNotRepeatOutOfSyncMessages() { + final SyncingSubscription subscription = + new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING); + final List subscriptions = Collections.singletonList(subscription); + final SyncStatus syncStatus = new SyncStatus(0L, 1L, 3L); + final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus); + + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + + syncStatusListener.onSyncStatusChanged(syncStatus); + syncStatusListener.onSyncStatusChanged(syncStatus); + + verify(subscriptionManager, atMostOnce()) + .sendMessage( + ArgumentMatchers.eq(subscription.getSubscriptionId()), eq(expectedSyncingResult)); + } + + @Test + public void shouldNotRepeatInSyncMessages() { + final SyncingSubscription subscription = + new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING); + final List subscriptions = Collections.singletonList(subscription); + final SyncStatus syncStatus = new SyncStatus(0L, 3L, 3L); + final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus); + + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + + syncStatusListener.onSyncStatusChanged(syncStatus); + syncStatusListener.onSyncStatusChanged(syncStatus); + + verify(subscriptionManager, atMostOnce()) + .sendMessage( + ArgumentMatchers.eq(subscription.getSubscriptionId()), eq(expectedSyncingResult)); + } + + @Test + public void shouldOnlyReportSyncChange() { + final SyncingSubscription subscription = + new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING); + final List subscriptions = Collections.singletonList(subscription); + + final SyncStatus inSyncStatus = new SyncStatus(0L, 3L, 3L); + final SyncStatus outOfSyncStatus = new SyncStatus(0L, 1L, 3L); + + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + + syncStatusListener.onSyncStatusChanged(outOfSyncStatus); + syncStatusListener.onSyncStatusChanged(inSyncStatus); + syncStatusListener.onSyncStatusChanged(inSyncStatus); + syncStatusListener.onSyncStatusChanged(outOfSyncStatus); + syncStatusListener.onSyncStatusChanged(outOfSyncStatus); + syncStatusListener.onSyncStatusChanged(inSyncStatus); + syncStatusListener.onSyncStatusChanged(inSyncStatus); + + final var resultCaptor = ArgumentCaptor.forClass(JsonRpcResult.class); + final NotSynchronisingResult inSyncResult = new NotSynchronisingResult(); + final SyncingResult outOfSyncingResult = new SyncingResult(outOfSyncStatus); + + verify(subscriptionManager, times(4)).sendMessage(any(), resultCaptor.capture()); + assertThat(resultCaptor.getAllValues()) + .containsOnly(outOfSyncingResult, inSyncResult, outOfSyncingResult, inSyncResult); + } } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/SyncStatus.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/SyncStatus.java index 036aa85b131..8740885a4f0 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/SyncStatus.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/SyncStatus.java @@ -21,11 +21,21 @@ public final class SyncStatus implements org.hyperledger.besu.plugin.data.SyncSt private final long startingBlock; private final long currentBlock; private final long highestBlock; + private final boolean inSync; public SyncStatus(final long startingBlock, final long currentBlock, final long highestBlock) { + this(startingBlock, currentBlock, highestBlock, currentBlock == highestBlock); + } + + public SyncStatus( + final long startingBlock, + final long currentBlock, + final long highestBlock, + final boolean inSync) { this.startingBlock = startingBlock; this.currentBlock = currentBlock; this.highestBlock = highestBlock; + this.inSync = inSync; } @Override @@ -45,7 +55,7 @@ public long getHighestBlock() { @Override public boolean inSync() { - return currentBlock == highestBlock; + return inSync; } @Override @@ -56,14 +66,15 @@ public boolean equals(final Object o) { if (o == null || getClass() != o.getClass()) { return false; } - SyncStatus that = (SyncStatus) o; + final SyncStatus that = (SyncStatus) o; return startingBlock == that.startingBlock && currentBlock == that.currentBlock - && highestBlock == that.highestBlock; + && highestBlock == that.highestBlock + && inSync == that.inSync; } @Override public int hashCode() { - return Objects.hash(startingBlock, currentBlock, highestBlock); + return Objects.hash(startingBlock, currentBlock, highestBlock, inSync); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java index 301ea6dc621..159c03b9789 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncState.java @@ -33,7 +33,7 @@ public class SyncState { private final Blockchain blockchain; private final EthPeers ethPeers; - private final long startingBlock; + private long startingBlock; private boolean lastInSync = true; private final Subscribers inSyncListeners = Subscribers.create(); private final Subscribers syncStatusListeners = Subscribers.create(); @@ -49,7 +49,17 @@ public SyncState(final Blockchain blockchain, final EthPeers ethPeers) { if (event.isNewCanonicalHead()) { checkInSync(); } - publishSyncStatus(); + switch (event.getEventType()) { + case CHAIN_REORG: + publishReorg(); + // fall through + case HEAD_ADVANCED: + publishSyncStatus(); + break; + case FORK: + // don't broadcast detected forks + break; + } }); } @@ -59,6 +69,15 @@ public void publishSyncStatus() { syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus)); } + private void publishReorg() { + final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber(); + final SyncStatus syncStatus = + new SyncStatus( + startingBlock, chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber), false); + + syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus)); + } + public void addInSyncListener(final InSyncListener observer) { inSyncListeners.subscribe(observer); } @@ -74,11 +93,7 @@ public void removeSyncStatusListener(final long listenerId) { public SyncStatus syncStatus() { final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber(); return new SyncStatus( - startingBlock(), chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber)); - } - - public long startingBlock() { - return startingBlock; + startingBlock, chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber)); } public Optional syncTarget() { @@ -153,6 +168,10 @@ private synchronized void checkInSync() { final boolean currentInSync = isInSync(); if (lastInSync != currentInSync) { lastInSync = currentInSync; + if (!currentInSync) { + // when we fall out of sync change our starting block + startingBlock = blockchain.getChainHeadBlockNumber(); + } inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync)); } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java index 2ee1cf661d1..9af33988f3f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/state/SyncStateTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -33,6 +34,7 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.SyncStatus; import org.hyperledger.besu.ethereum.eth.manager.ChainState; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.EthPeers; @@ -57,6 +59,7 @@ public class SyncStateTest { private final Blockchain blockchain = mock(Blockchain.class); private final EthPeers ethPeers = mock(EthPeers.class); private final SyncState.InSyncListener inSyncListener = mock(SyncState.InSyncListener.class); + private final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class); private final EthPeer syncTargetPeer = mock(EthPeer.class); private final ChainState syncTargetPeerChainState = spy(new ChainState()); private final EthPeer otherPeer = mock(EthPeer.class); @@ -80,6 +83,7 @@ public void setUp() { syncState = new SyncState(blockchain, ethPeers); blockAddedObserver = captor.getValue(); syncState.addInSyncListener(inSyncListener); + syncState.addSyncStatusListener(syncStatusListener); } @Test @@ -229,7 +233,7 @@ public void shouldBecomeInSyncWhenOurBlockchainCatchesUp() { @Test public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() { - SyncStatusListener syncStatusListener = mock(SyncStatusListener.class); + final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class); syncState.addSyncStatusListener(syncStatusListener); blockAddedObserver.onBlockAdded( @@ -242,6 +246,26 @@ public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() { verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus())); } + @Test + public void shouldReportReorgEvents() { + when(blockchain.getChainHeadBlockNumber()).thenReturn(TARGET_CHAIN_HEIGHT); + + blockAddedObserver.onBlockAdded( + BlockAddedEvent.createForChainReorg( + new Block( + targetBlockHeader(), + new BlockBody(Collections.emptyList(), Collections.emptyList())), + Collections.emptyList(), + Collections.emptyList()), + blockchain); + + assertThat(syncState.isInSync()).isTrue(); + final ArgumentCaptor captor = ArgumentCaptor.forClass(SyncStatus.class); + verify(syncStatusListener, times(2)).onSyncStatusChanged(captor.capture()); + assertThat(captor.getAllValues().get(0).inSync()).isFalse(); + assertThat(captor.getAllValues().get(1).inSync()).isTrue(); + } + private void setupOutOfSyncState() { updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY); syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));