Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PAN-3183] Less verbose synching subscriptions #59

Merged
merged 1 commit into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,14 @@ public class NotSynchronisingResult implements JsonRpcResult {
public boolean getResult() {
return false;
}

@Override
public boolean equals(final Object o) {
return (this == o) || (o != null && getClass() == o.getClass());
}

@Override
public int hashCode() {
return "NotSyncingResult".hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.plugin.data.SyncStatus;

import java.util.Optional;

public class SyncingSubscriptionService {

private final SubscriptionManager subscriptionManager;
private Optional<Boolean> lastMessageWasInSync = Optional.empty();

public SyncingSubscriptionService(
final SubscriptionManager subscriptionManager, final Synchronizer synchronizer) {
Expand All @@ -37,15 +40,21 @@ private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
Subscription.class,
syncingSubscriptions -> {
if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new NotSynchronisingResult()));
if (!lastMessageWasInSync.orElse(Boolean.FALSE)) {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new NotSynchronisingResult()));
lastMessageWasInSync = Optional.of(Boolean.TRUE);
}
} else {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new SyncingResult(syncStatus)));
if (lastMessageWasInSync.orElse(Boolean.TRUE)) {
syncingSubscriptions.forEach(
s ->
subscriptionManager.sendMessage(
s.getSubscriptionId(), new SyncingResult(syncStatus)));
lastMessageWasInSync = Optional.of(Boolean.FALSE);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.syncing;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.JsonRpcResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.SyncingResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
Expand Down Expand Up @@ -102,4 +106,89 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {
ArgumentMatchers.eq(subscription.getSubscriptionId()),
any(NotSynchronisingResult.class));
}

@Test
public void shouldNotRepeatOutOfSyncMessages() {
final SyncingSubscription subscription =
new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING);
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 3L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatusChanged(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager, atMostOnce())
.sendMessage(
ArgumentMatchers.eq(subscription.getSubscriptionId()), eq(expectedSyncingResult));
}

@Test
public void shouldNotRepeatInSyncMessages() {
final SyncingSubscription subscription =
new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING);
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 3L, 3L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatusChanged(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager, atMostOnce())
.sendMessage(
ArgumentMatchers.eq(subscription.getSubscriptionId()), eq(expectedSyncingResult));
}

@Test
public void shouldOnlyReportSyncChange() {
final SyncingSubscription subscription =
new SyncingSubscription(9L, "conn", SubscriptionType.SYNCING);
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);

final SyncStatus inSyncStatus = new SyncStatus(0L, 3L, 3L);
final SyncStatus outOfSyncStatus = new SyncStatus(0L, 1L, 3L);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatusChanged(outOfSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);
syncStatusListener.onSyncStatusChanged(outOfSyncStatus);
syncStatusListener.onSyncStatusChanged(outOfSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);
syncStatusListener.onSyncStatusChanged(inSyncStatus);

final var resultCaptor = ArgumentCaptor.forClass(JsonRpcResult.class);
final NotSynchronisingResult inSyncResult = new NotSynchronisingResult();
final SyncingResult outOfSyncingResult = new SyncingResult(outOfSyncStatus);

verify(subscriptionManager, times(4)).sendMessage(any(), resultCaptor.capture());
assertThat(resultCaptor.getAllValues())
.containsOnly(outOfSyncingResult, inSyncResult, outOfSyncingResult, inSyncResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ public final class SyncStatus implements org.hyperledger.besu.plugin.data.SyncSt
private final long startingBlock;
private final long currentBlock;
private final long highestBlock;
private final boolean inSync;

public SyncStatus(final long startingBlock, final long currentBlock, final long highestBlock) {
this(startingBlock, currentBlock, highestBlock, currentBlock == highestBlock);
}

public SyncStatus(
final long startingBlock,
final long currentBlock,
final long highestBlock,
final boolean inSync) {
this.startingBlock = startingBlock;
this.currentBlock = currentBlock;
this.highestBlock = highestBlock;
this.inSync = inSync;
}

@Override
Expand All @@ -45,7 +55,7 @@ public long getHighestBlock() {

@Override
public boolean inSync() {
return currentBlock == highestBlock;
return inSync;
}

@Override
Expand All @@ -56,14 +66,15 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
SyncStatus that = (SyncStatus) o;
final SyncStatus that = (SyncStatus) o;
return startingBlock == that.startingBlock
&& currentBlock == that.currentBlock
&& highestBlock == that.highestBlock;
&& highestBlock == that.highestBlock
&& inSync == that.inSync;
}

@Override
public int hashCode() {
return Objects.hash(startingBlock, currentBlock, highestBlock);
return Objects.hash(startingBlock, currentBlock, highestBlock, inSync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SyncState {
private final Blockchain blockchain;
private final EthPeers ethPeers;

private final long startingBlock;
private long startingBlock;
private boolean lastInSync = true;
private final Subscribers<InSyncListener> inSyncListeners = Subscribers.create();
private final Subscribers<SyncStatusListener> syncStatusListeners = Subscribers.create();
Expand All @@ -49,7 +49,17 @@ public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
if (event.isNewCanonicalHead()) {
checkInSync();
}
publishSyncStatus();
switch (event.getEventType()) {
case CHAIN_REORG:
publishReorg();
// fall through
case HEAD_ADVANCED:
publishSyncStatus();
break;
case FORK:
// don't broadcast detected forks
break;
}
});
}

Expand All @@ -59,6 +69,15 @@ public void publishSyncStatus() {
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}

private void publishReorg() {
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
final SyncStatus syncStatus =
new SyncStatus(
startingBlock, chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber), false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why a reorg should always be considered out-of-sync


syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}

public void addInSyncListener(final InSyncListener observer) {
inSyncListeners.subscribe(observer);
}
Expand All @@ -74,11 +93,7 @@ public void removeSyncStatusListener(final long listenerId) {
public SyncStatus syncStatus() {
final long chainHeadBlockNumber = blockchain.getChainHeadBlockNumber();
return new SyncStatus(
startingBlock(), chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber));
}

public long startingBlock() {
return startingBlock;
startingBlock, chainHeadBlockNumber, bestChainHeight(chainHeadBlockNumber));
}

public Optional<SyncTarget> syncTarget() {
Expand Down Expand Up @@ -153,6 +168,10 @@ private synchronized void checkInSync() {
final boolean currentInSync = isInSync();
if (lastInSync != currentInSync) {
lastInSync = currentInSync;
if (!currentInSync) {
// when we fall out of sync change our starting block
startingBlock = blockchain.getChainHeadBlockNumber();
}
inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
Expand All @@ -33,6 +34,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.SyncStatus;
import org.hyperledger.besu.ethereum.eth.manager.ChainState;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
Expand All @@ -57,6 +59,7 @@ public class SyncStateTest {
private final Blockchain blockchain = mock(Blockchain.class);
private final EthPeers ethPeers = mock(EthPeers.class);
private final SyncState.InSyncListener inSyncListener = mock(SyncState.InSyncListener.class);
private final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
private final EthPeer syncTargetPeer = mock(EthPeer.class);
private final ChainState syncTargetPeerChainState = spy(new ChainState());
private final EthPeer otherPeer = mock(EthPeer.class);
Expand All @@ -80,6 +83,7 @@ public void setUp() {
syncState = new SyncState(blockchain, ethPeers);
blockAddedObserver = captor.getValue();
syncState.addInSyncListener(inSyncListener);
syncState.addSyncStatusListener(syncStatusListener);
}

@Test
Expand Down Expand Up @@ -229,7 +233,7 @@ public void shouldBecomeInSyncWhenOurBlockchainCatchesUp() {

@Test
public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
final SyncStatusListener syncStatusListener = mock(SyncStatusListener.class);
syncState.addSyncStatusListener(syncStatusListener);

blockAddedObserver.onBlockAdded(
Expand All @@ -242,6 +246,26 @@ public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus()));
}

@Test
public void shouldReportReorgEvents() {
when(blockchain.getChainHeadBlockNumber()).thenReturn(TARGET_CHAIN_HEIGHT);

blockAddedObserver.onBlockAdded(
BlockAddedEvent.createForChainReorg(
new Block(
targetBlockHeader(),
new BlockBody(Collections.emptyList(), Collections.emptyList())),
Collections.emptyList(),
Collections.emptyList()),
blockchain);

assertThat(syncState.isInSync()).isTrue();
final ArgumentCaptor<SyncStatus> captor = ArgumentCaptor.forClass(SyncStatus.class);
verify(syncStatusListener, times(2)).onSyncStatusChanged(captor.capture());
assertThat(captor.getAllValues().get(0).inSync()).isFalse();
assertThat(captor.getAllValues().get(1).inSync()).isTrue();
}

private void setupOutOfSyncState() {
updateChainState(syncTargetPeerChainState, TARGET_CHAIN_HEIGHT, TARGET_DIFFICULTY);
syncState.setSyncTarget(syncTargetPeer, blockHeaderAt(0L));
Expand Down