diff --git a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java index 279cb1326..36ad2e5d7 100644 --- a/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java +++ b/src/server/src/main/java/io/cassandrareaper/service/RepairManager.java @@ -34,6 +34,8 @@ import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import com.codahale.metrics.InstrumentedScheduledExecutorService; @@ -52,8 +54,9 @@ public final class RepairManager implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RepairManager.class); - // Caching all active RepairRunners. + // State of all active RepairRunners final Map repairRunners = Maps.newConcurrentMap(); + private final Lock repairRunnersLock = new ReentrantLock(); private final AppContext context; private final Heart heart; @@ -121,36 +124,53 @@ private void abortAllRunningSegmentsWithNoLeader(Collection runningRe } private void resumeUnkownRunningRepairRuns(Collection runningRepairRuns) throws ReaperException { - for (RepairRun repairRun : runningRepairRuns) { - if (!repairRunners.containsKey(repairRun.getId())) { - LOG.info("Restarting run id {} that has no runner", repairRun.getId()); - // it may be that this repair is already "running" actively on other reaper instances - // nonetheless we need to make it actively running on this reaper instance as well - // so to help in running the queued segments - startRepairRun(repairRun); + try { + repairRunnersLock.lock(); + for (RepairRun repairRun : runningRepairRuns) { + if (!repairRunners.containsKey(repairRun.getId())) { + LOG.info("Restarting run id {} that has no runner", repairRun.getId()); + // it may be that this repair is already "running" actively on other reaper instances + // nonetheless we need to make it actively running on this reaper instance as well + // so to help in running the queued segments + startRepairRun(repairRun); + } } + } finally { + repairRunnersLock.unlock(); } } private void abortAllRunningSegmentsInKnownPausedRepairRuns(Collection pausedRepairRuns) { - pausedRepairRuns - .stream() - .filter((pausedRepairRun) -> repairRunners.containsKey(pausedRepairRun.getId())) - .forEach((pausedRepairRun) -> { - // Abort all running segments for paused repair runs - Collection runningSegments - = context.storage.getSegmentsWithState(pausedRepairRun.getId(), RepairSegment.State.RUNNING); + try { + repairRunnersLock.lock(); - abortSegments(runningSegments, pausedRepairRun, false, false); - }); + pausedRepairRuns + .stream() + .filter((pausedRepairRun) -> repairRunners.containsKey(pausedRepairRun.getId())) + .forEach((pausedRepairRun) -> { + // Abort all running segments for paused repair runs + Collection runningSegments + = context.storage.getSegmentsWithState(pausedRepairRun.getId(), RepairSegment.State.RUNNING); + + abortSegments(runningSegments, pausedRepairRun, false, false); + }); + } finally { + repairRunnersLock.unlock(); + } } private void resumeUnknownPausedRepairRuns(Collection pausedRepairRuns) { - pausedRepairRuns - .stream() - .filter((pausedRepairRun) -> (!repairRunners.containsKey(pausedRepairRun.getId()))) - // add "paused" repair run to this reaper instance, so it can be visualised in UI - .forEachOrdered((pausedRepairRun) -> startRunner(pausedRepairRun.getId())); + try { + repairRunnersLock.lock(); + + pausedRepairRuns + .stream() + .filter((pausedRepairRun) -> (!repairRunners.containsKey(pausedRepairRun.getId()))) + // add "paused" repair run to this reaper instance, so it can be visualised in UI + .forEachOrdered((pausedRepairRun) -> startRunner(pausedRepairRun.getId())); + } finally { + repairRunnersLock.unlock(); + } } private void abortSegmentsWithNoLeader(RepairRun repairRun, Collection runningSegments) { @@ -160,21 +180,25 @@ private void abortSegmentsWithNoLeader(RepairRun repairRun, Collection seg.getId()).collect(Collectors.toList())); } - - if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) { - // When multiple Reapers are in use, we can get stuck segments when one instance is rebooted - // Any segment in RUNNING state but with no leader should be killed - List leaders = context.storage instanceof IDistributedStorage - ? ((IDistributedStorage) context.storage).getLeaders() - : Collections.emptyList(); - - Collection orphanedSegments = runningSegments - .stream() - .filter(segment -> !leaders.contains(segment.getId()) && !leaders.contains(segment.getRunId())) - .collect(Collectors.toSet()); - - LOG.debug("No leader on the following segments : {}", orphanedSegments); - abortSegments(orphanedSegments, repairRun, false, true); + try { + repairRunnersLock.lock(); + if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) { + // When multiple Reapers are in use, we can get stuck segments when one instance is rebooted + // Any segment in RUNNING state but with no leader should be killed + List leaders = context.storage instanceof IDistributedStorage + ? ((IDistributedStorage) context.storage).getLeaders() + : Collections.emptyList(); + + Collection orphanedSegments = runningSegments + .stream() + .filter(segment -> !leaders.contains(segment.getId()) && !leaders.contains(segment.getRunId())) + .collect(Collectors.toSet()); + + LOG.debug("No leader on the following segments : {}", orphanedSegments); + abortSegments(orphanedSegments, repairRun, false, true); + } + } finally { + repairRunnersLock.unlock(); } } @@ -269,8 +293,6 @@ public RepairRun startRepairRun(RepairRun runToBeStarted) throws ReaperException return updatedRun; } case RUNNING: - Preconditions.checkState( - !repairRunners.containsKey(runId), "trying to re-trigger run that is already running, with id " + runId); LOG.info("re-trigger a running run after restart, with id {}", runId); startRunner(runId); return runToBeStarted; @@ -296,8 +318,14 @@ public RepairRun updateRepairRunIntensity(RepairRun repairRun, Double intensity) return updatedRun; } - private synchronized void startRunner(UUID runId) { - if (!repairRunners.containsKey(runId)) { + private void startRunner(UUID runId) { + try { + repairRunnersLock.lock(); + + Preconditions.checkState( + !repairRunners.containsKey(runId), + "there is already a repair runner for run with id " + runId + ". This should not happen."); + LOG.info("scheduling repair for repair run #{}", runId); try { RepairRunner newRunner = new RepairRunner(context, runId); @@ -306,10 +334,8 @@ private synchronized void startRunner(UUID runId) { } catch (ReaperException e) { LOG.warn("Failed to schedule repair for repair run #" + runId, e); } - } else { - LOG.error( - "there is already a repair runner for run with id {}, so not starting new runner. This should not happen.", - runId); + } finally { + repairRunnersLock.unlock(); } } @@ -347,7 +373,12 @@ ListenableFuture submitSegment(SegmentRunner runner) { } void removeRunner(RepairRunner runner) { - repairRunners.remove(runner.getRepairRunId()); + try { + repairRunnersLock.lock(); + repairRunners.remove(runner.getRepairRunId()); + } finally { + repairRunnersLock.unlock(); + } } private static boolean takeLead(AppContext context, UUID leaderElectionId) {