Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PIE-1632] newHeads subscription emits events only for canonical blocks #1798

Merged
merged 2 commits into from
Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,23 @@ public NewBlockHeadersSubscriptionService(

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
subscriptionManager.notifySubscribersOnWorkerThread(
SubscriptionType.NEW_BLOCK_HEADERS,
NewBlockHeadersSubscription.class,
subscribers -> {
final Hash newBlockHash = event.getBlock().getHash();
if (event.isNewCanonicalHead()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick ... if (!event.isNewCanonicalHead()) { return; }

would cause less code change :)

subscriptionManager.notifySubscribersOnWorkerThread(
SubscriptionType.NEW_BLOCK_HEADERS,
NewBlockHeadersSubscription.class,
subscribers -> {
final Hash newBlockHash = event.getBlock().getHash();

for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);
for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);

subscriptionManager.sendMessage(subscription.getSubscriptionId(), newBlock);
}
});
subscriptionManager.sendMessage(subscription.getSubscriptionId(), newBlock);
}
});
}
}

private BlockResult blockWithCompleteTransaction(final Hash hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.crypto.SECP256K1.KeyPair;
Expand Down Expand Up @@ -74,127 +75,115 @@ public void before() {
}

@Test
public void shouldSendMessageWhenBlockAdded() {
public void shouldSendMessageWhenBlockAddedOnCanonicalChain() {
final NewBlockHeadersSubscription subscription = createSubscription(false);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
final BlockWithMetadata<Hash, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(
blockHeader, Collections.emptyList(), Collections.emptyList(), UInt256.ONE, 1);
final BlockResult expectedNewBlock = blockResultFactory.transactionHash(testBlockWithMetadata);
mockSubscriptionManagerNotifyMethod(subscription);
final BlockResult expectedNewBlock = expectedBlockWithTransactions(Collections.emptyList());

when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();
simulateAddingBlockOnCanonicalChain();

verify(subscriptionManager)
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
assertThat(subscriptionIdCaptor.getValue()).isEqualTo(subscription.getSubscriptionId());
assertThat(responseCaptor.getValue())
.isEqualToComparingFieldByFieldRecursively(expectedNewBlock);
}

assertThat(actualSubscriptionId).isEqualTo(subscription.getSubscriptionId());
assertThat(actualBlock).isEqualToComparingFieldByFieldRecursively(expectedNewBlock);
@Test
public void shouldNotSendMessageWhenBlockAddedIsNotOnCanonicalChain() {
simulateAddingBlockOnNonCanonicalChain();

verify(subscriptionManager, times(1)).sendMessage(any(), any());
verifyZeroInteractions(subscriptionManager);
}

@Test
public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {
final NewBlockHeadersSubscription subscription = createSubscription(false);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
mockSubscriptionManagerNotifyMethod(subscription);
final List<Hash> txHashList = transactionsWithHashOnly();
final BlockWithMetadata<Hash, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(blockHeader, txHashList, Collections.emptyList(), UInt256.ONE, 1);
final BlockResult expectedNewBlock = blockResultFactory.transactionHash(testBlockWithMetadata);
final BlockResult expectedNewBlock = expectedBlockWithTransactions(txHashList);

when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();
simulateAddingBlockOnCanonicalChain();

verify(subscriptionManager)
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
assertThat(subscriptionIdCaptor.getValue()).isEqualTo(subscription.getSubscriptionId());
final Object actualBlock = responseCaptor.getValue();

assertThat(actualSubscriptionId).isEqualTo(subscription.getSubscriptionId());
assertThat(actualBlock).isInstanceOf(BlockResult.class);
final BlockResult actualBlockResult = (BlockResult) actualBlock;
assertThat(actualBlockResult.getTransactions()).hasSize(txHashList.size());
assertThat(((BlockResult) actualBlock).getTransactions()).hasSize(txHashList.size());
assertThat(actualBlock).isEqualToComparingFieldByFieldRecursively(expectedNewBlock);

verify(subscriptionManager, times(1)).sendMessage(any(), any());
verify(blockchainQueries, times(1)).blockByHashWithTxHashes(any());
verify(blockchainQueries, times(0)).blockByHash(any());
}

@Test
public void shouldReturnCompleteTxWhenParameterTrue() {
final NewBlockHeadersSubscription subscription = createSubscription(true);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
mockSubscriptionManagerNotifyMethod(subscription);
final List<TransactionWithMetadata> txHashList = transactionsWithMetadata();
final BlockWithMetadata<TransactionWithMetadata, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(
blockHeader, txHashList, Collections.emptyList(), blockHeader.getDifficulty(), 0);
final BlockResult expectedNewBlock =
blockResultFactory.transactionComplete(testBlockWithMetadata);

when(blockchainQueries.blockByHash(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();
simulateAddingBlockOnCanonicalChain();

verify(subscriptionManager)
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();

assertThat(actualSubscriptionId).isEqualTo(subscription.getSubscriptionId());
assertThat(subscriptionIdCaptor.getValue()).isEqualTo(subscription.getSubscriptionId());

final Object actualBlock = responseCaptor.getValue();
assertThat(actualBlock).isInstanceOf(BlockResult.class);
final BlockResult actualBlockResult = (BlockResult) actualBlock;
assertThat(actualBlockResult.getTransactions()).hasSize(txHashList.size());
assertThat(((BlockResult) actualBlock).getTransactions()).hasSize(txHashList.size());
assertThat(actualBlock).isEqualToComparingFieldByFieldRecursively(expectedNewBlock);

verify(subscriptionManager, times(1)).sendMessage(any(), any());
verify(blockchainQueries, times(0)).blockByHashWithTxHashes(any());
verify(blockchainQueries, times(1)).blockByHash(any());
}

private void simulateAddingBlock() {
private BlockResult expectedBlockWithTransactions(final List<Hash> objects) {
final BlockWithMetadata<Hash, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(blockHeader, objects, Collections.emptyList(), UInt256.ONE, 1);
final BlockResult expectedNewBlock = blockResultFactory.transactionHash(testBlockWithMetadata);

when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));
return expectedNewBlock;
}

private void mockSubscriptionManagerNotifyMethod(final NewBlockHeadersSubscription subscription) {
doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(Collections.singletonList(subscription));
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());
}

private void simulateAddingBlockOnCanonicalChain() {
final BlockBody blockBody = new BlockBody(Collections.emptyList(), Collections.emptyList());
final Block testBlock = new Block(blockHeader, blockBody);
newBlockHeadersSubscriptionService.onBlockAdded(
BlockAddedEvent.createForHeadAdvancement(testBlock), blockchainQueries.getBlockchain());
verify(blockchainQueries, times(1)).getBlockchain();
}

private void simulateAddingBlockOnNonCanonicalChain() {
final BlockBody blockBody = new BlockBody(Collections.emptyList(), Collections.emptyList());
final Block testBlock = new Block(blockHeader, blockBody);
newBlockHeadersSubscriptionService.onBlockAdded(
BlockAddedEvent.createForFork(testBlock), blockchainQueries.getBlockchain());
verify(blockchainQueries, times(1)).getBlockchain();
}

private List<TransactionWithMetadata> transactionsWithMetadata() {
final TransactionWithMetadata t1 =
new TransactionWithMetadata(
Expand Down