Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PIE-1707] Implement a timeout in TransactionMessageProcessor #1604

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.time.Instant.now;

import tech.pegasys.pantheon.ethereum.eth.manager.EthMessage;
import tech.pegasys.pantheon.ethereum.eth.manager.EthMessages.MessageCallback;
import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler;
import tech.pegasys.pantheon.ethereum.eth.messages.TransactionsMessage;

import java.time.Duration;
import java.time.Instant;

class TransactionsMessageHandler implements MessageCallback {

private static final Duration TX_KEEP_ALIVE = Duration.ofMinutes(1);
private final TransactionsMessageProcessor transactionsMessageProcessor;
private final EthScheduler scheduler;

Expand All @@ -32,9 +38,10 @@ public TransactionsMessageHandler(
@Override
public void exec(final EthMessage message) {
final TransactionsMessage transactionsMessage = TransactionsMessage.readFrom(message.getData());
final Instant startedAt = now();
scheduler.scheduleTxWorkerTask(
() ->
transactionsMessageProcessor.processTransactionsMessage(
message.getPeer(), transactionsMessage));
message.getPeer(), transactionsMessage, startedAt, TX_KEEP_ALIVE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.time.Instant.now;
import static org.apache.logging.log4j.LogManager.getLogger;

import tech.pegasys.pantheon.ethereum.core.Transaction;
Expand All @@ -20,6 +21,8 @@
import tech.pegasys.pantheon.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.ethereum.rlp.RLPException;

import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Set;

Expand All @@ -39,6 +42,17 @@ public TransactionsMessageProcessor(
}

void processTransactionsMessage(
final EthPeer peer,
final TransactionsMessage transactionsMessage,
final Instant startedAt,
final Duration keepAlive) {
// Check if message not expired.
if (startedAt.plus(keepAlive).isAfter(now())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you provide any log or some feedback in case you skip the message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because potentially we will drop thousands of messages and logging would be a bottleneck. A potential follow up PR would gather information and maintain a counter as a prometheus metric to see the total number of dropped messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create a follow up PR after this one to keep it small.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in #1586 we have to deal with up to 500 tx / second. We want to drop oldest messages because they are more likely to contain non relevant transactions. So we check if the message has expired prior to process it. Since we would probably have to skip thousands of messages we don't want to log each individual message skipped. A follow up PR will be created after this one to ensure we have proper feedback.

this.processTransactionsMessage(peer, transactionsMessage);
}
}

private void processTransactionsMessage(
final EthPeer peer, final TransactionsMessage transactionsMessage) {
try {
LOG.trace("Received transactions message from {}", peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
*/
package tech.pegasys.pantheon.ethereum.eth.transactions;

import static java.time.Duration.ofMillis;
import static java.time.Duration.ofMinutes;
import static java.time.Instant.now;
import static java.util.Arrays.asList;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
import tech.pegasys.pantheon.ethereum.core.Transaction;
Expand All @@ -41,7 +45,10 @@ public class TransactionsMessageProcessorTest {
@Test
public void shouldMarkAllReceivedTransactionsAsSeen() {
messageHandler.processTransactionsMessage(
peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3)));
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now(),
ofMinutes(1));

verify(transactionTracker)
.markTransactionsAsSeen(peer1, ImmutableSet.of(transaction1, transaction2, transaction3));
Expand All @@ -50,9 +57,31 @@ public void shouldMarkAllReceivedTransactionsAsSeen() {
@Test
public void shouldAddReceivedTransactionsToTransactionPool() {
messageHandler.processTransactionsMessage(
peer1, TransactionsMessage.create(asList(transaction1, transaction2, transaction3)));

peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now(),
ofMinutes(1));
verify(transactionPool)
.addRemoteTransactions(ImmutableSet.of(transaction1, transaction2, transaction3));
}

@Test
public void shouldNotMarkAllReceivedTransactionsAsSeenIfExpired() {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
messageHandler.processTransactionsMessage(
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now().minus(ofMinutes(1)),
ofMillis(1));
verifyZeroInteractions(transactionTracker);
}

@Test
public void shouldNotAddReceivedTransactionsToTransactionPoolIfExpired() {
AbdelStark marked this conversation as resolved.
Show resolved Hide resolved
messageHandler.processTransactionsMessage(
peer1,
TransactionsMessage.create(asList(transaction1, transaction2, transaction3)),
now().minus(ofMinutes(1)),
ofMillis(1));
verifyZeroInteractions(transactionPool);
}
}