Skip to content

Commit

Permalink
Fix duplicate service messages during failover/restart when using mul…
Browse files Browse the repository at this point in the history
…tiple services (#1703)

* [Java] Add test to show duplicate messages being received when failing over with uncommitted pending service messages when running with multiple services.

* [Java] Fix consensus module to use correct service ID when enqueuing pending service messages.
  • Loading branch information
JPWatson authored Jan 7, 2025
1 parent c2f0810 commit 80a93bb
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ void onServiceCloseSession(final long clusterSessionId)

void onServiceMessage(final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
final int i = PendingServiceMessageTracker.serviceId(clusterSessionId);
final int i = (int)clusterSessionId;
pendingServiceMessageTrackers[i].enqueueMessage((MutableDirectBuffer)buffer, offset, length);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
package io.aeron.cluster;

import io.aeron.cluster.service.Cluster;
import io.aeron.test.EventLogExtension;
import io.aeron.test.InterruptAfter;
import io.aeron.test.InterruptingTestCallback;
import io.aeron.test.SlowTest;
import io.aeron.test.SystemTestWatcher;
import io.aeron.test.*;
import io.aeron.test.cluster.ClusterTests;
import io.aeron.test.cluster.TestCluster;
import io.aeron.test.cluster.TestNode;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.collections.Int2IntCounterMap;
import org.agrona.collections.IntArrayList;
import org.agrona.collections.IntHashSet;
import org.agrona.collections.LongArrayList;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Collections;
import java.util.function.IntFunction;

import static io.aeron.test.cluster.TestCluster.aCluster;
Expand Down Expand Up @@ -129,6 +128,66 @@ void shouldProcessServiceMessagesWithoutDuplicates()
assertTrackedMessages(cluster, -1, messageCount);
}

@Test
@SlowTest
@InterruptAfter(60)
void shouldProcessServiceMessagesWithoutDuplicatesDuringFailoverWithUncommittedPendingServiceMessages()
{
final IntFunction<TestNode.TestService[]> serviceSupplier =
(i) -> new TestNode.TestService[]
{
new TestNode.MessageTrackingService(1, i),
new TestNode.MessageTrackingService(2, i),
new TestNode.MessageTrackingService(3, i)
};
final TestCluster cluster = aCluster()
.withStaticNodes(3)
.withTimerServiceSupplier(new PriorityHeapTimerServiceSupplier())
.withServiceSupplier(serviceSupplier)
.start();
systemTestWatcher.cluster(cluster);

final TestNode oldLeader = cluster.awaitLeaderAndClosedElection();
cluster.connectClient();

final ExpandableArrayBuffer msgBuffer = cluster.msgBuffer();
int messageCount = 0;
for (int i = 0; i < 50; i++)
{
msgBuffer.putInt(0, ++messageCount, LITTLE_ENDIAN);
cluster.pollUntilMessageSent(SIZE_OF_INT);
}

// wait for at least one message but don't wait for all messages to be processed as
// we want uncommitted messages in flight
cluster.awaitResponseMessageCount(1);
cluster.stopNode(oldLeader);

final TestNode newLeader = cluster.awaitLeader();
final int leaderMessageCount = newLeader.services()[0].messageCount();

final TestNode follower = cluster.node(3 - oldLeader.index() - newLeader.index());

for (final TestNode.TestService service : follower.services())
{
service.awaitServiceMessageCount(leaderMessageCount, () -> {}, follower);
}

for (int i = 0; i < 3; i++)
{
final TestNode node = cluster.node(i);
final TestNode.TestService[] services = node.services();
for (final TestNode.TestService service : services)
{
final TestNode.MessageTrackingService trackingService1 = (TestNode.MessageTrackingService)service;

final IntArrayList actualServiceMessages = trackingService1.serviceMessages();

assertNoDuplicates(node, actualServiceMessages);
}
}
}

@Test
@SlowTest
@InterruptAfter(40)
Expand Down Expand Up @@ -434,6 +493,37 @@ private static void assertTrackedServiceState(
fail("memberId=" + node.index() + ", role=" + node.role() + ": Timers diverged: expected=" +
expectedTimers.size() + ", actual=" + actualTimers.size());
}

assertNoDuplicates(node, actualServiceMessages);
}
}

private static void assertNoDuplicates(final TestNode node, final IntArrayList actualServiceMessages)
{
final IntHashSet set = new IntHashSet(actualServiceMessages.size());
set.addAll(actualServiceMessages);

if (set.size() != actualServiceMessages.size())
{
final Int2IntCounterMap messageCounts = new Int2IntCounterMap(0);
for (final int messageId : actualServiceMessages)
{
messageCounts.incrementAndGet(messageId);
}

final IntArrayList duplicateMessageIds = new IntArrayList();
messageCounts.forEach((messageId, count) ->
{
if (count > 1)
{
duplicateMessageIds.add(messageId);
}
});

Collections.sort(duplicateMessageIds);

fail("memberId=" + node.index() + ", role=" + node.role() + ": Duplicate messages found: " +
duplicateMessageIds);
}
}
}

0 comments on commit 80a93bb

Please sign in to comment.