Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2547] Modified JSON-RPC subscription processing to avoid blockin…
Browse files Browse the repository at this point in the history
…g important threads.
  • Loading branch information
mark-terry committed Jun 2, 2019
1 parent 900562b commit a600ac0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NewBlockHeadersSubscriptionService implements BlockAddedObserver {

Expand All @@ -38,20 +40,23 @@ public NewBlockHeadersSubscriptionService(

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
final List<NewBlockHeadersSubscription> subscribers =
subscriptionManager.subscriptionsOfType(
SubscriptionType.NEW_BLOCK_HEADERS, NewBlockHeadersSubscription.class);
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() ->{
final List<NewBlockHeadersSubscription> subscribers =
subscriptionManager.subscriptionsOfType(
SubscriptionType.NEW_BLOCK_HEADERS, NewBlockHeadersSubscription.class);

final Hash newBlockHash = event.getBlock().getHash();
final Hash newBlockHash = event.getBlock().getHash();

for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);
for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);

subscriptionManager.sendMessage(subscription.getId(), newBlock);
}
subscriptionManager.sendMessage(subscription.getId(), newBlock);
}
});
}

private BlockResult blockWithCompleteTransaction(final Hash hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SyncingSubscriptionService {

Expand All @@ -32,15 +34,18 @@ public SyncingSubscriptionService(
}

private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
final List<Subscription> syncingSubscriptions =
subscriptionManager.subscriptionsOfType(SubscriptionType.SYNCING, Subscription.class);
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
final List<Subscription> syncingSubscriptions =
subscriptionManager.subscriptionsOfType(SubscriptionType.SYNCING, Subscription.class);

if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -84,7 +85,7 @@ public void shouldSendMessageWhenBlockAdded() {

simulateAddingBlock();

verify(subscriptionManager)
verify(subscriptionManager, timeout(100))
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand All @@ -108,7 +109,7 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {

simulateAddingBlock();

verify(subscriptionManager)
verify(subscriptionManager, timeout(100))
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand Down Expand Up @@ -139,7 +140,7 @@ public void shouldReturnCompleteTxWhenParameterTrue() {

simulateAddingBlock();

verify(subscriptionManager)
verify(subscriptionManager, timeout(100))
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -58,7 +59,7 @@ public void shouldSendSyncStatusWhenReceiveSyncStatus() {

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
verify(subscriptionManager, timeout(100)).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
}

@Test
Expand All @@ -70,7 +71,7 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager)
verify(subscriptionManager, timeout(100))
.sendMessage(eq(subscription.getId()), any(NotSynchronisingResult.class));
}
}

0 comments on commit a600ac0

Please sign in to comment.