Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Discard known remote transactions prior to validation #1548

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ public int size() {
return pendingTransactions.size();
}

public boolean containsTransaction(final Hash transactionHash) {
return pendingTransactions.containsKey(transactionHash);
}

public Optional<Transaction> getTransactionByHash(final Hash transactionHash) {
return Optional.ofNullable(pendingTransactions.get(transactionHash))
.map(TransactionInfo::getTransaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator;
import tech.pegasys.pantheon.ethereum.mainnet.TransactionValidator.TransactionInvalidReason;
import tech.pegasys.pantheon.ethereum.mainnet.ValidationResult;
import tech.pegasys.pantheon.metrics.Counter;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.Collection;
import java.util.HashSet;
Expand All @@ -53,11 +57,14 @@ public class TransactionPool implements BlockAddedObserver {

private static final Logger LOG = getLogger();
private static final long SYNC_TOLERANCE = 100L;
private static final String REMOTE = "remote";
private static final String LOCAL = "local";
private final PendingTransactions pendingTransactions;
private final ProtocolSchedule<?> protocolSchedule;
private final ProtocolContext<?> protocolContext;
private final TransactionBatchAddedListener transactionBatchAddedListener;
private final SyncState syncState;
private final LabelledMetric<Counter> duplicateTransactionCounter;
private Optional<AccountFilter> accountFilter = Optional.empty();
private final PeerTransactionTracker peerTransactionTracker;

Expand All @@ -68,20 +75,28 @@ public TransactionPool(
final TransactionBatchAddedListener transactionBatchAddedListener,
final SyncState syncState,
final EthContext ethContext,
final PeerTransactionTracker peerTransactionTracker) {
final PeerTransactionTracker peerTransactionTracker,
final MetricsSystem metricsSystem) {
this.pendingTransactions = pendingTransactions;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.transactionBatchAddedListener = transactionBatchAddedListener;
this.syncState = syncState;
this.peerTransactionTracker = peerTransactionTracker;

duplicateTransactionCounter =
metricsSystem.createLabelledCounter(
MetricCategory.TRANSACTION_POOL,
"transactions_duplicates_total",
"Total number of duplicate transactions received",
"source");

ethContext.getEthPeers().subscribeConnect(this::handleConnect);
}

private void handleConnect(final EthPeer peer) {
List<Transaction> localTransactions = getLocalTransactions();
for (Transaction transaction : localTransactions) {
final List<Transaction> localTransactions = getLocalTransactions();
for (final Transaction transaction : localTransactions) {
peerTransactionTracker.addToPeerSendQueue(peer, transaction);
}
}
Expand All @@ -100,6 +115,8 @@ public ValidationResult<TransactionInvalidReason> addLocalTransaction(
final boolean added = pendingTransactions.addLocalTransaction(transaction);
if (added) {
transactionBatchAddedListener.onTransactionsAdded(singletonList(transaction));
} else {
duplicateTransactionCounter.labels(LOCAL).inc();
}
});
return validationResult;
Expand All @@ -111,12 +128,19 @@ public void addRemoteTransactions(final Collection<Transaction> transactions) {
}
final Set<Transaction> addedTransactions = new HashSet<>();
for (final Transaction transaction : transactions) {
if (pendingTransactions.containsTransaction(transaction.hash())) {
// We already have this transaction, don't even validate it.
duplicateTransactionCounter.labels(REMOTE).inc();
continue;
}
final ValidationResult<TransactionInvalidReason> validationResult =
validateTransaction(transaction);
if (validationResult.isValid()) {
final boolean added = pendingTransactions.addRemoteTransaction(transaction);
if (added) {
addedTransactions.add(transaction);
} else {
duplicateTransactionCounter.labels(REMOTE).inc();
}
} else {
LOG.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public static TransactionPool createTransactionPool(
new TransactionSender(transactionTracker, transactionsMessageSender, ethContext),
syncState,
ethContext,
transactionTracker);
transactionTracker,
metricsSystem);

final TransactionsMessageHandler transactionsMessageHandler =
new TransactionsMessageHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ public void setUp() {
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
peerTransactionTracker,
metricsSystem);
blockchain.observeBlockAdded(transactionPool);
}

Expand Down Expand Up @@ -335,6 +336,29 @@ public void shouldAllowSequenceOfTransactionsWithIncreasingNonceFromSameSender()
assertTransactionPending(transaction3);
}

@Test
public void shouldDiscardRemoteTransactionThatAlreadyExistsBeforeValidation() {
final PendingTransactions pendingTransactions = mock(PendingTransactions.class);
final TransactionPool transactionPool =
new TransactionPool(
pendingTransactions,
protocolSchedule,
protocolContext,
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker,
metricsSystem);

when(pendingTransactions.containsTransaction(transaction1.hash())).thenReturn(true);

transactionPool.addRemoteTransactions(singletonList(transaction1));

verify(pendingTransactions).containsTransaction(transaction1.hash());
verifyZeroInteractions(transactionValidator);
verifyNoMoreInteractions(pendingTransactions);
}

@Test
public void shouldNotNotifyBatchListenerWhenRemoteTransactionDoesNotReplaceExisting() {
final TransactionTestFixture builder = new TransactionTestFixture();
Expand Down Expand Up @@ -465,7 +489,8 @@ public void shouldRejectRemoteTransactionsWhenNotInSync() {
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
peerTransactionTracker,
metricsSystem);

final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transaction1 = builder.nonce(1).createTransaction(KEY_PAIR1);
Expand Down Expand Up @@ -529,7 +554,8 @@ public void shouldSendOnlyLocalTransactionToNewlyConnectedPeer() {
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
peerTransactionTracker,
metricsSystem);

final TransactionTestFixture builder = new TransactionTestFixture();
final Transaction transactionLocal = builder.nonce(1).createTransaction(KEY_PAIR1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public void setUp() {
batchAddedListener,
syncState,
ethContext,
peerTransactionTracker);
peerTransactionTracker,
metricsSystem);
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, protocolContext.getWorldStateArchive());
filterManager =
Expand Down