Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2547] Modified JSON-RPC subscription processing to avoid blockin… (
Browse files Browse the repository at this point in the history
#1519)

* [PAN-2547] Modified JSON-RPC subscription processing to avoid blocking important threads.

* [PAN-2547] Changed executor to Vertx managed.

* [PAN-2547] Refactored vertx thread handling to super + refactored tests.

* Update ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java

Error logging improvement.

Co-Authored-By: Adrian Sutton <[email protected]>
  • Loading branch information
mark-terry and ajsutton authored Jun 6, 2019
1 parent 1b25f6e commit 816f7fa
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -160,4 +161,22 @@ public void sendMessage(final Long subscriptionId, final JsonRpcResult msg) {
.findFirst()
.ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response)));
}

public <T> Void notifySubscribersOnWorkerThread(
final SubscriptionType subscriptionType,
final Class<T> clazz,
final Consumer<List<T>> runnable) {
vertx.executeBlocking(
future -> {
final List<T> syncingSubscriptions = subscriptionsOfType(subscriptionType, clazz);
runnable.accept(syncingSubscriptions);
future.complete();
},
result -> {
if (result.failed()) {
LOG.error("Failed to notify subscribers.", result.cause());
}
});
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;

public class NewBlockHeadersSubscriptionService implements BlockAddedObserver {

private final SubscriptionManager subscriptionManager;
Expand All @@ -38,20 +36,21 @@ public NewBlockHeadersSubscriptionService(

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
final List<NewBlockHeadersSubscription> subscribers =
subscriptionManager.subscriptionsOfType(
SubscriptionType.NEW_BLOCK_HEADERS, NewBlockHeadersSubscription.class);

final Hash newBlockHash = event.getBlock().getHash();
subscriptionManager.notifySubscribersOnWorkerThread(
SubscriptionType.NEW_BLOCK_HEADERS,
NewBlockHeadersSubscription.class,
subscribers -> {
final Hash newBlockHash = event.getBlock().getHash();

for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);
for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);

subscriptionManager.sendMessage(subscription.getId(), newBlock);
}
subscriptionManager.sendMessage(subscription.getId(), newBlock);
}
});
}

private BlockResult blockWithCompleteTransaction(final Hash hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;

public class SyncingSubscriptionService {

private final SubscriptionManager subscriptionManager;
Expand All @@ -32,15 +30,17 @@ public SyncingSubscriptionService(
}

private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
final List<Subscription> syncingSubscriptions =
subscriptionManager.subscriptionsOfType(SubscriptionType.SYNCING, Subscription.class);

if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
subscriptionManager.notifySubscribersOnWorkerThread(
SubscriptionType.SYNCING,
Subscription.class,
syncingSubscriptions -> {
if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -39,6 +40,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import com.google.common.collect.Lists;
import org.junit.Before;
Expand Down Expand Up @@ -74,6 +76,7 @@ public void before() {
@Test
public void shouldSendMessageWhenBlockAdded() {
final NewBlockHeadersSubscription subscription = createSubscription(false);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
final BlockWithMetadata<Hash, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(
blockHeader, Collections.emptyList(), Collections.emptyList(), UInt256.ONE, 1);
Expand All @@ -82,6 +85,15 @@ public void shouldSendMessageWhenBlockAdded() {
when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();

verify(subscriptionManager)
Expand All @@ -98,6 +110,7 @@ public void shouldSendMessageWhenBlockAdded() {
@Test
public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {
final NewBlockHeadersSubscription subscription = createSubscription(false);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
final List<Hash> txHashList = transactionsWithHashOnly();
final BlockWithMetadata<Hash, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(blockHeader, txHashList, Collections.emptyList(), UInt256.ONE, 1);
Expand All @@ -106,6 +119,15 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {
when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();

verify(subscriptionManager)
Expand All @@ -127,6 +149,7 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {
@Test
public void shouldReturnCompleteTxWhenParameterTrue() {
final NewBlockHeadersSubscription subscription = createSubscription(true);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
final List<TransactionWithMetadata> txHashList = transactionsWithMetadata();
final BlockWithMetadata<TransactionWithMetadata, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(
Expand All @@ -137,6 +160,15 @@ public void shouldReturnCompleteTxWhenParameterTrue() {
when(blockchainQueries.blockByHash(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();

verify(subscriptionManager)
Expand Down Expand Up @@ -184,8 +216,6 @@ private List<Hash> transactionsWithHashOnly() {
private NewBlockHeadersSubscription createSubscription(final boolean includeTransactions) {
final NewBlockHeadersSubscription headerSub =
new NewBlockHeadersSubscription(1L, includeTransactions);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(headerSub));
return headerSub;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -24,7 +25,10 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -51,11 +55,19 @@ public void before() {
@Test
public void shouldSendSyncStatusWhenReceiveSyncStatus() {
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription));
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 3L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
Expand All @@ -64,10 +76,18 @@ public void shouldSendSyncStatusWhenReceiveSyncStatus() {
@Test
public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription));
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager)
Expand Down

0 comments on commit 816f7fa

Please sign in to comment.