Skip to content

Commit

Permalink
Merge pull request #95 from spotify/Bj0rnen/FixZombieRepairs
Browse files Browse the repository at this point in the history
Syncronize all get&update operations of RepairRun objects
  • Loading branch information
varjoranta committed Apr 21, 2015
2 parents ec8622b + bb13ed3 commit e8ff457
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 69 deletions.
4 changes: 4 additions & 0 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ public enum RunState {
public boolean isActive() {
return this == RUNNING || this == PAUSED;
}

public boolean isTerminated() {
return this == DONE || this == ERROR || this == ABORTED || this == DELETED;
}
}

public static class Builder {
Expand Down
110 changes: 68 additions & 42 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.core.RepairUnit;

import org.apache.cassandra.repair.RepairParallelism;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -132,9 +134,10 @@ public void run() {

Thread.currentThread().setName(clusterName + ":" + repairRunId);

Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
try {
if (!repairRun.isPresent()) {
Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
if ((!repairRun.isPresent() || repairRun.get().getRunState().isTerminated()) &&
context.repairManager.repairRunners.containsKey(repairRunId)) {
// this might happen if a run is deleted while paused etc.
LOG.warn("RepairRun \"" + repairRunId + "\" does not exist. Killing "
+ "RepairRunner for this run instance.");
Expand All @@ -153,25 +156,24 @@ public void run() {
case PAUSED:
context.repairManager.scheduleRetry(this);
break;
case DONE:
// We're done. Let go of thread.
context.repairManager.removeRunner(this);
break;
}
} catch (ReaperException | RuntimeException e) {
LOG.error("RepairRun FAILURE");
LOG.error(e.toString());
LOG.error(Arrays.toString(e.getStackTrace()));
e.printStackTrace();
if (repairRun.isPresent()) {
context.storage.updateRepairRun(repairRun.get()
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("Exception: %s", e.getMessage()))
.endTime(DateTime.now())
.build(repairRunId));
synchronized (this) {
Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
if (repairRun.isPresent()) {
context.storage.updateRepairRun(repairRun.get()
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("Exception: %s", e.getMessage()))
.endTime(DateTime.now())
.build(repairRunId));
}
context.repairManager.removeRunner(this);
}
context.repairManager.removeRunner(this);
}
}

Expand All @@ -180,13 +182,12 @@ public void run() {
*/
private void start() throws ReaperException {
LOG.info("Repairs for repair run #{} starting", repairRunId);
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
boolean success = context.storage.updateRepairRun(repairRun.with()
.runState(RepairRun.RunState.RUNNING)
.startTime(DateTime.now())
.build(repairRun.getId()));
if (!success) {
LOG.error("failed updating repair run " + repairRun.getId());
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
context.storage.updateRepairRun(repairRun.with()
.runState(RepairRun.RunState.RUNNING)
.startTime(DateTime.now())
.build(repairRun.getId()));
}
startNextSegment();
}
Expand All @@ -196,14 +197,14 @@ private void start() throws ReaperException {
*/
private void end() {
LOG.info("Repairs for repair run #{} done", repairRunId);
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
boolean success = context.storage.updateRepairRun(repairRun.with()
.runState(RepairRun.RunState.DONE)
.endTime(DateTime.now())
.lastEvent("All done")
.build(repairRun.getId()));
if (!success) {
LOG.error("failed updating repair run " + repairRun.getId());
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
context.storage.updateRepairRun(repairRun.with()
.runState(RepairRun.RunState.DONE)
.endTime(DateTime.now())
.lastEvent("All done")
.build(repairRun.getId()));
context.repairManager.removeRunner(this);
}
}

Expand Down Expand Up @@ -255,10 +256,19 @@ private void startNextSegment() throws ReaperException {
* @param tokenRange token range of the segment to repair.
*/
private void repairSegment(final int rangeIndex, final long segmentId, RingRange tokenRange) throws ReaperException {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
RepairUnit repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId()).get();
final long unitId;
final double intensity;
final RepairParallelism validationParallelism;
{
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
unitId = repairRun.getRepairUnitId();
intensity = repairRun.getIntensity();
validationParallelism = repairRun.getRepairParallelism();
}

RepairUnit repairUnit = context.storage.getRepairUnit(unitId).get();
String keyspace = repairUnit.getKeyspaceName();
LOG.debug("preparing to repair segment {} on run with id {}", segmentId, repairRun.getId());
LOG.debug("preparing to repair segment {} on run with id {}", segmentId, repairRunId);

if (jmxConnection == null || !jmxConnection.isConnectionAlive()) {
try {
Expand All @@ -278,20 +288,22 @@ private void repairSegment(final int rangeIndex, final long segmentId, RingRange
List<String> potentialCoordinators = jmxConnection.tokenRangeToEndpoint(keyspace, tokenRange);
if (potentialCoordinators.isEmpty()) {
// This segment has a faulty token range. Abort the entire repair run.
boolean success = context.storage.updateRepairRun(repairRun
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", tokenRange.toString()))
.endTime(DateTime.now())
.build(repairRun.getId()));
if (!success) {
LOG.error("failed updating repair run " + repairRun.getId());
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
context.storage.updateRepairRun(repairRun
.with()
.runState(RepairRun.RunState.ERROR)
.lastEvent(String.format("No coordinators for range %s", tokenRange.toString()))
.endTime(DateTime.now())
.build(repairRunId));
context.repairManager.removeRunner(this);
}
return;
}

SegmentRunner segmentRunner = new SegmentRunner(context, segmentId, potentialCoordinators,
context.repairManager.getRepairTimeoutMillis(), repairRun.getIntensity(), clusterName);
context.repairManager.getRepairTimeoutMillis(), intensity, validationParallelism,
clusterName, this);

ListenableFuture<?> segmentResult = context.repairManager.submitSegment(segmentRunner);
Futures.addCallback(segmentResult, new FutureCallback<Object>() {
Expand All @@ -312,7 +324,7 @@ private void handleResult(long segmentId) {
RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
RepairSegment.State state = segment.getState();
LOG.debug("In repair run #{}, triggerRepair on segment {} ended with state {}",
repairRunId, segmentId, state);
repairRunId, segmentId, state);
switch (state) {
case NOT_STARTED:
// Unsuccessful repair
Expand All @@ -331,4 +343,18 @@ private void handleResult(long segmentId) {
throw new RuntimeException(msg);
}
}

public void updateLastEvent(String newEvent) {
synchronized (this) {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
if (repairRun.getRunState().isTerminated()) {
LOG.warn("Will not update lastEvent of run that has already terminated. The message was: "
+ "\"{}\"", newEvent);
} else {
context.storage.updateRepairRun(repairRun.with()
.lastEvent(newEvent)
.build(repairRunId));
}
}
}
}
42 changes: 19 additions & 23 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
import com.spotify.reaper.cassandra.RepairStatusHandler;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.core.RepairUnit;

import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.SimpleCondition;
import org.joda.time.DateTime;
Expand All @@ -48,26 +48,26 @@ public final class SegmentRunner implements RepairStatusHandler, Runnable {
private final Collection<String> potentialCoordinators;
private final long timeoutMillis;
private final double intensity;
private String clusterName;
private final RepairParallelism validationParallelism;
private final String clusterName;
private final RepairRunner repairRunner;
private int commandId;

// Caching all active SegmentRunners.
@VisibleForTesting
public static Map<Long, SegmentRunner> segmentRunners = Maps.newConcurrentMap();

// private SegmentRunner(AppContext context, long segmentId) {
// this.context = context;
// this.segmentId = segmentId;
// }

public SegmentRunner(AppContext context, long segmentId, Collection<String> potentialCoordinators,
long timeoutMillis, double intensity, String clusterName) throws ReaperException {
long timeoutMillis, double intensity, RepairParallelism validationParallelism, String clusterName, RepairRunner repairRunner)
throws ReaperException {
this.context = context;
this.segmentId = segmentId;
this.potentialCoordinators = potentialCoordinators;
this.timeoutMillis = timeoutMillis;
this.intensity = intensity;
this.validationParallelism = validationParallelism;
this.clusterName = clusterName;
this.repairRunner = repairRunner;
}

@Override
Expand Down Expand Up @@ -105,7 +105,6 @@ public static void abort(AppContext context, RepairSegment segment, JmxProxy jmx

private void runRepair() {
final RepairSegment segment = context.storage.getRepairSegment(segmentId).get();
final RepairRun repairRun = context.storage.getRepairRun(segment.getRunId()).get();
try (JmxProxy coordinator = context.jmxConnectionFactory
.connectAny(Optional.<RepairStatusHandler>of(this), potentialCoordinators)) {

Expand All @@ -118,15 +117,14 @@ private void runRepair() {
RepairUnit repairUnit = context.storage.getRepairUnit(segment.getRepairUnitId()).get();
String keyspace = repairUnit.getKeyspaceName();

if (!canRepair(segment, keyspace, coordinator, repairRun)) {
if (!canRepair(segment, keyspace, coordinator)) {
postpone(segment);
return;
}

synchronized (condition) {
commandId = coordinator.triggerRepair(segment.getStartToken(), segment.getEndToken(),
keyspace, repairRun.getRepairParallelism(),
repairUnit.getColumnFamilies());
keyspace, validationParallelism, repairUnit.getColumnFamilies());

if (commandId == 0) {
// From cassandra source in "forceRepairAsync":
Expand All @@ -148,8 +146,7 @@ private void runRepair() {
.build(segmentId));
String eventMsg = String.format("Triggered repair of segment %d via host %s",
segment.getId(), coordinator.getHost());
context.storage.updateRepairRun(
repairRun.with().lastEvent(eventMsg).build(repairRun.getId()));
repairRunner.updateLastEvent(eventMsg);
LOG.info("Repair for segment {} started, status wait will timeout in {} millis", segmentId,
timeoutMillis);
try {
Expand All @@ -176,14 +173,13 @@ private void runRepair() {
}
} catch (ReaperException e) {
LOG.warn("Failed to connect to a coordinator node for segment {}", segmentId);
String msg = String.format("Postponed because couldn't any of the coordinators");
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
String msg = String.format("Postponed a segment because no coordinator was reachable");
repairRunner.updateLastEvent(msg);
postpone(segment);
}
}

boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
RepairRun repairRun) {
boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator) {
Collection<String> allHosts;
try {
// when hosts are coming up or going down, this method can throw an
Expand All @@ -192,7 +188,7 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
} catch (RuntimeException e) {
LOG.warn("SegmentRunner couldn't get token ranges from coordinator: ", e);
String msg = String.format("SegmentRunner couldn't get token ranges from coordinator");
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
repairRunner.updateLastEvent(msg);
return false;
}

Expand All @@ -207,27 +203,27 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
hostProxy.getHost());
String msg = String.format("Postponed due to pending compactions (%d)",
pendingCompactions);
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
repairRunner.updateLastEvent(msg);
return false;
}
if (hostProxy.isRepairRunning()) {
LOG.warn("SegmentRunner declined to repair segment {} because one of the hosts ({}) was "
+ "already involved in a repair", segmentId, hostProxy.getHost());
String msg = String.format("Postponed due to affected hosts already doing repairs");
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
repairRunner.updateLastEvent(msg);
return false;
}
} catch (ReaperException e) {
LOG.warn("SegmentRunner declined to repair segment {} because one of the hosts ({}) could "
+ "not be connected with", segmentId, hostName);
String msg = String.format("Postponed due to inability to connect host %s", hostName);
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
repairRunner.updateLastEvent(msg);
return false;
} catch (RuntimeException e) {
LOG.warn("SegmentRunner declined to repair segment {} because of an error collecting "
+ "information from one of the hosts ({}): {}", segmentId, hostName, e);
String msg = String.format("Postponed due to inability to collect information from host %s", hostName);
context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId()));
repairRunner.updateLastEvent(msg);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.core.RepairUnit;
import com.spotify.reaper.service.RepairRunner;
import com.spotify.reaper.service.RingRange;
import com.spotify.reaper.service.SegmentRunner;
import com.spotify.reaper.storage.IStorage;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void timeoutTest() throws InterruptedException, ReaperException, Executio
new RepairUnit.Builder("reaper", "reaper", Sets.newHashSet("reaper")));
RepairRun run = context.storage.addRepairRun(
new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1,
RepairParallelism.PARALLEL));
RepairParallelism.PARALLEL));
context.storage.addRepairSegments(Collections.singleton(
new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO),
cf.getId())), run.getId());
Expand Down Expand Up @@ -110,7 +111,9 @@ public void run() {
return jmx;
}
};
SegmentRunner sr = new SegmentRunner(context, segmentId, Collections.singleton(""), 100, 0.5, "reaper");
RepairRunner rr = mock(RepairRunner.class);
SegmentRunner sr = new SegmentRunner(context, segmentId, Collections.singleton(""), 100, 0.5,
RepairParallelism.PARALLEL, "reaper", rr);
sr.run();

future.getValue().get();
Expand Down Expand Up @@ -182,7 +185,9 @@ public void run() {
return jmx;
}
};
SegmentRunner sr = new SegmentRunner(context, segmentId, Collections.singleton(""), 1000, 0.5, "reaper");
RepairRunner rr = mock(RepairRunner.class);
SegmentRunner sr = new SegmentRunner(context, segmentId, Collections.singleton(""), 1000, 0.5,
RepairParallelism.PARALLEL, "reaper", rr);
sr.run();

future.getValue().get();
Expand Down Expand Up @@ -252,7 +257,9 @@ public void run() {
return jmx;
}
};
SegmentRunner sr = new SegmentRunner(context, segmentId, Collections.singleton(""), 1000, 0.5, "reaper");
RepairRunner rr = mock(RepairRunner.class);
SegmentRunner sr = new SegmentRunner(context, segmentId, Collections.singleton(""), 1000, 0.5,
RepairParallelism.PARALLEL, "reaper", rr);
sr.run();

future.getValue().get();
Expand Down

0 comments on commit e8ff457

Please sign in to comment.