Skip to content

Commit

Permalink
[Java] Poll for remote Archive errors while awaiting log recording se…
Browse files Browse the repository at this point in the history
…ssion to be created.
  • Loading branch information
vyazelenko committed Jan 9, 2025
1 parent 57318d7 commit 45b4cd4
Showing 1 changed file with 33 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -849,10 +849,22 @@ private int liveLogReplay(final long nowMs)
}
else if (NULL_VALUE == liveLogReplaySessionId)
{
if (pollForResponse(clusterArchive, correlationId))
final ControlResponsePoller poller = clusterArchive.controlResponsePoller();
if (0 != poller.poll() && poller.isPollComplete() &&
poller.controlSessionId() == clusterArchive.controlSessionId() &&
poller.correlationId() == correlationId)
{
liveLogReplaySessionId = clusterArchive.controlResponsePoller().relevantId();
timeOfLastProgressMs = nowMs;
switch (poller.code())
{
case OK ->
{
liveLogReplaySessionId = poller.relevantId();
timeOfLastProgressMs = nowMs;
workCount++;
}
case ERROR -> throwReplayFailedException(poller);
default -> throw new ClusterException("Live log replay failed: " + poller.code());
}
}
}
else if (NULL_COUNTER_ID == liveLogRecordingCounterId)
Expand All @@ -868,6 +880,17 @@ else if (NULL_COUNTER_ID == liveLogRecordingCounterId)
timeOfLastBackupQueryMs = nowMs;
timeOfLastProgressMs = nowMs;
state(UPDATE_RECORDING_LOG, nowMs);
workCount++;
}
else
{
final ControlResponsePoller poller = clusterArchive.controlResponsePoller();
if (0 != poller.poll() && poller.isPollComplete() &&
poller.controlSessionId() == clusterArchive.controlSessionId() &&
ControlResponseCode.ERROR == poller.code())
{
throwReplayFailedException(poller);
}
}
}
}
Expand All @@ -880,6 +903,13 @@ else if (NULL_COUNTER_ID == liveLogRecordingCounterId)
return workCount;
}

private void throwReplayFailedException(final ControlResponsePoller poller)
{
throw new ClusterException("Live log replay failed (replaySessionId=" + liveLogReplaySessionId + "):" +
" errorMessage=" + poller.errorMessage() + ", errorCode=" +
ArchiveException.errorCodeAsString((int)poller.relevantId()));
}

private int updateRecordingLog(final long nowMs)
{
boolean wasRecordingLogUpdated = false;
Expand Down Expand Up @@ -1016,27 +1046,6 @@ private void logStateChange(
//System.out.println("ClusterBackup: " + oldState + " -> " + newState + " nowMs=" + nowMs);
}

private static boolean pollForResponse(final AeronArchive archive, final long correlationId)
{
final ControlResponsePoller poller = archive.controlResponsePoller();

if (poller.poll() > 0 && poller.isPollComplete())
{
if (poller.controlSessionId() == archive.controlSessionId())
{
final ControlResponseCode code = poller.code();
if (ControlResponseCode.ERROR == code)
{
throw new ArchiveException(poller.errorMessage(), (int)poller.relevantId(), poller.correlationId());
}

return ControlResponseCode.OK == code && poller.correlationId() == correlationId;
}
}

return false;
}

private int pollBackupArchiveEvents()
{
int workCount = 0;
Expand Down

0 comments on commit 45b4cd4

Please sign in to comment.