Skip to content

Commit

Permalink
For Cassandra storage minimise the number of tombstones (NULL entries…
Browse files Browse the repository at this point in the history
…) in the repair_run table.

Remove RepairSegment.repairCommandId as no one was using/storing it.

Make explicit which fields in RepairSegment can be null and be nulled.
Hide the RepairSegment.Builder constructor.

Re-read segments once leader-election is done, ensuring we're not writing back stale data

Add Preconditions and asserts in ensuring lifecycle state is correct and null fields only exist as/when appropriate.
Preconditions are used when fast and providing fail-fast design. Asserts are used where expensive and/or are secondary checks.

ref:
 - #240
 - #244
  • Loading branch information
michaelsembwever committed Oct 19, 2017
1 parent 22afbdb commit e717ad8
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 107 deletions.
52 changes: 33 additions & 19 deletions src/server/src/main/java/io/cassandrareaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.math.BigInteger;
import java.util.UUID;
import javax.annotation.Nullable;

import com.google.common.base.Preconditions;
import org.joda.time.DateTime;

public final class RepairSegment {
Expand All @@ -30,23 +32,25 @@ public final class RepairSegment {
private final int failCount;
private final State state;
private final String coordinatorHost;
private final Integer repairCommandId; // received when triggering repair in Cassandra
private final DateTime startTime;
private final DateTime endTime;

private RepairSegment(Builder builder, UUID id) {
private RepairSegment(Builder builder, @Nullable UUID id) {
this.id = id;
this.runId = builder.runId;
this.repairUnitId = builder.repairUnitId;
this.tokenRange = builder.tokenRange;
this.failCount = builder.failCount;
this.state = builder.state;
this.coordinatorHost = builder.coordinatorHost;
this.repairCommandId = builder.repairCommandId;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
}

public static Builder builder(RingRange tokenRange, UUID repairUnitId) {
return new Builder(tokenRange, repairUnitId);
}

public UUID getId() {
return id;
}
Expand Down Expand Up @@ -79,18 +83,17 @@ public RepairSegment.State getState() {
return state;
}

@Nullable
public String getCoordinatorHost() {
return coordinatorHost;
}

public Integer getRepairCommandId() {
return repairCommandId;
}

@Nullable
public DateTime getStartTime() {
return startTime;
}

@Nullable
public DateTime getEndTime() {
return endTime;
}
Expand All @@ -105,19 +108,20 @@ public enum State {
DONE
}

public static class Builder {
public static final class Builder {

public final RingRange tokenRange;
private final UUID repairUnitId;
private UUID runId;
private int failCount;
private State state;
private String coordinatorHost;
private Integer repairCommandId;
private DateTime startTime;
private DateTime endTime;

public Builder(RingRange tokenRange, UUID repairUnitId) {
private Builder(RingRange tokenRange, UUID repairUnitId) {
Preconditions.checkNotNull(tokenRange);
Preconditions.checkNotNull(repairUnitId);
this.repairUnitId = repairUnitId;
this.tokenRange = tokenRange;
this.failCount = 0;
Expand All @@ -131,12 +135,12 @@ private Builder(RepairSegment original) {
failCount = original.failCount;
state = original.state;
coordinatorHost = original.coordinatorHost;
repairCommandId = original.repairCommandId;
startTime = original.startTime;
endTime = original.endTime;
}

public Builder withRunId(UUID runId) {
Preconditions.checkNotNull(runId);
this.runId = runId;
return this;
}
Expand All @@ -147,32 +151,42 @@ public Builder failCount(int failCount) {
}

public Builder state(State state) {
Preconditions.checkNotNull(state);
this.state = state;
return this;
}

public Builder coordinatorHost(String coordinatorHost) {
public Builder coordinatorHost(@Nullable String coordinatorHost) {
this.coordinatorHost = coordinatorHost;
return this;
}

public Builder repairCommandId(Integer repairCommandId) {
this.repairCommandId = repairCommandId;
return this;
}
public Builder startTime(@Nullable DateTime startTime) {
Preconditions.checkState(
null != startTime || null == endTime,
"unsetting startTime only permitted if endTime unset");

public Builder startTime(DateTime startTime) {
this.startTime = startTime;
return this;
}

public Builder endTime(DateTime endTime) {
Preconditions.checkNotNull(endTime);
this.endTime = endTime;
return this;
}

public RepairSegment build(UUID id) {
return new RepairSegment(this, id);
public RepairSegment build(@Nullable UUID segmentId) {
// a null segmentId is a special case where the storage uses a sequence for it
Preconditions.checkNotNull(runId);
Preconditions.checkState(null != startTime || null == endTime, "if endTime is set, so must startTime be set");
Preconditions.checkState(null == endTime || State.DONE == state, "endTime can only be set if segment is DONE");

Preconditions.checkState(
null != startTime || State.NOT_STARTED == state,
"startTime can only be unset if segment is NOT_STARTED");

return new RepairSegment(this, segmentId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private static List<RepairSegment.Builder> createRepairSegments(
RepairUnit repairUnit) {

List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();
tokenSegments.forEach(range -> repairSegmentBuilders.add(new RepairSegment.Builder(range, repairUnit.getId())));
tokenSegments.forEach(range -> repairSegmentBuilders.add(RepairSegment.builder(range, repairUnit.getId())));
return repairSegmentBuilders;
}

Expand All @@ -260,7 +260,7 @@ private static List<RepairSegment.Builder> createRepairSegmentsForIncrementalRep
.forEach(
range
-> repairSegmentBuilders.add(
new RepairSegment.Builder(range.getValue(), repairUnit.getId()).coordinatorHost(range.getKey())));
RepairSegment.builder(range.getValue(), repairUnit.getId()).coordinatorHost(range.getKey())));

return repairSegmentBuilders;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,11 @@ public void resumeRunningRepairRuns(AppContext context) throws ReaperException {
}

private void abortSegmentsWithNoLeader(
AppContext context, RepairRun repairRun, Collection<RepairSegment> runningSegments) {
if (context.storage instanceof IDistributedStorage
|| !repairRunners.containsKey(repairRun.getId())) {
AppContext context,
RepairRun repairRun,
Collection<RepairSegment> runningSegments) {

if (context.storage instanceof IDistributedStorage || !repairRunners.containsKey(repairRun.getId())) {
// When multiple Reapers are in use, we can get stuck segments when one instance is rebooted
// Any segment in RUNNING state but with no leader should be killed
List<UUID> activeLeaders =
Expand All @@ -141,20 +143,24 @@ public void abortSegments(
for (RepairSegment segment : runningSegments) {
UUID leaderElectionId = repairUnit.getIncrementalRepair() ? repairRun.getId() : segment.getId();
if (takeLead(context, leaderElectionId) || renewLead(context, leaderElectionId)) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(
segment.getCoordinatorHost(), context.config.getJmxConnectionTimeoutInSeconds())) {

SegmentRunner.abort(context, segment, jmxProxy);
} catch (ReaperException e) {
LOG.debug(
"Tried to abort repair on segment {} marked as RUNNING, "
+ "but the host was down (so abortion won't be needed)",
segment.getId(),
e);
} finally {
// if someone else does hold the lease, ie renewLead(..) was true,
// then their writes to repair_run table and any call to releaseLead(..) will throw an exception
releaseLead(context, leaderElectionId);
// refresh segment once we're inside leader-election
segment = context.storage.getRepairSegment(repairRun.getId(), segment.getId()).get();
if (RepairSegment.State.RUNNING == segment.getState()) {
try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(
segment.getCoordinatorHost(), context.config.getJmxConnectionTimeoutInSeconds())) {

SegmentRunner.abort(context, segment, jmxProxy);
} catch (ReaperException e) {
LOG.debug(
"Tried to abort repair on segment {} marked as RUNNING, "
+ "but the host was down (so abortion won't be needed)",
segment.getId(),
e);
} finally {
// if someone else does hold the lease, ie renewLead(..) was true,
// then their writes to repair_run table and any call to releaseLead(..) will throw an exception
releaseLead(context, leaderElectionId);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ public SegmentRunner(

@Override
public void run() {
final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId);
if (takeLead()) {
try {
if (runRepair()) {
Expand All @@ -147,7 +145,7 @@ public void run() {
}
}

public static void postpone(AppContext context, RepairSegment segment, Optional<RepairUnit> repairUnit) {
private static void postpone(AppContext context, RepairSegment segment, Optional<RepairUnit> repairUnit) {
LOG.info("Postponing segment {}", segment.getId());
try {
context.storage.updateRepairSegment(
Expand All @@ -158,7 +156,6 @@ public static void postpone(AppContext context, RepairSegment segment, Optional<
repairUnit.isPresent() && repairUnit.get().getIncrementalRepair()
? segment.getCoordinatorHost()
: null) // set coordinator host to null only for full repairs
.repairCommandId(null)
.startTime(null)
.failCount(segment.getFailCount() + 1)
.build(segment.getId()));
Expand Down Expand Up @@ -211,6 +208,7 @@ private static long getOpenFilesAmount() {
private boolean runRepair() {
LOG.debug("Run repair for segment #{}", segmentId);
final RepairSegment segment = context.storage.getRepairSegment(repairRunner.getRepairRunId(), segmentId).get();
Thread.currentThread().setName(clusterName + ":" + segment.getRunId() + ":" + segmentId);

try (Timer.Context cxt = context.metricRegistry.timer(metricNameForRunRepair(segment)).time();
JmxProxy coordinator = context.jmxConnectionFactory.connectAny(
Expand Down Expand Up @@ -292,8 +290,7 @@ protected Set<String> initialize() {
}

long timeout = repairUnit.getIncrementalRepair() ? timeoutMillis * MAX_TIMEOUT_EXTENSIONS : timeoutMillis;
context.storage.updateRepairSegment(
segment.with().coordinatorHost(coordinator.getHost()).repairCommandId(commandId).build(segmentId));
context.storage.updateRepairSegment(segment.with().coordinatorHost(coordinator.getHost()).build(segmentId));
String eventMsg
= String.format("Triggered repair of segment %s via host %s", segment.getId(), coordinator.getHost());

Expand Down
Loading

0 comments on commit e717ad8

Please sign in to comment.