diff --git a/src/main/db/reaper_db.sql b/src/main/db/reaper_db.sql index f4a203c0c..47621508f 100644 --- a/src/main/db/reaper_db.sql +++ b/src/main/db/reaper_db.sql @@ -30,6 +30,9 @@ CREATE TABLE IF NOT EXISTS "repair_unit" ( "segment_count" INT NOT NULL, "snapshot_repair" BOOLEAN NOT NULL ); +-- Using GIN index to make @> (contains) type of array operations faster +CREATE INDEX repair_unit_column_families_gin_idx + ON repair_unit USING GIN (column_families); CREATE TABLE IF NOT EXISTS "repair_run" ( "id" SERIAL PRIMARY KEY, diff --git a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java index 617a9a629..254ab6c58 100644 --- a/src/main/java/com/spotify/reaper/resources/RepairRunResource.java +++ b/src/main/java/com/spotify/reaper/resources/RepairRunResource.java @@ -23,11 +23,10 @@ import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.cassandra.JmxProxy; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.resources.view.RepairRunStatus; -import com.spotify.reaper.cassandra.JmxConnectionFactory; import com.spotify.reaper.service.RepairRunner; import com.spotify.reaper.service.RingRange; import com.spotify.reaper.service.SegmentGenerator; @@ -414,10 +413,7 @@ private void storeNewRepairSegments(List tokenSegments, RepairRun rep repairUnit.getId()); repairSegmentBuilders.add(repairSegment); } - boolean success = storage.addRepairSegments(repairSegmentBuilders, repairRun.getId()); - if (!success) { - throw new ReaperException("failed adding repair segments to storage"); - } + storage.addRepairSegments(repairSegmentBuilders, repairRun.getId()); if (repairUnit.getSegmentCount() != tokenSegments.size()) { LOG.debug("created segment amount differs from expected default {} != {}", repairUnit.getSegmentCount(), tokenSegments.size()); diff --git a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java index efb80276b..47c28dc50 100644 --- a/src/main/java/com/spotify/reaper/storage/PostgresStorage.java +++ b/src/main/java/com/spotify/reaper/storage/PostgresStorage.java @@ -13,33 +13,28 @@ */ package com.spotify.reaper.storage; +import com.google.common.base.Optional; import com.spotify.reaper.ReaperApplicationConfiguration; import com.spotify.reaper.ReaperException; import com.spotify.reaper.core.Cluster; -import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.core.RepairRun; import com.spotify.reaper.core.RepairSegment; +import com.spotify.reaper.core.RepairUnit; import com.spotify.reaper.service.RingRange; -import com.spotify.reaper.storage.postgresql.BigIntegerArgumentFactory; -import com.spotify.reaper.storage.postgresql.IStoragePostgreSQL; -import com.spotify.reaper.storage.postgresql.PostgresArrayArgumentFactory; -import com.spotify.reaper.storage.postgresql.RunStateArgumentFactory; -import com.spotify.reaper.storage.postgresql.StateArgumentFactory; - +import com.spotify.reaper.storage.postgresql.*; +import io.dropwizard.jdbi.DBIFactory; +import io.dropwizard.setup.Environment; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import javax.swing.text.html.Option; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import javax.annotation.Nullable; - -import io.dropwizard.jdbi.DBIFactory; -import io.dropwizard.setup.Environment; - /** * Implements the StorageAPI using PostgreSQL database. */ @@ -69,12 +64,12 @@ private static IStoragePostgreSQL getPostgresStorage(Handle h) { } @Override - public Cluster getCluster(String clusterName) { + public Optional getCluster(String clusterName) { Cluster result; try (Handle h = jdbi.open()) { result = getPostgresStorage(h).getCluster(clusterName); } - return result; + return result == null ? Optional.absent() : Optional.of(result); } @Override @@ -99,7 +94,7 @@ public Collection getClusters() { } @Override - public Cluster addCluster(Cluster newCluster) { + public boolean addCluster(Cluster newCluster) { Cluster result = null; try (Handle h = jdbi.open()) { int rowsAdded = getPostgresStorage(h).insertCluster(newCluster); @@ -109,7 +104,7 @@ public Cluster addCluster(Cluster newCluster) { result = newCluster; // no created id, as cluster name used for primary key } } - return result; + return result != null; } @Override @@ -127,12 +122,12 @@ public boolean updateCluster(Cluster cluster) { } @Override - public RepairRun getRepairRun(long id) { + public Optional getRepairRun(long id) { RepairRun result; try (Handle h = jdbi.open()) { result = getPostgresStorage(h).getRepairRun(id); } - return result; + return result == null ? Optional.absent() : Optional.of(result); } @Override @@ -178,32 +173,32 @@ public boolean updateRepairRun(RepairRun repairRun) { } @Override - public RepairUnit addColumnFamily(RepairUnit.Builder newColumnFamily) { - RepairUnit result; + public RepairUnit addRepairUnit(RepairUnit.Builder newRepairUnit) { + long insertedId; try (Handle h = jdbi.open()) { - long insertedId = getPostgresStorage(h).insertColumnFamily(newColumnFamily.build(-1)); - result = newColumnFamily.build(insertedId); + insertedId = getPostgresStorage(h).insertRepairUnit(newRepairUnit.build(-1)); } - return result; + return newRepairUnit.build(insertedId); } @Override - public RepairUnit getColumnFamily(long id) { + public Optional getRepairUnit(long id) { RepairUnit result; try (Handle h = jdbi.open()) { - result = getPostgresStorage(h).getColumnFamily(id); + result = getPostgresStorage(h).getRepairUnit(id); } - return result; + return result == null ? Optional.absent() : Optional.of(result); } @Override - public RepairUnit getColumnFamily(String clusterName, String keyspaceName, String tableName) { + public Optional getRepairUnit(String clusterName, String keyspaceName, + Collection columnFamilies) { RepairUnit result; try (Handle h = jdbi.open()) { IStoragePostgreSQL storage = getPostgresStorage(h); - result = storage.getColumnFamilyByClusterAndName(clusterName, keyspaceName, tableName); + result = storage.getRepairUnitByClusterAndTables(clusterName, keyspaceName, columnFamilies); } - return result; + return result == null ? Optional.absent() : Optional.of(result); } @Override @@ -232,48 +227,36 @@ public boolean updateRepairSegment(RepairSegment repairSegment) { } @Override - public RepairSegment getRepairSegment(long id) { + public Optional getRepairSegment(long id) { RepairSegment result; try (Handle h = jdbi.open()) { result = getPostgresStorage(h).getRepairSegment(id); } - return result; + return result == null ? Optional.absent() : Optional.of(result); } @Override - public RepairSegment getNextFreeSegment(long runId) { + public Optional getNextFreeSegment(long runId) { RepairSegment result; try (Handle h = jdbi.open()) { result = getPostgresStorage(h).getNextFreeRepairSegment(runId); } - return result; + return result == null ? Optional.absent() : Optional.of(result); } @Override - public RepairSegment getNextFreeSegmentInRange(long runId, RingRange range) { + public Optional getNextFreeSegmentInRange(long runId, RingRange range) { RepairSegment result; try (Handle h = jdbi.open()) { IStoragePostgreSQL storage = getPostgresStorage(h); result = storage.getNextFreeRepairSegmentOnRange(runId, range.getStart(), range.getEnd()); } - return result; + return result == null ? Optional.absent() : Optional.of(result); } - @Nullable - @Override - public RepairSegment getTheRunningSegment(long runId) { - RepairSegment result = null; - try (Handle h = jdbi.open()) { - Collection segments = getPostgresStorage(h) - .getRepairSegmentForRunWithState(runId, RepairSegment.State.RUNNING); - if (null != segments) { - assert segments.size() < 2 : "there are more than one RUNNING segment on run: " + runId; - if (segments.size() == 1) { - result = segments.iterator().next(); - } - } - } - return result; + @Override public Collection getSegmentsWithStateForRun(long runId, + RepairSegment.State segmentState) { + return null; } @Override diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java index c3febb4fa..5db6c5e02 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/IStoragePostgreSQL.java @@ -33,7 +33,7 @@ /** * JDBI based PostgreSQL interface. - * + * * See following specification for more info: http://jdbi.org/sql_object_api_dml/ */ public interface IStoragePostgreSQL { @@ -51,7 +51,7 @@ public interface IStoragePostgreSQL { "SELECT " + SQL_CLUSTER_ALL_FIELDS + " FROM cluster WHERE name = :name"; static final String SQL_INSERT_CLUSTER = "INSERT INTO cluster (" + SQL_CLUSTER_ALL_FIELDS + - ") VALUES (:name, :partitioner, :seedHosts)"; + ") VALUES (:name, :partitioner, :seedHosts)"; static final String SQL_UPDATE_CLUSTER = "UPDATE cluster SET partitioner = :partitioner, seed_hosts = :seedHosts WHERE name = :name"; @@ -74,18 +74,19 @@ public interface IStoragePostgreSQL { // static final String SQL_REPAIR_RUN_ALL_FIELDS_NO_ID = "cluster_name, column_family_id, cause, owner, state, creation_time, " - + "start_time, end_time, intensity"; + + "start_time, end_time, pause_time, intensity"; static final String SQL_REPAIR_RUN_ALL_FIELDS = "id, " + SQL_REPAIR_RUN_ALL_FIELDS_NO_ID; static final String SQL_INSERT_REPAIR_RUN = "INSERT INTO repair_run (" + SQL_REPAIR_RUN_ALL_FIELDS_NO_ID + ") VALUES " - + "(:clusterName, :columnFamilyId, :cause, :owner, :runState, :creationTime, " - + ":startTime, :endTime, :intensity)"; + + "(:clusterName, :columnFamilyId, :cause, :owner, :runState, :creationTime, " + + ":startTime, :endTime, :pauseTime, :intensity)"; static final String SQL_UPDATE_REPAIR_RUN = "UPDATE repair_run SET cause = :cause, owner = :owner, state = :runState, " - + "start_time = :startTime, end_time = :endTime, intensity = :intensity WHERE id = :id"; + + "start_time = :startTime, end_time = :endTime, pause_time = :pauseTime, " + + "intensity = :intensity WHERE id = :id"; static final String SQL_GET_REPAIR_RUN = "SELECT " + SQL_REPAIR_RUN_ALL_FIELDS + " FROM repair_run WHERE id = :id"; @@ -117,35 +118,36 @@ public interface IStoragePostgreSQL { // RepairUnit // - static final String SQL_COLUMN_FAMILY_ALL_FIELDS_NO_ID = - "cluster_name, keyspace_name, name, segment_count, snapshot_repair"; + static final String SQL_REPAIR_UNIT_ALL_FIELDS_NO_ID = + "cluster_name, keyspace_name, column_families, segment_count, snapshot_repair"; - static final String SQL_COLUMN_FAMILY_ALL_FIELDS = "id, " + SQL_COLUMN_FAMILY_ALL_FIELDS_NO_ID; + static final String SQL_REPAIR_UNIT_ALL_FIELDS = "id, " + SQL_REPAIR_UNIT_ALL_FIELDS_NO_ID; - static final String SQL_INSERT_COLUMN_FAMILY = - "INSERT INTO column_family (" + SQL_COLUMN_FAMILY_ALL_FIELDS_NO_ID + ") VALUES " - + "(:clusterName, :keyspaceName, :name, :segmentCount, :snapshotRepair)"; + static final String SQL_INSERT_REPAIR_UNIT = + "INSERT INTO column_family (" + SQL_REPAIR_UNIT_ALL_FIELDS_NO_ID + ") VALUES " + + "(:clusterName, :keyspaceName, :columnFamilies, :segmentCount, :snapshotRepair)"; - static final String SQL_GET_COLUMN_FAMILY = - "SELECT " + SQL_COLUMN_FAMILY_ALL_FIELDS + " FROM column_family WHERE id = :id"; + static final String SQL_GET_REPAIR_UNIT = + "SELECT " + SQL_REPAIR_UNIT_ALL_FIELDS + " FROM column_family WHERE id = :id"; - static final String SQL_GET_COLUMN_FAMILY_BY_CLUSTER_AND_NAME = - "SELECT " + SQL_COLUMN_FAMILY_ALL_FIELDS + " FROM column_family " - + "WHERE cluster_name = :clusterName AND keyspace_name = :keyspaceName AND name = :name"; + static final String SQL_GET_REPAIR_UNIT_BY_CLUSTER_AND_TABLES = + "SELECT " + SQL_REPAIR_UNIT_ALL_FIELDS + " FROM column_family " + + "WHERE cluster_name = :clusterName AND keyspace_name = :keyspaceName " + + "AND column_families @> :columnFamilies AND column_families <@ :columnFamilies"; - @SqlQuery(SQL_GET_COLUMN_FAMILY) - @Mapper(ColumnFamilyMapper.class) - public RepairUnit getColumnFamily(@Bind("id") long columnFamilyId); + @SqlQuery(SQL_GET_REPAIR_UNIT) + @Mapper(RepairUnitMapper.class) + public RepairUnit getRepairUnit(@Bind("id") long columnFamilyId); - @SqlQuery(SQL_GET_COLUMN_FAMILY_BY_CLUSTER_AND_NAME) - @Mapper(ColumnFamilyMapper.class) - public RepairUnit getColumnFamilyByClusterAndName(@Bind("clusterName") String clusterName, - @Bind("keyspaceName") String keyspaceName, - @Bind("name") String tableName); + @SqlQuery(SQL_GET_REPAIR_UNIT_BY_CLUSTER_AND_TABLES) + @Mapper(RepairUnitMapper.class) + public RepairUnit getRepairUnitByClusterAndTables(@Bind("clusterName") String clusterName, + @Bind("keyspaceName") String keyspaceName, + @Bind("columnFamilies") Collection columnFamilies); - @SqlUpdate(SQL_INSERT_COLUMN_FAMILY) + @SqlUpdate(SQL_INSERT_REPAIR_UNIT) @GetGeneratedKeys - public long insertColumnFamily(@BindBean RepairUnit newRepairUnit); + public long insertRepairUnit(@BindBean RepairUnit newRepairUnit); // RepairSegment // @@ -156,29 +158,29 @@ public RepairUnit getColumnFamilyByClusterAndName(@Bind("clusterName") String cl static final String SQL_INSERT_REPAIR_SEGMENT = "INSERT INTO repair_segment (" + SQL_REPAIR_SEGMENT_ALL_FIELDS_NO_ID + ") VALUES " - + "(:columnFamilyId, :runId, :startToken, :endToken, :state, :startTime, :endTime, " - + ":failCount)"; + + "(:columnFamilyId, :runId, :startToken, :endToken, :state, :startTime, :endTime, " + + ":failCount)"; static final String SQL_UPDATE_REPAIR_SEGMENT = "UPDATE repair_segment SET column_family_id = :columnFamilyId, run_id = :runId, " - + "start_token = :startToken, end_token = :endToken, state = :state, " - + "start_time = :startTime, end_time = :endTime, fail_count = :failCount WHERE id = :id"; + + "start_token = :startToken, end_token = :endToken, state = :state, " + + "start_time = :startTime, end_time = :endTime, fail_count = :failCount WHERE id = :id"; static final String SQL_GET_REPAIR_SEGMENT = "SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE id = :id"; static final String SQL_GET_REPAIR_SEGMENT_FOR_RUN_WITH_STATE = "SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE " - + "run_id = :runId AND state = :state"; + + "run_id = :runId AND state = :state"; static final String SQL_GET_NEXT_FREE_REPAIR_SEGMENT = "SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE run_id = :runId " - + "AND state = 0 ORDER BY fail_count ASC, start_token ASC LIMIT 1"; + + "AND state = 0 ORDER BY fail_count ASC, start_token ASC LIMIT 1"; static final String SQL_GET_NEXT_FREE_REPAIR_SEGMENT_ON_RANGE = "SELECT " + SQL_REPAIR_SEGMENT_ALL_FIELDS + " FROM repair_segment WHERE " - + "run_id = :runId AND state = 0 AND start_token >= :startToken " - + "AND end_token < :endToken ORDER BY fail_count ASC, start_token ASC LIMIT 1"; + + "run_id = :runId AND state = 0 AND start_token >= :startToken " + + "AND end_token < :endToken ORDER BY fail_count ASC, start_token ASC LIMIT 1"; @SqlBatch(SQL_INSERT_REPAIR_SEGMENT) @BatchChunkSize(500) @@ -194,8 +196,8 @@ public RepairUnit getColumnFamilyByClusterAndName(@Bind("clusterName") String cl @SqlQuery(SQL_GET_REPAIR_SEGMENT_FOR_RUN_WITH_STATE) @Mapper(RepairSegmentMapper.class) public Collection getRepairSegmentForRunWithState(@Bind("runId") long runId, - @Bind("state") - RepairSegment.State state); + @Bind("state") + RepairSegment.State state); @SqlQuery(SQL_GET_NEXT_FREE_REPAIR_SEGMENT) @Mapper(RepairSegmentMapper.class) @@ -204,8 +206,8 @@ public Collection getRepairSegmentForRunWithState(@Bind("runId") @SqlQuery(SQL_GET_NEXT_FREE_REPAIR_SEGMENT_ON_RANGE) @Mapper(RepairSegmentMapper.class) public RepairSegment getNextFreeRepairSegmentOnRange(@Bind("runId") long runId, - @Bind("startToken") BigInteger startToken, - @Bind("endToken") BigInteger endToken); + @Bind("startToken") BigInteger startToken, + @Bind("endToken") BigInteger endToken); // Utility methods // @@ -215,11 +217,11 @@ public RepairSegment getNextFreeRepairSegmentOnRange(@Bind("runId") long runId, static final String SQL_SEGMENTS_AMOUNT_FOR_REPAIR_RUN = "SELECT count(*) FROM repair_segment WHERE run_id = :runId AND state = :state"; - @SqlQuery(SQL_GET_REPAIR_RUN_IDS_FOR_CLUSTER) - Collection getRepairRunIdsForCluster(@Bind("clusterName") String clusterName); + @SqlQuery(SQL_GET_REPAIR_RUN_IDS_FOR_CLUSTER) Collection getRepairRunIdsForCluster( + @Bind("clusterName") String clusterName); - @SqlQuery(SQL_SEGMENTS_AMOUNT_FOR_REPAIR_RUN) - int getSegmentAmountForRepairRun(@Bind("runId") long runId, - @Bind("state") RepairSegment.State state); + @SqlQuery(SQL_SEGMENTS_AMOUNT_FOR_REPAIR_RUN) int getSegmentAmountForRepairRun( + @Bind("runId") long runId, + @Bind("state") RepairSegment.State state); } diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java index 6ac6b47bf..67bebbfe3 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairRunMapper.java @@ -36,6 +36,7 @@ public RepairRun map(int index, ResultSet r, StatementContext ctx) throws SQLExc .cause(r.getString("cause")) .startTime(getDateTimeOrNull(r, "start_time")) .endTime(getDateTimeOrNull(r, "end_time")) + .pauseTime(getDateTimeOrNull(r, "pause_time")) .build(r.getLong("id")); } diff --git a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java b/src/main/java/com/spotify/reaper/storage/postgresql/RepairUnitMapper.java similarity index 78% rename from src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java rename to src/main/java/com/spotify/reaper/storage/postgresql/RepairUnitMapper.java index 1fdcee194..2597676e7 100644 --- a/src/main/java/com/spotify/reaper/storage/postgresql/ColumnFamilyMapper.java +++ b/src/main/java/com/spotify/reaper/storage/postgresql/RepairUnitMapper.java @@ -13,6 +13,7 @@ */ package com.spotify.reaper.storage.postgresql; +import com.google.common.collect.Sets; import com.spotify.reaper.core.RepairUnit; import org.skife.jdbi.v2.StatementContext; @@ -21,11 +22,12 @@ import java.sql.ResultSet; import java.sql.SQLException; -public class ColumnFamilyMapper implements ResultSetMapper { +public class RepairUnitMapper implements ResultSetMapper { public RepairUnit map(int index, ResultSet r, StatementContext ctx) throws SQLException { + String[] columnFamilies = (String[]) r.getArray("column_families").getArray(); RepairUnit.Builder builder = new RepairUnit.Builder(r.getString("cluster_name"), - r.getString("keyspace_name"), r.getString("name"), r.getInt("segment_count"), + r.getString("keyspace_name"), Sets.newHashSet(columnFamilies), r.getInt("segment_count"), r.getBoolean("snapshot_repair")); return builder.build(r.getLong("id")); } diff --git a/src/test/resources/cassandra-reaper.yaml b/src/test/resources/cassandra-reaper.yaml index 52b105972..4f0080d87 100644 --- a/src/test/resources/cassandra-reaper.yaml +++ b/src/test/resources/cassandra-reaper.yaml @@ -3,6 +3,9 @@ snapshotRepair: false repairIntensity: 0.75 repairRunThreadCount: 15 +# storageType is either "database" or "memory" +storageType: database + logging: level: DEBUG loggers: @@ -11,9 +14,6 @@ logging: appenders: - type: console -# storageType is either "database" or "memory" -storageType: memory - server: type: default applicationConnectors: