Skip to content

Commit

Permalink
Fix potential memory leak (hyperledger#7076)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <[email protected]>
  • Loading branch information
fab-10 authored May 8, 2024
1 parent 65b3cb1 commit b4b6adc
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,7 +85,7 @@ private void processNewPooledTransactionHashesMessage(

LOG.atTrace()
.setMessage(
"Received pooled transaction hashes message from {}... incoming hashes {}, incoming list {}")
"Received pooled transaction hashes message from {} incoming hashes {}, incoming list {}")
.addArgument(() -> peer == null ? null : peer.getLoggableId())
.addArgument(incomingTransactionHashes::size)
.addArgument(incomingTransactionHashes)
Expand Down Expand Up @@ -121,7 +120,7 @@ private void processNewPooledTransactionHashesMessage(
bufferedTask.addHashes(
incomingTransactionHashes.stream()
.filter(hash -> transactionPool.getTransactionByHash(hash).isEmpty())
.collect(Collectors.toList()));
.toList());
} catch (final RLPException ex) {
if (peer != null) {
LOG.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,34 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
private static final Logger LOG = LoggerFactory.getLogger(PeerTransactionTracker.class);

private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;

private final EthPeers ethPeers;
private final Map<EthPeer, Set<Hash>> seenTransactions = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionsToSend = new ConcurrentHashMap<>();
private final Map<EthPeer, Set<Transaction>> transactionHashesToSend = new ConcurrentHashMap<>();

public PeerTransactionTracker(final EthPeers ethPeers) {
this.ethPeers = ethPeers;
}

public void reset() {
seenTransactions.clear();
transactionsToSend.clear();
Expand Down Expand Up @@ -119,8 +133,46 @@ protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {

@Override
public void onDisconnect(final EthPeer peer) {
seenTransactions.remove(peer);
transactionsToSend.remove(peer);
transactionHashesToSend.remove(peer);
LOG.atTrace().setMessage("onDisconnect for peer {}").addArgument(peer::getLoggableId).log();

// here we reconcile all the trackers with the active peers, since due to the asynchronous
// processing of incoming messages it could seldom happen that a tracker is recreated just
// after a peer was disconnected, resulting in a memory leak.
final Set<EthPeer> trackedPeers = new HashSet<>(seenTransactions.keySet());
trackedPeers.addAll(transactionsToSend.keySet());
trackedPeers.addAll(transactionHashesToSend.keySet());

LOG.atTrace()
.setMessage("{} tracked peers ({})")
.addArgument(trackedPeers.size())
.addArgument(() -> logPeerSet(trackedPeers))
.log();

final Set<EthPeer> connectedPeers =
ethPeers.streamAllPeers().collect(Collectors.toUnmodifiableSet());

final var disconnectedPeers = trackedPeers;
disconnectedPeers.removeAll(connectedPeers);
LOG.atTrace()
.setMessage("Removing {} transaction trackers for disconnected peers ({})")
.addArgument(disconnectedPeers.size())
.addArgument(() -> logPeerSet(disconnectedPeers))
.log();

disconnectedPeers.stream()
.forEach(
disconnectedPeer -> {
seenTransactions.remove(disconnectedPeer);
transactionsToSend.remove(disconnectedPeer);
transactionHashesToSend.remove(disconnectedPeer);
LOG.atTrace()
.setMessage("Removed transaction trackers for disconnected peer {}")
.addArgument(disconnectedPeer::getLoggableId)
.log();
});
}

private String logPeerSet(final Set<EthPeer> peers) {
return peers.stream().map(EthPeer::getLoggableId).collect(Collectors.joining(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public static TransactionPool createTransactionPool(

final TransactionPoolMetrics metrics = new TransactionPoolMetrics(metricsSystem);

final PeerTransactionTracker transactionTracker = new PeerTransactionTracker();
final PeerTransactionTracker transactionTracker =
new PeerTransactionTracker(ethContext.getEthPeers());
final TransactionsMessageSender transactionsMessageSender =
new TransactionsMessageSender(transactionTracker);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PeerTransactionTracker;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class BufferedGetPooledTransactionsFromPeerFetcherTest {
@Mock TransactionPool transactionPool;
@Mock EthContext ethContext;
@Mock EthScheduler ethScheduler;
@Mock EthPeers ethPeers;

private final BlockDataGenerator generator = new BlockDataGenerator();

Expand All @@ -67,7 +69,8 @@ public class BufferedGetPooledTransactionsFromPeerFetcherTest {
@BeforeEach
public void setup() {
metricsSystem = new StubMetricsSystem();
transactionTracker = new PeerTransactionTracker();
when(ethContext.getEthPeers()).thenReturn(ethPeers);
transactionTracker = new PeerTransactionTracker(ethPeers);
when(ethContext.getScheduler()).thenReturn(ethScheduler);
ScheduledFuture<?> mock = mock(ScheduledFuture.class);
fetcher =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void setUp() {
doNothing().when(ethScheduler).scheduleSyncWorkerTask(syncTaskCapture.capture());
doReturn(ethScheduler).when(ethContext).getScheduler();

peerTransactionTracker = new PeerTransactionTracker();
peerTransactionTracker = new PeerTransactionTracker(ethContext.getEthPeers());
transactionBroadcaster =
spy(
new TransactionBroadcaster(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.manager.MockPeerConnection;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.eth.messages.NewPooledTransactionHashesMessage;
Expand All @@ -47,6 +48,7 @@
import org.mockito.ArgumentCaptor;

public class NewPooledTransactionHashesMessageSenderTest {
private final EthPeers ethPeers = mock(EthPeers.class);

private final EthPeer peer1 = mock(EthPeer.class);
private final EthPeer peer2 = mock(EthPeer.class);
Expand All @@ -63,7 +65,7 @@ public class NewPooledTransactionHashesMessageSenderTest {

@BeforeEach
public void setUp() {
transactionTracker = new PeerTransactionTracker();
transactionTracker = new PeerTransactionTracker(ethPeers);
messageSender = new NewPooledTransactionHashesMessageSender(transactionTracker);
final Transaction tx = mock(Transaction.class);
pendingTransactions = mock(PendingTransactions.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;

import java.util.List;
import java.util.stream.Stream;

import com.google.common.collect.ImmutableSet;
import org.junit.jupiter.api.Test;

public class PeerTransactionTrackerTest {
private final EthPeers ethPeers = mock(EthPeers.class);

private final EthPeer ethPeer1 = mock(EthPeer.class);
private final EthPeer ethPeer2 = mock(EthPeer.class);
private final BlockDataGenerator generator = new BlockDataGenerator();
private final PeerTransactionTracker tracker = new PeerTransactionTracker();
private final PeerTransactionTracker tracker = new PeerTransactionTracker(ethPeers);
private final Transaction transaction1 = generator.transaction();
private final Transaction transaction2 = generator.transaction();
private final Transaction transaction3 = generator.transaction();
Expand Down Expand Up @@ -79,6 +85,7 @@ public void shouldClearDataWhenPeerDisconnects() {
tracker.addToPeerSendQueue(ethPeer1, transaction2);
tracker.addToPeerSendQueue(ethPeer2, transaction3);

when(ethPeers.streamAllPeers()).thenReturn(Stream.of(ethPeer2));
tracker.onDisconnect(ethPeer1);

assertThat(tracker.getEthPeersWithUnsentTransactions()).containsOnly(ethPeer2);
Expand All @@ -90,4 +97,32 @@ public void shouldClearDataWhenPeerDisconnects() {
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer1)).containsOnly(transaction1);
assertThat(tracker.claimTransactionsToSendToPeer(ethPeer2)).containsOnly(transaction3);
}

@Test
public void shouldClearDataForAllDisconnectedPeers() {
tracker.markTransactionsAsSeen(ethPeer1, List.of(transaction1));
tracker.markTransactionsAsSeen(ethPeer2, List.of(transaction2));

when(ethPeers.streamAllPeers()).thenReturn(Stream.of(ethPeer2));
tracker.onDisconnect(ethPeer1);

// false because tracker removed for ethPeer1
assertThat(tracker.hasPeerSeenTransaction(ethPeer1, transaction1)).isFalse();
assertThat(tracker.hasPeerSeenTransaction(ethPeer2, transaction2)).isTrue();

// simulate a concurrent interaction, that just after the disconnection of the peer,
// recreates the transaction tackers for it
tracker.markTransactionsAsSeen(ethPeer1, List.of(transaction1));
// ethPeer1 is here again, due to the above interaction with the tracker
assertThat(tracker.hasPeerSeenTransaction(ethPeer1, transaction1)).isTrue();

// disconnection of ethPeers2 will reconcile the tracker, removing also all the other
// disconnected peers
when(ethPeers.streamAllPeers()).thenReturn(Stream.of());
tracker.onDisconnect(ethPeer2);

// since no peers are connected, all the transaction trackers have been removed
assertThat(tracker.hasPeerSeenTransaction(ethPeer1, transaction1)).isFalse();
assertThat(tracker.hasPeerSeenTransaction(ethPeer2, transaction2)).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.EthPeers;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.TransactionsMessage;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
Expand All @@ -37,6 +38,7 @@
import org.mockito.ArgumentCaptor;

public class TransactionsMessageSenderTest {
private final EthPeers ethPeers = mock(EthPeers.class);

private final EthPeer peer1 = mock(EthPeer.class);
private final EthPeer peer2 = mock(EthPeer.class);
Expand All @@ -46,7 +48,7 @@ public class TransactionsMessageSenderTest {
private final Transaction transaction2 = generator.transaction();
private final Transaction transaction3 = generator.transaction();

private final PeerTransactionTracker transactionTracker = new PeerTransactionTracker();
private final PeerTransactionTracker transactionTracker = new PeerTransactionTracker(ethPeers);
private final TransactionsMessageSender messageSender =
new TransactionsMessageSender(transactionTracker);

Expand Down

0 comments on commit b4b6adc

Please sign in to comment.