Skip to content

Commit

Permalink
Fix large partitions in the metrics v2 table for the sidecar
Browse files Browse the repository at this point in the history
Introduce a new metrics table to reduce disk usage and partition sizes
  • Loading branch information
adejanovski committed Mar 2, 2020
1 parent 05e5f32 commit 7774e4e
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 51 deletions.
13 changes: 10 additions & 3 deletions src/server/src/main/java/io/cassandrareaper/jmx/ClusterFacade.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@


public final class ClusterFacade {

/**
* Depth of metrics_node_v3 partitions in minutes
*/
private static final int METRICS_PARTITIONING_TIME_MINS = 10;

private static final Logger LOG = LoggerFactory.getLogger(ClusterFacade.class);

private static final long CLUSTER_VERSIONS_TTL_SECONDS
Expand Down Expand Up @@ -455,7 +461,7 @@ public List<Compaction> listActiveCompactions(Node node)
return listActiveCompactionsDirect(node);
} else {
// We don't have access to the node through JMX, so we'll get data from the database
LOG.info("Node {} in DC {} is not accessible through JMX", node.getHostname(), nodeDc);
LOG.debug("Node {} in DC {} is not accessible through JMX", node.getHostname(), nodeDc);

String compactionsJson = ((IDistributedStorage)context.storage)
.listOperations(node.getClusterName(), OpType.OP_COMPACTION, node.getHostname());
Expand Down Expand Up @@ -530,13 +536,14 @@ public List<MetricsHistogram> getClientRequestLatencies(Node node) throws Reaper
return convertToMetricsHistogram(
MetricsProxy.convertToGenericMetrics(metricsProxy.collectLatencyMetrics(), node));
} else {
// We look for metrics in the last two time based partitions to make sure we get a result
return convertToMetricsHistogram(((IDistributedStorage)context.storage)
.getMetrics(
node.getClusterName(),
Optional.of(node.getHostname()),
"org.apache.cassandra.metrics",
"ClientRequest",
DateTime.now().minusMinutes(1).getMillis()));
DateTime.now().minusMinutes(METRICS_PARTITIONING_TIME_MINS + 1).getMillis()));
}
} catch (JMException | InterruptedException | IOException e) {
LOG.error("Failed collecting tpstats for host {}", node, e);
Expand Down Expand Up @@ -722,7 +729,7 @@ public List<StreamSession> listActiveStreams(Node node)
return listStreamsDirect(node);
} else {
// We don't have access to the node through JMX, so we'll get data from the database
LOG.info("Node {} in DC {} is not accessible through JMX", node.getHostname(), nodeDc);
LOG.debug("Node {} in DC {} is not accessible through JMX", node.getHostname(), nodeDc);

String streamsJson = ((IDistributedStorage) context.storage)
.listOperations(node.getClusterName(), OpType.OP_STREAMING, node.getHostname());
Expand Down
10 changes: 9 additions & 1 deletion src/server/src/main/java/io/cassandrareaper/service/Heart.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ final class Heart implements AutoCloseable {
private static final long DEFAULT_MAX_FREQUENCY = TimeUnit.SECONDS.toMillis(30);

private final AtomicLong lastBeat = new AtomicLong(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
private final AtomicLong lastMetricBeat = new AtomicLong(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
private final ForkJoinPool forkJoinPool = new ForkJoinPool(64);
private final AppContext context;
private final MetricsService metricsService;
Expand Down Expand Up @@ -149,7 +150,14 @@ private void updateRequestedNodeMetrics() {

if (context.config.getDatacenterAvailability() == DatacenterAvailability.SIDECAR) {
// In sidecar mode we store metrics in the db on a regular basis
metricsService.grabAndStoreGenericMetrics();
if (lastMetricBeat.get() + maxBeatFrequencyMillis <= System.currentTimeMillis()) {
metricsService.grabAndStoreGenericMetrics();
lastMetricBeat.set(System.currentTimeMillis());
} else {
LOG.trace("Not storing metrics yet... Last beat was {} and now is {}",
lastMetricBeat.get(),
System.currentTimeMillis());
}
metricsService.grabAndStoreActiveCompactions();
metricsService.grabAndStoreActiveStreams();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.cassandrareaper.storage.cassandra.DateTimeCodec;
import io.cassandrareaper.storage.cassandra.Migration016;
import io.cassandrareaper.storage.cassandra.Migration021;
import io.cassandrareaper.storage.cassandra.Migration024;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -111,14 +112,15 @@

public final class CassandraStorage implements IStorage, IDistributedStorage {

private static final int METRICS_PARTITIONING_TIME_MINS = 10;
private static final int LEAD_DURATION = 600;
/* Simple stmts */
private static final String SELECT_CLUSTER = "SELECT * FROM cluster";
private static final String SELECT_REPAIR_SCHEDULE = "SELECT * FROM repair_schedule_v1";
private static final String SELECT_REPAIR_UNIT = "SELECT * FROM repair_unit_v1";
private static final String SELECT_LEADERS = "SELECT * FROM leader";
private static final String SELECT_RUNNING_REAPERS = "SELECT reaper_instance_id FROM running_reapers";
private static final DateTimeFormatter HOURLY_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHH");
private static final DateTimeFormatter TIME_BUCKET_FORMATTER = DateTimeFormat.forPattern("yyyyMMddHHmm");
private static final Logger LOG = LoggerFactory.getLogger(CassandraStorage.class);
private static final AtomicBoolean UNINITIALISED = new AtomicBoolean(true);

Expand Down Expand Up @@ -270,6 +272,8 @@ private static void initializeAndUpgradeSchema(
Migration016.migrate(session, config.getCassandraFactory().getKeyspace());
// Switch metrics table to TWCS if possible, this is intentionally executed every startup
Migration021.migrate(session, config.getCassandraFactory().getKeyspace());
// Switch metrics table to TWCS if possible, this is intentionally executed every startup
Migration024.migrate(session, config.getCassandraFactory().getKeyspace());
} else {
LOG.info(
String.format("Keyspace %s already at schema version %d", session.getLoggedKeyspace(), currentVersion));
Expand Down Expand Up @@ -480,23 +484,16 @@ private void prepareMetricStatements() {
storeMetricsPrepStmt
= session
.prepare(
"INSERT INTO node_metrics_v2 (cluster, metric_domain, metric_type, time_bucket, "
"INSERT INTO node_metrics_v3 (cluster, metric_domain, metric_type, time_bucket, "
+ "host, metric_scope, metric_name, ts, metric_attribute, value) "
+ "VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
getMetricsForHostPrepStmt
= session
.prepare(
"SELECT cluster, metric_domain, metric_type, time_bucket, host, "
+ "metric_scope, metric_name, ts, metric_attribute, value "
+ "FROM node_metrics_v2 "
+ "FROM node_metrics_v3 "
+ "WHERE metric_domain = ? and metric_type = ? and cluster = ? and time_bucket = ? and host = ?");
getMetricsForClusterPrepStmt
= session
.prepare(
"SELECT cluster, metric_domain, metric_type, time_bucket, host, "
+ "metric_scope, metric_name, ts, metric_attribute, value "
+ "FROM node_metrics_v2 "
+ "WHERE metric_domain = ? and metric_type = ? and cluster = ? and time_bucket = ?");
}

private void prepareOperationsStatements() {
Expand Down Expand Up @@ -1778,8 +1775,8 @@ public List<GenericMetric> getMetrics(

// Compute the hourly buckets since the requested lower bound timestamp
while (startTime < now) {
timeBuckets.add(DateTime.now().withMillis(startTime).toString(HOURLY_FORMATTER));
startTime += 3600000;
timeBuckets.add(DateTime.now().withMillis(startTime).toString(TIME_BUCKET_FORMATTER).substring(0, 11) + "0");
startTime += 600000;
}

for (String timeBucket:timeBuckets) {
Expand All @@ -1792,30 +1789,22 @@ public List<GenericMetric> getMetrics(
clusterName,
timeBucket,
host.get())));
} else {
futures.add(
session.executeAsync(
getMetricsForClusterPrepStmt.bind(
metricDomain, metricType, clusterName, timeBucket)));
}
}

for (ResultSetFuture future : futures) {
for (Row row : future.getUninterruptibly()) {
// Filtering on the timestamp lower bound since it's not filtered in cluster wide metrics requests
if (row.getTimestamp("ts").getTime() >= since) {
metrics.add(
GenericMetric.builder()
.withClusterName(row.getString("cluster"))
.withHost(row.getString("host"))
.withMetricType(row.getString("metric_type"))
.withMetricScope(row.getString("metric_scope"))
.withMetricName(row.getString("metric_name"))
.withMetricAttribute(row.getString("metric_attribute"))
.withTs(new DateTime(row.getTimestamp("ts")))
.withValue(row.getDouble("value"))
.build());
}
metrics.add(
GenericMetric.builder()
.withClusterName(row.getString("cluster"))
.withHost(row.getString("host"))
.withMetricType(row.getString("metric_type"))
.withMetricScope(row.getString("metric_scope"))
.withMetricName(row.getString("metric_name"))
.withMetricAttribute(row.getString("metric_attribute"))
.withTs(new DateTime(row.getTimestamp("ts")))
.withValue(row.getDouble("value"))
.build());
}
}

Expand All @@ -1830,15 +1819,29 @@ public void storeMetric(GenericMetric metric) {
metric.getClusterName(),
metric.getMetricDomain(),
metric.getMetricType(),
metric.getTs().toString(HOURLY_FORMATTER),
computeMetricsPartition(metric.getTs()).toString(TIME_BUCKET_FORMATTER),
metric.getHost(),
metric.getMetricScope(),
metric.getMetricName(),
metric.getTs().toDate(),
computeMetricsPartition(metric.getTs()),
metric.getMetricAttribute(),
metric.getValue()));
}

/**
* Truncates a metric date time to the closest partition based on the definesd partition sizes
* @param metricTime the time of the metric
* @return the time truncated to the closest partition
*/
private DateTime computeMetricsPartition(DateTime metricTime) {
return metricTime
.withMinuteOfHour(
(metricTime.getMinuteOfHour() / METRICS_PARTITIONING_TIME_MINS)
* METRICS_PARTITIONING_TIME_MINS)
.withSecondOfMinute(0)
.withMillisOfSecond(0);
}

@Override
public void purgeMetrics() {}

Expand All @@ -1848,7 +1851,7 @@ public void storeOperations(String clusterName, OpType operationType, String hos
insertOperationsPrepStmt.bind(
clusterName,
operationType.getName(),
DateTime.now().toString(HOURLY_FORMATTER),
DateTime.now().toString(TIME_BUCKET_FORMATTER),
host,
DateTime.now().toDate(),
operationsJson));
Expand All @@ -1859,7 +1862,7 @@ public String listOperations(String clusterName, OpType operationType, String ho
ResultSet operations
= session.execute(
listOperationsForNodePrepStmt.bind(
clusterName, operationType.getName(), DateTime.now().toString(HOURLY_FORMATTER), host));
clusterName, operationType.getName(), DateTime.now().toString(TIME_BUCKET_FORMATTER), host));
return operations.isExhausted()
? "[]"
: operations.one().getString("data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public final class Migration021 {

private static final Logger LOG = LoggerFactory.getLogger(Migration021.class);
private static final String METRICS_V1_TABLE = "node_metrics_v1";
private static final String METRICS_V2_TABLE = "node_metrics_v2";
private static final String OPERATIONS_TABLE = "node_operations";

private Migration021() {
Expand Down Expand Up @@ -56,21 +55,12 @@ public static void migrate(Session session, String keyspace) {
+ "'compaction_window_size': '2', "
+ "'compaction_window_unit': 'MINUTES'}");

LOG.info("Altering {} to use TWCS...", METRICS_V2_TABLE);
session.execute(
"ALTER TABLE " + METRICS_V2_TABLE + " WITH compaction = {'class': 'TimeWindowCompactionStrategy', "
+ "'unchecked_tombstone_compaction': 'true', "
+ "'compaction_window_size': '1', "
+ "'compaction_window_unit': 'HOURS'}");

LOG.info("{} was successfully altered to use TWCS.", METRICS_V2_TABLE);

LOG.info("Altering {} to use TWCS...", OPERATIONS_TABLE);
session.execute(
"ALTER TABLE " + OPERATIONS_TABLE + " WITH compaction = {'class': 'TimeWindowCompactionStrategy', "
+ "'unchecked_tombstone_compaction': 'true', "
+ "'compaction_window_size': '1', "
+ "'compaction_window_unit': 'HOURS'}");
+ "'compaction_window_size': '30', "
+ "'compaction_window_unit': 'MINUTES'}");

LOG.info("{} was successfully altered to use TWCS.", OPERATIONS_TABLE);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020-2020 The Last Pickle Ltd
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cassandrareaper.storage.cassandra;


import com.datastax.driver.core.Session;
import com.datastax.driver.core.VersionNumber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Migration024 {

private static final Logger LOG = LoggerFactory.getLogger(Migration024.class);
private static final String METRICS_V3_TABLE = "node_metrics_v3";

private Migration024() {
}

/**
* Apply TWCS for metrics tables if the Cassandra version allows it.
*/
public static void migrate(Session session, String keyspace) {

VersionNumber lowestNodeVersion = session.getCluster().getMetadata().getAllHosts()
.stream()
.map(host -> host.getCassandraVersion())
.min(VersionNumber::compareTo)
.get();

if ((VersionNumber.parse("3.0.8").compareTo(lowestNodeVersion) <= 0
&& VersionNumber.parse("3.0.99").compareTo(lowestNodeVersion) >= 0)
|| VersionNumber.parse("3.8").compareTo(lowestNodeVersion) <= 0) {
try {
if (!isUsingTwcs(session, keyspace)) {
LOG.info("Altering {} to use TWCS...", METRICS_V3_TABLE);
session.execute(
"ALTER TABLE " + METRICS_V3_TABLE + " WITH compaction = {'class': 'TimeWindowCompactionStrategy', "
+ "'unchecked_tombstone_compaction': 'true', "
+ "'compaction_window_size': '10', "
+ "'compaction_window_unit': 'MINUTES'}");

LOG.info("{} was successfully altered to use TWCS.", METRICS_V3_TABLE);

}
} catch (RuntimeException e) {
LOG.error("Failed altering metrics tables to TWCS", e);
}
}

}

private static boolean isUsingTwcs(Session session, String keyspace) {
return session
.getCluster()
.getMetadata()
.getKeyspace(keyspace)
.getTable(METRICS_V3_TABLE)
.getOptions()
.getCompaction()
.get("class")
.contains("TimeWindowCompactionStrategy");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
--
-- Copyright 2020-2020 The Last Pickle Ltd
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-- Upgrade to handle diagnostic event subscriptions

DROP TABLE IF EXISTS node_metrics_v2;

CREATE TABLE IF NOT EXISTS node_metrics_v3 (
cluster text,
metric_domain text,
metric_type text,
time_bucket text,
host text,
ts timestamp,
metric_scope text,
metric_name text,
metric_attribute text,
value double,
PRIMARY KEY ((cluster, metric_domain, metric_type, time_bucket, host), ts, metric_scope, metric_name, metric_attribute)
) WITH CLUSTERING ORDER BY (ts DESC, metric_scope ASC, metric_name ASC, metric_attribute ASC)
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4', 'unchecked_tombstone_compaction': 'true'}
AND default_time_to_live = 3600
AND gc_grace_seconds = 300;

0 comments on commit 7774e4e

Please sign in to comment.