diff --git a/src/main/java/com/spotify/reaper/core/Cluster.java b/src/main/java/com/spotify/reaper/core/Cluster.java index 3d54979ee..b27b99e20 100644 --- a/src/main/java/com/spotify/reaper/core/Cluster.java +++ b/src/main/java/com/spotify/reaper/core/Cluster.java @@ -28,21 +28,18 @@ private Cluster(Builder builder) { public static class Builder { - private final String name; - private String partitioner; - private Set seedHosts; + public final String name; + public final String partitioner; + private final Set seedHosts; - public Builder(String name) { + public Builder(String name, String partitioner, Set seedHosts) { this.name = name; - } - - public Builder partitioner(String partitioner) { this.partitioner = partitioner; - return this; + this.seedHosts = seedHosts; } - public Builder seedHosts(Set seedHosts) { - this.seedHosts = seedHosts; + public Builder addSeedHost(String seedHost) { + seedHosts.add(seedHost); return this; } diff --git a/src/main/java/com/spotify/reaper/core/ColumnFamily.java b/src/main/java/com/spotify/reaper/core/ColumnFamily.java index 87fd78ec6..81cf7732c 100644 --- a/src/main/java/com/spotify/reaper/core/ColumnFamily.java +++ b/src/main/java/com/spotify/reaper/core/ColumnFamily.java @@ -2,22 +2,17 @@ public class ColumnFamily { - private Long id; + private final long id; private final Cluster cluster; private final String keyspaceName; private final String name; private final int segmentCount; // int/long/BigInteger? private final boolean snapshotRepair; - public Long getId() { + public long getId() { return id; } - public void setId(long id) { - assert this.id == null : "cannot reset id after once set"; - this.id = id; - } - public Cluster getCluster() { return cluster; } @@ -38,8 +33,8 @@ public boolean isSnapshotRepair() { return snapshotRepair; } - private ColumnFamily(Builder builder) { - this.id = builder.id; + private ColumnFamily(Builder builder, long id) { + this.id = id; this.cluster = builder.cluster; this.keyspaceName = builder.keyspaceName; this.name = builder.name; @@ -50,46 +45,23 @@ private ColumnFamily(Builder builder) { public static class Builder { - private Long id; - private Cluster cluster; - private String keyspaceName; - private String name; - private int segmentCount; - private boolean snapshotRepair; + public final Cluster cluster; + public final String keyspaceName; + public final String name; + public final int segmentCount; + public final boolean snapshotRepair; - public Builder id(long id) { - this.id = id; - return this; - } - - public Builder cluster(Cluster cluster) { + public Builder(Cluster cluster, String keyspaceName, String name, int segmentCount, + boolean snapshotRepair) { this.cluster = cluster; - return this; - } - - public Builder keyspaceName(String keyspaceName) { this.keyspaceName = keyspaceName; - return this; - } - - public Builder name(String name) { this.name = name; - return this; - } - - public Builder segmentCount(int segmentCount) { this.segmentCount = segmentCount; - return this; - } - - public Builder snapshotRepair(boolean snapshotRepair) { this.snapshotRepair = snapshotRepair; - return this; } - - public ColumnFamily build() { - return new ColumnFamily(this); + public ColumnFamily build(long id) { + return new ColumnFamily(this, id); } } } diff --git a/src/main/java/com/spotify/reaper/core/RepairRun.java b/src/main/java/com/spotify/reaper/core/RepairRun.java index 95e4cabcf..3fe50f786 100644 --- a/src/main/java/com/spotify/reaper/core/RepairRun.java +++ b/src/main/java/com/spotify/reaper/core/RepairRun.java @@ -4,7 +4,7 @@ public class RepairRun { - private Long id; + private final long id; // IDEA: maybe we want to have start and stop token for parallel runners on same repair run? //private final long startToken; @@ -18,15 +18,10 @@ public class RepairRun { private final DateTime endTime; private final double intensity; - public Long getId() { + public long getId() { return id; } - public void setId(long id) { - assert this.id == null : "cannot reset id after once set"; - this.id = id; - } - public String getCause() { return cause; } @@ -62,8 +57,8 @@ public enum State { PAUSED } - private RepairRun(Builder builder) { - this.id = builder.id; + private RepairRun(Builder builder, long id) { + this.id = id; this.cause = builder.cause; this.owner = builder.owner; this.state = builder.state; @@ -75,18 +70,18 @@ private RepairRun(Builder builder) { public static class Builder { - private Long id; + public final State state; + public final DateTime creationTime; + public final double intensity; private String cause; private String owner; - private State state; - private DateTime creationTime; private DateTime startTime; private DateTime endTime; - private double intensity; - public Builder id(long id) { - this.id = id; - return this; + public Builder(State state, DateTime creationTime, double intensity) { + this.state = state; + this.creationTime = creationTime; + this.intensity = intensity; } public Builder cause(String cause) { @@ -99,16 +94,6 @@ public Builder owner(String owner) { return this; } - public Builder state(State state) { - this.state = state; - return this; - } - - public Builder creationTime(DateTime creationTime) { - this.creationTime = creationTime; - return this; - } - public Builder startTime(DateTime startTime) { this.startTime = startTime; return this; @@ -119,13 +104,8 @@ public Builder endTime(DateTime endTime) { return this; } - public Builder intensity(double intensity) { - this.intensity = intensity; - return this; - } - - public RepairRun build() { - return new RepairRun(this); + public RepairRun build(long id) { + return new RepairRun(this, id); } } } diff --git a/src/main/java/com/spotify/reaper/core/RepairSegment.java b/src/main/java/com/spotify/reaper/core/RepairSegment.java index ec99dc628..dce6fc9c8 100644 --- a/src/main/java/com/spotify/reaper/core/RepairSegment.java +++ b/src/main/java/com/spotify/reaper/core/RepairSegment.java @@ -6,7 +6,7 @@ public class RepairSegment { - private Long id; + private final long id; private final ColumnFamily columnFamily; private final long runID; private final BigInteger startToken; // open/exclusive @@ -15,15 +15,10 @@ public class RepairSegment { private final DateTime startTime; private final DateTime endTime; - public Long getId() { + public long getId() { return id; } - public void setId(long id) { - assert this.id == null : "cannot reset id after once set"; - this.id = id; - } - public ColumnFamily getColumnFamily() { return columnFamily; } @@ -58,8 +53,8 @@ public enum State { DONE } - private RepairSegment(Builder builder) { - this.id = builder.id; + private RepairSegment(Builder builder, long id) { + this.id = id; this.columnFamily = builder.columnFamily; this.runID = builder.runID; this.startToken = builder.startToken; @@ -71,43 +66,20 @@ private RepairSegment(Builder builder) { public static class Builder { - private Long id; - private ColumnFamily columnFamily; - private long runID; - private BigInteger startToken; - private BigInteger endToken; - private RepairSegment.State state; + public final ColumnFamily columnFamily; + public final long runID; + public final BigInteger startToken; + public final BigInteger endToken; + public final State state; private DateTime startTime; private DateTime endTime; - public Builder id(long id) { - this.id = id; - return this; - } - - public Builder columnFamily(ColumnFamily columnFamily) { + public Builder(ColumnFamily columnFamily, long runID, BigInteger startToken, BigInteger endToken, State state) { this.columnFamily = columnFamily; - return this; - } - - public Builder runID(long runID) { this.runID = runID; - return this; - } - - public Builder startToken(BigInteger startToken) { this.startToken = startToken; - return this; - } - - public Builder endToken(BigInteger endToken) { - this.endToken = endToken; - return this; - } - - public Builder state(RepairSegment.State state) { + this. endToken = endToken; this.state = state; - return this; } public Builder startTime(DateTime startTime) { @@ -120,14 +92,13 @@ public Builder endTime(DateTime endTime) { return this; } - - public RepairSegment build() { - return new RepairSegment(this); + public RepairSegment build(long id) { + return new RepairSegment(this, id); } - } - @Override - public String toString() { - return String.format("(%s,%s]", startToken.toString(), endToken.toString()); + @Override + public String toString() { + return String.format("(%s,%s]", startToken.toString(), endToken.toString()); + } } } diff --git a/src/main/java/com/spotify/reaper/resources/ClusterResource.java b/src/main/java/com/spotify/reaper/resources/ClusterResource.java index 3fe440533..5be3423c5 100644 --- a/src/main/java/com/spotify/reaper/resources/ClusterResource.java +++ b/src/main/java/com/spotify/reaper/resources/ClusterResource.java @@ -99,9 +99,9 @@ public static Cluster createClusterWithSeedHost(String seedHost) e.printStackTrace(); throw e; } - Cluster newCluster = new Cluster.Builder(clusterName) - .seedHosts(Collections.singleton(seedHost)) - .partitioner(partitioner).build(); + Cluster newCluster = + new Cluster.Builder(clusterName, partitioner, Collections.singleton(seedHost)) + .build(); return newCluster; } diff --git a/src/main/java/com/spotify/reaper/resources/TableResource.java b/src/main/java/com/spotify/reaper/resources/TableResource.java index e5be47eb6..308801028 100644 --- a/src/main/java/com/spotify/reaper/resources/TableResource.java +++ b/src/main/java/com/spotify/reaper/resources/TableResource.java @@ -115,18 +115,13 @@ public Response addTable(@Context UriInfo uriInfo, } // TODO: verify that the table exists in the cluster. - ColumnFamily newTable = new ColumnFamily.Builder() - .cluster(targetCluster) - .keyspaceName(keyspace.get()) - .name(table.get()) - .keyspaceName(keyspace.get()) - .snapshotRepair(config.getSnapshotRepair()) - .segmentCount(config.getSegmentCount()) - .build(); + ColumnFamily newTable = storage.addColumnFamily( + new ColumnFamily.Builder(targetCluster, keyspace.get(), table.get(), + config.getSegmentCount(), config.getSnapshotRepair())); String newTablePathPart = newTable.getCluster().getName() + "/" + newTable.getKeyspaceName() + "/" + newTable.getName(); - if (!storage.addColumnFamily(newTable)) { + if (newTable == null) { return Response.status(500) .entity("failed creating table into Reaper storage: " + newTablePathPart).build(); } @@ -144,21 +139,19 @@ public Response addTable(@Context UriInfo uriInfo, // Start repairing the table if the startRepair query parameter is given at all, // i.e. possible value not checked, and not required. if (startRepair.isPresent()) { - RepairRun newRepairRun = new RepairRun.Builder() - .cause(cause.isPresent() ? cause.get() : "no cause specified") - .owner(owner.get()) - .intensity(config.getRepairIntensity()) - .state(RepairRun.State.NOT_STARTED) - .creationTime(DateTime.now()) - .build(); - if (!storage.addRepairRun(newRepairRun)) { + RepairRun newRepairRun = + storage.addRepairRun(new RepairRun.Builder(RepairRun.State.NOT_STARTED, DateTime.now(), + config.getRepairIntensity()) + .cause(cause.isPresent() ? cause.get() : "no cause specified") + .owner(owner.get())); + if (newRepairRun == null) { return Response.status(500) .entity("failed creating repair run into Reaper storage for owner: " + owner.get()) .build(); } // create segments - List segments = null; + List segments = null; try { SegmentGenerator sg = new SegmentGenerator(targetCluster.getPartitioner()); Set seedHosts = targetCluster.getSeedHosts(); diff --git a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java index 560129fc4..dec0bbbb8 100644 --- a/src/main/java/com/spotify/reaper/service/SegmentGenerator.java +++ b/src/main/java/com/spotify/reaper/service/SegmentGenerator.java @@ -48,12 +48,13 @@ public SegmentGenerator(String partitioner) throws ReaperException { * @param ringTokens list of all start tokens in a cluster. They have to be in ring order. * @return a list containing at least {@code totalSegmentCount} repair segments. */ - public List generateSegments(int totalSegmentCount, List ringTokens, - long runId, ColumnFamily table) + public List generateSegments(int totalSegmentCount, + List ringTokens, + long runId, ColumnFamily table) throws ReaperException { int tokenRangeCount = ringTokens.size(); - List repairSegments = Lists.newArrayList(); + List repairSegments = Lists.newArrayList(); for (int i = 0; i < tokenRangeCount; i++) { BigInteger start = ringTokens.get(i); BigInteger stop = ringTokens.get((i + 1) % tokenRangeCount); @@ -97,13 +98,10 @@ public List generateSegments(int totalSegmentCount, List generateSegments(int totalSegmentCount, List newSegments); + boolean addRepairSegments(Collection newSegments); - public boolean updateRepairSegment(RepairSegment newRepairSegment); + boolean updateRepairSegment(RepairSegment newRepairSegment); - public RepairSegment getNextFreeSegment(long runId); + RepairSegment getNextFreeSegment(long runId); - public RepairSegment getNextFreeSegmentInRange(long runId, long start, long end); + RepairSegment getNextFreeSegmentInRange(long runId, long start, long end); } diff --git a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java index 1cf9185e2..0624e402f 100644 --- a/src/main/java/com/spotify/reaper/storage/MemoryStorage.java +++ b/src/main/java/com/spotify/reaper/storage/MemoryStorage.java @@ -22,16 +22,47 @@ public class MemoryStorage implements IStorage { private Map clusters = Maps.newHashMap(); private Map repairRuns = Maps.newHashMap(); private Map columnFamilies = Maps.newHashMap(); + private Map columnFamiliesByName = Maps.newHashMap(); + + public static class TableName { + public final String cluster; + public final String keyspace; + public final String table; + + public TableName(String cluster, String keyspace, String table) { + this.cluster = cluster; + this.keyspace = keyspace; + this.table = table; + } + + @Override + public boolean equals(Object other) { + if (other instanceof TableName) { + return + cluster.equals(((TableName) other).cluster) && + keyspace.equals(((TableName) other).keyspace) && + table.equals(((TableName) other).table); + } else { + return false; + } + } + + @Override + public int hashCode() { + return (cluster + keyspace + table).hashCode(); + } + } @Override - public boolean addCluster(Cluster cluster) { - clusters.put(cluster.getName(), cluster); - return true; + public Cluster addCluster(Cluster cluster) { + Cluster existing = clusters.putIfAbsent(cluster.getName(), cluster); + return existing == null ? cluster : null; } @Override - public boolean updateCluster(Cluster cluster) { - return addCluster(cluster); + public Cluster updateCluster(Cluster newCluster) { + clusters.put(newCluster.getName(), newCluster); + return newCluster; } @Override @@ -40,11 +71,10 @@ public Cluster getCluster(String clusterName) { } @Override - public boolean addRepairRun(RepairRun newRepairRun) { - assert newRepairRun.getId() == null : "new RepairRun instance must NOT have ID set"; - newRepairRun.setId(REPAIR_RUN_ID.incrementAndGet()); + public RepairRun addRepairRun(RepairRun.Builder repairRun) { + RepairRun newRepairRun = repairRun.build(REPAIR_RUN_ID.incrementAndGet()); repairRuns.put(newRepairRun.getId(), newRepairRun); - return true; + return newRepairRun; } @Override @@ -53,11 +83,22 @@ public RepairRun getRepairRun(long id) { } @Override - public boolean addColumnFamily(ColumnFamily newColumnFamily) { - assert newColumnFamily.getId() == null : "new ColumnFamily instance must NOT have ID set"; - newColumnFamily.setId(COLUMN_FAMILY_ID.incrementAndGet()); - columnFamilies.put(newColumnFamily.getId(), newColumnFamily); - return true; + public ColumnFamily addColumnFamily(ColumnFamily.Builder columnFamily) { + ColumnFamily + existing = + getColumnFamily(columnFamily.cluster.getName(), columnFamily.keyspaceName, + columnFamily.name); + if (existing == null) { + ColumnFamily newColumnFamily = columnFamily.build(COLUMN_FAMILY_ID.incrementAndGet()); + columnFamilies.put(newColumnFamily.getId(), newColumnFamily); + columnFamiliesByName + .put(new TableName(newColumnFamily.getCluster().getName(), + newColumnFamily.getKeyspaceName(), + newColumnFamily.getName()), newColumnFamily); + return newColumnFamily; + } else { + return null; + } } @Override @@ -65,6 +106,11 @@ public ColumnFamily getColumnFamily(long id) { return columnFamilies.get(id); } + @Override + public ColumnFamily getColumnFamily(String cluster, String keyspace, String table) { + return columnFamiliesByName.get(new TableName(cluster, keyspace, table)); + } + @Override public boolean addRepairSegments(Collection newSegments) { // TODO: diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index d43857cb7..81fb31269 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -49,35 +49,35 @@ public Cluster getCluster(String clusterName) { } @Override - public boolean addRepairRun(RepairRun newRepairRun) { + public RepairRun addRepairRun(RepairRun.Builder newRepairRun) { // TODO: implementation - return false; + return null; } @Override - public boolean addCluster(Cluster newCluster) { + public Cluster addCluster(Cluster newCluster) { Handle h = jdbi.open(); IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); int rowsAdded = postgres.insertCluster(newCluster); h.close(); if (rowsAdded < 1) { LOG.warn("failed inserting cluster with name: {}", newCluster.getName()); - return false; + return null; } - return true; + return newCluster; } @Override - public boolean updateCluster(Cluster cluster) { + public Cluster updateCluster(Cluster cluster) { Handle h = jdbi.open(); IStoragePostgreSQL postgres = h.attach(IStoragePostgreSQL.class); int rowsAdded = postgres.updateCluster(cluster); h.close(); if (rowsAdded < 1) { LOG.warn("failed updating cluster with name: {}", cluster.getName()); - return false; + return null; } - return true; + return cluster; } @Override @@ -87,9 +87,9 @@ public RepairRun getRepairRun(long id) { } @Override - public boolean addColumnFamily(ColumnFamily newTable) { + public ColumnFamily addColumnFamily(ColumnFamily.Builder newTable) { // TODO: implementation - return false; + return null; } @Override @@ -98,6 +98,11 @@ public ColumnFamily getColumnFamily(long id) { return null; } + @Override + public ColumnFamily getColumnFamily(String cluster, String keyspace, String table) { + return null; + } + @Override public boolean addRepairSegments(Collection newSegments) { // TODO: implementation diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java index 20f33fc0a..e3b04f979 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/ClusterMapper.java @@ -1,5 +1,7 @@ package com.spotify.reaper.storage.postgresql; +import com.google.common.collect.Sets; + import com.spotify.reaper.core.Cluster; import org.skife.jdbi.v2.StatementContext; @@ -14,10 +16,8 @@ public class ClusterMapper implements ResultSetMapper { public Cluster map(int index, ResultSet r, StatementContext ctx) throws SQLException { String[] seedHosts = (String[]) r.getArray("seed_hosts").getArray(); - return new Cluster.Builder(r.getString("name")) - .partitioner(r.getString("partitioner")) - .seedHosts(new HashSet(Arrays.asList(seedHosts))) - .build(); + return new Cluster.Builder(r.getString("name"), r.getString("partitioner"), + Sets.newHashSet(Arrays.asList(seedHosts))).build(); } } diff --git a/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java b/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java index d7362fc03..8d6f592cc 100644 --- a/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java +++ b/src/test/java/com/spotify/reaper/service/SegmentGeneratorTest.java @@ -34,7 +34,7 @@ public BigInteger apply(@Nullable String s) { ); SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner"); - List segments = generator.generateSegments(10, tokens, -1, null); + List segments = generator.generateSegments(10, tokens, -1, null); assertEquals(15, segments.size()); assertEquals("(0,1]", segments.get(0).toString()); @@ -101,7 +101,7 @@ public BigInteger apply(@Nullable String s) { }); SegmentGenerator generator = new SegmentGenerator("foo.bar.RandomPartitioner"); - List segments = generator.generateSegments(10, tokens, -1, null); + List segments = generator.generateSegments(10, tokens, -1, null); assertEquals(15, segments.size()); assertEquals("(113427455640312821154458202477256070484,113427455640312821154458202477256070485]", segments.get(4).toString());