From 769a2fae4f7abd47f3cbb0837b9d78bef29cb693 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta Date: Tue, 25 Jun 2019 14:10:58 +0200 Subject: [PATCH 1/2] [PIE-1707] Implement a timeout in TransactionMessageProcessor - `processTransactionsMessage` now takes a `keepAlive` parameter - don't process the message if expired - add unit tests - use a default timeout for transactions (1 minute) --- .../TransactionsMessageHandler.java | 9 ++++- .../TransactionsMessageProcessor.java | 14 ++++++++ .../TransactionsMessageProcessorTest.java | 35 +++++++++++++++++-- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java index e802ad5864..99a7df59ef 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageHandler.java @@ -12,13 +12,19 @@ */ package tech.pegasys.pantheon.ethereum.eth.transactions; +import static java.time.Instant.now; + import tech.pegasys.pantheon.ethereum.eth.manager.EthMessage; import tech.pegasys.pantheon.ethereum.eth.manager.EthMessages.MessageCallback; import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage; +import java.time.Duration; +import java.time.Instant; + class TransactionsMessageHandler implements MessageCallback { + private static final Duration TX_KEEP_ALIVE = Duration.ofMinutes(1); private final TransactionsMessageProcessor transactionsMessageProcessor; private final EthScheduler scheduler; @@ -32,9 +38,10 @@ public TransactionsMessageHandler( @Override public void exec(final EthMessage message) { final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData()); + final Instant startedAt = now(); scheduler.scheduleTxWorkerTask( () -> transactionsMessageProcessor.processTransactionsMessage( - message.getPeer(), transactionsMessage)); + message.getPeer(), transactionsMessage, startedAt, TX_KEEP_ALIVE)); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java index 7829804676..5a50edcaa3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessor.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.transactions; +import static java.time.Instant.now; import static org.apache.logging.log4j.LogManager.getLogger; import tech.pegasys.pantheon.ethereum.core.Transaction; @@ -20,6 +21,8 @@ import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import tech.pegasys.pantheon.ethereum.rlp.RLPException; +import java.time.Duration; +import java.time.Instant; import java.util.Iterator; import java.util.Set; @@ -39,6 +42,17 @@ public TransactionsMessageProcessor( } void processTransactionsMessage( + final EthPeer peer, + final TransactionsMessage transactionsMessage, + final Instant startedAt, + final Duration keepAlive) { + // Check if message not expired. + if (startedAt.plus(keepAlive).isAfter(now())) { + this.processTransactionsMessage(peer, transactionsMessage); + } + } + + private void processTransactionsMessage( final EthPeer peer, final TransactionsMessage transactionsMessage) { try { LOG.trace("Received transactions message from {}", peer); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java index 9c29bfffeb..eda6d3e84d 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java @@ -12,9 +12,13 @@ */ package tech.pegasys.pantheon.ethereum.eth.transactions; +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofMinutes; +import static java.time.Instant.now; import static java.util.Arrays.asList; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; import tech.pegasys.pantheon.ethereum.core.Transaction; @@ -41,7 +45,10 @@ public class TransactionsMessageProcessorTest { @Test public void shouldMarkAllReceivedTransactionsAsSeen() { messageHandler.processTransactionsMessage( - peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3))); + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now(), + ofMinutes(1)); verify(transactionTracker) .markTransactionsAsSeen(peer1, ImmutableSet.of(transaction1, transaction2, transaction3)); @@ -50,9 +57,31 @@ public void shouldMarkAllReceivedTransactionsAsSeen() { @Test public void shouldAddReceivedTransactionsToTransactionPool() { messageHandler.processTransactionsMessage( - peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3))); - + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now(), + ofMinutes(1)); verify(transactionPool) .addRemoteTransactions(ImmutableSet.of(transaction1, transaction2, transaction3)); } + + @Test + public void shouldNotMarkAllReceivedTransactionsAsSeenIfExpired() { + messageHandler.processTransactionsMessage( + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now().minus(ofMinutes(1)), + ofMillis(1)); + verifyZeroInteractions(transactionTracker); + } + + @Test + public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() { + messageHandler.processTransactionsMessage( + peer1, + TransactionsMessage.create(asList(transaction1, transaction2, transaction3)), + now().minus(ofMinutes(1)), + ofMillis(1)); + verifyZeroInteractions(transactionPool); + } } From 42f770ac05cd44a8a5f575bc34bfce0401408e42 Mon Sep 17 00:00:00 2001 From: Abdelhamid Bakhta <45264458+abdelhamidbakhta@users.noreply.github.com> Date: Tue, 25 Jun 2019 15:22:23 +0200 Subject: [PATCH 2/2] Update ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java Co-Authored-By: Nicolas MASSART --- .../eth/transactions/TransactionsMessageProcessorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java index eda6d3e84d..2fcc38c4d9 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/transactions/TransactionsMessageProcessorTest.java @@ -66,7 +66,7 @@ public void shouldAddReceivedTransactionsToTransactionPool() { } @Test - public void shouldNotMarkAllReceivedTransactionsAsSeenIfExpired() { + public void shouldNotMarkReceivedExpiredTransactionsAsSeen() { messageHandler.processTransactionsMessage( peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),