From 02bbf297bdf6b0dfe14b4132c2ccaa54c425a00f Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Tue, 20 Jan 2015 20:19:21 +0100 Subject: [PATCH] Add endpoints for starting, pausing and resuming a repair run. Because we're just modifying an object, eshr said this should be done using PUT method. This caused a bit of rework in RepairRunResource. --- README.md | 5 + bin/spreaper | 40 +++++++- .../com/spotify/reaper/core/RepairRun.java | 2 +- .../reaper/resources/RepairRunResource.java | 98 ++++++++++++++++--- .../spotify/reaper/service/RepairRunner.java | 1 - .../resources/RepairRunResourceTest.java | 79 +++++++++++++-- 6 files changed, 195 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 8e90792c4..8d4c78100 100644 --- a/README.md +++ b/README.md @@ -149,3 +149,8 @@ REST API * POST /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource) * Expected query parameters: *None* * Triggers a repair run identified by the "id" path parameter. + +* PUT /repair_run/{id} (com.spotify.reaper.resources.RepairRunResource) + * Expected query parameters: + * *state*: new value for the state of the repair run, e.g. "PAUSED" + * Pauses a repair run identified by the "id" path parameter. diff --git a/bin/spreaper b/bin/spreaper index 828a4fd56..7a8d8e13d 100644 --- a/bin/spreaper +++ b/bin/spreaper @@ -52,6 +52,8 @@ class ReaperCaller(object): r = requests.get(the_url, params=params) elif http_method == 'POST': r = requests.post(the_url, params=params) + elif http_method == 'PUT': + r = requests.put(the_url, params=params) else: assert False, "invalid HTTP method: {0}".format(http_method) log.info("HTTP %s return code %s with content of length %s", @@ -67,6 +69,10 @@ class ReaperCaller(object): the_url = urlparse.urljoin(self.base_url, endpoint) return self._http_req("POST", the_url, params) + def put(self, endpoint, **params): + the_url = urlparse.urljoin(self.base_url, endpoint) + return self._http_req("PUT", the_url, params) + # === Arguments for commands ============================================================ @@ -139,6 +145,12 @@ def _arguments_for_trigger(parser): parser.add_argument("run_id", help="ID of the run to trigger") +def _arguments_for_pause(parser): + """Arguments needed for pausing a repair + """ + parser.add_argument("run_id", help="ID of the run to pause") + + def _parse_arguments(command, description, usage=None, extra_arguments=None): """Generic argument parsing done by every command """ @@ -166,6 +178,8 @@ Usage: spreaper [] [] add-table Register a table for a previously added cluster repair Create a repair run, optionally triggering it trigger Start a repair run + pause Pause a repair run + resume Resume a previously paused run ping Test connectivity to to the Reaper service """ @@ -270,18 +284,36 @@ class ReaperCLI(object): run_id = json.loads(reply)['id'] print "# run with id={0} created".format(run_id) if args.trigger: - self._trigger_run(reaper, run_id, args) + self._trigger_run(reaper, run_id, args, "RUNNING") def trigger(self): args = _parse_arguments(command='trigger', description='trigger a new or paused repair run', extra_arguments=_arguments_for_trigger) reaper = self.init_reaper(args) - self._trigger_run(reaper, args.id, args) + self._trigger_run(reaper, args.run_id, args, "RUNNING") + + def pause(self): + args = _parse_arguments(command='pause', + description='pause a running repair run', + extra_arguments=_arguments_for_pause) + reaper = self.init_reaper(args) + print "# pausing run with id: {0}".format(args.run_id) + reaper.put("repair_run/{0}".format(args.run_id), state="PAUSED") + print "# run paused" + + def resume(self): + args = _parse_arguments(command='resume', + description='resume a repair run', + extra_arguments=_arguments_for_pause) + reaper = self.init_reaper(args) + print "# resuming a run with id: {0}".format(args.run_id) + reaper.put("repair_run/{0}".format(args.run_id), state="RUNNING") + print "# run resumed" - def _trigger_run(self, reaper, run_id, args): + def _trigger_run(self, reaper, run_id, args, st): print "# triggering run with id: {0}".format(run_id) - reaper.post("repair_run/{0}".format(run_id), owner=args.owner, cause=args.cause) + reaper.put("repair_run/{0}".format(run_id), owner=args.owner, cause=args.cause, state=st) print "# run triggered" diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 7ef7d0cdf..f30a7a52c 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -119,7 +119,7 @@ public Builder(String clusterName, long columnFamilyId, DateTime creationTime, this.intensity = intensity; } - private Builder(RepairRun original) { + public Builder(RepairRun original) { clusterName = original.clusterName; columnFamilyId = original.columnFamilyId; runState = original.runState; diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index 08651ac4e..6796f04a3 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -30,6 +30,7 @@ import com.spotify.reaper.service.SegmentGenerator; import com.spotify.reaper.storage.IStorage; +import org.apache.cassandra.db.Column; import org.joda.time.DateTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +47,7 @@ import javax.ws.rs.GET; import javax.ws.rs.POST; +import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -126,35 +128,99 @@ public Response addRepairRun( } /** - * Triggers an orchestration of a repair run. - * @return 201 if all goes well, 500 in case of errors. + * Modifies a state of the repair run. + * + * Currently supports NOT_STARTED|PAUSED -> RUNNING and RUNNING -> PAUSED. + * @return OK if all goes well NOT_MODIFIED if new state is the same as the old one, + * and 501 (NOT_IMPLEMENTED) if transition is not supported. */ - @POST + @PUT @Path("/{id}") - public Response triggerRepairRun( - @Context UriInfo uriInfo, - @PathParam("id") Long repairRunId) { + public Response modifyRunState( + @Context UriInfo uriInfo, + @PathParam("id") Long repairRunId, + @QueryParam("state") Optional state) { - LOG.info("trigger repair run called with: runId = {}", repairRunId); + LOG.info("pause repair run called with: runId = {}", repairRunId); + + if (!state.isPresent()) { + return Response.status(Response.Status.BAD_REQUEST.getStatusCode()) + .entity("New state not specified") + .build(); + } try { RepairRun repairRun = fetchRepairRun(repairRunId); - // TODO(zvo): this prevents PAUSED runs to resume, will fix later - if (repairRun.getRunState() != RepairRun.RunState.NOT_STARTED) { - throw new ReaperException(String.format("Repair run \"%d\" already running", repairRunId)); + ColumnFamily table = storage.getColumnFamily(repairRun.getColumnFamilyId()); + RepairRun.RunState newState = RepairRun.RunState.valueOf(state.get()); + RepairRun.RunState oldState = repairRun.getRunState(); + + if (oldState == newState) { + return Response.status(Response.Status.NOT_MODIFIED).build(); } - ColumnFamily table = getTable(repairRun.getColumnFamilyId()); - RepairRunner.startNewRepairRun(storage, repairRun.getId(), jmxFactory); - return Response.created(buildRepairRunURI(uriInfo, repairRun)) - .entity(new RepairRunStatus(repairRun, table)) - .build(); + + if (isStarting(oldState, newState)) { + return startRun(repairRun, table); + } + if (isPausing(oldState, newState)) { + return pauseRun(repairRun, table); + } + if (isResuming(oldState, newState)) { + return resumeRun(repairRun, table); + } + String errMsg = String.format("Transition %s->%s not supported.", newState.toString(), + oldState.toString()); + LOG.error(errMsg); + return Response.status(501).entity(errMsg).build(); } catch (ReaperException e) { LOG.error(e.getMessage()); e.printStackTrace(); - return Response.status(500).entity(e.getMessage()).build(); + return Response.status(Response.Status.NOT_FOUND).entity(e.getMessage()).build(); } } + private boolean isStarting(RepairRun.RunState oldState, RepairRun.RunState newState) { + return oldState == RepairRun.RunState.NOT_STARTED && newState == RepairRun.RunState.RUNNING; + } + + private boolean isPausing(RepairRun.RunState oldState, RepairRun.RunState newState) { + return oldState == RepairRun.RunState.RUNNING && newState == RepairRun.RunState.PAUSED; + } + + private boolean isResuming(RepairRun.RunState oldState, RepairRun.RunState newState) { + return oldState == RepairRun.RunState.PAUSED && newState == RepairRun.RunState.RUNNING; + } + + private Response startRun(RepairRun repairRun, ColumnFamily table) { + LOG.info("Starting run {}", repairRun.getId()); + RepairRun updatedRun = new RepairRun.Builder(repairRun) + .runState(RepairRun.RunState.RUNNING) + .startTime(DateTime.now()) + .build(repairRun.getId()); + storage.updateRepairRun(updatedRun); + RepairRunner.startNewRepairRun(storage, repairRun.getId(), jmxFactory); + return Response.status(Response.Status.OK).entity(new RepairRunStatus(repairRun, table)) + .build(); + } + + private Response pauseRun(RepairRun repairRun, ColumnFamily table) { + LOG.info("Pausing run {}", repairRun.getId()); + RepairRun updatedRun = new RepairRun.Builder(repairRun) + .runState(RepairRun.RunState.PAUSED) + .build(repairRun.getId()); + storage.updateRepairRun(updatedRun); + return Response.ok().entity(new RepairRunStatus(repairRun, table)).build(); + } + + private Response resumeRun(RepairRun repairRun, ColumnFamily table) { + LOG.info("Resuming run {}", repairRun.getId()); + RepairRun updatedRun = new RepairRun.Builder(repairRun) + .runState(RepairRun.RunState.RUNNING) + .build(repairRun.getId()); + storage.updateRepairRun(updatedRun); + return Response.ok().entity(new RepairRunStatus(repairRun, table)).build(); + } + /** * @return detailed information about a repair run. */ diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index c2bbfc9aa..6fdbc8660 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -82,7 +82,6 @@ public static void startNewRepairRun(IStorage storage, long repairRunID, } } - private final IStorage storage; private final long repairRunId; private final JmxConnectionFactory jmxConnectionFactory; diff --git a/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java b/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java index f16f24423..78a07cab5 100644 --- a/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java +++ b/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java @@ -152,9 +152,10 @@ public void testTriggerRepairRun() throws Exception { long runId = repairRunStatus.getId(); DateTimeUtils.setCurrentMillisFixed(TIME_START); - response = resource.triggerRepairRun(uriInfo, runId); + Optional newState = Optional.of(RepairRun.RunState.RUNNING.toString()); + response = resource.modifyRunState(uriInfo, runId, newState); - assertEquals(201, response.getStatus()); + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); assertTrue(response.getEntity() instanceof RepairRunStatus); // the thing we get as a reply from the endpoint is a not started run. This is because the // executor didn't have time to start the run @@ -176,8 +177,9 @@ public void testTriggerRepairRun() throws Exception { @Test public void testTriggerNotExistingRun() { RepairRunResource resource = new RepairRunResource(config, storage, factory); - Response response = resource.triggerRepairRun(uriInfo, 42l); - assertEquals(500, response.getStatus()); + Optional newState = Optional.of(RepairRun.RunState.RUNNING.toString()); + Response response = resource.modifyRunState(uriInfo, 42l, newState); + assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus()); assertTrue(response.getEntity() instanceof String); assertTrue(response.getEntity().toString().contains("not found")); } @@ -193,11 +195,11 @@ public void testTriggerAlreadyRunningRun() throws InterruptedException { long runId = repairRunStatus.getId(); DateTimeUtils.setCurrentMillisFixed(TIME_START); - resource.triggerRepairRun(uriInfo, runId); + Optional newState = Optional.of(RepairRun.RunState.RUNNING.toString()); + resource.modifyRunState(uriInfo, runId, newState); Thread.sleep(1000); - response = resource.triggerRepairRun(uriInfo, runId); - assertEquals(500, response.getStatus()); - assertTrue(response.getEntity().toString().contains("already running")); + response = resource.modifyRunState(uriInfo, runId, newState); + assertEquals(Response.Status.NOT_MODIFIED.getStatusCode(), response.getStatus()); } @Test @@ -232,4 +234,65 @@ public void testTriggerRunMissingArgument() { assertTrue(response.getEntity() instanceof String); assertTrue(response.getEntity().toString().contains("argument missing")); } + + @Test + public void testPauseRunningRun() throws InterruptedException { + // first trigger a run + DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); + RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); + RepairRunResource resource = new RepairRunResource(config, storage, factory); + Response response = resource.addRepairRun(uriInfo, CLUSTER_NAME, KEYSPACE, TABLE, OWNER, + Optional.absent()); + RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity(); + long runId = repairRunStatus.getId(); + DateTimeUtils.setCurrentMillisFixed(TIME_START); + Optional newState = Optional.of(RepairRun.RunState.RUNNING.toString()); + resource.modifyRunState(uriInfo, runId, newState); + + Thread.sleep(200); + + // now pause it + response = resource.modifyRunState(uriInfo, runId, + Optional.of(RepairRun.RunState.PAUSED.toString())); + Thread.sleep(200); + + assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); + RepairRun repairRun = storage.getRepairRun(runId); + // the run should be paused + assertEquals(RepairRun.RunState.PAUSED, repairRun.getRunState()); + // but the running segment should be untouched + assertEquals(1, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); + } + + @Test + public void testPauseNotRunningRun() throws InterruptedException { + DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); + RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); + RepairRunResource resource = new RepairRunResource(config, storage, factory); + Response response = resource.addRepairRun(uriInfo, CLUSTER_NAME, KEYSPACE, TABLE, OWNER, + Optional.absent()); + RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity(); + long runId = repairRunStatus.getId(); + + response = resource.modifyRunState(uriInfo, runId, + Optional.of(RepairRun.RunState.PAUSED.toString())); + Thread.sleep(200); + + assertEquals(501, response.getStatus()); + RepairRun repairRun = storage.getRepairRun(runId); + // the run should be paused + assertEquals(RepairRun.RunState.NOT_STARTED, repairRun.getRunState()); + // but the running segment should be untouched + assertEquals(0, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); + } + + @Test + public void testPauseNotExistingRun() throws InterruptedException { + RepairRunResource resource = new RepairRunResource(config, storage, factory); + Response response = resource.modifyRunState(uriInfo, 42l, + Optional.of(RepairRun.RunState.PAUSED.toString())); + assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus()); + assertEquals(0, storage.getAllRunningRepairRuns().size()); + } + }