From 88477016922d4e11506f706faead3b3c27863c40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Sun, 1 Feb 2015 01:27:38 +0100 Subject: [PATCH 01/10] Fix printout with negative duration --- src/main/java/com/spotify/reaper/service/SegmentRunner.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 286552e0f..8f1e9bfb7 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -136,8 +136,9 @@ private void runRepair(Collection potentialCoordinators, } else if (resultingSegment.getState().equals(RepairSegment.State.DONE)) { LOG.debug("Repair segment with id '{}' was repaired in {} seconds", resultingSegment.getId(), - Seconds.secondsBetween(resultingSegment.getEndTime(), - resultingSegment.getStartTime())); + Seconds.secondsBetween( + resultingSegment.getStartTime(), + resultingSegment.getEndTime())); segmentRunners.remove(resultingSegment.getId()); } } From d4532f02fd8e40bcf66c72b3ab188bb16ec72257 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Mon, 2 Feb 2015 10:16:31 +0100 Subject: [PATCH 02/10] fix version compare to support revision strings in version --- .../spotify/reaper/cassandra/JmxProxy.java | 36 +++++++++++++------ .../reaper/cassandra/JmxProxyTest.java | 2 ++ 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index a2ec22bc2..f2f974867 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.math.BigInteger; import java.net.MalformedURLException; +import java.text.NumberFormat; import java.util.AbstractMap; import java.util.Collection; import java.util.HashSet; @@ -385,7 +386,7 @@ public void close() throws ReaperException { } /** - * NOTICE: This code is copied from StackOverflow answer: + * NOTICE: This code is loosely based on StackOverflow answer: * http://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java * * Compares two version strings. @@ -402,24 +403,39 @@ public void close() throws ReaperException { */ public static Integer versionCompare(String str1, String str2) throws ReaperException { try { - str1 = str1.split(" ")[0].replaceAll("[^0-9.]", ""); - str2 = str2.split(" ")[0].replaceAll("[^0-9.]", "");; - String[] vals1 = str1.split("\\."); - String[] vals2 = str2.split("\\."); + str1 = str1.split(" ")[0].replaceAll("[-_~]", "."); + str2 = str2.split(" ")[0].replaceAll("[-_~]", "."); + String[] parts1 = str1.split("\\."); + String[] parts2 = str2.split("\\."); int i = 0; // set index to first non-equal ordinal or length of shortest version string - while (i < vals1.length && i < vals2.length && vals1[i].equals(vals2[i])) { - i++; + while (i < parts1.length && i < parts2.length) { + try { + Integer.parseInt(parts1[i]); + Integer.parseInt(parts2[i]); + } catch (NumberFormatException ex) { + if (i == 0) { + throw ex; // just comparing two non-version strings should fail + } + // first non integer part, so let's just stop comparison here and ignore this part + i--; + break; + } + if (parts1[i].equals(parts2[i])) { + i++; + continue; + } + break; } // compare first non-equal ordinal number - if (i < vals1.length && i < vals2.length) { - int diff = Integer.valueOf(vals1[i]).compareTo(Integer.valueOf(vals2[i])); + if (i < parts1.length && i < parts2.length) { + int diff = Integer.valueOf(parts1[i]).compareTo(Integer.valueOf(parts2[i])); return Integer.signum(diff); } // the strings are equal or one string is a substring of the other // e.g. "1.2.3" = "1.2.3" or "1.2.3" < "1.2.3.4" else { - return Integer.signum(vals1.length - vals2.length); + return Integer.signum(parts1.length - parts2.length); } } catch (Exception ex) { LOG.error("failed comparing strings for versions: '{}' '{}'", str1, str2); diff --git a/src/test/java/com/spotify/reaper/cassandra/JmxProxyTest.java b/src/test/java/com/spotify/reaper/cassandra/JmxProxyTest.java index d09dd49ea..5047e41bb 100644 --- a/src/test/java/com/spotify/reaper/cassandra/JmxProxyTest.java +++ b/src/test/java/com/spotify/reaper/cassandra/JmxProxyTest.java @@ -32,6 +32,8 @@ public void testVersionCompare() throws ReaperException { assertEquals(Integer.valueOf(1), JmxProxy.versionCompare("99.0.0", "9.0")); assertEquals(Integer.valueOf(1), JmxProxy.versionCompare("99.0.10", "99.0.1")); assertEquals(Integer.valueOf(-1), JmxProxy.versionCompare("99.0.10~1", "99.0.10~2")); + assertEquals(Integer.valueOf(0), JmxProxy.versionCompare("1.2.18-1~1.2.15.219.gec18fb4.9", + "1.2.18-1~1.2.15.219.gec17fb4.10")); } } From 6badbd0fe0464298ee4e73636a428eccacf17277 Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Mon, 2 Feb 2015 15:44:36 +0100 Subject: [PATCH 03/10] Fix rounding bug by using float literals --- .../resources/view/RepairRunStatus.java | 8 ++++++- .../resources/view/RepairRunStatusTest.java | 23 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) create mode 100644 src/test/java/com/spotify/reaper/resources/view/RepairRunStatusTest.java diff --git a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java index af7e944b8..f08a45f92 100644 --- a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java +++ b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java @@ -15,6 +15,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairUnit; @@ -87,11 +88,16 @@ public RepairRunStatus(RepairRun repairRun, RepairUnit repairUnit) { this.startTime = repairRun.getStartTime(); this.endTime = repairRun.getEndTime(); this.pauseTime = repairRun.getPauseTime(); - this.intensity = Math.round(repairRun.getIntensity() * 10000) / 10000; + this.intensity = roundIntensity(repairRun.getIntensity()); this.segmentCount = repairUnit.getSegmentCount(); this.repairParallelism = repairUnit.getRepairParallelism().name().toLowerCase(); } + @VisibleForTesting + protected static double roundIntensity(double intensity) { + return Math.round(intensity * 10000f) / 10000f; + } + @JsonProperty("creation_time") public String getCreationTimeISO8601() { if (creationTime == null) { diff --git a/src/test/java/com/spotify/reaper/resources/view/RepairRunStatusTest.java b/src/test/java/com/spotify/reaper/resources/view/RepairRunStatusTest.java new file mode 100644 index 000000000..70be9f7f4 --- /dev/null +++ b/src/test/java/com/spotify/reaper/resources/view/RepairRunStatusTest.java @@ -0,0 +1,23 @@ +package com.spotify.reaper.resources.view; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class RepairRunStatusTest { + + @Test + public void testRoundIntensity() throws Exception { + assertEquals(0.0f, RepairRunStatus.roundIntensity(0.0f), 0.00000f); + assertEquals(0.1f, RepairRunStatus.roundIntensity(0.1f), 0.00001f); + assertEquals(0.2f, RepairRunStatus.roundIntensity(0.2f), 0.00001f); + assertEquals(0.3f, RepairRunStatus.roundIntensity(0.3f), 0.00001f); + assertEquals(0.4f, RepairRunStatus.roundIntensity(0.4f), 0.00001f); + assertEquals(0.5f, RepairRunStatus.roundIntensity(0.5f), 0.00001f); + assertEquals(0.6f, RepairRunStatus.roundIntensity(0.6f), 0.00001f); + assertEquals(0.7f, RepairRunStatus.roundIntensity(0.7f), 0.00001f); + assertEquals(0.8f, RepairRunStatus.roundIntensity(0.8f), 0.00001f); + assertEquals(0.9f, RepairRunStatus.roundIntensity(0.9f), 0.00001f); + assertEquals(1.0f, RepairRunStatus.roundIntensity(1.0f), 0.00001f); + } +} From 53c75f7cf39a23a6b143dde65752aa9910df203a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Mon, 2 Feb 2015 18:43:11 +0100 Subject: [PATCH 04/10] Unset currentlyRunningSegmentId when not running a segment --- src/main/java/com/spotify/reaper/service/RepairRunner.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 5c54b71e3..ffa03be1c 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -121,7 +121,7 @@ public static void startRepairRun(IStorage storage, long repairRunID, @VisibleForTesting public Long getCurrentlyRunningSegmentId() { - return this.currentlyRunningSegmentId; + return currentlyRunningSegmentId; } /** @@ -245,9 +245,10 @@ private void repairSegment(long segmentId, RingRange tokenRange) throws ReaperEx return; } - this.currentlyRunningSegmentId = Long.valueOf(segmentId); + currentlyRunningSegmentId = segmentId; SegmentRunner.triggerRepair(storage, segmentId, potentialCoordinators, repairTimeoutMillis, jmxConnectionFactory); + currentlyRunningSegmentId = null; handleResult(segmentId); } From 25295c47c94e1070a2b593e69fa3157ddb4577de Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Tue, 3 Feb 2015 15:52:36 +0100 Subject: [PATCH 05/10] add acceptance tests, refactor Reaper to use context --- pom.xml | 21 +- src/main/db/reaper_db.sql | 2 +- .../java/com/spotify/reaper/AppContext.java | 15 + .../com/spotify/reaper/ReaperApplication.java | 36 ++- .../cassandra/JmxConnectionFactory.java | 18 +- .../spotify/reaper/cassandra/JmxProxy.java | 10 +- .../reaper/resources/ClusterResource.java | 31 +- .../reaper/resources/ReaperHealthCheck.java | 9 +- .../reaper/resources/RepairRunResource.java | 73 +++-- .../spotify/reaper/service/RepairRunner.java | 109 ++++---- .../com/spotify/reaper/service/RingRange.java | 2 +- .../reaper/service/SegmentGenerator.java | 6 +- .../spotify/reaper/service/SegmentRunner.java | 88 +++--- .../com/spotify/reaper/IntegrationTest.java | 25 -- .../spotify/reaper/acceptance/BasicSteps.java | 59 ++++ .../acceptance/ReaperTestJettyRunner.java | 158 +++++++++++ .../reaper/acceptance/TestContext.java | 7 + .../reaper/service/SegmentRunnerTest.java | 255 ----------------- .../{ => unit}/cassandra/JmxProxyTest.java | 4 +- .../reaper/{ => unit}/core/ClusterTest.java | 5 +- .../resources/ClusterResourceTest.java | 34 +-- .../resources/RepairRunResourceTest.java | 100 +++---- .../{ => unit}/service/RepairRunnerTest.java | 46 +-- .../{ => unit}/service/RingRangeTest.java | 5 +- .../service/SegmentGeneratorTest.java | 26 +- .../unit/service/SegmentRunnerTest.java | 264 ++++++++++++++++++ src/test/resources/cassandra-reaper-at.yaml | 66 +++++ src/test/resources/cassandra-reaper.yaml | 3 +- .../basic_reaper_functionality.feature | 10 + 29 files changed, 909 insertions(+), 578 deletions(-) create mode 100644 src/main/java/com/spotify/reaper/AppContext.java delete mode 100644 src/test/java/com/spotify/reaper/IntegrationTest.java create mode 100644 src/test/java/com/spotify/reaper/acceptance/BasicSteps.java create mode 100644 src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java create mode 100644 src/test/java/com/spotify/reaper/acceptance/TestContext.java delete mode 100644 src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java rename src/test/java/com/spotify/reaper/{ => unit}/cassandra/JmxProxyTest.java (94%) rename src/test/java/com/spotify/reaper/{ => unit}/core/ClusterTest.java (91%) rename src/test/java/com/spotify/reaper/{ => unit}/resources/ClusterResourceTest.java (75%) rename src/test/java/com/spotify/reaper/{ => unit}/resources/RepairRunResourceTest.java (82%) rename src/test/java/com/spotify/reaper/{ => unit}/service/RepairRunnerTest.java (89%) rename src/test/java/com/spotify/reaper/{ => unit}/service/RingRangeTest.java (97%) rename src/test/java/com/spotify/reaper/{ => unit}/service/SegmentGeneratorTest.java (92%) create mode 100644 src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java create mode 100644 src/test/resources/cassandra-reaper-at.yaml create mode 100644 src/test/resources/com.spotify.reaper.acceptance/basic_reaper_functionality.feature diff --git a/pom.xml b/pom.xml index 70a6e18ea..b5cff04ad 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,9 @@ UTF-8 - 0.7.0 + 0.7.1 2.0.12 + 1.1.3 @@ -52,12 +53,30 @@ 4.8.1 test + + io.dropwizard + dropwizard-testing + ${dropwizard.version} + test + org.mockito mockito-all 1.9.5 test + + info.cukes + cucumber-java + ${cucumber.version} + test + + + info.cukes + cucumber-junit + ${cucumber.version} + test + diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index 6c18c4166..7985f6086 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS "cluster" ( "partitioner" TEXT NOT NULL, "seed_hosts" TEXT[] NOT NULL ); - +§ -- Repair unit is basically a keyspace with a set of column families. -- Cassandra supports repairing multiple column families in one go. -- diff --git a/src/main/java/com/spotify/reaper/AppContext.java b/src/main/java/com/spotify/reaper/AppContext.java new file mode 100644 index 000000000..2e9cd2d0b --- /dev/null +++ b/src/main/java/com/spotify/reaper/AppContext.java @@ -0,0 +1,15 @@ +package com.spotify.reaper; + +import com.spotify.reaper.cassandra.JmxConnectionFactory; +import com.spotify.reaper.storage.IStorage; + +/** + * Single class to hold all application global interfacing objects, + * and app global options. + */ +public class AppContext { + + public IStorage storage; + public JmxConnectionFactory jmxConnectionFactory; + public ReaperApplicationConfiguration config; +} diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index ffc52a5d5..d069fb361 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -13,7 +13,8 @@ */ package com.spotify.reaper; -import com.spotify.reaper.cassandra.JmxConnectionFactory; +import com.google.common.annotations.VisibleForTesting; + import com.spotify.reaper.resources.ClusterResource; import com.spotify.reaper.resources.PingResource; import com.spotify.reaper.resources.ReaperHealthCheck; @@ -41,6 +42,21 @@ public class ReaperApplication extends Application bootstrap) { public void run(ReaperApplicationConfiguration config, Environment environment) throws ReaperException { checkConfiguration(config); + context.config = config; addSignalHandlers(); // SIGHUP, etc. @@ -73,14 +90,17 @@ public void run(ReaperApplicationConfiguration config, config.getRepairRunThreadCount(), config.getHangingRepairTimeoutMins(), TimeUnit.MINUTES, 30, TimeUnit.SECONDS); - JmxConnectionFactory jmxConnectionFactory = new JmxConnectionFactory(); - LOG.info("initializing storage of type: {}", config.getStorageType()); - IStorage storage = initializeStorage(config, environment); + if (context.storage == null) { + LOG.info("initializing storage of type: {}", config.getStorageType()); + context.storage = initializeStorage(config, environment); + } else { + LOG.info("storage already given in context, not initializing a new one"); + } LOG.info("creating and registering health checks"); // Notice that health checks are registered under the admin application on /healthcheck - final ReaperHealthCheck healthCheck = new ReaperHealthCheck(storage); + final ReaperHealthCheck healthCheck = new ReaperHealthCheck(context); environment.healthChecks().register("reaper", healthCheck); environment.jersey().register(healthCheck); @@ -88,16 +108,16 @@ public void run(ReaperApplicationConfiguration config, final PingResource pingResource = new PingResource(); environment.jersey().register(pingResource); - final ClusterResource addClusterResource = new ClusterResource(storage, jmxConnectionFactory); + final ClusterResource addClusterResource = new ClusterResource(context); environment.jersey().register(addClusterResource); - final RepairRunResource addRepairRunResource = new RepairRunResource(config, storage); + final RepairRunResource addRepairRunResource = new RepairRunResource(context); environment.jersey().register(addRepairRunResource); LOG.info("Reaper is ready to accept connections"); LOG.info("resuming pending repair runs"); - RepairRunner.resumeRunningRepairRuns(storage, jmxConnectionFactory); + RepairRunner.resumeRunningRepairRuns(context); } private IStorage initializeStorage(ReaperApplicationConfiguration config, diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxConnectionFactory.java b/src/main/java/com/spotify/reaper/cassandra/JmxConnectionFactory.java index 26a6524e4..7ca8442f0 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxConnectionFactory.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxConnectionFactory.java @@ -14,12 +14,12 @@ package com.spotify.reaper.cassandra; import com.google.common.base.Optional; -import com.google.common.collect.Lists; import com.spotify.reaper.ReaperException; import com.spotify.reaper.core.Cluster; import java.util.Collection; +import java.util.Set; public class JmxConnectionFactory { @@ -34,20 +34,18 @@ public final JmxProxy connect(String host) throws ReaperException { public final JmxProxy connectAny(Optional handler, Collection hosts) throws ReaperException { + if (hosts == null || hosts.isEmpty()) { + throw new ReaperException("no hosts given for connectAny"); + } return connect(handler, hosts.iterator().next()); } public final JmxProxy connectAny(Cluster cluster) throws ReaperException { - return connectAny(Optional.absent(), cluster.getSeedHosts()); - } - - public final Collection connectAll(Collection hosts) - throws ReaperException { - Collection connections = Lists.newArrayList(); - for (String host : hosts) { - connections.add(connect(host)); + Set hosts = cluster.getSeedHosts(); + if (hosts == null || hosts.isEmpty()) { + throw new ReaperException("no seeds in cluster with name: " + cluster.getName()); } - return connections; + return connectAny(Optional.absent(), hosts); } } diff --git a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java index f2f974867..090a5c5f8 100644 --- a/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java +++ b/src/main/java/com/spotify/reaper/cassandra/JmxProxy.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.math.BigInteger; import java.net.MalformedURLException; -import java.text.NumberFormat; import java.util.AbstractMap; import java.util.Collection; import java.util.HashSet; @@ -76,8 +75,7 @@ public class JmxProxy implements NotificationListener, AutoCloseable { private JmxProxy(Optional handler, String host, JMXConnector jmxConnector, StorageServiceMBean ssProxy, ObjectName ssMbeanName, - MBeanServerConnection mbeanServer, - CompactionManagerMBean cmProxy) { + MBeanServerConnection mbeanServer, CompactionManagerMBean cmProxy) { this.host = host; this.jmxConnector = jmxConnector; this.ssMbeanName = ssMbeanName; @@ -95,6 +93,7 @@ private JmxProxy(Optional handler, String host, JMXConnecto */ public static JmxProxy connect(Optional handler, String host) throws ReaperException { + assert null != host : "null host given to JmxProxy.connect()"; String[] parts = host.split(":"); if (parts.length == 2) { return connect(handler, parts[0], Integer.valueOf(parts[1])); @@ -164,7 +163,6 @@ public BigInteger apply(String s) { /** * @return all hosts owning a range of tokens */ - @Nullable public List tokenRangeToEndpoint(String keyspace, RingRange tokenRange) { checkNotNull(ssProxy, "Looks like the proxy is not connected"); Set, List>> entries = @@ -176,7 +174,7 @@ public List tokenRangeToEndpoint(String keyspace, RingRange tokenRange) return entry.getValue(); } } - return null; + return Lists.newArrayList(); } /** @@ -417,7 +415,7 @@ public static Integer versionCompare(String str1, String str2) throws ReaperExce if (i == 0) { throw ex; // just comparing two non-version strings should fail } - // first non integer part, so let's just stop comparison here and ignore this part + // first non integer part, so let's just stop comparison here and ignore the rest i--; break; } diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index a007ac870..f4a3e331a 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -16,14 +16,13 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperException; -import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.Cluster; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.resources.view.ClusterStatus; import com.spotify.reaper.resources.view.KeyspaceStatus; -import com.spotify.reaper.storage.IStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,18 +51,16 @@ public class ClusterResource { private static final Logger LOG = LoggerFactory.getLogger(ClusterResource.class); - private final JmxConnectionFactory jmxFactory; - private final IStorage storage; + private final AppContext context; - public ClusterResource(IStorage storage, JmxConnectionFactory jmxFactory) { - this.storage = storage; - this.jmxFactory = jmxFactory; + public ClusterResource(AppContext context) { + this.context = context; } @GET public Response getClusterList() { LOG.info("get cluster list called"); - Collection clusters = storage.getClusters(); + Collection clusters = context.storage.getClusters(); List clusterNames = new ArrayList<>(); for (Cluster cluster : clusters) { clusterNames.add(cluster.getName()); @@ -75,7 +72,7 @@ public Response getClusterList() { @Path("/{cluster_name}") public Response getCluster(@PathParam("cluster_name") String clusterName) { LOG.info("get cluster called with cluster_name: {}", clusterName); - Optional cluster = storage.getCluster(clusterName); + Optional cluster = context.storage.getCluster(clusterName); if (cluster.isPresent()) { return viewCluster(cluster.get(), Optional.absent()); } else { @@ -90,7 +87,7 @@ public Response getCluster(@PathParam("cluster_name") String clusterName, @PathParam("keyspace_name") String keyspaceName) { LOG.info("get cluster/keyspace called with cluster_name: {}, and keyspace_name: {}", clusterName, keyspaceName); - Optional cluster = storage.getCluster(clusterName); + Optional cluster = context.storage.getCluster(clusterName); if (cluster.isPresent()) { return viewKeyspace(cluster.get(), keyspaceName); } else { @@ -116,7 +113,7 @@ public Response addCluster( return Response.status(400) .entity("failed to create cluster with seed host: " + seedHost.get()).build(); } - Optional existingCluster = storage.getCluster(newCluster.getName()); + Optional existingCluster = context.storage.getCluster(newCluster.getName()); if (existingCluster.isPresent()) { LOG.info("cluster already stored with this name: {}", existingCluster); return Response.status(403) @@ -124,7 +121,7 @@ public Response addCluster( .build(); } else { LOG.info("creating new cluster based on given seed host: {}", newCluster); - storage.addCluster(newCluster); + context.storage.addCluster(newCluster); } URI createdURI; @@ -144,7 +141,7 @@ public Cluster createClusterWithSeedHost(String seedHost) throws ReaperException { String clusterName; String partitioner; - try (JmxProxy jmxProxy = jmxFactory.connect(seedHost)) { + try (JmxProxy jmxProxy = context.jmxConnectionFactory.connect(seedHost)) { clusterName = jmxProxy.getClusterName(); partitioner = jmxProxy.getPartitioner(); } catch (ReaperException e) { @@ -158,15 +155,15 @@ public Cluster createClusterWithSeedHost(String seedHost) private Response viewCluster(Cluster cluster, Optional createdURI) { ClusterStatus view = new ClusterStatus(cluster); Collection> runIdTuples = Lists.newArrayList(); - for (Long repairRunId : storage.getRepairRunIdsForCluster(cluster.getName())) { - Optional repairRun = storage.getRepairRun(repairRunId); + for (Long repairRunId : context.storage.getRepairRunIdsForCluster(cluster.getName())) { + Optional repairRun = context.storage.getRepairRun(repairRunId); if (repairRun.isPresent()) { runIdTuples .add(Lists.newArrayList(new Object[]{repairRunId, repairRun.get().getRunState()})); } } view.setRepairRunIds(runIdTuples); - try (JmxProxy jmx = this.jmxFactory.connectAny(cluster)) { + try (JmxProxy jmx = context.jmxConnectionFactory.connectAny(cluster)) { view.setKeyspaces(jmx.getKeyspaces()); } catch (ReaperException e) { e.printStackTrace(); @@ -182,7 +179,7 @@ private Response viewCluster(Cluster cluster, Optional createdURI) { private Response viewKeyspace(Cluster cluster, String keyspaceName) { KeyspaceStatus view = new KeyspaceStatus(cluster); - try (JmxProxy jmx = this.jmxFactory.connectAny(cluster)) { + try (JmxProxy jmx = context.jmxConnectionFactory.connectAny(cluster)) { if (jmx.getKeyspaces().contains(keyspaceName)) { view.setTables(jmx.getTableNamesForKeyspace(keyspaceName)); } else { diff --git a/src/main/java/com/spotify/reaper/resources/ReaperHealthCheck.java b/src/main/java/com/spotify/reaper/resources/ReaperHealthCheck.java index f05de2634..873c4533c 100644 --- a/src/main/java/com/spotify/reaper/resources/ReaperHealthCheck.java +++ b/src/main/java/com/spotify/reaper/resources/ReaperHealthCheck.java @@ -1,5 +1,6 @@ package com.spotify.reaper.resources; +import com.spotify.reaper.AppContext; import com.spotify.reaper.storage.IStorage; /** @@ -7,16 +8,16 @@ */ public class ReaperHealthCheck extends com.codahale.metrics.health.HealthCheck { - private IStorage storage; + private AppContext context; - public ReaperHealthCheck(IStorage storage) { - this.storage = storage; + public ReaperHealthCheck(AppContext context) { + this.context = context; } @Override protected Result check() throws Exception { // Should check some other pre-conditions here for a healthy Reaper instance? - if (storage.isStorageConnected()) { + if (context.storage.isStorageConnected()) { return Result.healthy(); } return Result.unhealthy("storage not connected"); diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index cc6713fa5..2e9e4361e 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -19,10 +19,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperApplication; -import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; -import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.Cluster; import com.spotify.reaper.core.RepairRun; @@ -32,7 +31,6 @@ import com.spotify.reaper.service.RepairRunner; import com.spotify.reaper.service.RingRange; import com.spotify.reaper.service.SegmentGenerator; -import com.spotify.reaper.storage.IStorage; import org.apache.cassandra.repair.RepairParallelism; import org.joda.time.DateTime; @@ -70,22 +68,13 @@ public class RepairRunResource { public static final Splitter COMMA_SEPARATED_LIST_SPLITTER = Splitter.on(',').trimResults(CharMatcher.anyOf(" ()[]\"'")).omitEmptyStrings(); + private static final Logger LOG = LoggerFactory.getLogger(RepairRunResource.class); - private final IStorage storage; - private final ReaperApplicationConfiguration config; - private final JmxConnectionFactory jmxFactory; - - public RepairRunResource(ReaperApplicationConfiguration config, IStorage storage) { - this.config = config; - this.storage = storage; - this.jmxFactory = new JmxConnectionFactory(); - } - public RepairRunResource(ReaperApplicationConfiguration config, IStorage storage, - JmxConnectionFactory jmxFactory) { - this.storage = storage; - this.config = config; - this.jmxFactory = jmxFactory; + private final AppContext context; + + public RepairRunResource(AppContext context) { + this.context = context; } /** @@ -134,14 +123,16 @@ public Response addRepairRun( } } - Optional cluster = storage.getCluster(Cluster.toSymbolicName(clusterName.get())); + Optional + cluster = context.storage.getCluster(Cluster.toSymbolicName(clusterName.get())); if (!cluster.isPresent()) { return Response.status(Response.Status.NOT_FOUND).entity( "No cluster found with name \"" + clusterName.get() + "\", did you register your cluster first?").build(); } - JmxProxy jmxProxy = jmxFactory.connect(cluster.get().getSeedHosts().iterator().next()); + JmxProxy jmxProxy = context.jmxConnectionFactory.connect( + cluster.get().getSeedHosts().iterator().next()); Set knownTables = jmxProxy.getTableNamesForKeyspace(keyspace.get()); if (knownTables.size() == 0) { LOG.debug("no known tables for keyspace {} in cluster {}", keyspace.get(), @@ -165,7 +156,7 @@ public Response addRepairRun( } Optional storedRepairUnit = - storage.getRepairUnit(cluster.get().getName(), keyspace.get(), tableNames); + context.storage.getRepairUnit(cluster.get().getName(), keyspace.get(), tableNames); RepairUnit theRepairUnit; if (storedRepairUnit.isPresent()) { // TODO: should we drop the RepairUnit not to get these issues with existing values? @@ -182,21 +173,21 @@ public Response addRepairRun( cluster.get().getName(), keyspace.get(), tableNames); theRepairUnit = storedRepairUnit.get(); } else { - int segments = config.getSegmentCount(); + int segments = context.config.getSegmentCount(); if (segmentCount.isPresent()) { LOG.debug("using given segment count {} instead of configured value {}", - segmentCount.get(), config.getSegmentCount()); + segmentCount.get(), context.config.getSegmentCount()); segments = segmentCount.get(); } - String repairParallelismStr = config.getRepairParallelism(); + String repairParallelismStr = context.config.getRepairParallelism(); if (repairParallelism.isPresent()) { LOG.debug("using given repair parallelism {} instead of configured value {}", - repairParallelism.get(), config.getRepairParallelism()); + repairParallelism.get(), context.config.getRepairParallelism()); repairParallelismStr = repairParallelism.get(); } LOG.info("create new repair unit for cluster '{}', keyspace '{}', and column families: {}", cluster.get().getName(), keyspace.get(), tableNames); - theRepairUnit = storage.addRepairUnit( + theRepairUnit = context.storage.addRepairUnit( new RepairUnit.Builder(cluster.get().getName(), keyspace.get(), tableNames, segments, RepairParallelism.valueOf(repairParallelismStr.toUpperCase()))); } @@ -232,14 +223,16 @@ public Response modifyRunState( .entity("\"state\" argument missing").build(); } - Optional repairRun = storage.getRepairRun(repairRunId); + Optional repairRun = context.storage.getRepairRun(repairRunId); if (!repairRun.isPresent()) { return Response.status(Response.Status.NOT_FOUND).entity("repair run with id " + repairRunId + " not found") .build(); } - Optional repairUnit = storage.getRepairUnit(repairRun.get().getRepairUnitId()); + Optional + repairUnit = + context.storage.getRepairUnit(repairRun.get().getRepairUnitId()); if (!repairUnit.isPresent()) { String errMsg = "repair unit with id " + repairRun.get().getRepairUnitId() + " not found"; LOG.error(errMsg); @@ -285,10 +278,10 @@ private Response startRun(RepairRun repairRun, RepairUnit repairUnit) { .runState(RepairRun.RunState.RUNNING) .startTime(DateTime.now()) .build(repairRun.getId()); - if (!storage.updateRepairRun(updatedRun)) { + if (!context.storage.updateRepairRun(updatedRun)) { throw new RuntimeException("failed updating repair run " + updatedRun.getId()); } - RepairRunner.startRepairRun(storage, repairRun.getId(), jmxFactory); + RepairRunner.startRepairRun(context, repairRun.getId()); return Response.status(Response.Status.OK).entity(new RepairRunStatus(repairRun, repairUnit)) .build(); } @@ -299,7 +292,7 @@ private Response pauseRun(RepairRun repairRun, RepairUnit repairUnit) { .runState(RepairRun.RunState.PAUSED) .pauseTime(DateTime.now()) .build(repairRun.getId()); - if (!storage.updateRepairRun(updatedRun)) { + if (!context.storage.updateRepairRun(updatedRun)) { throw new RuntimeException("failed updating repair run " + updatedRun.getId()); } return Response.ok().entity(new RepairRunStatus(repairRun, repairUnit)).build(); @@ -311,7 +304,7 @@ private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit) { .runState(RepairRun.RunState.RUNNING) .pauseTime(null) .build(repairRun.getId()); - if (!storage.updateRepairRun(updatedRun)) { + if (!context.storage.updateRepairRun(updatedRun)) { throw new RuntimeException("failed updating repair run " + updatedRun.getId()); } return Response.ok().entity(new RepairRunStatus(repairRun, repairUnit)).build(); @@ -324,7 +317,7 @@ private Response resumeRun(RepairRun repairRun, RepairUnit repairUnit) { @Path("/{id}") public Response getRepairRun(@PathParam("id") Long repairRunId) { LOG.info("get repair_run called with: id = {}", repairRunId); - Optional repairRun = storage.getRepairRun(repairRunId); + Optional repairRun = context.storage.getRepairRun(repairRunId); if (repairRun.isPresent()) { return Response.ok().entity(getRepairRunStatus(repairRun.get())).build(); } else { @@ -340,7 +333,7 @@ public Response getRepairRun(@PathParam("id") Long repairRunId) { @Path("/cluster/{cluster_name}") public Response getRepairRunsForCluster(@PathParam("cluster_name") String clusterName) { LOG.info("get repair run for cluster called with: cluster_name = {}", clusterName); - Collection repairRuns = storage.getRepairRunsForCluster(clusterName); + Collection repairRuns = context.storage.getRepairRunsForCluster(clusterName); Collection repairRunViews = new ArrayList<>(); for (RepairRun repairRun : repairRuns) { repairRunViews.add(getRepairRunStatus(repairRun)); @@ -402,7 +395,7 @@ private List generateSegments(Cluster targetCluster, RepairUnit repai } for (String host : seedHosts) { try { - JmxProxy jmxProxy = jmxFactory.connect(host); + JmxProxy jmxProxy = context.jmxConnectionFactory.connect(host); List tokens = jmxProxy.getTokens(); segments = sg.generateSegments(repairUnit.getSegmentCount(), tokens); jmxProxy.close(); @@ -430,10 +423,10 @@ private RepairRun storeNewRepairRun(Cluster cluster, RepairUnit repairUnit, Optional cause, String owner) throws ReaperException { RepairRun.Builder runBuilder = new RepairRun.Builder(cluster.getName(), repairUnit.getId(), DateTime.now(), - config.getRepairIntensity()); + context.config.getRepairIntensity()); runBuilder.cause(cause.isPresent() ? cause.get() : "no cause specified"); runBuilder.owner(owner); - RepairRun newRepairRun = storage.addRepairRun(runBuilder); + RepairRun newRepairRun = context.storage.addRepairRun(runBuilder); if (newRepairRun == null) { String errMsg = String.format("failed storing repair run for cluster \"%s\", " + "keyspace \"%s\", and column families: %s", @@ -457,11 +450,11 @@ private void storeNewRepairSegments(List tokenSegments, RepairRun rep repairUnit.getId()); repairSegmentBuilders.add(repairSegment); } - storage.addRepairSegments(repairSegmentBuilders, repairRun.getId()); + context.storage.addRepairSegments(repairSegmentBuilders, repairRun.getId()); if (repairUnit.getSegmentCount() != tokenSegments.size()) { LOG.debug("created segment amount differs from expected default {} != {}", repairUnit.getSegmentCount(), tokenSegments.size()); - storage.updateRepairUnit( + context.storage.updateRepairUnit( repairUnit.with().segmentCount(tokenSegments.size()).build(repairUnit.getId())); } } @@ -470,12 +463,12 @@ private void storeNewRepairSegments(List tokenSegments, RepairRun rep * @return only a status of a repair run, not the entire repair run info. */ private RepairRunStatus getRepairRunStatus(RepairRun repairRun) { - Optional repairUnit = storage.getRepairUnit(repairRun.getRepairUnitId()); + Optional repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId()); assert repairUnit.isPresent() : "no repair unit found with id: " + repairRun.getRepairUnitId(); RepairRunStatus repairRunStatus = new RepairRunStatus(repairRun, repairUnit.get()); if (repairRun.getRunState() != RepairRun.RunState.NOT_STARTED) { int segmentsRepaired = - storage.getSegmentAmountForRepairRun(repairRun.getId(), RepairSegment.State.DONE); + context.storage.getSegmentAmountForRepairRun(repairRun.getId(), RepairSegment.State.DONE); repairRunStatus.setSegmentsRepaired(segmentsRepaired); } return repairRunStatus; diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index ffa03be1c..8b5e221c2 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -17,14 +17,13 @@ import com.google.common.base.Optional; import com.google.common.collect.Maps; +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperException; -import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.Cluster; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.core.RepairUnit; -import com.spotify.reaper.storage.IStorage; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -45,9 +44,8 @@ public class RepairRunner implements Runnable { private static ScheduledExecutorService executor = null; private static long repairTimeoutMillis; private static long retryDelayMillis; - private final IStorage storage; + private final AppContext context; private final long repairRunId; - private final JmxConnectionFactory jmxConnectionFactory; private JmxProxy jmxConnection; private Long currentlyRunningSegmentId; @@ -55,16 +53,15 @@ public class RepairRunner implements Runnable { @VisibleForTesting public static Map repairRunners = Maps.newConcurrentMap(); - private RepairRunner(IStorage storage, long repairRunId, - JmxConnectionFactory jmxConnectionFactory) + private RepairRunner(AppContext context, long repairRunId) throws ReaperException { - this.storage = storage; + this.context = context; this.repairRunId = repairRunId; - this.jmxConnectionFactory = jmxConnectionFactory; - Optional repairRun = storage.getRepairRun(repairRunId); + Optional repairRun = context.storage.getRepairRun(repairRunId); assert repairRun.isPresent() : "No RepairRun with ID " + repairRunId + " found from storage"; - jmxConnection = this.jmxConnectionFactory.connectAny( - storage.getCluster(repairRun.get().getClusterName()).get()); + Optional cluster = context.storage.getCluster(repairRun.get().getClusterName()); + assert cluster.isPresent() : "No Cluster with name " + repairRun.get().getClusterName() + + " found from storage"; } public static void initializeThreadPool(int threadAmount, long repairTimeout, @@ -78,39 +75,41 @@ public static void initializeThreadPool(int threadAmount, long repairTimeout, /** * Consult storage to see if any repairs are running, and resume those repair runs. * - * @param storage Reaper's internal storage. + * @param context Reaper's application context. */ - public static void resumeRunningRepairRuns(IStorage storage, - JmxConnectionFactory jmxConnectionFactory) { - for (RepairRun repairRun : storage.getRepairRunsWithState(RepairRun.RunState.RUNNING)) { + public static void resumeRunningRepairRuns(AppContext context) { + Collection running = + context.storage.getRepairRunsWithState(RepairRun.RunState.RUNNING); + for (RepairRun repairRun : running) { Collection runningSegments = - storage.getSegmentsWithState(repairRun.getId(), RepairSegment.State.RUNNING); + context.storage.getSegmentsWithState(repairRun.getId(), RepairSegment.State.RUNNING); for (RepairSegment segment : runningSegments) { try { - SegmentRunner.abort(storage, segment, - jmxConnectionFactory.connect(segment.getCoordinatorHost())); + SegmentRunner.abort(context, segment, + context.jmxConnectionFactory.connect(segment.getCoordinatorHost())); } catch (ReaperException e) { LOG.debug("Tried to abort repair on segment {} marked as RUNNING, but the host was down" + " (so abortion won't be needed)", segment.getId()); - SegmentRunner.postpone(storage, segment); + SegmentRunner.postpone(context, segment); } } - RepairRunner.startRepairRun(storage, repairRun.getId(), jmxConnectionFactory); + RepairRunner.startRepairRun(context, repairRun.getId()); } - for (RepairRun pausedRepairRun : storage.getRepairRunsWithState(RepairRun.RunState.PAUSED)) { - RepairRunner.startRepairRun(storage, pausedRepairRun.getId(), jmxConnectionFactory); + Collection paused = + context.storage.getRepairRunsWithState(RepairRun.RunState.PAUSED); + for (RepairRun pausedRepairRun : paused) { + RepairRunner.startRepairRun(context, pausedRepairRun.getId()); } } - public static void startRepairRun(IStorage storage, long repairRunID, - JmxConnectionFactory jmxConnectionFactory) { + public static void startRepairRun(AppContext context, long repairRunID) { assert null != executor : "you need to initialize the thread pool first"; LOG.info("scheduling repair for repair run #{}", repairRunID); try { if (repairRunners.containsKey(repairRunID)) { throw new ReaperException("RepairRunner for repair run " + repairRunID + " already exists"); } - RepairRunner newRunner = new RepairRunner(storage, repairRunID, jmxConnectionFactory); + RepairRunner newRunner = new RepairRunner(context, repairRunID); repairRunners.put(repairRunID, newRunner); executor.submit(newRunner); } catch (ReaperException e) { @@ -129,7 +128,7 @@ public Long getCurrentlyRunningSegmentId() { */ @Override public void run() { - RepairRun repairRun = storage.getRepairRun(repairRunId).get(); + RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); try { RepairRun.RunState state = repairRun.getRunState(); LOG.debug("run() called for repair run #{} with run state {}", repairRunId, state); @@ -153,10 +152,10 @@ public void run() { LOG.error(e.toString()); LOG.error(Arrays.toString(e.getStackTrace())); e.printStackTrace(); - storage.updateRepairRun(repairRun.with() - .runState(RepairRun.RunState.ERROR) - .endTime(DateTime.now()) - .build(repairRun.getId())); + context.storage.updateRepairRun(repairRun.with() + .runState(RepairRun.RunState.ERROR) + .endTime(DateTime.now()) + .build(repairRun.getId())); repairRunners.remove(repairRunId); } } @@ -166,11 +165,11 @@ public void run() { */ private void start() throws ReaperException { LOG.info("Repairs for repair run #{} starting", repairRunId); - RepairRun repairRun = storage.getRepairRun(repairRunId).get(); - boolean success = storage.updateRepairRun(repairRun.with() - .runState(RepairRun.RunState.RUNNING) - .startTime(DateTime.now()) - .build(repairRun.getId())); + RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); + boolean success = context.storage.updateRepairRun(repairRun.with() + .runState(RepairRun.RunState.RUNNING) + .startTime(DateTime.now()) + .build(repairRun.getId())); if (!success) { LOG.error("failed updating repair run " + repairRun.getId()); } @@ -182,11 +181,11 @@ private void start() throws ReaperException { */ private void end() { LOG.info("Repairs for repair run #{} done", repairRunId); - RepairRun repairRun = storage.getRepairRun(repairRunId).get(); - boolean success = storage.updateRepairRun(repairRun.with() - .runState(RepairRun.RunState.DONE) - .endTime(DateTime.now()) - .build(repairRun.getId())); + RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); + boolean success = context.storage.updateRepairRun(repairRun.with() + .runState(RepairRun.RunState.DONE) + .endTime(DateTime.now()) + .build(repairRun.getId())); if (!success) { LOG.error("failed updating repair run " + repairRun.getId()); } @@ -197,8 +196,9 @@ private void end() { */ private void startNextSegment() throws ReaperException { // Currently not allowing parallel repairs. - assert storage.getSegmentAmountForRepairRun(repairRunId, RepairSegment.State.RUNNING) == 0; - Optional nextSegment = storage.getNextFreeSegment(repairRunId); + assert + context.storage.getSegmentAmountForRepairRun(repairRunId, RepairSegment.State.RUNNING) == 0; + Optional nextSegment = context.storage.getNextFreeSegment(repairRunId); if (nextSegment.isPresent()) { repairSegment(nextSegment.get().getId(), nextSegment.get().getTokenRange()); } else { @@ -213,16 +213,16 @@ private void startNextSegment() throws ReaperException { * @param tokenRange token range of the segment to repair. */ private void repairSegment(long segmentId, RingRange tokenRange) throws ReaperException { - RepairRun repairRun = storage.getRepairRun(repairRunId).get(); - RepairUnit repairUnit = storage.getRepairUnit(repairRun.getRepairUnitId()).get(); + RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); + RepairUnit repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId()).get(); String keyspace = repairUnit.getKeyspaceName(); LOG.debug("preparing to repair segment {} on run with id {}", segmentId, repairRun.getId()); - if (!jmxConnection.isConnectionAlive()) { + if (jmxConnection == null || !jmxConnection.isConnectionAlive()) { try { - LOG.debug("reestablishing JMX proxy for repair runner on run id: {}", repairRunId); - Cluster cluster = storage.getCluster(repairUnit.getClusterName()).get(); - jmxConnection = jmxConnectionFactory.connectAny(cluster); + LOG.debug("connecting JMX proxy for repair runner on run id: {}", repairRunId); + Cluster cluster = context.storage.getCluster(repairUnit.getClusterName()).get(); + jmxConnection = context.jmxConnectionFactory.connectAny(cluster); } catch (ReaperException e) { e.printStackTrace(); LOG.warn("Failed to reestablish JMX connection in runner #{}, reattempting in {} seconds", @@ -236,9 +236,9 @@ private void repairSegment(long segmentId, RingRange tokenRange) throws ReaperEx List potentialCoordinators = jmxConnection.tokenRangeToEndpoint(keyspace, tokenRange); if (potentialCoordinators == null) { // This segment has a faulty token range. Abort the entire repair run. - boolean success = storage.updateRepairRun(repairRun.with() - .runState(RepairRun.RunState.ERROR) - .build(repairRun.getId())); + boolean success = context.storage.updateRepairRun(repairRun.with() + .runState(RepairRun.RunState.ERROR) + .build(repairRun.getId())); if (!success) { LOG.error("failed updating repair run " + repairRun.getId()); } @@ -246,15 +246,14 @@ private void repairSegment(long segmentId, RingRange tokenRange) throws ReaperEx } currentlyRunningSegmentId = segmentId; - SegmentRunner.triggerRepair(storage, segmentId, potentialCoordinators, repairTimeoutMillis, - jmxConnectionFactory); + SegmentRunner.triggerRepair(context, segmentId, potentialCoordinators, repairTimeoutMillis); currentlyRunningSegmentId = null; handleResult(segmentId); } private void handleResult(long segmentId) { - RepairSegment segment = storage.getRepairSegment(segmentId).get(); + RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); RepairSegment.State state = segment.getState(); LOG.debug("In repair run #{}, triggerRepair on segment {} ended with state {}", repairRunId, segmentId, state); @@ -284,7 +283,7 @@ private void handleResult(long segmentId) { * @return the delay in milliseconds. */ long intensityBasedDelayMillis(RepairSegment repairSegment) { - RepairRun repairRun = storage.getRepairRun(repairRunId).get(); + RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); assert repairSegment.getEndTime() != null && repairSegment.getStartTime() != null; long repairEnd = repairSegment.getEndTime().getMillis(); long repairStart = repairSegment.getStartTime().getMillis(); diff --git a/src/main/java/com/spotify/reaper/service/RingRange.java b/src/main/java/com/spotify/reaper/service/RingRange.java index 5ff5063c2..d31a32990 100644 --- a/src/main/java/com/spotify/reaper/service/RingRange.java +++ b/src/main/java/com/spotify/reaper/service/RingRange.java @@ -71,7 +71,7 @@ public boolean encloses(RingRange other) { * @return true if 0 is inside of this range. Note that if start == end, then wrapping is true */ @VisibleForTesting - protected boolean isWrapping() { + public boolean isWrapping() { return SegmentGenerator.greaterThanOrEqual(start, end); } diff --git a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java index 51015e1ba..23189cd38 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java +++ b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java @@ -156,11 +156,7 @@ public List generateSegments(int totalSegmentCount, List } protected boolean inRange(BigInteger token) { - if (lowerThan(token, RANGE_MIN) || greaterThan(token, RANGE_MAX)) { - return false; - } else { - return true; - } + return !(lowerThan(token, RANGE_MIN) || greaterThan(token, RANGE_MAX)); } } diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 8f1e9bfb7..3c995aae3 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -17,13 +17,12 @@ import com.google.common.base.Optional; import com.google.common.collect.Maps; +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperException; -import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.core.RepairUnit; -import com.spotify.reaper.storage.IStorage; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.SimpleCondition; @@ -42,7 +41,7 @@ public final class SegmentRunner implements RepairStatusHandler { private static final Logger LOG = LoggerFactory.getLogger(SegmentRunner.class); private static final int MAX_PENDING_COMPACTIONS = 20; - private final IStorage storage; + private final AppContext context; private final long segmentId; private final Condition condition = new SimpleCondition(); private int commandId; @@ -51,41 +50,39 @@ public final class SegmentRunner implements RepairStatusHandler { @VisibleForTesting public static Map segmentRunners = Maps.newConcurrentMap(); - private SegmentRunner(IStorage storage, long segmentId) { - this.storage = storage; + private SegmentRunner(AppContext context, long segmentId) { + this.context = context; this.segmentId = segmentId; } /** * Triggers a repair for a segment. Is blocking call. - * @throws ReaperException */ - public static void triggerRepair(IStorage storage, long segmentId, - Collection potentialCoordinators, long timeoutMillis, - JmxConnectionFactory jmxConnectionFactory) + public static void triggerRepair(AppContext context, long segmentId, + Collection potentialCoordinators, long timeoutMillis) throws ReaperException { if (segmentRunners.containsKey(segmentId)) { throw new ReaperException("SegmentRunner already exists for segment with ID: " + segmentId); } - SegmentRunner newSegmentRunner = new SegmentRunner(storage, segmentId); + SegmentRunner newSegmentRunner = new SegmentRunner(context, segmentId); segmentRunners.put(segmentId, newSegmentRunner); - newSegmentRunner.runRepair(potentialCoordinators, jmxConnectionFactory, timeoutMillis); + newSegmentRunner.runRepair(potentialCoordinators, timeoutMillis); } - public static void postpone(IStorage storage, RepairSegment segment) { + public static void postpone(AppContext context, RepairSegment segment) { LOG.warn("Postponing segment {}", segment.getId()); - storage.updateRepairSegment(segment.with() - .state(RepairSegment.State.NOT_STARTED) - .coordinatorHost(null) - .repairCommandId(null) - .startTime(null) - .failCount(segment.getFailCount() + 1) - .build(segment.getId())); + context.storage.updateRepairSegment(segment.with() + .state(RepairSegment.State.NOT_STARTED) + .coordinatorHost(null) + .repairCommandId(null) + .startTime(null) + .failCount(segment.getFailCount() + 1) + .build(segment.getId())); segmentRunners.remove(segment.getId()); } - public static void abort(IStorage storage, RepairSegment segment, JmxProxy jmxConnection) { - postpone(storage, segment); + public static void abort(AppContext context, RepairSegment segment, JmxProxy jmxConnection) { + postpone(context, segment); LOG.info("Aborting repair on segment with id {} on coordinator {}", segment.getId(), segment.getCoordinatorHost()); jmxConnection.cancelAllRepairs(); @@ -96,15 +93,14 @@ public int getCurrentCommandId() { return this.commandId; } - private void runRepair(Collection potentialCoordinators, - JmxConnectionFactory jmxConnectionFactory, long timeoutMillis) { - final RepairSegment segment = storage.getRepairSegment(segmentId).get(); - try (JmxProxy coordinator = jmxConnectionFactory + private void runRepair(Collection potentialCoordinators, long timeoutMillis) { + final RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); + try (JmxProxy coordinator = context.jmxConnectionFactory .connectAny(Optional.of(this), potentialCoordinators)) { - RepairUnit repairUnit = storage.getRepairUnit(segment.getRepairUnitId()).get(); + RepairUnit repairUnit = context.storage.getRepairUnit(segment.getRepairUnitId()).get(); String keyspace = repairUnit.getKeyspaceName(); - if (!canRepair(segment, keyspace, coordinator, jmxConnectionFactory)) { + if (!canRepair(segment, keyspace, coordinator)) { postpone(segment); return; } @@ -114,10 +110,10 @@ private void runRepair(Collection potentialCoordinators, keyspace, repairUnit.getRepairParallelism(), repairUnit.getColumnFamilies()); LOG.debug("Triggered repair with command id {}", commandId); - storage.updateRepairSegment(segment.with() - .coordinatorHost(coordinator.getHost()) - .repairCommandId(commandId) - .build(segmentId)); + context.storage.updateRepairSegment(segment.with() + .coordinatorHost(coordinator.getHost()) + .repairCommandId(commandId) + .build(segmentId)); LOG.info("Repair for segment {} started, status wait will timeout in {} millis", segmentId, timeoutMillis); @@ -126,7 +122,7 @@ private void runRepair(Collection potentialCoordinators, } catch (InterruptedException e) { LOG.warn("Repair command {} on segment {} interrupted", commandId, segmentId); } finally { - RepairSegment resultingSegment = storage.getRepairSegment(segmentId).get(); + RepairSegment resultingSegment = context.storage.getRepairSegment(segmentId).get(); LOG.info("Repair command {} on segment {} returned with state {}", commandId, segmentId, resultingSegment.getState()); if (resultingSegment.getState().equals(RepairSegment.State.RUNNING)) { @@ -149,12 +145,12 @@ private void runRepair(Collection potentialCoordinators, } } - boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator, - JmxConnectionFactory factory) throws ReaperException { + boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator) + throws ReaperException { Collection allHosts = coordinator.tokenRangeToEndpoint(keyspace, segment.getTokenRange()); for (String hostName : allHosts) { - try (JmxProxy hostProxy = factory.connect(hostName)) { + try (JmxProxy hostProxy = context.jmxConnectionFactory.connect(hostName)) { LOG.debug("checking host '{}' for pending compactions and other repairs (can repair?)" + " Run id '{}'", hostName, segment.getRunId()); if (hostProxy.getPendingCompactions() > MAX_PENDING_COMPACTIONS) { @@ -176,11 +172,11 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator, } private void postpone(RepairSegment segment) { - postpone(storage, segment); + postpone(context, segment); } private void abort(RepairSegment segment, JmxProxy jmxConnection) { - abort(storage, segment, jmxConnection); + abort(context, segment, jmxConnection); } /** @@ -203,15 +199,15 @@ public void handle(int repairNumber, ActiveRepairService.Status status, String m return; } - RepairSegment currentSegment = storage.getRepairSegment(segmentId).get(); + RepairSegment currentSegment = context.storage.getRepairSegment(segmentId).get(); // See status explanations from: https://wiki.apache.org/cassandra/RepairAsyncAPI switch (status) { case STARTED: DateTime now = DateTime.now(); - storage.updateRepairSegment(currentSegment.with() - .state(RepairSegment.State.RUNNING) - .startTime(now) - .build(segmentId)); + context.storage.updateRepairSegment(currentSegment.with() + .state(RepairSegment.State.RUNNING) + .startTime(now) + .build(segmentId)); break; case SESSION_FAILED: LOG.warn("repair session failed for segment with id '{}' and repair number '{}'", @@ -223,10 +219,10 @@ public void handle(int repairNumber, ActiveRepairService.Status status, String m // Do nothing, wait for FINISHED. break; case FINISHED: - storage.updateRepairSegment(currentSegment.with() - .state(RepairSegment.State.DONE) - .endTime(DateTime.now()) - .build(segmentId)); + context.storage.updateRepairSegment(currentSegment.with() + .state(RepairSegment.State.DONE) + .endTime(DateTime.now()) + .build(segmentId)); condition.signalAll(); break; } diff --git a/src/test/java/com/spotify/reaper/IntegrationTest.java b/src/test/java/com/spotify/reaper/IntegrationTest.java deleted file mode 100644 index ae72c5ae9..000000000 --- a/src/test/java/com/spotify/reaper/IntegrationTest.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.spotify.reaper; - -import org.junit.Test; - -public class IntegrationTest { - - @Test - public void testService() { - // TODO - } - -} diff --git a/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java b/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java new file mode 100644 index 000000000..006854add --- /dev/null +++ b/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java @@ -0,0 +1,59 @@ +package com.spotify.reaper.acceptance; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; + +import java.util.Map; + +import javax.ws.rs.core.Response; + +import cucumber.api.java.Before; +import cucumber.api.java.en.And; +import cucumber.api.java.en.Given; +import cucumber.api.java.en.Then; +import cucumber.api.java.en.When; + +/** + * Basic acceptance test (Cucumber) steps. + */ +public class BasicSteps { + + @Before + public static void setup() throws Exception { + ReaperTestJettyRunner.setup(); + } + + @Given("^a reaper service is running$") + public void a_reaper_service_is_running() throws Throwable { + ReaperTestJettyRunner.callAndExpect( + "GET", "/ping", Optional.>absent(), Response.Status.OK); + } + + @Given("^that we are going to use \"([^\"]*)\" as cluster seed host$") + public void that_we_are_going_to_use_as_cluster_seed_host(String seedHost) throws Throwable { + TestContext.SEED_HOST = seedHost; + } + + @And("^reaper has no cluster with name \"([^\"]*)\" in storage$") + public void reaper_has_no_cluster_with_name_in_storage(String clusterName) throws Throwable { + ReaperTestJettyRunner.callAndExpect( + "GET", "/cluster/" + clusterName, + Optional.>absent(), Response.Status.NOT_FOUND); + } + + @When("^an add-cluster request is made to reaper$") + public void an_add_cluster_request_is_made_to_reaper() throws Throwable { + Map params = Maps.newHashMap(); + params.put("seedHost", TestContext.SEED_HOST); + ReaperTestJettyRunner.callAndExpect( + "POST", "/cluster", Optional.of(params), Response.Status.CREATED); + } + + @Then("^reaper has a cluster called \"([^\"]*)\" in storage$") + public void reaper_has_a_cluster_called_in_storage(String clusterName) throws Throwable { + ReaperTestJettyRunner.callAndExpect( + "GET", "/cluster/" + clusterName, + Optional.>absent(), Response.Status.OK.getStatusCode()); + } + +} diff --git a/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java b/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java new file mode 100644 index 000000000..acc1a28da --- /dev/null +++ b/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java @@ -0,0 +1,158 @@ +package com.spotify.reaper.acceptance; + +import com.google.common.base.Optional; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; + +import com.spotify.reaper.ReaperApplication; +import com.spotify.reaper.ReaperApplicationConfiguration; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; + +import net.sourceforge.argparse4j.inf.Namespace; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URL; +import java.util.Map; + +import javax.ws.rs.core.Response; + +import io.dropwizard.cli.ServerCommand; +import io.dropwizard.lifecycle.ServerLifecycleListener; +import io.dropwizard.setup.Bootstrap; +import io.dropwizard.setup.Environment; + +import static org.junit.Assert.assertEquals; + +/** + * Simple Reaper application runner for testing purposes. + * Starts a Jetty server that wraps Reaper application, + * and registers a shutdown hook for JVM exit event. + */ +public class ReaperTestJettyRunner { + + private static final Logger LOG = LoggerFactory.getLogger(ReaperTestJettyRunner.class); + + private static ReaperTestJettyRunner runnerInstance; + + public static void setup() throws Exception { + if (runnerInstance == null) { + String testConfigPath = Resources.getResource("cassandra-reaper-at.yaml").getPath(); + LOG.info("initializing ReaperTestJettyRunner with config in path: " + testConfigPath); + runnerInstance = new ReaperTestJettyRunner(testConfigPath); + runnerInstance.start(); + // Stop the testing Reaper after tests are finished. + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + runnerInstance.stop(); + } + }); + } + } + + public static void callAndExpect(String httpMethod, String urlPath, + Optional> params, int statusCode) { + assert runnerInstance != null : "service not initialized, call setup() first"; + String reaperBase = "http://localhost:" + runnerInstance.getLocalPort() + "/"; + URI uri; + try { + uri = new URL(new URL(reaperBase), urlPath).toURI(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + Client client = new Client(); + WebResource resource = client.resource(uri); + LOG.info("calling reaper in resource: " + resource.getURI()); + if (params.isPresent()) { + for (Map.Entry entry : params.get().entrySet()) { + resource = resource.queryParam(entry.getKey(), entry.getValue()); + } + } + ClientResponse response; + if (httpMethod.equalsIgnoreCase("GET")) { + response = resource.get(ClientResponse.class); + } else if (httpMethod.equalsIgnoreCase("POST")) { + response = resource.post(ClientResponse.class); + } else { + throw new RuntimeException("Invalid HTTP method: " + httpMethod); + } + + assertEquals(statusCode, response.getStatus()); + } + + public static void callAndExpect(String httpMethod, String url, + Optional> params, Response.Status status) { + callAndExpect(httpMethod, url, params, status.getStatusCode()); + } + + private final String configPath; + + private Server jettyServer; + + public ReaperTestJettyRunner(String configPath) { + this.configPath = configPath; + } + + public void start() { + if (jettyServer != null) { + return; + } + try { + ReaperApplication reaper = new ReaperApplication(); + + final Bootstrap bootstrap = + new Bootstrap(reaper) { + @Override + public void run(ReaperApplicationConfiguration configuration, Environment environment) + throws Exception { + environment.lifecycle().addServerLifecycleListener(new ServerLifecycleListener() { + @Override + public void serverStarted(Server server) { + jettyServer = server; + } + }); + super.run(configuration, environment); + } + }; + + reaper.initialize(bootstrap); + final ServerCommand command = new ServerCommand<>(reaper); + + ImmutableMap.Builder file = ImmutableMap.builder(); + if (!Strings.isNullOrEmpty(configPath)) { + file.put("file", configPath); + } + final Namespace namespace = new Namespace(file.build()); + + command.run(bootstrap, namespace); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void stop() { + if (null != jettyServer) { + try { + jettyServer.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + jettyServer = null; + runnerInstance = null; + } + + public int getLocalPort() { + assert jettyServer != null : "service not initialized, call setup() first"; + return ((ServerConnector) jettyServer.getConnectors()[0]).getLocalPort(); + } + +} diff --git a/src/test/java/com/spotify/reaper/acceptance/TestContext.java b/src/test/java/com/spotify/reaper/acceptance/TestContext.java new file mode 100644 index 000000000..50bcb9977 --- /dev/null +++ b/src/test/java/com/spotify/reaper/acceptance/TestContext.java @@ -0,0 +1,7 @@ +package com.spotify.reaper.acceptance; + +public class TestContext { + public static String SEED_HOST; + + +} diff --git a/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java b/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java deleted file mode 100644 index e70b0be16..000000000 --- a/src/test/java/com/spotify/reaper/service/SegmentRunnerTest.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.spotify.reaper.service; - -import com.google.common.base.Optional; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -import com.spotify.reaper.ReaperException; -import com.spotify.reaper.cassandra.JmxConnectionFactory; -import com.spotify.reaper.cassandra.JmxProxy; -import com.spotify.reaper.cassandra.RepairStatusHandler; -import com.spotify.reaper.core.RepairRun; -import com.spotify.reaper.core.RepairSegment; -import com.spotify.reaper.core.RepairUnit; -import com.spotify.reaper.storage.IStorage; -import com.spotify.reaper.storage.MemoryStorage; - -import org.apache.cassandra.repair.RepairParallelism; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.commons.lang3.mutable.MutableObject; -import org.joda.time.DateTime; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.math.BigInteger; -import java.util.Collections; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -public class SegmentRunnerTest { - // TODO: Clean up tests. There's a lot of code duplication across these tests. - - @Before - public void setUp() throws Exception { - RepairRunner.repairRunners.clear(); - SegmentRunner.segmentRunners.clear(); - } - - @Test - public void timeoutTest() throws InterruptedException, ReaperException, ExecutionException { - final IStorage storage = new MemoryStorage(); - RepairUnit cf = storage.addRepairUnit( - new RepairUnit.Builder("reaper", "reaper", Sets.newHashSet("reaper"), 1, - RepairParallelism.PARALLEL)); - RepairRun run = storage.addRepairRun( - new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); - storage.addRepairSegments(Collections.singleton( - new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO), - cf.getId())), run.getId()); - final long segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); - - final ExecutorService executor = Executors.newSingleThreadExecutor(); - final MutableObject> future = new MutableObject<>(); - - SegmentRunner.triggerRepair(storage, segmentId, - Collections.singleton(""), 100, new JmxConnectionFactory() { - @Override - public JmxProxy connect(final Optional handler, String host) - throws ReaperException { - JmxProxy jmx = mock(JmxProxy.class); - when(jmx.getClusterName()).thenReturn("reaper"); - when(jmx.isConnectionAlive()).thenReturn(true); - when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) - .thenReturn(Lists.newArrayList("")); - when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), - Matchers.any(), Sets.newHashSet(anyString()))) - .then(new Answer() { - @Override - public Integer answer(InvocationOnMock invocation) { - assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); - future.setValue(executor.submit(new Thread() { - @Override - public void run() { - handler.get().handle(1, ActiveRepairService.Status.STARTED, - "Repair command 1 has started"); - assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(segmentId).get().getState()); - } - })); - return 1; - } - }); - - return jmx; - } - }); - - future.getValue().get(); - executor.shutdown(); - - assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); - assertEquals(1, storage.getRepairSegment(segmentId).get().getFailCount()); - } - - @Test - public void successTest() throws InterruptedException, ReaperException, ExecutionException { - final IStorage storage = new MemoryStorage(); - RepairUnit cf = storage.addRepairUnit( - new RepairUnit.Builder("reaper", "reaper", Sets.newHashSet("reaper"), 1, - RepairParallelism.PARALLEL)); - RepairRun run = storage.addRepairRun( - new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); - storage.addRepairSegments(Collections.singleton( - new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO), - cf.getId())), run.getId()); - final long segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); - - final ExecutorService executor = Executors.newSingleThreadExecutor(); - final MutableObject> future = new MutableObject<>(); - - SegmentRunner.triggerRepair(storage, segmentId, - Collections.singleton(""), 1000, new JmxConnectionFactory() { - @Override - public JmxProxy connect(final Optional handler, String host) - throws ReaperException { - JmxProxy jmx = mock(JmxProxy.class); - when(jmx.getClusterName()).thenReturn("reaper"); - when(jmx.isConnectionAlive()).thenReturn(true); - when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) - .thenReturn(Lists.newArrayList("")); - when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), - Matchers.any(), Sets.newHashSet(anyString()))) - .then(new Answer() { - @Override - public Integer answer(InvocationOnMock invocation) { - assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); - future.setValue(executor.submit(new Runnable() { - @Override - public void run() { - handler.get().handle(1, ActiveRepairService.Status.STARTED, - "Repair command 1 has started"); - assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(segmentId).get().getState()); - // report about an unrelated repair. Shouldn't affect anything. - handler.get().handle(2, ActiveRepairService.Status.SESSION_FAILED, - "Repair command 2 has failed"); - handler.get().handle(1, ActiveRepairService.Status.SESSION_SUCCESS, - "Repair session succeeded in command 1"); - assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(segmentId).get().getState()); - handler.get().handle(1, ActiveRepairService.Status.FINISHED, - "Repair command 1 has finished"); - assertEquals(RepairSegment.State.DONE, - storage.getRepairSegment(segmentId).get().getState()); - } - })); - return 1; - } - }); - - return jmx; - } - }); - - future.getValue().get(); - executor.shutdown(); - - assertEquals(RepairSegment.State.DONE, storage.getRepairSegment(segmentId).get().getState()); - assertEquals(0, storage.getRepairSegment(segmentId).get().getFailCount()); - } - - @Test - public void failureTest() throws InterruptedException, ReaperException, ExecutionException { - final IStorage storage = new MemoryStorage(); - RepairUnit cf = - storage.addRepairUnit( - new RepairUnit.Builder("reaper", "reaper", Sets.newHashSet("reaper"), 1, - RepairParallelism.PARALLEL)); - RepairRun run = storage.addRepairRun( - new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); - storage.addRepairSegments(Collections.singleton( - new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO), - cf.getId())), run.getId()); - final long segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); - - final ExecutorService executor = Executors.newSingleThreadExecutor(); - final MutableObject> future = new MutableObject<>(); - - SegmentRunner.triggerRepair(storage, segmentId, - Collections.singleton(""), 1000, new JmxConnectionFactory() { - @Override - public JmxProxy connect(final Optional handler, String host) - throws ReaperException { - JmxProxy jmx = mock(JmxProxy.class); - when(jmx.getClusterName()).thenReturn("reaper"); - when(jmx.isConnectionAlive()).thenReturn(true); - when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) - .thenReturn(Lists.newArrayList("")); - when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), - Matchers.any(), Sets.newHashSet(anyString()))) - .then(new Answer() { - @Override - public Integer answer(InvocationOnMock invocation) { - assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); - future.setValue(executor.submit(new Runnable() { - @Override - public void run() { - handler.get().handle(1, ActiveRepairService.Status.STARTED, - "Repair command 1 has started"); - assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(segmentId).get().getState()); - handler.get().handle(1, ActiveRepairService.Status.SESSION_SUCCESS, - "Repair session succeeded in command 1"); - assertEquals(RepairSegment.State.RUNNING, - storage.getRepairSegment(segmentId).get().getState()); - handler.get().handle(1, ActiveRepairService.Status.SESSION_FAILED, - "Repair command 1 has failed"); - assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); - } - })); - - return 1; - } - }); - - return jmx; - } - }); - - future.getValue().get(); - executor.shutdown(); - - assertEquals(RepairSegment.State.NOT_STARTED, - storage.getRepairSegment(segmentId).get().getState()); - assertEquals(1, storage.getRepairSegment(segmentId).get().getFailCount()); - } -} diff --git a/src/test/java/com/spotify/reaper/cassandra/JmxProxyTest.java b/src/test/java/com/spotify/reaper/unit/cassandra/JmxProxyTest.java similarity index 94% rename from src/test/java/com/spotify/reaper/cassandra/JmxProxyTest.java rename to src/test/java/com/spotify/reaper/unit/cassandra/JmxProxyTest.java index 5047e41bb..add278763 100644 --- a/src/test/java/com/spotify/reaper/cassandra/JmxProxyTest.java +++ b/src/test/java/com/spotify/reaper/unit/cassandra/JmxProxyTest.java @@ -11,10 +11,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.spotify.reaper.cassandra; +package com.spotify.reaper.unit.cassandra; import com.spotify.reaper.ReaperException; -import com.spotify.reaper.core.Cluster; +import com.spotify.reaper.cassandra.JmxProxy; import org.junit.Test; diff --git a/src/test/java/com/spotify/reaper/core/ClusterTest.java b/src/test/java/com/spotify/reaper/unit/core/ClusterTest.java similarity index 91% rename from src/test/java/com/spotify/reaper/core/ClusterTest.java rename to src/test/java/com/spotify/reaper/unit/core/ClusterTest.java index fbac9f0d2..fa968fa21 100644 --- a/src/test/java/com/spotify/reaper/core/ClusterTest.java +++ b/src/test/java/com/spotify/reaper/unit/core/ClusterTest.java @@ -11,13 +11,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.spotify.reaper.core; +package com.spotify.reaper.unit.core; + +import com.spotify.reaper.core.Cluster; import org.junit.Test; import static org.junit.Assert.assertEquals; public class ClusterTest { + @Test public void testGetSymbolicName() { assertEquals("example2cluster", Cluster.toSymbolicName("Example2 Cluster")); diff --git a/src/test/java/com/spotify/reaper/resources/ClusterResourceTest.java b/src/test/java/com/spotify/reaper/unit/resources/ClusterResourceTest.java similarity index 75% rename from src/test/java/com/spotify/reaper/resources/ClusterResourceTest.java rename to src/test/java/com/spotify/reaper/unit/resources/ClusterResourceTest.java index 5e06a1da7..6a48020e2 100644 --- a/src/test/java/com/spotify/reaper/resources/ClusterResourceTest.java +++ b/src/test/java/com/spotify/reaper/unit/resources/ClusterResourceTest.java @@ -1,21 +1,25 @@ -package com.spotify.reaper.resources; +package com.spotify.reaper.unit.resources; import com.google.common.base.Optional; import com.google.common.collect.Sets; + +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.storage.IStorage; +import com.spotify.reaper.resources.ClusterResource; import com.spotify.reaper.storage.MemoryStorage; + import org.junit.Before; import org.junit.Test; +import java.net.URI; + import javax.ws.rs.core.Response; import javax.ws.rs.core.UriInfo; -import java.net.URI; import static org.hibernate.validator.internal.util.Contracts.assertNotNull; import static org.junit.Assert.assertEquals; @@ -30,15 +34,13 @@ public class ClusterResourceTest { String SEED_HOST = "TestHost"; URI SAMPLE_URI = URI.create("http://test"); - IStorage storage; - ReaperApplicationConfiguration config; - JmxConnectionFactory factory; + AppContext context = new AppContext(); UriInfo uriInfo; @Before public void setUp() throws Exception { - storage = new MemoryStorage(); - config = mock(ReaperApplicationConfiguration.class); + context.storage = new MemoryStorage(); + context.config = mock(ReaperApplicationConfiguration.class); uriInfo = mock(UriInfo.class); when(uriInfo.getAbsolutePath()).thenReturn(SAMPLE_URI); @@ -47,7 +49,7 @@ public void setUp() throws Exception { final JmxProxy proxy = mock(JmxProxy.class); when(proxy.getClusterName()).thenReturn(CLUSTER_NAME); when(proxy.getPartitioner()).thenReturn(PARTITIONER); - factory = new JmxConnectionFactory() { + context.jmxConnectionFactory = new JmxConnectionFactory() { @Override public JmxProxy connect(Optional handler, String host) throws ReaperException { @@ -58,15 +60,15 @@ public JmxProxy connect(Optional handler, String host) @Test public void testAddCluster() throws Exception { - ClusterResource clusterResource = new ClusterResource(storage, factory); + ClusterResource clusterResource = new ClusterResource(context); Response response = clusterResource.addCluster(uriInfo, Optional.of(SEED_HOST)); assertEquals(201, response.getStatus()); - assertEquals(1, storage.getClusters().size()); + assertEquals(1, context.storage.getClusters().size()); - Cluster cluster = storage.getCluster(CLUSTER_NAME).get(); + Cluster cluster = context.storage.getCluster(CLUSTER_NAME).get(); assertNotNull(cluster, "Did not find expected cluster"); - assertEquals(0, storage.getRepairRunsForCluster(cluster.getName()).size()); + assertEquals(0, context.storage.getRepairRunsForCluster(cluster.getName()).size()); assertEquals(CLUSTER_NAME, cluster.getName()); assertEquals(1, cluster.getSeedHosts().size()); assertEquals(SEED_HOST, cluster.getSeedHosts().iterator().next()); @@ -75,14 +77,14 @@ public void testAddCluster() throws Exception { @Test public void testAddExistingCluster() throws Exception { Cluster cluster = new Cluster(CLUSTER_NAME, PARTITIONER, Sets.newHashSet(SEED_HOST)); - storage.addCluster(cluster); + context.storage.addCluster(cluster); - ClusterResource clusterResource = new ClusterResource(storage, factory); + ClusterResource clusterResource = new ClusterResource(context); Response response = clusterResource.addCluster(uriInfo, Optional.of(SEED_HOST)); assertEquals(403, response.getStatus()); assertTrue(response.getEntity() instanceof String); String msg = response.getEntity().toString(); assertTrue(msg.contains("already exists")); - assertEquals(1, storage.getClusters().size()); + assertEquals(1, context.storage.getClusters().size()); } } diff --git a/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java b/src/test/java/com/spotify/reaper/unit/resources/RepairRunResourceTest.java similarity index 82% rename from src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java rename to src/test/java/com/spotify/reaper/unit/resources/RepairRunResourceTest.java index 79f5a52a7..bb5176032 100644 --- a/src/test/java/com/spotify/reaper/resources/RepairRunResourceTest.java +++ b/src/test/java/com/spotify/reaper/unit/resources/RepairRunResourceTest.java @@ -1,9 +1,10 @@ -package com.spotify.reaper.resources; +package com.spotify.reaper.unit.resources; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; import com.spotify.reaper.cassandra.JmxConnectionFactory; @@ -13,11 +14,11 @@ import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.core.RepairUnit; +import com.spotify.reaper.resources.RepairRunResource; import com.spotify.reaper.resources.view.RepairRunStatus; import com.spotify.reaper.service.RepairRunner; import com.spotify.reaper.service.RingRange; import com.spotify.reaper.service.SegmentRunner; -import com.spotify.reaper.storage.IStorage; import com.spotify.reaper.storage.MemoryStorage; import org.apache.cassandra.repair.RepairParallelism; @@ -46,15 +47,6 @@ public class RepairRunResourceTest { - int THREAD_CNT = 1; - int REPAIR_TIMEOUT_S = 60; - int RETRY_DELAY_S = 10; - - long TIME_CREATE = 42l; - long TIME_START = 43l; - - URI SAMPLE_URI = URI.create("http://test"); - static final String CLUSTER_NAME = "testcluster"; static final String PARTITIONER = "org.apache.cassandra.dht.RandomPartitioner"; static final String SEED_HOST = "TestHost"; @@ -62,30 +54,34 @@ public class RepairRunResourceTest { static final Set TABLES = Sets.newHashSet("testTable"); static final String OWNER = "test"; static final Integer SEGMENTS = 100; - + int THREAD_CNT = 1; + int REPAIR_TIMEOUT_S = 60; + int RETRY_DELAY_S = 10; + long TIME_CREATE = 42l; + long TIME_START = 43l; + URI SAMPLE_URI = URI.create("http://test"); int SEGMENT_CNT = 6; double REPAIR_INTENSITY = 0.5f; RepairParallelism REPAIR_PARALLELISM = RepairParallelism.SEQUENTIAL; List TOKENS = Lists.newArrayList(BigInteger.valueOf(0l), BigInteger.valueOf(100l), BigInteger.valueOf(200l)); - IStorage storage; - ReaperApplicationConfiguration config; + AppContext context; UriInfo uriInfo; - JmxConnectionFactory factory; @Before public void setUp() throws Exception { RepairRunner.repairRunners.clear(); SegmentRunner.segmentRunners.clear(); - storage = new MemoryStorage(); + context = new AppContext(); + context.storage = new MemoryStorage(); Cluster cluster = new Cluster(CLUSTER_NAME, PARTITIONER, Sets.newHashSet(SEED_HOST)); - storage.addCluster(cluster); + context.storage.addCluster(cluster); - config = mock(ReaperApplicationConfiguration.class); - when(config.getSegmentCount()).thenReturn(SEGMENT_CNT); - when(config.getRepairIntensity()).thenReturn(REPAIR_INTENSITY); + context.config = mock(ReaperApplicationConfiguration.class); + when(context.config.getSegmentCount()).thenReturn(SEGMENT_CNT); + when(context.config.getRepairIntensity()).thenReturn(REPAIR_INTENSITY); uriInfo = mock(UriInfo.class); when(uriInfo.getAbsolutePath()).thenReturn(SAMPLE_URI); @@ -100,7 +96,7 @@ public void setUp() throws Exception { when(proxy.isConnectionAlive()).thenReturn(Boolean.TRUE); when(proxy.tokenRangeToEndpoint(anyString(), any(RingRange.class))).thenReturn( Collections.singletonList("")); - factory = new JmxConnectionFactory() { + context.jmxConnectionFactory = new JmxConnectionFactory() { @Override public JmxProxy connect(Optional handler, String host) throws ReaperException { @@ -111,7 +107,7 @@ public JmxProxy connect(Optional handler, String host) RepairUnit.Builder repairUnitBuilder = new RepairUnit.Builder(CLUSTER_NAME, KEYSPACE, TABLES, SEGMENT_CNT, REPAIR_PARALLELISM); - storage.addRepairUnit(repairUnitBuilder); + context.storage.addRepairUnit(repairUnitBuilder); } private Response addDefaultRepairRun(RepairRunResource resource) { @@ -141,17 +137,17 @@ private Response addRepairRun(RepairRunResource resource, UriInfo uriInfo, public void testAddRepairRun() throws Exception { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = addDefaultRepairRun(resource); assertEquals(201, response.getStatus()); assertTrue(response.getEntity() instanceof RepairRunStatus); - assertEquals(1, storage.getClusters().size()); - assertEquals(1, storage.getRepairRunsForCluster(CLUSTER_NAME).size()); - assertEquals(1, storage.getRepairRunIdsForCluster(CLUSTER_NAME).size()); - Long runId = storage.getRepairRunIdsForCluster(CLUSTER_NAME).iterator().next(); - RepairRun run = storage.getRepairRun(runId).get(); + assertEquals(1, context.storage.getClusters().size()); + assertEquals(1, context.storage.getRepairRunsForCluster(CLUSTER_NAME).size()); + assertEquals(1, context.storage.getRepairRunIdsForCluster(CLUSTER_NAME).size()); + Long runId = context.storage.getRepairRunIdsForCluster(CLUSTER_NAME).iterator().next(); + RepairRun run = context.storage.getRepairRun(runId).get(); assertEquals(RepairRun.RunState.NOT_STARTED, run.getRunState()); assertEquals(TIME_CREATE, run.getCreationTime().getMillis()); assertEquals(REPAIR_INTENSITY, run.getIntensity(), 0.0f); @@ -159,8 +155,8 @@ public void testAddRepairRun() throws Exception { assertNull(run.getEndTime()); // apparently, tokens [0, 100, 200] and 6 requested segments causes generating 8 RepairSegments - assertEquals(8, storage.getSegmentAmountForRepairRun(run.getId(), - RepairSegment.State.NOT_STARTED)); + assertEquals(8, context.storage.getSegmentAmountForRepairRun(run.getId(), + RepairSegment.State.NOT_STARTED)); // adding another repair run should work as well response = addDefaultRepairRun(resource); @@ -168,8 +164,8 @@ public void testAddRepairRun() throws Exception { assertEquals(201, response.getStatus()); assertTrue(response.getEntity() instanceof RepairRunStatus); - assertEquals(1, storage.getClusters().size()); - assertEquals(2, storage.getRepairRunsForCluster(CLUSTER_NAME).size()); + assertEquals(1, context.storage.getClusters().size()); + assertEquals(2, context.storage.getRepairRunsForCluster(CLUSTER_NAME).size()); } @Test @@ -177,7 +173,7 @@ public void testTriggerRepairRun() throws Exception { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = addDefaultRepairRun(resource); RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity(); long runId = repairRunStatus.getId(); @@ -201,19 +197,21 @@ public void testTriggerRepairRun() throws Exception { "sent a STARTED event from a test"); Thread.sleep(50); - RepairRun repairRun = storage.getRepairRun(runId).get(); + RepairRun repairRun = context.storage.getRepairRun(runId).get(); assertEquals(RepairRun.RunState.RUNNING, repairRun.getRunState()); assertEquals(TIME_CREATE, repairRun.getCreationTime().getMillis()); assertEquals(TIME_START, repairRun.getStartTime().getMillis()); assertNull(repairRun.getEndTime()); assertEquals(REPAIR_INTENSITY, repairRun.getIntensity(), 0.0f); - assertEquals(1, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); - assertEquals(7, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.NOT_STARTED)); + assertEquals(1, + context.storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); + assertEquals(7, context.storage + .getSegmentAmountForRepairRun(runId, RepairSegment.State.NOT_STARTED)); } @Test public void testTriggerNotExistingRun() { - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Optional newState = Optional.of(RepairRun.RunState.RUNNING.toString()); Response response = resource.modifyRunState(uriInfo, 42l, newState); assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus()); @@ -226,7 +224,7 @@ public void testTriggerAlreadyRunningRun() throws InterruptedException { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = addDefaultRepairRun(resource); RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity(); long runId = repairRunStatus.getId(); @@ -241,8 +239,8 @@ public void testTriggerAlreadyRunningRun() throws InterruptedException { @Test public void testAddRunClusterNotInStorage() { - storage = new MemoryStorage(); - RepairRunResource resource = new RepairRunResource(config, storage, factory); + context.storage = new MemoryStorage(); + RepairRunResource resource = new RepairRunResource(context); Response response = addDefaultRepairRun(resource); assertEquals(404, response.getStatus()); assertTrue(response.getEntity() instanceof String); @@ -250,7 +248,7 @@ public void testAddRunClusterNotInStorage() { @Test public void testAddRunMissingArgument() { - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = addRepairRun(resource, uriInfo, CLUSTER_NAME, null, TABLES, OWNER, null, SEGMENTS); assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -261,7 +259,7 @@ public void testAddRunMissingArgument() { public void testTriggerRunMissingArgument() { RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = addRepairRun(resource, uriInfo, CLUSTER_NAME, null, TABLES, OWNER, null, SEGMENTS); assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -274,7 +272,7 @@ public void testPauseRunningRun() throws InterruptedException { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = addDefaultRepairRun(resource); RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity(); long runId = repairRunStatus.getId(); @@ -296,11 +294,12 @@ public void testPauseRunningRun() throws InterruptedException { Thread.sleep(50); assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); - RepairRun repairRun = storage.getRepairRun(runId).get(); + RepairRun repairRun = context.storage.getRepairRun(runId).get(); // the run should be paused assertEquals(RepairRun.RunState.PAUSED, repairRun.getRunState()); // but the running segment should be untouched - assertEquals(1, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); + assertEquals(1, + context.storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); } @Test @@ -308,7 +307,7 @@ public void testPauseNotRunningRun() throws InterruptedException { DateTimeUtils.setCurrentMillisFixed(TIME_CREATE); RepairRunner.initializeThreadPool(THREAD_CNT, REPAIR_TIMEOUT_S, TimeUnit.SECONDS, RETRY_DELAY_S, TimeUnit.SECONDS); - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = addDefaultRepairRun(resource); RepairRunStatus repairRunStatus = (RepairRunStatus) response.getEntity(); long runId = repairRunStatus.getId(); @@ -318,20 +317,21 @@ public void testPauseNotRunningRun() throws InterruptedException { Thread.sleep(200); assertEquals(400, response.getStatus()); - RepairRun repairRun = storage.getRepairRun(runId).get(); + RepairRun repairRun = context.storage.getRepairRun(runId).get(); // the run should be paused assertEquals(RepairRun.RunState.NOT_STARTED, repairRun.getRunState()); // but the running segment should be untouched - assertEquals(0, storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); + assertEquals(0, + context.storage.getSegmentAmountForRepairRun(runId, RepairSegment.State.RUNNING)); } @Test public void testPauseNotExistingRun() throws InterruptedException { - RepairRunResource resource = new RepairRunResource(config, storage, factory); + RepairRunResource resource = new RepairRunResource(context); Response response = resource.modifyRunState(uriInfo, 42l, Optional.of(RepairRun.RunState.PAUSED.toString())); assertEquals(Response.Status.NOT_FOUND.getStatusCode(), response.getStatus()); - assertEquals(0, storage.getRepairRunsWithState(RepairRun.RunState.RUNNING).size()); + assertEquals(0, context.storage.getRepairRunsWithState(RepairRun.RunState.RUNNING).size()); } } diff --git a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java similarity index 89% rename from src/test/java/com/spotify/reaper/service/RepairRunnerTest.java rename to src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java index 6e029979f..05230bdf3 100644 --- a/src/test/java/com/spotify/reaper/service/RepairRunnerTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/RepairRunnerTest.java @@ -11,12 +11,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.spotify.reaper.service; +package com.spotify.reaper.unit.service; import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperException; import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; @@ -25,6 +26,9 @@ import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.core.RepairUnit; +import com.spotify.reaper.service.RepairRunner; +import com.spotify.reaper.service.RingRange; +import com.spotify.reaper.service.SegmentRunner; import com.spotify.reaper.storage.IStorage; import com.spotify.reaper.storage.MemoryStorage; @@ -54,13 +58,10 @@ public class RepairRunnerTest { - IStorage storage; - @Before public void setUp() throws Exception { RepairRunner.repairRunners.clear(); SegmentRunner.segmentRunners.clear(); - storage = new MemoryStorage(); } @Test @@ -72,37 +73,33 @@ public void noSegmentsTest() throws InterruptedException { final long TIME_START = 42l; final String TEST_CLUSTER = "testcluster"; - IStorage storage = new MemoryStorage(); + AppContext context = new AppContext(); + context.storage = new MemoryStorage(); + context.jmxConnectionFactory = new JmxConnectionFactory(); // place a dummy cluster into storage - storage.addCluster(new Cluster(TEST_CLUSTER, null, Collections.singleton(null))); + context.storage.addCluster(new Cluster(TEST_CLUSTER, null, Collections.singleton(null))); // place a dummy repair run into the storage DateTimeUtils.setCurrentMillisFixed(TIME_CREATION); RepairRun.Builder runBuilder = new RepairRun.Builder(TEST_CLUSTER, CF_ID, DateTime.now(), INTENSITY); - storage.addRepairRun(runBuilder); - storage.addRepairSegments(Collections.emptySet(), RUN_ID); + context.storage.addRepairRun(runBuilder); + context.storage.addRepairSegments(Collections.emptySet(), RUN_ID); // start the repair DateTimeUtils.setCurrentMillisFixed(TIME_START); RepairRunner.initializeThreadPool(1, 3, TimeUnit.HOURS, 30, TimeUnit.SECONDS); - RepairRunner.startRepairRun(storage, RUN_ID, new JmxConnectionFactory() { - @Override - public JmxProxy connect(Optional handler, String host) - throws ReaperException { - return null; - } - }); + RepairRunner.startRepairRun(context, RUN_ID); Thread.sleep(200); // check if the start time was properly set - DateTime startTime = storage.getRepairRun(RUN_ID).get().getStartTime(); + DateTime startTime = context.storage.getRepairRun(RUN_ID).get().getStartTime(); assertNotNull(startTime); assertEquals(TIME_START, startTime.getMillis()); // end time will also be set immediately - DateTime endTime = storage.getRepairRun(RUN_ID).get().getEndTime(); + DateTime endTime = context.storage.getRepairRun(RUN_ID).get().getEndTime(); assertNotNull(endTime); assertEquals(TIME_START, endTime.getMillis()); } @@ -134,7 +131,9 @@ public void testHangingRepair() throws ReaperException, InterruptedException { assertEquals(storage.getRepairSegment(SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); - RepairRunner.startRepairRun(storage, RUN_ID, new JmxConnectionFactory() { + AppContext context = new AppContext(); + context.storage = storage; + context.jmxConnectionFactory = new JmxConnectionFactory() { final AtomicInteger repairAttempts = new AtomicInteger(0); @Override @@ -192,7 +191,8 @@ public void run() { }); return jmx; } - }); + }; + RepairRunner.startRepairRun(context, RUN_ID); // TODO: refactor so that we can properly wait for the repair runner to finish rather than // TODO: using this sleep(). @@ -209,6 +209,8 @@ public void testResumeRepair() throws InterruptedException { final double INTENSITY = 0.5f; final IStorage storage = new MemoryStorage(); + AppContext context = new AppContext(); + context.storage = storage; storage.addCluster(new Cluster(CLUSTER_NAME, null, Collections.singleton(null))); long cf = storage.addRepairUnit( @@ -230,7 +232,7 @@ public void testResumeRepair() throws InterruptedException { assertEquals(storage.getRepairSegment(SEGMENT_ID).get().getState(), RepairSegment.State.NOT_STARTED); - JmxConnectionFactory factory = new JmxConnectionFactory() { + context.jmxConnectionFactory = new JmxConnectionFactory() { @Override public JmxProxy connect(final Optional handler, String host) throws ReaperException { @@ -263,10 +265,10 @@ public void run() { }; assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(RUN_ID).get().getRunState()); - RepairRunner.resumeRunningRepairRuns(storage, factory); + RepairRunner.resumeRunningRepairRuns(context); assertEquals(RepairRun.RunState.NOT_STARTED, storage.getRepairRun(RUN_ID).get().getRunState()); storage.updateRepairRun(run.with().runState(RepairRun.RunState.RUNNING).build(RUN_ID)); - RepairRunner.resumeRunningRepairRuns(storage, factory); + RepairRunner.resumeRunningRepairRuns(context); Thread.sleep(100); assertEquals(RepairRun.RunState.DONE, storage.getRepairRun(RUN_ID).get().getRunState()); } diff --git a/src/test/java/com/spotify/reaper/service/RingRangeTest.java b/src/test/java/com/spotify/reaper/unit/service/RingRangeTest.java similarity index 97% rename from src/test/java/com/spotify/reaper/service/RingRangeTest.java rename to src/test/java/com/spotify/reaper/unit/service/RingRangeTest.java index 0264b76ad..e08489420 100644 --- a/src/test/java/com/spotify/reaper/service/RingRangeTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/RingRangeTest.java @@ -1,4 +1,6 @@ -package com.spotify.reaper.service; +package com.spotify.reaper.unit.service; + +import com.spotify.reaper.service.RingRange; import org.junit.Test; @@ -78,7 +80,6 @@ public void testEncloses() throws Exception { assertFalse(r_190_0.encloses(r_0_0)); } - @Test public void isWrappingTest() { RingRange r_0_0 = new RingRange(BigInteger.valueOf(0l), BigInteger.valueOf(0l)); diff --git a/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java b/src/test/java/com/spotify/reaper/unit/service/SegmentGeneratorTest.java similarity index 92% rename from src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java rename to src/test/java/com/spotify/reaper/unit/service/SegmentGeneratorTest.java index 4dec454c0..9adb3ed4e 100644 --- a/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java +++ b/src/test/java/com/spotify/reaper/unit/service/SegmentGeneratorTest.java @@ -11,18 +11,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.spotify.reaper.service; +package com.spotify.reaper.unit.service; import com.google.common.base.Function; import com.google.common.collect.Lists; + import com.spotify.reaper.ReaperException; +import com.spotify.reaper.service.RingRange; +import com.spotify.reaper.service.SegmentGenerator; + import org.junit.Test; -import javax.annotation.Nullable; import java.math.BigInteger; import java.util.List; -import static org.junit.Assert.*; +import javax.annotation.Nullable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class SegmentGeneratorTest { @@ -46,14 +53,13 @@ public BigInteger apply(String s) { List segments = generator.generateSegments(10, tokens); assertEquals(15, segments.size()); assertEquals("(0,1]", - segments.get(0).toString()); + segments.get(0).toString()); assertEquals("(56713727820156410577229101238628035242,56713727820156410577229101238628035243]", - segments.get(5).toString()); + segments.get(5).toString()); assertEquals( "(113427455640312821154458202477256070484,113427455640312821154458202477256070485]", segments.get(10).toString()); - tokens = Lists.transform( Lists.newArrayList( "5", "6", @@ -71,9 +77,9 @@ public BigInteger apply(String s) { segments = generator.generateSegments(10, tokens); assertEquals(15, segments.size()); assertEquals("(5,6]", - segments.get(0).toString()); + segments.get(0).toString()); assertEquals("(56713727820156410577229101238628035242,56713727820156410577229101238628035243]", - segments.get(5).toString()); + segments.get(5).toString()); assertEquals( "(113427455640312821154458202477256070484,113427455640312821154458202477256070485]", segments.get(10).toString()); @@ -118,9 +124,9 @@ public BigInteger apply(String s) { "(113427455640312821154458202477256070484,113427455640312821154458202477256070485]", segments.get(4).toString()); assertEquals("(5,6]", - segments.get(9).toString()); + segments.get(9).toString()); assertEquals("(56713727820156410577229101238628035242,56713727820156410577229101238628035243]", - segments.get(14).toString()); + segments.get(14).toString()); } @Test(expected = ReaperException.class) diff --git a/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java b/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java new file mode 100644 index 000000000..8ee95642a --- /dev/null +++ b/src/test/java/com/spotify/reaper/unit/service/SegmentRunnerTest.java @@ -0,0 +1,264 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.spotify.reaper.unit.service; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import com.spotify.reaper.AppContext; +import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxConnectionFactory; +import com.spotify.reaper.cassandra.JmxProxy; +import com.spotify.reaper.cassandra.RepairStatusHandler; +import com.spotify.reaper.core.RepairRun; +import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.core.RepairUnit; +import com.spotify.reaper.service.RepairRunner; +import com.spotify.reaper.service.RingRange; +import com.spotify.reaper.service.SegmentRunner; +import com.spotify.reaper.storage.IStorage; +import com.spotify.reaper.storage.MemoryStorage; + +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.commons.lang3.mutable.MutableObject; +import org.joda.time.DateTime; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SegmentRunnerTest { + // TODO: Clean up tests. There's a lot of code duplication across these tests. + + @Before + public void setUp() throws Exception { + RepairRunner.repairRunners.clear(); + SegmentRunner.segmentRunners.clear(); + } + + @Test + public void timeoutTest() throws InterruptedException, ReaperException, ExecutionException { + final AppContext context = new AppContext(); + context.storage = new MemoryStorage(); + RepairUnit cf = context.storage.addRepairUnit( + new RepairUnit.Builder("reaper", "reaper", Sets.newHashSet("reaper"), 1, + RepairParallelism.PARALLEL)); + RepairRun run = context.storage.addRepairRun( + new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); + context.storage.addRepairSegments(Collections.singleton( + new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO), + cf.getId())), run.getId()); + final long segmentId = context.storage.getNextFreeSegment(run.getId()).get().getId(); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final MutableObject> future = new MutableObject<>(); + + context.jmxConnectionFactory = new JmxConnectionFactory() { + @Override + public JmxProxy connect(final Optional handler, String host) + throws ReaperException { + JmxProxy jmx = mock(JmxProxy.class); + when(jmx.getClusterName()).thenReturn("reaper"); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) + .thenReturn(Lists.newArrayList("")); + when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), + Matchers.any(), Sets.newHashSet(anyString()))) + .then(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) { + assertEquals(RepairSegment.State.NOT_STARTED, + context.storage.getRepairSegment(segmentId).get().getState()); + future.setValue(executor.submit(new Thread() { + @Override + public void run() { + handler.get().handle(1, ActiveRepairService.Status.STARTED, + "Repair command 1 has started"); + assertEquals(RepairSegment.State.RUNNING, + context.storage.getRepairSegment(segmentId).get().getState()); + } + })); + return 1; + } + }); + + return jmx; + } + }; + SegmentRunner.triggerRepair(context, segmentId, Collections.singleton(""), 100); + + future.getValue().get(); + executor.shutdown(); + + assertEquals(RepairSegment.State.NOT_STARTED, + context.storage.getRepairSegment(segmentId).get().getState()); + assertEquals(1, context.storage.getRepairSegment(segmentId).get().getFailCount()); + } + + @Test + public void successTest() throws InterruptedException, ReaperException, ExecutionException { + final IStorage storage = new MemoryStorage(); + RepairUnit cf = storage.addRepairUnit( + new RepairUnit.Builder("reaper", "reaper", Sets.newHashSet("reaper"), 1, + RepairParallelism.PARALLEL)); + RepairRun run = storage.addRepairRun( + new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); + storage.addRepairSegments(Collections.singleton( + new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO), + cf.getId())), run.getId()); + final long segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final MutableObject> future = new MutableObject<>(); + + AppContext context = new AppContext(); + context.storage = storage; + context.jmxConnectionFactory = new JmxConnectionFactory() { + @Override + public JmxProxy connect(final Optional handler, String host) + throws ReaperException { + JmxProxy jmx = mock(JmxProxy.class); + when(jmx.getClusterName()).thenReturn("reaper"); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) + .thenReturn(Lists.newArrayList("")); + when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), + Matchers.any(), Sets.newHashSet(anyString()))) + .then(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) { + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(segmentId).get().getState()); + future.setValue(executor.submit(new Runnable() { + @Override + public void run() { + handler.get().handle(1, ActiveRepairService.Status.STARTED, + "Repair command 1 has started"); + assertEquals(RepairSegment.State.RUNNING, + storage.getRepairSegment(segmentId).get().getState()); + // report about an unrelated repair. Shouldn't affect anything. + handler.get().handle(2, ActiveRepairService.Status.SESSION_FAILED, + "Repair command 2 has failed"); + handler.get().handle(1, ActiveRepairService.Status.SESSION_SUCCESS, + "Repair session succeeded in command 1"); + assertEquals(RepairSegment.State.RUNNING, + storage.getRepairSegment(segmentId).get().getState()); + handler.get().handle(1, ActiveRepairService.Status.FINISHED, + "Repair command 1 has finished"); + assertEquals(RepairSegment.State.DONE, + storage.getRepairSegment(segmentId).get().getState()); + } + })); + return 1; + } + }); + + return jmx; + } + }; + SegmentRunner.triggerRepair(context, segmentId, Collections.singleton(""), 1000); + + future.getValue().get(); + executor.shutdown(); + + assertEquals(RepairSegment.State.DONE, storage.getRepairSegment(segmentId).get().getState()); + assertEquals(0, storage.getRepairSegment(segmentId).get().getFailCount()); + } + + @Test + public void failureTest() throws InterruptedException, ReaperException, ExecutionException { + final IStorage storage = new MemoryStorage(); + RepairUnit cf = + storage.addRepairUnit( + new RepairUnit.Builder("reaper", "reaper", Sets.newHashSet("reaper"), 1, + RepairParallelism.PARALLEL)); + RepairRun run = storage.addRepairRun( + new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5)); + storage.addRepairSegments(Collections.singleton( + new RepairSegment.Builder(run.getId(), new RingRange(BigInteger.ONE, BigInteger.ZERO), + cf.getId())), run.getId()); + final long segmentId = storage.getNextFreeSegment(run.getId()).get().getId(); + + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final MutableObject> future = new MutableObject<>(); + + AppContext context = new AppContext(); + context.storage = storage; + context.jmxConnectionFactory = new JmxConnectionFactory() { + @Override + public JmxProxy connect(final Optional handler, String host) + throws ReaperException { + JmxProxy jmx = mock(JmxProxy.class); + when(jmx.getClusterName()).thenReturn("reaper"); + when(jmx.isConnectionAlive()).thenReturn(true); + when(jmx.tokenRangeToEndpoint(anyString(), any(RingRange.class))) + .thenReturn(Lists.newArrayList("")); + when(jmx.triggerRepair(any(BigInteger.class), any(BigInteger.class), anyString(), + Matchers.any(), Sets.newHashSet(anyString()))) + .then(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) { + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(segmentId).get().getState()); + future.setValue(executor.submit(new Runnable() { + @Override + public void run() { + handler.get().handle(1, ActiveRepairService.Status.STARTED, + "Repair command 1 has started"); + assertEquals(RepairSegment.State.RUNNING, + storage.getRepairSegment(segmentId).get().getState()); + handler.get().handle(1, ActiveRepairService.Status.SESSION_SUCCESS, + "Repair session succeeded in command 1"); + assertEquals(RepairSegment.State.RUNNING, + storage.getRepairSegment(segmentId).get().getState()); + handler.get().handle(1, ActiveRepairService.Status.SESSION_FAILED, + "Repair command 1 has failed"); + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(segmentId).get().getState()); + } + })); + + return 1; + } + }); + + return jmx; + } + }; + SegmentRunner.triggerRepair(context, segmentId, Collections.singleton(""), 1000); + + future.getValue().get(); + executor.shutdown(); + + assertEquals(RepairSegment.State.NOT_STARTED, + storage.getRepairSegment(segmentId).get().getState()); + assertEquals(1, storage.getRepairSegment(segmentId).get().getFailCount()); + } +} diff --git a/src/test/resources/cassandra-reaper-at.yaml b/src/test/resources/cassandra-reaper-at.yaml new file mode 100644 index 000000000..704fa487f --- /dev/null +++ b/src/test/resources/cassandra-reaper-at.yaml @@ -0,0 +1,66 @@ +# Cassandra Reaper Configuration for Acceptance Tests. +# +segmentCount: 200 +repairParallelism: SEQUENTIAL +repairIntensity: 0.95 +repairRunThreadCount: 15 +hangingRepairTimeoutMins: 1 +storageType: memory + +logging: + level: DEBUG + loggers: + io.dropwizard: INFO + org.eclipse.jetty: INFO + appenders: + - type: console + +server: + type: default + applicationConnectors: + - type: http + port: 8083 + bindHost: 127.0.0.1 + adminConnectors: + - type: http + port: 8084 + bindHost: 127.0.0.1 + +# database section will be ignored if storageType is set to "memory" +database: + # the name of your JDBC driver + driverClass: org.postgresql.Driver + + # the username + user: reaper + + # the password + password: murmur3partitioner + + # the JDBC URL + url: jdbc:postgresql://awseu3-reaperdbtest-a1.bases.cloud.spotify.net/reaper_db + + # any properties specific to your JDBC driver: + properties: + charSet: UTF-8 + + # the maximum amount of time to wait on an empty pool before throwing an exception + maxWaitForConnection: 1s + + # the SQL query to run when validating a connection's liveness + validationQuery: "SELECT 1" + + # the minimum number of connections to keep open + minSize: 8 + + # the maximum number of connections to keep open + maxSize: 32 + + # whether or not idle connections should be validated + checkConnectionWhileIdle: true + + # the amount of time to sleep between runs of the idle connection validation, abandoned cleaner and idle pool resizing + evictionInterval: 10s + + # the minimum amount of time an connection must sit idle in the pool before it is eligible for eviction + minIdleTime: 1 minute diff --git a/src/test/resources/cassandra-reaper.yaml b/src/test/resources/cassandra-reaper.yaml index 307787cb0..ebbee3dfb 100644 --- a/src/test/resources/cassandra-reaper.yaml +++ b/src/test/resources/cassandra-reaper.yaml @@ -39,7 +39,8 @@ server: ## The example below is from: ## https://dropwizard.github.io/dropwizard/manual/jdbi.html#usage -## This is not used or needed, if storageType is "memory" +# database section will be ignored if storageType is set to "memory" +# database: # the name of your JDBC driver driverClass: org.postgresql.Driver diff --git a/src/test/resources/com.spotify.reaper.acceptance/basic_reaper_functionality.feature b/src/test/resources/com.spotify.reaper.acceptance/basic_reaper_functionality.feature new file mode 100644 index 000000000..e7de3c80f --- /dev/null +++ b/src/test/resources/com.spotify.reaper.acceptance/basic_reaper_functionality.feature @@ -0,0 +1,10 @@ +Feature: Using Reaper to launch repairs + + Background: + Given a reaper service is running + + Scenario: Registering a cluster + Given that we are going to use "127.0.0.1" as cluster seed host + And reaper has no cluster with name "testcluster" in storage + When an add-cluster request is made to reaper + Then reaper has a cluster called "testcluster" in storage From 20f4d43592547475c01620f4cb22946c6dc02024 Mon Sep 17 00:00:00 2001 From: Radovan Zvoncek Date: Mon, 2 Feb 2015 18:25:25 +0100 Subject: [PATCH 06/10] Include last event info in a RepairRun(Status) Conflicts: src/main/java/com/spotify/reaper/service/RepairRunner.java src/main/java/com/spotify/reaper/service/SegmentRunner.java --- .../com/spotify/reaper/core/RepairRun.java | 13 +++++++++++ .../resources/view/RepairRunStatus.java | 4 ++++ .../spotify/reaper/service/RepairRunner.java | 15 ++++++++---- .../spotify/reaper/service/SegmentRunner.java | 23 +++++++++++++++---- 4 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index b4a02fa31..691299458 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -33,6 +33,7 @@ public class RepairRun { private final DateTime endTime; private final DateTime pauseTime; private final double intensity; + private final String lastEvent; private RepairRun(Builder builder, long id) { this.id = id; @@ -46,6 +47,7 @@ private RepairRun(Builder builder, long id) { this.endTime = builder.endTime; this.pauseTime = builder.pauseTime; this.intensity = builder.intensity; + this.lastEvent = builder.lastEvent; } public long getId() { @@ -92,6 +94,10 @@ public double getIntensity() { return intensity; } + public String getLastEvent() { + return lastEvent; + } + public Builder with() { return new Builder(this); } @@ -116,6 +122,7 @@ public static class Builder { private DateTime startTime; private DateTime endTime; private DateTime pauseTime; + private String lastEvent = "Nothing happened yet"; public Builder(String clusterName, long repairUnitId, DateTime creationTime, double intensity) { @@ -137,6 +144,7 @@ private Builder(RepairRun original) { startTime = original.startTime; endTime = original.endTime; pauseTime = original.pauseTime; + lastEvent = original.lastEvent; } public Builder runState(RunState runState) { @@ -179,6 +187,11 @@ public Builder pauseTime(DateTime pauseTime) { return this; } + public Builder lastEvent(String event) { + this.lastEvent = event; + return this; + } + public RepairRun build(long id) { return new RepairRun(this, id); } diff --git a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java index f08a45f92..557054118 100644 --- a/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java +++ b/src/main/java/com/spotify/reaper/resources/view/RepairRunStatus.java @@ -76,6 +76,9 @@ public class RepairRunStatus { @JsonProperty("segments_repaired") private int segmentsRepaired = 0; + @JsonProperty("last_event") + private final String lastEvent; + public RepairRunStatus(RepairRun repairRun, RepairUnit repairUnit) { this.id = repairRun.getId(); this.cause = repairRun.getCause(); @@ -91,6 +94,7 @@ public RepairRunStatus(RepairRun repairRun, RepairUnit repairUnit) { this.intensity = roundIntensity(repairRun.getIntensity()); this.segmentCount = repairUnit.getSegmentCount(); this.repairParallelism = repairUnit.getRepairParallelism().name().toLowerCase(); + this.lastEvent = repairRun.getLastEvent(); } @VisibleForTesting diff --git a/src/main/java/com/spotify/reaper/service/RepairRunner.java b/src/main/java/com/spotify/reaper/service/RepairRunner.java index 8b5e221c2..2e02fd091 100644 --- a/src/main/java/com/spotify/reaper/service/RepairRunner.java +++ b/src/main/java/com/spotify/reaper/service/RepairRunner.java @@ -183,9 +183,10 @@ private void end() { LOG.info("Repairs for repair run #{} done", repairRunId); RepairRun repairRun = context.storage.getRepairRun(repairRunId).get(); boolean success = context.storage.updateRepairRun(repairRun.with() - .runState(RepairRun.RunState.DONE) - .endTime(DateTime.now()) - .build(repairRun.getId())); + .runState(RepairRun.RunState.DONE) + .endTime(DateTime.now()) + .lastEvent("All done") + .build(repairRun.getId())); if (!success) { LOG.error("failed updating repair run " + repairRun.getId()); } @@ -264,7 +265,13 @@ private void handleResult(long segmentId) { break; case DONE: // Successful repair - executor.schedule(this, intensityBasedDelayMillis(segment), TimeUnit.MILLISECONDS); + long delay = intensityBasedDelayMillis(segment); + executor.schedule(this, delay, TimeUnit.MILLISECONDS); + String event = String.format("Waiting %ds because of intensity based delay", delay / 1000); + RepairRun updatedRepairRun = + context.storage.getRepairRun(repairRunId).get().with().lastEvent(event) + .build(repairRunId); + context.storage.updateRepairRun(updatedRepairRun); break; default: // Another thread has started a new repair on this segment already diff --git a/src/main/java/com/spotify/reaper/service/SegmentRunner.java b/src/main/java/com/spotify/reaper/service/SegmentRunner.java index 3c995aae3..79de75296 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentRunner.java +++ b/src/main/java/com/spotify/reaper/service/SegmentRunner.java @@ -21,6 +21,7 @@ import com.spotify.reaper.ReaperException; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.cassandra.RepairStatusHandler; +import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.core.RepairUnit; @@ -95,6 +96,7 @@ public int getCurrentCommandId() { private void runRepair(Collection potentialCoordinators, long timeoutMillis) { final RepairSegment segment = context.storage.getRepairSegment(segmentId).get(); + final RepairRun repairRun = context.storage.getRepairRun(segment.getRunId()).get(); try (JmxProxy coordinator = context.jmxConnectionFactory .connectAny(Optional.of(this), potentialCoordinators)) { RepairUnit repairUnit = context.storage.getRepairUnit(segment.getRepairUnitId()).get(); @@ -114,9 +116,12 @@ private void runRepair(Collection potentialCoordinators, long timeoutMil .coordinatorHost(coordinator.getHost()) .repairCommandId(commandId) .build(segmentId)); - LOG.info("Repair for segment {} started, status wait will timeout in {} millis", - segmentId, timeoutMillis); - + String eventMsg = String.format("Triggered repair of segment %d via host %s", + segment.getId(), coordinator.getHost()); + context.storage.updateRepairRun( + repairRun.with().lastEvent(eventMsg).build(repairRun.getId())); + LOG.info("Repair for segment {} started, status wait will timeout in {} millis", segmentId, + timeoutMillis); try { condition.await(timeoutMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -141,6 +146,8 @@ private void runRepair(Collection potentialCoordinators, long timeoutMil } } catch (ReaperException e) { LOG.warn("Failed to connect to a coordinator node for segment {}", segmentId); + String msg = String.format("Postponed because couldn't any of the coordinators"); + context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId())); postpone(segment); } } @@ -153,15 +160,23 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator) try (JmxProxy hostProxy = context.jmxConnectionFactory.connect(hostName)) { LOG.debug("checking host '{}' for pending compactions and other repairs (can repair?)" + " Run id '{}'", hostName, segment.getRunId()); - if (hostProxy.getPendingCompactions() > MAX_PENDING_COMPACTIONS) { + int pendingCompactions = hostProxy.getPendingCompactions(); + if (pendingCompactions > MAX_PENDING_COMPACTIONS) { LOG.warn("SegmentRunner declined to repair segment {} because of too many pending " + "compactions (> {}) on host \"{}\"", segmentId, MAX_PENDING_COMPACTIONS, hostProxy.getHost()); + String msg = String.format("Postponed due to pending compactions (%d)", + pendingCompactions); + RepairRun repairRun = context.storage.getRepairRun(segment.getRunId()).get(); + context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId())); return false; } if (hostProxy.isRepairRunning()) { LOG.warn("SegmentRunner declined to repair segment {} because one of the hosts ({}) was " + "already involved in a repair", segmentId, hostProxy.getHost()); + String msg = String.format("Postponed due to affected hosts already doing repairs"); + RepairRun repairRun = context.storage.getRepairRun(segment.getRunId()).get(); + context.storage.updateRepairRun(repairRun.with().lastEvent(msg).build(repairRun.getId())); return false; } } From 991682ac60035a02ee73a6e24406d53c2f07e1d1 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Tue, 3 Feb 2015 17:04:23 +0100 Subject: [PATCH 07/10] add last_event to database model from repair run --- src/main/db/reaper_db.sql | 3 ++- .../reaper/storage/postgresql/IStoragePostgreSQL.java | 6 +++--- .../spotify/reaper/storage/postgresql/RepairRunMapper.java | 1 + src/test/resources/cassandra-reaper-at.yaml | 6 +++--- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index 7985f6086..cd111da4a 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -46,7 +46,8 @@ CREATE TABLE IF NOT EXISTS "repair_run" ( "start_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, "end_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, "pause_time" TIMESTAMP WITH TIME ZONE DEFAULT NULL, - "intensity" REAL NOT NULL + "intensity" REAL NOT NULL, + "last_event" TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS "repair_segment" ( diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java index c305d5eb5..dde990a32 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java @@ -53,16 +53,16 @@ public interface IStoragePostgreSQL { // static final String SQL_REPAIR_RUN_ALL_FIELDS_NO_ID = "cluster_name, repair_unit_id, cause, owner, state, creation_time, " - + "start_time, end_time, pause_time, intensity"; + + "start_time, end_time, pause_time, intensity, last_event"; static final String SQL_REPAIR_RUN_ALL_FIELDS = "id, " + SQL_REPAIR_RUN_ALL_FIELDS_NO_ID; static final String SQL_INSERT_REPAIR_RUN = "INSERT INTO repair_run (" + SQL_REPAIR_RUN_ALL_FIELDS_NO_ID + ") VALUES " + "(:clusterName, :repairUnitId, :cause, :owner, :runState, :creationTime, " - + ":startTime, :endTime, :pauseTime, :intensity)"; + + ":startTime, :endTime, :pauseTime, :intensity, :lastEvent)"; static final String SQL_UPDATE_REPAIR_RUN = "UPDATE repair_run SET cause = :cause, owner = :owner, state = :runState, " + "start_time = :startTime, end_time = :endTime, pause_time = :pauseTime, " - + "intensity = :intensity WHERE id = :id"; + + "intensity = :intensity, last_event = :lastEvent WHERE id = :id"; static final String SQL_GET_REPAIR_RUN = "SELECT " + SQL_REPAIR_RUN_ALL_FIELDS + " FROM repair_run WHERE id = :id"; static final String SQL_GET_REPAIR_RUNS_FOR_CLUSTER = diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java index 524746101..7a3d11724 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java @@ -48,6 +48,7 @@ public RepairRun map(int index, ResultSet r, StatementContext ctx) throws SQLExc .startTime(getDateTimeOrNull(r, "start_time")) .endTime(getDateTimeOrNull(r, "end_time")) .pauseTime(getDateTimeOrNull(r, "pause_time")) + .lastEvent(r.getString("last_event")) .build(r.getLong("id")); } diff --git a/src/test/resources/cassandra-reaper-at.yaml b/src/test/resources/cassandra-reaper-at.yaml index 704fa487f..93de358db 100644 --- a/src/test/resources/cassandra-reaper-at.yaml +++ b/src/test/resources/cassandra-reaper-at.yaml @@ -32,13 +32,13 @@ database: driverClass: org.postgresql.Driver # the username - user: reaper + user: reaper-user # the password - password: murmur3partitioner + password: reaper-secret # the JDBC URL - url: jdbc:postgresql://awseu3-reaperdbtest-a1.bases.cloud.spotify.net/reaper_db + url: jdbc:postgresql://127.0.0.1:5432/reaper_db # any properties specific to your JDBC driver: properties: From 81f12625ceb539e09254f80394da1536eb77d92e Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Wed, 4 Feb 2015 13:52:52 +0100 Subject: [PATCH 08/10] some refactoring on acceptance tests --- pom.xml | 2 +- src/main/db/reaper_db.sql | 2 +- .../com/spotify/reaper/ReaperApplication.java | 6 ++++ .../spotify/reaper/acceptance/BasicSteps.java | 30 ++++++++++------- .../acceptance/ReaperTestJettyRunner.java | 32 +++++++++---------- .../reaper/acceptance/TestContext.java | 2 +- 6 files changed, 43 insertions(+), 31 deletions(-) diff --git a/pom.xml b/pom.xml index b5cff04ad..e90cebb29 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ UTF-8 0.7.1 2.0.12 - 1.1.3 + 1.1.5 diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index cd111da4a..b14770bda 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -18,7 +18,7 @@ CREATE TABLE IF NOT EXISTS "cluster" ( "partitioner" TEXT NOT NULL, "seed_hosts" TEXT[] NOT NULL ); -§ + -- Repair unit is basically a keyspace with a set of column families. -- Cassandra supports repairing multiple column families in one go. -- diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index d069fb361..9fd14cdae 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -15,6 +15,7 @@ import com.google.common.annotations.VisibleForTesting; +import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.resources.ClusterResource; import com.spotify.reaper.resources.PingResource; import com.spotify.reaper.resources.ReaperHealthCheck; @@ -98,6 +99,11 @@ public void run(ReaperApplicationConfiguration config, LOG.info("storage already given in context, not initializing a new one"); } + if (context.jmxConnectionFactory == null) { + LOG.info("no JMX connection factory given in context, creating default"); + context.jmxConnectionFactory = new JmxConnectionFactory(); + } + LOG.info("creating and registering health checks"); // Notice that health checks are registered under the admin application on /healthcheck final ReaperHealthCheck healthCheck = new ReaperHealthCheck(context); diff --git a/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java b/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java index 006854add..bcf556006 100644 --- a/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java +++ b/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java @@ -3,6 +3,9 @@ import com.google.common.base.Optional; import com.google.common.collect.Maps; +import com.spotify.reaper.AppContext; +import com.sun.jersey.api.client.ClientResponse; + import java.util.Map; import javax.ws.rs.core.Response; @@ -13,6 +16,8 @@ import cucumber.api.java.en.Then; import cucumber.api.java.en.When; +import static org.junit.Assert.assertEquals; + /** * Basic acceptance test (Cucumber) steps. */ @@ -20,13 +25,19 @@ public class BasicSteps { @Before public static void setup() throws Exception { - ReaperTestJettyRunner.setup(); + AppContext context = new AppContext(); + ReaperTestJettyRunner.setup(context); + } + + public void callAndExpect(String httpMethod, String callPath, + Optional> params, Response.Status expectedStatus) { + ClientResponse response = ReaperTestJettyRunner.callReaper(httpMethod, callPath, params); + assertEquals(expectedStatus.getStatusCode(), response.getStatus()); } @Given("^a reaper service is running$") public void a_reaper_service_is_running() throws Throwable { - ReaperTestJettyRunner.callAndExpect( - "GET", "/ping", Optional.>absent(), Response.Status.OK); + callAndExpect("GET", "/ping", Optional.>absent(), Response.Status.OK); } @Given("^that we are going to use \"([^\"]*)\" as cluster seed host$") @@ -36,24 +47,21 @@ public void that_we_are_going_to_use_as_cluster_seed_host(String seedHost) throw @And("^reaper has no cluster with name \"([^\"]*)\" in storage$") public void reaper_has_no_cluster_with_name_in_storage(String clusterName) throws Throwable { - ReaperTestJettyRunner.callAndExpect( - "GET", "/cluster/" + clusterName, - Optional.>absent(), Response.Status.NOT_FOUND); + callAndExpect("GET", "/cluster/" + clusterName, + Optional.>absent(), Response.Status.NOT_FOUND); } @When("^an add-cluster request is made to reaper$") public void an_add_cluster_request_is_made_to_reaper() throws Throwable { Map params = Maps.newHashMap(); params.put("seedHost", TestContext.SEED_HOST); - ReaperTestJettyRunner.callAndExpect( - "POST", "/cluster", Optional.of(params), Response.Status.CREATED); + callAndExpect("POST", "/cluster", Optional.of(params), Response.Status.CREATED); } @Then("^reaper has a cluster called \"([^\"]*)\" in storage$") public void reaper_has_a_cluster_called_in_storage(String clusterName) throws Throwable { - ReaperTestJettyRunner.callAndExpect( - "GET", "/cluster/" + clusterName, - Optional.>absent(), Response.Status.OK.getStatusCode()); + callAndExpect("GET", "/cluster/" + clusterName, + Optional.>absent(), Response.Status.OK); } } diff --git a/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java b/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java index acc1a28da..e0f670ff0 100644 --- a/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java +++ b/src/test/java/com/spotify/reaper/acceptance/ReaperTestJettyRunner.java @@ -5,6 +5,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; +import com.spotify.reaper.AppContext; import com.spotify.reaper.ReaperApplication; import com.spotify.reaper.ReaperApplicationConfiguration; import com.sun.jersey.api.client.Client; @@ -22,15 +23,11 @@ import java.net.URL; import java.util.Map; -import javax.ws.rs.core.Response; - import io.dropwizard.cli.ServerCommand; import io.dropwizard.lifecycle.ServerLifecycleListener; import io.dropwizard.setup.Bootstrap; import io.dropwizard.setup.Environment; -import static org.junit.Assert.assertEquals; - /** * Simple Reaper application runner for testing purposes. * Starts a Jetty server that wraps Reaper application, @@ -42,11 +39,11 @@ public class ReaperTestJettyRunner { private static ReaperTestJettyRunner runnerInstance; - public static void setup() throws Exception { + public static void setup(AppContext testContext) throws Exception { if (runnerInstance == null) { String testConfigPath = Resources.getResource("cassandra-reaper-at.yaml").getPath(); LOG.info("initializing ReaperTestJettyRunner with config in path: " + testConfigPath); - runnerInstance = new ReaperTestJettyRunner(testConfigPath); + runnerInstance = new ReaperTestJettyRunner(testConfigPath, testContext); runnerInstance.start(); // Stop the testing Reaper after tests are finished. Runtime.getRuntime().addShutdownHook(new Thread() { @@ -58,8 +55,8 @@ public void run() { } } - public static void callAndExpect(String httpMethod, String urlPath, - Optional> params, int statusCode) { + public static ClientResponse callReaper(String httpMethod, String urlPath, + Optional> params) { assert runnerInstance != null : "service not initialized, call setup() first"; String reaperBase = "http://localhost:" + runnerInstance.getLocalPort() + "/"; URI uri; @@ -84,21 +81,17 @@ public static void callAndExpect(String httpMethod, String urlPath, } else { throw new RuntimeException("Invalid HTTP method: " + httpMethod); } - - assertEquals(statusCode, response.getStatus()); - } - - public static void callAndExpect(String httpMethod, String url, - Optional> params, Response.Status status) { - callAndExpect(httpMethod, url, params, status.getStatusCode()); + return response; } private final String configPath; private Server jettyServer; + private AppContext context; - public ReaperTestJettyRunner(String configPath) { + public ReaperTestJettyRunner(String configPath, AppContext context) { this.configPath = configPath; + this.context = context; } public void start() { @@ -106,7 +99,12 @@ public void start() { return; } try { - ReaperApplication reaper = new ReaperApplication(); + ReaperApplication reaper; + if (context != null) { + reaper = new ReaperApplication(context); + } else { + reaper = new ReaperApplication(); + } final Bootstrap bootstrap = new Bootstrap(reaper) { diff --git a/src/test/java/com/spotify/reaper/acceptance/TestContext.java b/src/test/java/com/spotify/reaper/acceptance/TestContext.java index 50bcb9977..6d61e0b93 100644 --- a/src/test/java/com/spotify/reaper/acceptance/TestContext.java +++ b/src/test/java/com/spotify/reaper/acceptance/TestContext.java @@ -1,7 +1,7 @@ package com.spotify.reaper.acceptance; public class TestContext { - public static String SEED_HOST; + public static String SEED_HOST; } From d5e7204d09454e61a4aadc2328d55fd1de71fb9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Wed, 4 Feb 2015 16:33:06 +0100 Subject: [PATCH 09/10] Mock a JmxProxy that can report cluster name in acceptance test --- .../com/spotify/reaper/acceptance/BasicSteps.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java b/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java index bcf556006..e6e51d08d 100644 --- a/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java +++ b/src/test/java/com/spotify/reaper/acceptance/BasicSteps.java @@ -4,6 +4,10 @@ import com.google.common.collect.Maps; import com.spotify.reaper.AppContext; +import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxConnectionFactory; +import com.spotify.reaper.cassandra.JmxProxy; +import com.spotify.reaper.cassandra.RepairStatusHandler; import com.sun.jersey.api.client.ClientResponse; import java.util.Map; @@ -17,6 +21,8 @@ import cucumber.api.java.en.When; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Basic acceptance test (Cucumber) steps. @@ -26,6 +32,15 @@ public class BasicSteps { @Before public static void setup() throws Exception { AppContext context = new AppContext(); + context.jmxConnectionFactory = new JmxConnectionFactory() { + @Override + public JmxProxy connect(Optional handler, String host) + throws ReaperException { + JmxProxy jmx = mock(JmxProxy.class); + when(jmx.getClusterName()).thenReturn("testcluster"); + return jmx; + } + }; ReaperTestJettyRunner.setup(context); } From 0f24d15dca2b8a01a9778a06211d2102fd4e97cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Hegerfors?= Date: Wed, 4 Feb 2015 17:26:21 +0100 Subject: [PATCH 10/10] Include cucumber test in junit tests --- pom.xml | 2 +- .../reaper/acceptance/AddClusterTest.java | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 src/test/java/com/spotify/reaper/acceptance/AddClusterTest.java diff --git a/pom.xml b/pom.xml index e90cebb29..83139c321 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ junit junit - 4.8.1 + 4.11 test diff --git a/src/test/java/com/spotify/reaper/acceptance/AddClusterTest.java b/src/test/java/com/spotify/reaper/acceptance/AddClusterTest.java new file mode 100644 index 000000000..4555d380f --- /dev/null +++ b/src/test/java/com/spotify/reaper/acceptance/AddClusterTest.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.spotify.reaper.acceptance; + +import org.junit.runner.RunWith; + +import cucumber.api.CucumberOptions; +import cucumber.api.junit.Cucumber; + +@RunWith(Cucumber.class) +@CucumberOptions( + features = "classpath:com.spotify.reaper.acceptance/basic_reaper_functionality.feature" +) +public class AddClusterTest { +}