Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Events API: Transaction dropped, sync status, and renames #1919

Merged
merged 24 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public void startNode(final PantheonNode node) {
PantheonEvents.class,
new PantheonEventsImpl(
pantheonController.getProtocolManager().getBlockBroadcaster(),
pantheonController.getTransactionPool()));
pantheonController.getTransactionPool(),
pantheonController.getSyncState()));
pantheonPluginContext.startPlugins();

final Runner runner =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import java.util.Objects;

public final class SyncStatus {
public final class SyncStatus implements tech.pegasys.pantheon.plugin.data.SyncStatus {

private final long startingBlock;
private final long currentBlock;
Expand All @@ -26,18 +26,22 @@ public SyncStatus(final long startingBlock, final long currentBlock, final long
this.highestBlock = highestBlock;
}

@Override
public long getStartingBlock() {
return startingBlock;
}

@Override
public long getCurrentBlock() {
return currentBlock;
}

@Override
public long getHighestBlock() {
return highestBlock;
}

@Override
public boolean inSync() {
return currentBlock == highestBlock;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package tech.pegasys.pantheon.ethereum.core;

import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.services.PantheonEvents;

import java.util.Optional;

/** Provides an interface to block synchronization processes. */
Expand All @@ -27,12 +30,7 @@ public interface Synchronizer {
*/
Optional<SyncStatus> getSyncStatus();

long observeSyncStatus(final SyncStatusListener listener);
long observeSyncStatus(final PantheonEvents.SyncStatusListener listener);

boolean removeObserver(long observerId);

@FunctionalInterface
interface SyncStatusListener {
void onSyncStatus(final SyncStatus status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastDownloaderFactory;
Expand All @@ -29,7 +28,9 @@
import tech.pegasys.pantheon.ethereum.worldstate.Pruner;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.services.MetricsSystem;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.Subscribers;

Expand Down Expand Up @@ -176,10 +177,11 @@ public Optional<SyncStatus> getSyncStatus() {
if (!running.get()) {
return Optional.empty();
}
if (syncState.syncStatus().getCurrentBlock() == syncState.syncStatus().getHighestBlock()) {
final SyncStatus syncStatus = syncState.syncStatus();
if (syncStatus.inSync()) {
return Optional.empty();
}
return Optional.of(syncState.syncStatus());
return Optional.of(syncStatus);
}

@Override
Expand All @@ -194,6 +196,6 @@ public boolean removeObserver(final long observerId) {
}

private void syncStatusCallback(final SyncStatus status) {
syncStatusListeners.forEach(c -> c.onSyncStatus(status));
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(status));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
import tech.pegasys.pantheon.ethereum.chain.ChainHead;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.Subscribers;

import java.util.Optional;

import com.google.common.annotations.VisibleForTesting;

public class SyncState {
private static final long SYNC_TOLERANCE = 5;
private final Blockchain blockchain;
Expand All @@ -49,17 +51,22 @@ public SyncState(final Blockchain blockchain, final EthPeers ethPeers) {
});
}

private void publishSyncStatus() {
@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(discussion) : why @VisibleForTesting if public ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know this before but @VisibleForTesting is for documentation purposes only! I used to think it actually changed the visibility for the tests in some special way. It's to communicate that if not for the tests, this method would be less visible.

public void publishSyncStatus() {
final SyncStatus syncStatus = syncStatus();
syncStatusListeners.forEach(c -> c.onSyncStatus(syncStatus));
syncStatusListeners.forEach(c -> c.onSyncStatusChanged(syncStatus));
}

public void addInSyncListener(final InSyncListener observer) {
inSyncListeners.subscribe(observer);
}

public void addSyncStatusListener(final SyncStatusListener observer) {
syncStatusListeners.subscribe(observer);
public long addSyncStatusListener(final SyncStatusListener observer) {
return syncStatusListeners.subscribe(observer);
}

public void removeSyncStatusListener(final long listenerId) {
syncStatusListeners.unsubscribe(listenerId);
}

public SyncStatus syncStatus() {
Expand Down Expand Up @@ -141,10 +148,10 @@ public long bestChainHeight(final long localChainHeight) {
}

private synchronized void checkInSync() {
final boolean currentSyncStatus = isInSync();
if (lastInSync != currentSyncStatus) {
lastInSync = currentSyncStatus;
inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentSyncStatus));
final boolean currentInSync = isInSync();
if (lastInSync != currentInSync) {
lastInSync = currentInSync;
inSyncListeners.forEach(c -> c.onSyncStatusChanged(currentInSync));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.util.Collections;
Expand Down Expand Up @@ -237,7 +237,7 @@ public void shouldSendSyncStatusWhenBlockIsAddedToTheChain() {
new BlockBody(Collections.emptyList(), Collections.emptyList()))),
blockchain);

verify(syncStatusListener).onSyncStatus(eq(syncState.syncStatus()));
verify(syncStatusListener).onSyncStatusChanged(eq(syncState.syncStatus()));
}

private void setupOutOfSyncState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import tech.pegasys.pantheon.ethereum.core.Account;
import tech.pegasys.pantheon.ethereum.core.Address;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.Transaction;
import tech.pegasys.pantheon.ethereum.core.WorldState;
Expand All @@ -38,6 +37,7 @@
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.Capability;
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.util.bytes.Bytes32;
import tech.pegasys.pantheon.util.bytes.BytesValue;
import tech.pegasys.pantheon.util.uint.UInt256;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.graphql.internal.pojoadapter;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.Optional;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.health;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.HealthCheck;
import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.ParamSource;
import tech.pegasys.pantheon.ethereum.p2p.network.P2PNetwork;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.internal.results;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.Objects;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

/**
* The SubscriptionManager is responsible for managing subscriptions and sending messages to the
* clients that have an active subscription subscription.
* clients that have an active subscription.
*/
public class SubscriptionManager extends AbstractVerticle {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
*/
package tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.syncing;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.Subscription;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

public class SyncingSubscriptionService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.health.HealthService.ParamSource;
import tech.pegasys.pantheon.ethereum.p2p.network.P2PNetwork;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -141,6 +141,7 @@ public void shouldNotBeReadyWhenCustomMaxBlocksBehindIsInvalid() {
}

private Optional<SyncStatus> createSyncStatus(final int currentBlock, final int highestBlock) {
return Optional.of(new SyncStatus(0, currentBlock, highestBlock));
return Optional.of(
new tech.pegasys.pantheon.ethereum.core.SyncStatus(0, currentBlock, highestBlock));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only java had type aliases on import...

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.JsonRpcRequest;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.response.JsonRpcSuccessResponse;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.plugin.data.SyncStatus;

import java.util.Optional;

Expand Down Expand Up @@ -66,7 +66,8 @@ public void shouldReturnFalseWhenSyncStatusIsEmpty() {
@Test
public void shouldReturnExpectedValueWhenSyncStatusIsNotEmpty() {
final JsonRpcRequest request = requestWithParams();
final SyncStatus expectedSyncStatus = new SyncStatus(0, 1, 2);
final SyncStatus expectedSyncStatus =
new tech.pegasys.pantheon.ethereum.core.SyncStatus(0, 1, 2);
final JsonRpcResponse expectedResponse =
new JsonRpcSuccessResponse(request.getId(), new SyncingResult(expectedSyncStatus));
final Optional<SyncStatus> optionalSyncStatus = Optional.of(expectedSyncStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.core.Synchronizer.SyncStatusListener;
import tech.pegasys.pantheon.ethereum.jsonrpc.internal.results.SyncingResult;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;
import tech.pegasys.pantheon.plugin.services.PantheonEvents.SyncStatusListener;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -69,7 +69,7 @@ public void shouldSendSyncStatusWhenReceiveSyncStatus() {
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager)
.sendMessage(eq(subscription.getSubscriptionId()), eq(expectedSyncingResult));
Expand All @@ -91,7 +91,7 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);
syncStatusListener.onSyncStatusChanged(syncStatus);

verify(subscriptionManager)
.sendMessage(eq(subscription.getSubscriptionId()), any(NotSynchronisingResult.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
import tech.pegasys.pantheon.ethereum.p2p.peers.EnodeURL;
import tech.pegasys.pantheon.ethereum.permissioning.node.NodePermissioningProvider;
import tech.pegasys.pantheon.metrics.PantheonMetricCategory;
import tech.pegasys.pantheon.plugin.data.SyncStatus;
import tech.pegasys.pantheon.plugin.services.MetricsSystem;
import tech.pegasys.pantheon.plugin.services.metrics.Counter;

Expand Down
Loading