Skip to content

Commit

Permalink
[Java] Introduce SnapshotDurationTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
eliquinox committed Nov 14, 2023
1 parent 3bf1c54 commit 88c6199
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 14 deletions.
10 changes: 10 additions & 0 deletions aeron-client/src/main/java/io/aeron/AeronCounters.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,16 @@ public final class AeronCounters
*/
public static final int NODE_CONTROL_TOGGLE_TYPE_ID = 233;

/**
* The type id of the {@link Counter} used for keeping track of the maximum time it took to snapshot consensus module.
*/
public static final int CLUSTER_CONSENSUS_MODULE_MAX_SNAPSHOT_DURATION_TYPE_ID = 234;

/**
* The type id of the {@link Counter} used for keeping track of the count consensus module snapshot duration has exceeded the threshold.
*/
public static final int CLUSTER_CONSENSUS_MODULE_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID = 235;

private AeronCounters()
{
}
Expand Down
90 changes: 90 additions & 0 deletions aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,17 @@ public static final class Configuration
*/
public static final long CYCLE_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Property name for threshold value, which is used for tracking consensus module snapshot duration breaches.
*/
public static final String SNAPSHOT_DURATION_THRESHOLD_PROP_NAME =
"aeron.cluster.consensus.module.snapshot.threshold";

/**
* Default threshold value, which is used for tracking consensus module snapshot duration breaches.
*/
public static final long SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS = TimeUnit.MILLISECONDS.toNanos(1000);

/**
* Default timeout a leader will wait on getting termination ACKs from followers.
*/
Expand Down Expand Up @@ -1097,6 +1108,17 @@ public static long cycleThresholdNs()
return getDurationInNanos(CYCLE_THRESHOLD_PROP_NAME, CYCLE_THRESHOLD_DEFAULT_NS);
}

/**
* Get threshold value, which is used for monitoring consensus module snapshot duration breaches of its
* predefined threshold.
*
* @return threshold value in nanoseconds.
*/
public static long snapshotDurationThresholdNs()
{
return getDurationInNanos(SNAPSHOT_DURATION_THRESHOLD_PROP_NAME, SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS);
}

/**
* Size in bytes of the error buffer in the mark file.
*
Expand Down Expand Up @@ -1389,6 +1411,7 @@ public static final class Context implements Cloneable
private long electionStatusIntervalNs = Configuration.electionStatusIntervalNs();
private long terminationTimeoutNs = Configuration.terminationTimeoutNs();
private long cycleThresholdNs = Configuration.cycleThresholdNs();
private long snapshotDurationThresholdNs = Configuration.snapshotDurationThresholdNs();

private String agentRoleName = Configuration.agentRoleName();
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -1422,6 +1445,7 @@ public static final class Context implements Cloneable
private LogPublisher logPublisher;
private EgressPublisher egressPublisher;
private DutyCycleTracker dutyCycleTracker;
private SnapshotDurationTracker snapshotDurationTracker;
private AppVersionValidator appVersionValidator;
private boolean isLogMdc;
private boolean useAgentInvoker = false;
Expand Down Expand Up @@ -1717,6 +1741,26 @@ public void conclude()
cycleThresholdNs);
}

if (null == snapshotDurationTracker)
{
snapshotDurationTracker = new SnapshotDurationTracker(
ClusterCounters.allocate(
aeron,
buffer,
"Consensus module max snapshot duration in ns",
AeronCounters.CLUSTER_CONSENSUS_MODULE_MAX_SNAPSHOT_DURATION_TYPE_ID,
clusterId),
ClusterCounters.allocate(
aeron,
buffer,
"Consensus module max snapshot duration exceeded count: threshold=" +
snapshotDurationThresholdNs,
AeronCounters.CLUSTER_CONSENSUS_MODULE_SNAPSHOT_DURATION_THRESHOLD_EXCEEDED_TYPE_ID,
clusterId),
snapshotDurationThresholdNs);
}


if (null == threadFactory)
{
threadFactory = Thread::new;
Expand Down Expand Up @@ -3020,6 +3064,52 @@ public DutyCycleTracker dutyCycleTracker()
return dutyCycleTracker;
}

/**
* Set a threshold for consensus module snapshot duration which when exceeded will result
* in a counter increment.
*
* @param thresholdNs value in nanoseconds
* @return this for fluent API.
* @see ConsensusModule.Configuration#SNAPSHOT_DURATION_THRESHOLD_PROP_NAME
* @see ConsensusModule.Configuration#SNAPSHOT_DURATION_THRESHOLD_DEFAULT_NS
*/
public ConsensusModule.Context snapshotDurationThresholdNs(final long thresholdNs)
{
this.snapshotDurationThresholdNs = thresholdNs;
return this;
}

/**
* Threshold for consensus module agent snapshot duration which when exceeded it will increment the
* counter.
*
* @return threshold value in nanoseconds.
*/
public long snapshotDurationThresholdNs()
{
return snapshotDurationThresholdNs;
}

/**
* Set snapshot duration tracker used for monitoring consensus module snapshot duration.
* @param snapshotDurationTracker snapshot duration tracker.
* @return this for fluent API.
*/
public ConsensusModule.Context snapshotDurationTracker(final SnapshotDurationTracker snapshotDurationTracker)
{
this.snapshotDurationTracker = snapshotDurationTracker;
return this;
}

/**
* Get snapshot duration tracker used for monitoring consensus module snapshot duration.
* @return snapshot duration tracker
*/
public SnapshotDurationTracker snapshotDurationTracker()
{
return snapshotDurationTracker;
}

/**
* Get the {@link Agent#roleName()} to be used for the consensus module agent. If {@code null} then one will
* be generated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
import static io.aeron.archive.client.ReplayMerge.LIVE_ADD_MAX_WINDOW;
import static io.aeron.archive.codecs.SourceLocation.LOCAL;
import static io.aeron.cluster.ClusterSession.State.*;
import static io.aeron.cluster.ConsensusModule.CLUSTER_ACTION_FLAGS_STANDBY_SNAPSHOT;
import static io.aeron.cluster.ConsensusModule.CLUSTER_ACTION_FLAGS_DEFAULT;
import static io.aeron.cluster.ConsensusModule.CLUSTER_ACTION_FLAGS_STANDBY_SNAPSHOT;
import static io.aeron.cluster.ConsensusModule.Configuration.*;
import static io.aeron.cluster.client.AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION;
import static io.aeron.cluster.service.ClusteredServiceContainer.Configuration.MARK_FILE_UPDATE_INTERVAL_NS;
Expand Down Expand Up @@ -147,6 +147,8 @@ final class ConsensusModuleAgent implements Agent, TimerService.TimerHandler, Co
private final IdleStrategy idleStrategy;
private final RecordingLog recordingLog;
private final DutyCycleTracker dutyCycleTracker;
private final SnapshotDurationTracker snapshotDurationTracker;

private RecordingLog.RecoveryPlan recoveryPlan;
private AeronArchive archive;
private RecordingSignalPoller recordingSignalPoller;
Expand Down Expand Up @@ -188,6 +190,7 @@ final class ConsensusModuleAgent implements Agent, TimerService.TimerHandler, Co
Arrays.fill(serviceClientIds, NULL_VALUE);
this.serviceAckQueues = ServiceAck.newArrayOfQueues(ctx.serviceCount());
this.dutyCycleTracker = ctx.dutyCycleTracker();
this.snapshotDurationTracker = ctx.snapshotDurationTracker();

aeronClientInvoker = aeron.conductorAgentInvoker();
aeronClientInvoker.invoke();
Expand Down Expand Up @@ -1275,6 +1278,8 @@ void onServiceAck(
{
ClusterControl.ToggleState.reset(controlToggle);
}

snapshotDurationTracker.onSnapshotTaken(clusterClock.timeNanos());
}
}
else if (ConsensusModule.State.QUITTING == state)
Expand Down Expand Up @@ -2247,6 +2252,7 @@ private int checkClusterControlToggle(final long nowNs)
if (ConsensusModule.State.ACTIVE == state && appendAction(ClusterAction.SNAPSHOT))
{
state(ConsensusModule.State.SNAPSHOT);
snapshotDurationTracker.onSnapshotStart(nowNs);
}
break;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2014-2023 Real Logic Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.cluster.service;

import org.agrona.concurrent.status.AtomicCounter;

// TODO: check against duty cycle tracker
// TODO: worry about false sharing?
// TODO: mike b -- think about being able to delegate measurements to an aggregator

/**
* Snapshot duration tracker that tracks maximum snapshot duration and also keeps count of how many times a predefined
* duration threshold is breached.
*/
public class SnapshotDurationTracker
{
private final AtomicCounter maxSnapshotDuration;
private final AtomicCounter snapshotDurationThresholdExceededCount;
private final long durationThresholdNs;
private long snapshotStartTimeNs = Long.MIN_VALUE;

/**
* Create a tracker to track max snapshot duration and breaches of a threshold.
*
* @param maxSnapshotDuration counter for tracking.
* @param snapshotDurationThresholdExceededCount counter for tracking.
* @param durationThresholdNs to use for tracking breaches.
*/
public SnapshotDurationTracker(
final AtomicCounter maxSnapshotDuration,
final AtomicCounter snapshotDurationThresholdExceededCount,
final long durationThresholdNs)
{
this.maxSnapshotDuration = maxSnapshotDuration;
this.snapshotDurationThresholdExceededCount = snapshotDurationThresholdExceededCount;
this.durationThresholdNs = durationThresholdNs;
}

/**
* Get max snapshot duration counter.
*
* @return max snapshot duration counter.
*/
public AtomicCounter maxSnapshotDuration()
{
return maxSnapshotDuration;
}

/**
* Get counter tracking number of times {@link SnapshotDurationTracker#durationThresholdNs} was exceeded
*
* @return duration threshold exceeded counter.
*/
public AtomicCounter snapshotDurationThresholdExceededCount()
{
return snapshotDurationThresholdExceededCount;
}

/**
* Called when snapshotting has started.
*
* @param timeNanos snapshot start time in nanoseconds.
*/
public void onSnapshotStart(final long timeNanos)
{
snapshotStartTimeNs = timeNanos;
}

/**
* Called when snapshot has been taken.
*
* @param timeNanos snapshot end time in nanoseconds.
*/
public void onSnapshotTaken(final long timeNanos)
{
if (snapshotStartTimeNs != Long.MIN_VALUE)
{
final long snapshotDurationNs = timeNanos - snapshotStartTimeNs;

if (snapshotDurationNs > durationThresholdNs)
{
snapshotDurationThresholdExceededCount.increment();
}

maxSnapshotDuration.proposeMax(snapshotDurationNs);
}
}
}
69 changes: 56 additions & 13 deletions aeron-system-tests/src/test/java/io/aeron/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.aeron.cluster.codecs.AdminResponseEncoder;
import io.aeron.cluster.codecs.MessageHeaderDecoder;
import io.aeron.cluster.codecs.MessageHeaderEncoder;
import io.aeron.cluster.service.SnapshotDurationTracker;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.security.AuthorisationService;
Expand All @@ -45,6 +46,7 @@
import org.agrona.collections.Hashing;
import org.agrona.collections.MutableBoolean;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.status.AtomicCounter;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -61,23 +63,13 @@
import static io.aeron.logbuffer.FrameDescriptor.computeMaxMessageLength;
import static io.aeron.test.SystemTestWatcher.UNKNOWN_HOST_FILTER;
import static io.aeron.test.Tests.awaitAvailableWindow;
import static io.aeron.test.cluster.ClusterTests.NO_OP_MSG;
import static io.aeron.test.cluster.ClusterTests.REGISTER_TIMER_MSG;
import static io.aeron.test.cluster.ClusterTests.startPublisherThread;
import static io.aeron.test.cluster.TestCluster.aCluster;
import static io.aeron.test.cluster.TestCluster.awaitElectionClosed;
import static io.aeron.test.cluster.TestCluster.awaitElectionState;
import static io.aeron.test.cluster.TestCluster.awaitLossOfLeadership;
import static io.aeron.test.cluster.ClusterTests.*;
import static io.aeron.test.cluster.TestCluster.*;
import static io.aeron.test.cluster.TestNode.atMost;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.agrona.BitUtil.SIZE_OF_INT;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;

@SlowTest
@ExtendWith({ EventLogExtension.class, InterruptingTestCallback.class })
Expand Down Expand Up @@ -1922,6 +1914,57 @@ void shouldTakeASnapshotAfterReceivingAdminRequestOfTypeSnapshot()
cluster.awaitNeutralControlToggle(leader);
}

@Test
@InterruptAfter(20)
void shouldTrackSnapshotDuration()
{
cluster = aCluster()
.withServiceSupplier((i) -> new TestNode.TestService[]{
new TestNode.SleepOnSnapshotTestService()
.sleepNsOnTakeSnapshot(TimeUnit.MILLISECONDS.toNanos(101)).index(i)
})
.withStaticNodes(3)
.withAuthorisationServiceSupplier(() -> AuthorisationService.ALLOW_ALL)
.start();
systemTestWatcher.cluster(cluster);

final TestNode leader = cluster.awaitLeader();

final SnapshotDurationTracker snapshotDurationTracker = leader.consensusModule().context()
.snapshotDurationTracker();

final AtomicCounter snapshotDurationThresholdExceededCount = snapshotDurationTracker
.snapshotDurationThresholdExceededCount();

final AtomicCounter maxSnapshotDuration = snapshotDurationTracker.maxSnapshotDuration();

final long requestCorrelationId = System.nanoTime();
final MutableBoolean hasResponse = injectAdminResponseEgressListener(
requestCorrelationId, AdminRequestType.SNAPSHOT, AdminResponseCode.OK, EMPTY_MSG);

final AeronCluster client = cluster.connectClient();

assertEquals(0L, snapshotDurationThresholdExceededCount.get());
assertEquals(0, maxSnapshotDuration.get());

while (!client.sendAdminRequestToTakeASnapshot(requestCorrelationId))
{
Tests.yield();
}

while (!hasResponse.get())
{
client.pollEgress();
Tests.yield();
}

cluster.awaitSnapshotCount(1);
cluster.awaitNeutralControlToggle(leader);

assertEquals(1L, snapshotDurationThresholdExceededCount.get());
assertTrue(maxSnapshotDuration.get() > 0);
}

@Test
@InterruptAfter(20)
void shouldTakeASnapshotAfterReceivingAdminRequestOfTypeSnapshotAndNotifyViaControlledPoll()
Expand Down
Loading

0 comments on commit 88c6199

Please sign in to comment.