From a84ee3fbc049bfcce169158ff1ca35329ff4f609 Mon Sep 17 00:00:00 2001 From: Hannu Varjoranta Date: Wed, 3 Dec 2014 16:55:42 +0100 Subject: [PATCH] add first implementation of table resource, removed strategy --- src/main/db/reaper_db.sql | 1 - .../com/spotify/reaper/ReaperApplication.java | 2 +- .../ReaperApplicationConfiguration.java | 15 --- .../java/com/spotify/reaper/core/Cluster.java | 21 ++-- .../com/spotify/reaper/core/ColumnFamily.java | 19 +--- .../com/spotify/reaper/core/RepairRun.java | 5 + .../reaper/resources/ClusterResource.java | 49 +++++---- .../reaper/resources/RepairRunResource.java | 41 +++++++ .../reaper/resources/TableResource.java | 103 +++++++++++++++++- .../reaper/service/IRepairStrategy.java | 11 -- .../reaper/service/SegmentGenerator.java | 2 - .../service/SequentialRepairStrategy.java | 11 -- .../com/spotify/reaper/storage/IStorage.java | 12 ++ .../spotify/reaper/storage/MemoryStorage.java | 28 ++++- .../reaper/storage/PostgresStorage.java | 35 ++++++ .../storage/postgresql/ClusterMapper.java | 3 +- src/test/resources/cassandra-reaper.yaml | 1 - 17 files changed, 259 insertions(+), 100 deletions(-) create mode 100644 src/main/java/com/spotify/reaper/resources/RepairRunResource.java delete mode 100644 src/main/java/com/spotify/reaper/service/IRepairStrategy.java delete mode 100644 src/main/java/com/spotify/reaper/service/SequentialRepairStrategy.java diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index b17bcdde3..91bf45169 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -18,7 +18,6 @@ CREATE TABLE IF NOT EXISTS "column_family" ( "cluster_name" TEXT NOT NULL REFERENCES "cluster" ("name"), "keyspace_name" TEXT NOT NULL, "name" TEXT NOT NULL, - "strategy" TEXT NOT NULL, "segment_count" INT NOT NULL, "snapshot_repair" BOOLEAN NOT NULL ); diff --git a/src/main/java/com/spotify/reaper/ReaperApplication.java b/src/main/java/com/spotify/reaper/ReaperApplication.java index f7dc956bb..894478901 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplication.java +++ b/src/main/java/com/spotify/reaper/ReaperApplication.java @@ -40,7 +40,7 @@ public void run(ReaperApplicationConfiguration config, final PingResource pingResource = new PingResource(); final ClusterResource addClusterResource = new ClusterResource(storage); - final TableResource addTableResource = new TableResource(storage); + final TableResource addTableResource = new TableResource(config, storage); environment.jersey().register(pingResource); environment.jersey().register(addClusterResource); diff --git a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java index 64210168d..f98f122fc 100644 --- a/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java +++ b/src/main/java/com/spotify/reaper/ReaperApplicationConfiguration.java @@ -14,9 +14,6 @@ public class ReaperApplicationConfiguration extends Configuration { private int segmentCount; - @NotEmpty - private String repairStrategy; - private boolean snapshotRepair; private double repairIntensity; @@ -39,18 +36,6 @@ public void setSegmentCount(int segmentCount) { this.segmentCount = segmentCount; } - - @JsonProperty - public String getRepairStrategy() { - return repairStrategy; - } - - @JsonProperty - public void setRepairStrategy(String repairStrategy) { - this.repairStrategy = repairStrategy; - } - - @JsonProperty public boolean getSnapshotRepair() { return snapshotRepair; diff --git a/src/main/java/com/spotify/reaper/core/Cluster.java b/src/main/java/com/spotify/reaper/core/Cluster.java index 711107d30..3d54979ee 100644 --- a/src/main/java/com/spotify/reaper/core/Cluster.java +++ b/src/main/java/com/spotify/reaper/core/Cluster.java @@ -8,37 +8,36 @@ public class Cluster { private final String partitioner; // Name of the partitioner class private final Set seedHosts; - public String getPartitioner() { - return partitioner; - } - public String getName() { return name; } + public String getPartitioner() { + return partitioner; + } + public Set getSeedHosts() { return seedHosts; } private Cluster(Builder builder) { - this.partitioner = builder.partitioner; this.name = builder.name; + this.partitioner = builder.partitioner; this.seedHosts = builder.seedHosts; } public static class Builder { + private final String name; private String partitioner; - private String name; private Set seedHosts; - public Builder partitioner(String partitioner) { - this.partitioner = partitioner; - return this; + public Builder(String name) { + this.name = name; } - public Builder name(String name) { - this.name = name; + public Builder partitioner(String partitioner) { + this.partitioner = partitioner; return this; } diff --git a/src/main/java/com/spotify/reaper/core/ColumnFamily.java b/src/main/java/com/spotify/reaper/core/ColumnFamily.java index 37434a5f2..7a881e813 100644 --- a/src/main/java/com/spotify/reaper/core/ColumnFamily.java +++ b/src/main/java/com/spotify/reaper/core/ColumnFamily.java @@ -1,13 +1,11 @@ package com.spotify.reaper.core; -import com.spotify.reaper.service.IRepairStrategy; - public class ColumnFamily { + private Long id; private final Cluster cluster; private final String keyspaceName; private final String name; - private final IRepairStrategy strategy; private final int segmentCount; // int/long/BigInteger? private final boolean snapshotRepair; @@ -31,10 +29,6 @@ public String getName() { return name; } - public IRepairStrategy getStrategy() { - return strategy; - } - public int getSegmentCount() { return segmentCount; } @@ -43,24 +37,22 @@ public boolean isSnapshotRepair() { return snapshotRepair; } - private ColumnFamily(Builder builder) - { + private ColumnFamily(Builder builder) { this.id = builder.id; this.cluster = builder.cluster; this.keyspaceName = builder.keyspaceName; this.name = builder.name; - this.strategy = builder.strategy; this.segmentCount = builder.segmentCount; this.snapshotRepair = builder.snapshotRepair; } public static class Builder { + private Long id; private Cluster cluster; private String keyspaceName; private String name; - private IRepairStrategy strategy; private int segmentCount; private boolean snapshotRepair; @@ -84,11 +76,6 @@ public Builder name(String name) { return this; } - public Builder strategy(IRepairStrategy strategy) { - this.strategy = strategy; - return this; - } - public Builder segmentCount(int segmentCount) { this.segmentCount = segmentCount; return this; diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 345c99428..c27267303 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -7,6 +7,11 @@ public class RepairRun { private Long id; + + // IDEA: maybe we want to have start and stop token for parallel runners on same repair run? + //private final long startToken; + //private final long endToken; + private final String cause; private final String owner; private final State state; diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index 389b1ffda..3e1eeefdf 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -51,40 +51,49 @@ public Response addCluster(@Context UriInfo uriInfo, @QueryParam("host") Optiona LOG.error("POST on cluster resource called without host"); return Response.status(400).entity("query parameter \"host\" required").build(); } - LOG.info("add cluster called with host: {}", host); + LOG.info("add cluster called with host: {}", host.get()); - String clusterName; - String partitioner; + Cluster newCluster; try { - JmxProxy jmxProxy = JmxProxy.connect(host.get()); - clusterName = jmxProxy.getClusterName(); - partitioner = jmxProxy.getPartitioner(); - jmxProxy.close(); + newCluster = createClusterWithSeedHost(host.get()); } catch (ReaperException e) { - String errMsg = "failed to get cluster info from seed host: " + host.get(); - LOG.error(errMsg); - e.printStackTrace(); - return Response.status(400).entity(errMsg).build(); + return Response.status(400) + .entity("failed to create cluster with seed host: " + host.get()).build(); } + storage.addCluster(newCluster); + URI createdURI = null; try { - createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), clusterName)).toURI(); + createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), newCluster.getName())).toURI(); } catch (Exception e) { - String errMsg = "failed creating target URI for cluster: " + clusterName; + String errMsg = "failed creating target URI for cluster: " + newCluster.getName(); LOG.error(errMsg); e.printStackTrace(); return Response.status(400).entity(errMsg).build(); } - storage.addCluster(new Cluster.Builder() - .name(clusterName) - .seedHosts(Collections.singleton(host.get())) - .partitioner(partitioner) - .build()); + return Response.created(createdURI).entity(newCluster).build(); + } - String replyMsg = "cluster with name \"" + clusterName + "\" created"; - return Response.created(createdURI).entity(replyMsg).build(); + public static Cluster createClusterWithSeedHost(String seedHost) + throws ReaperException { + String clusterName; + String partitioner; + try { + JmxProxy jmxProxy = JmxProxy.connect(seedHost); + clusterName = jmxProxy.getClusterName(); + partitioner = jmxProxy.getPartitioner(); + jmxProxy.close(); + } catch (ReaperException e) { + LOG.error("failed to create cluster with seed host: " + seedHost); + e.printStackTrace(); + throw e; + } + Cluster newCluster = new Cluster.Builder(clusterName) + .seedHosts(Collections.singleton(seedHost)) + .partitioner(partitioner).build(); + return newCluster; } } diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java new file mode 100644 index 000000000..7b73ef5af --- /dev/null +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -0,0 +1,41 @@ +package com.spotify.reaper.resources; + +import com.google.common.base.Optional; + +import com.spotify.reaper.storage.IStorage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Path("/repair_run") +@Produces(MediaType.APPLICATION_JSON) +public class RepairRunResource { + + private static final Logger LOG = LoggerFactory.getLogger(RepairRunResource.class); + + private final IStorage storage; + + public RepairRunResource(IStorage storage) { + this.storage = storage; + } + + @GET + @Path("/{id}") + public Response getCluster(@PathParam("id") Long repairRunId) { + LOG.info("get repair_run called with: id = {}", repairRunId); + return Response.ok().entity("not implemented yet").build(); + } + + // We probably don't want to create repair runs with this resource, + // but actually only by posting the cluster resource. + // Get here is used only for providing visibility to what is going on with the run. +} diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java index 426487a70..972b14313 100644 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ b/src/main/java/com/spotify/reaper/resources/TableResource.java @@ -2,20 +2,29 @@ import com.google.common.base.Optional; +import com.spotify.reaper.ReaperApplicationConfiguration; +import com.spotify.reaper.ReaperException; +import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.Cluster; +import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.storage.IStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.URI; +import java.net.URL; + import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriInfo; @Path("/table") @Produces(MediaType.APPLICATION_JSON) @@ -24,9 +33,11 @@ public class TableResource { private static final Logger LOG = LoggerFactory.getLogger(TableResource.class); private final IStorage storage; + private final ReaperApplicationConfiguration config; - public TableResource(IStorage storage) { + public TableResource(ReaperApplicationConfiguration config, IStorage storage) { this.storage = storage; + this.config = config; } @GET @@ -40,13 +51,93 @@ public Response getCluster(@PathParam("clusterName") String clusterName, } @POST - public Response addTable(@QueryParam("clusterName") Optional clusterName, - @QueryParam("seedHost") Optional seedHost, - @QueryParam("keyspace") Optional keyspace, - @QueryParam("table") Optional table) { + public Response addTable(@Context UriInfo uriInfo, + @QueryParam("clusterName") Optional clusterName, + @QueryParam("seedHost") Optional seedHost, + @QueryParam("keyspace") Optional keyspace, + @QueryParam("table") Optional table, + @QueryParam("startRepair") Optional startRepair) { LOG.info("add table called with: clusterName = {}, seedHost = {}, keyspace = {}, table = {}", clusterName, seedHost, keyspace, table); - return Response.ok().entity("not implemented yet").build(); + + if (!keyspace.isPresent()) { + return Response.status(400) + .entity("Query parameter \"keyspace\" required").build(); + } + if (!table.isPresent()) { + return Response.status(400) + .entity("Query parameter \"table\" required").build(); + } + + Cluster targetCluster; + if (seedHost.isPresent()) { + try { + targetCluster = ClusterResource.createClusterWithSeedHost(seedHost.get()); + } catch (ReaperException e) { + e.printStackTrace(); + return Response.status(400) + .entity("failed creating cluster with seed host: " + seedHost.get()).build(); + } + Cluster existingCluster = storage.getCluster(targetCluster.getName()); + if (null == existingCluster) { + LOG.info("creating new cluster based on given seed host: {}", seedHost); + storage.addCluster(targetCluster); + } + if (!existingCluster.equals(targetCluster)) { + LOG.info("cluster information has changed for cluster: {}", targetCluster.getName()); + storage.updateCluster(targetCluster); + } + } else if (clusterName.isPresent()) { + targetCluster = storage.getCluster(JmxProxy.toSymbolicName(clusterName.get())); + if (null == targetCluster) { + return Response.status(404) + .entity("cluster \"" + clusterName + "\" does not exist").build(); + } + } else { + return Response.status(400) + .entity("Query parameter \"clusterName\" or \"seedHost\" required").build(); + } + + ColumnFamily newTable = new ColumnFamily.Builder() + .cluster(targetCluster) + .keyspaceName(keyspace.get()) + .name(table.get()) + .keyspaceName(keyspace.get()) + .snapshotRepair(config.getSnapshotRepair()) + .segmentCount(config.getSegmentCount()) + .build(); + + String newTablePathPart = newTable.getCluster().getName() + "/" + newTable + .getKeyspaceName() + "/" + newTable.getName(); + if (!storage.addColumnFamily(newTable)) { + return Response.status(500) + .entity("failed creating table: " + newTablePathPart).build(); + } + + URI createdURI = null; + try { + createdURI = (new URL(uriInfo.getAbsolutePath().toURL(), newTablePathPart)).toURI(); + } catch (Exception e) { + String errMsg = "failed creating target URI for new table: " + newTablePathPart; + LOG.error(errMsg); + e.printStackTrace(); + return Response.status(400).entity(errMsg).build(); + } + + // If startRepair query parameter is given at all, i.e. value not checked. + if (startRepair.isPresent()) { + // TODO: + // create repair run + // create and store segments + // initialize segment states + // store repair run + // create new runner for the run + // start the runner and return pointer to new RepairRun + // runner holds open jmx proxy to update segment states + // runner checks storage after every segment, if run state has changed (paused etc.) + } + + return Response.created(createdURI).entity(newTable).build(); } } diff --git a/src/main/java/com/spotify/reaper/service/IRepairStrategy.java b/src/main/java/com/spotify/reaper/service/IRepairStrategy.java deleted file mode 100644 index 6788b3132..000000000 --- a/src/main/java/com/spotify/reaper/service/IRepairStrategy.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.spotify.reaper.service; - -/** - * Interface for implementing a repair strategy for cassandra-reaper. - * - * Repair strategy is basically a sorting function for repair segments - * selected from a column family. - */ -public interface IRepairStrategy { - -} diff --git a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java index 6fc16235d..db217a8bf 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java +++ b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java @@ -13,8 +13,6 @@ /** * Splits given Cassandra table's (column family's) token range into RepairSegments. - * - * The run order of RepairSegments in RepairRun defines the RepairStrategy. */ public class SegmentGenerator { diff --git a/src/main/java/com/spotify/reaper/service/SequentialRepairStrategy.java b/src/main/java/com/spotify/reaper/service/SequentialRepairStrategy.java deleted file mode 100644 index c1a6e8ebd..000000000 --- a/src/main/java/com/spotify/reaper/service/SequentialRepairStrategy.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.spotify.reaper.service; - -/** - * Implements a sequential repair strategy for cassandra-reaper. - * - * Sequential repair strategy will pick the repair segments in sequential - * (round-robin) order one-by-one. - */ -public class SequentialRepairStrategy { - -} diff --git a/src/main/java/com/spotify/reaper/storage/IStorage.java b/src/main/java/com/spotify/reaper/storage/IStorage.java index b4e01944b..88c005581 100644 --- a/src/main/java/com/spotify/reaper/storage/IStorage.java +++ b/src/main/java/com/spotify/reaper/storage/IStorage.java @@ -1,7 +1,9 @@ package com.spotify.reaper.storage; import com.spotify.reaper.core.Cluster; +import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.core.RepairRun; +import com.spotify.reaper.core.RepairSegment; /** * API definition for cassandra-reaper. @@ -10,10 +12,20 @@ public interface IStorage { public Cluster addCluster(Cluster cluster); + public Cluster updateCluster(Cluster cluster); + public Cluster getCluster(String clusterName); public RepairRun addRepairRun(RepairRun repairRun); public RepairRun getRepairRun(long id); + public boolean addColumnFamily(ColumnFamily newTable); + + public ColumnFamily getColumnFamily(long id); + + public RepairSegment getNextFreeSegment(long runId); + + // IDEA: should we have something like this for parallel runners on same ring? + //public RepairSegment getNextFreeSegmentInRange(long runId, long start, long end); } diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 797643b18..1bf01cf8f 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -3,10 +3,10 @@ import com.google.common.collect.Maps; import com.spotify.reaper.core.Cluster; +import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.core.RepairRun; +import com.spotify.reaper.core.RepairSegment; -import java.util.Collection; -import java.util.HashMap; import java.util.Map; /** @@ -16,13 +16,18 @@ public class MemoryStorage implements IStorage { private Map clusters = Maps.newHashMap(); private Map repairRuns = Maps.newHashMap(); - + private Map columnFamilies = Maps.newHashMap(); @Override public Cluster addCluster(Cluster cluster) { return clusters.put(cluster.getName(), cluster); } + @Override + public Cluster updateCluster(Cluster cluster) { + return addCluster(cluster); + } + @Override public Cluster getCluster(String clusterName) { return clusters.get(clusterName); @@ -37,4 +42,21 @@ public RepairRun addRepairRun(RepairRun repairRun) { public RepairRun getRepairRun(long id) { return repairRuns.get(id); } + + @Override + public boolean addColumnFamily(ColumnFamily columnFamily) { + columnFamilies.put(columnFamily.getId(), columnFamily); + return true; + } + + @Override + public ColumnFamily getColumnFamily(long id) { + return columnFamilies.get(id); + } + + @Override + public RepairSegment getNextFreeSegment(long runId) { + // TODO: + return null; + } } diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index 0a542b59a..be1e70acd 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -3,7 +3,9 @@ import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; import com.spotify.reaper.core.Cluster; +import com.spotify.reaper.core.ColumnFamily; import com.spotify.reaper.core.RepairRun; +import com.spotify.reaper.core.RepairSegment; import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL; import org.skife.jdbi.v2.DBI; @@ -60,6 +62,23 @@ public Cluster addCluster(Cluster newCluster) { return result; } + @Override + public Cluster updateCluster(Cluster cluster) { + Handle h = jdbi.open(); + IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); + int rowsAdded = postgres.updateCluster(cluster); + Cluster result; + if (rowsAdded < 1) { + LOG.warn("failed updating cluster with name: {}", cluster.getName()); + result = null; + } + else { + result = postgres.getCluster(cluster.getName()); + } + h.close(); + return result; + } + @Override public RepairRun addRepairRun(RepairRun repairRun) { return null; @@ -69,4 +88,20 @@ public RepairRun addRepairRun(RepairRun repairRun) { public RepairRun getRepairRun(long id) { return null; } + + @Override + public boolean addColumnFamily(ColumnFamily newTable) { + return false; + } + + @Override + public ColumnFamily getColumnFamily(long id) { + return null; + } + + @Override + public RepairSegment getNextFreeSegment(long runId) { + // TODO: implementation + return null; + } } diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java index 694c6a248..20f33fc0a 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java @@ -14,8 +14,7 @@ public class ClusterMapper implements ResultSetMapper { public Cluster map(int index, ResultSet r, StatementContext ctx) throws SQLException { String[] seedHosts = (String[]) r.getArray("seed_hosts").getArray(); - return new Cluster.Builder() - .name(r.getString("name")) + return new Cluster.Builder(r.getString("name")) .partitioner(r.getString("partitioner")) .seedHosts(new HashSet(Arrays.asList(seedHosts))) .build(); diff --git a/src/test/resources/cassandra-reaper.yaml b/src/test/resources/cassandra-reaper.yaml index a66682c60..3a36170cf 100644 --- a/src/test/resources/cassandra-reaper.yaml +++ b/src/test/resources/cassandra-reaper.yaml @@ -1,5 +1,4 @@ segmentCount: 100 -repairStrategy: com.spotify.reaper.service.SequentialRepairStrategy snapshotRepair: false repairIntensity: 0.75