From 7877d6ad5a859d7a3cfcb8b6010c54d3e377e64a Mon Sep 17 00:00:00 2001 From: Jeff Sabin Date: Wed, 25 Apr 2018 09:08:36 -0600 Subject: [PATCH] Added filter aggregator support. --- pom.xml | 6 +- .../client/builder/AggregatorFactory.java | 19 +++ .../client/ClientIntegrationTest.java | 138 ++++++++---------- .../client/TestDataPointListener.java | 19 +-- .../client/builder/AggregatorFactoryTest.java | 10 ++ 5 files changed, 103 insertions(+), 89 deletions(-) diff --git a/pom.xml b/pom.xml index 1bfa689..d6e9a57 100644 --- a/pom.xml +++ b/pom.xml @@ -43,7 +43,7 @@ com.google.guava guava - 14.0 + 19.0 org.hamcrest @@ -64,7 +64,7 @@ commons-io commons-io - 2.2 + 2.5 com.google.code.findbugs @@ -86,7 +86,7 @@ org.kairosdb kairosdb - 1.1.3-1 + 1.2.1-1 test diff --git a/src/main/java/org/kairosdb/client/builder/AggregatorFactory.java b/src/main/java/org/kairosdb/client/builder/AggregatorFactory.java index e74a4df..1a3091c 100644 --- a/src/main/java/org/kairosdb/client/builder/AggregatorFactory.java +++ b/src/main/java/org/kairosdb/client/builder/AggregatorFactory.java @@ -46,6 +46,11 @@ public String toString() } } + public enum FilterOperation + { + LTE, LT, GTE, GT, EQUAL + } + /** * Creates an aggregator that returns the minimum values for each time period as specified. * For example, "5 minutes" would returns the minimum value for each 5 minute period. @@ -304,4 +309,18 @@ public static CustomAggregator createTrimAggregator(Trim trim) checkNotNull(trim, "trim cannot be null"); return new CustomAggregator("trim", "\"trim\":\"" + trim + "\""); } + + /** + * Creates an aggregator that filters datapoints according to the filter operation. + * + * @param operation what to filter on + * @param threshold the value the operation is performed on. If the operation is lt, then a null data point is returned if the data point is less than the threshold. + * @return filter aggregator + */ + public static CustomAggregator createFilterAggregator(FilterOperation operation, double threshold) + { + checkNotNull(operation, "operation cannot be null"); + checkArgument(threshold >= 0.0, "threshold must be greater than or equal to zero"); + return new CustomAggregator("filter", "\"filter_op\":\"" + operation + "\", \"threshold\":" + threshold); + } } \ No newline at end of file diff --git a/src/test/java/org/kairosdb/client/ClientIntegrationTest.java b/src/test/java/org/kairosdb/client/ClientIntegrationTest.java index 25298d5..deb05df 100644 --- a/src/test/java/org/kairosdb/client/ClientIntegrationTest.java +++ b/src/test/java/org/kairosdb/client/ClientIntegrationTest.java @@ -1,31 +1,10 @@ package org.kairosdb.client; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasItems; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.verify; - -import java.io.File; -import java.io.IOException; -import java.net.URISyntaxException; -import java.util.List; - -import org.junit.After; +import com.google.common.base.Stopwatch; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.kairosdb.client.builder.AggregatorFactory; -import org.kairosdb.client.builder.DataFormatException; -import org.kairosdb.client.builder.DataPoint; -import org.kairosdb.client.builder.Metric; -import org.kairosdb.client.builder.MetricBuilder; -import org.kairosdb.client.builder.QueryBuilder; -import org.kairosdb.client.builder.QueryMetric; -import org.kairosdb.client.builder.RelativeTime; -import org.kairosdb.client.builder.TimeUnit; +import org.kairosdb.client.builder.*; import org.kairosdb.client.builder.grouper.BinGrouper; import org.kairosdb.client.builder.grouper.TagGrouper; import org.kairosdb.client.builder.grouper.TimeGrouper; @@ -35,31 +14,40 @@ import org.kairosdb.client.response.Response; import org.kairosdb.core.exception.DatastoreException; +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.List; + +import static java.lang.Thread.sleep; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.assertThat; + public class ClientIntegrationTest { - public static final String HTTP_METRIC_NAME_1 = "httpMetric1"; - public static final String HTTP_METRIC_NAME_2 = "httpMetric2"; - public static final String HTTP_TAG_NAME_1 = "httpTag1"; - public static final String HTTP_TAG_NAME_2 = "httpTag2"; - public static final String HTTP_TAG_VALUE_1 = "httpTagValue1"; - public static final String HTTP_TAG_VALUE_2 = "httpTagValue2"; - public static final String HTTP_TAG_VALUE_3 = "httpTagValue3"; - - public static final String TELNET_METRIC_NAME_1 = "telnetMetric1"; - public static final String TELNET_METRIC_NAME_2 = "telnetMetric2"; - public static final String TELNET_METRIC_NAME_3 = "telnetMetric3"; - public static final String TELNET_METRIC_NAME_4 = "telnetMetric4"; - public static final String TELNET_TAG_NAME_1 = "telnetTag1"; - public static final String TELNET_TAG_NAME_2 = "telnetTag2"; - public static final String TELNET_TAG_VALUE_1 = "telnetTag1"; - public static final String TELNET_TAG_VALUE_2 = "telnetTag2"; - - public static final String SSL_METRIC_NAME_1 = "sslMetric1"; - public static final String SSL_METRIC_NAME_2 = "sslMetric2"; - public static final String SSL_TAG_NAME_1 = "sslTag1"; - public static final String SSL_TAG_NAME_2 = "sslTag2"; - public static final String SSL_TAG_VALUE_1 = "sslTag1"; - public static final String SSL_TAG_VALUE_2 = "sslTag2"; + private static final String HTTP_METRIC_NAME_1 = "httpMetric1"; + private static final String HTTP_METRIC_NAME_2 = "httpMetric2"; + private static final String HTTP_TAG_NAME_1 = "httpTag1"; + private static final String HTTP_TAG_NAME_2 = "httpTag2"; + private static final String HTTP_TAG_VALUE_1 = "httpTagValue1"; + private static final String HTTP_TAG_VALUE_2 = "httpTagValue2"; + private static final String HTTP_TAG_VALUE_3 = "httpTagValue3"; + + private static final String TELNET_METRIC_NAME_1 = "telnetMetric1"; + private static final String TELNET_METRIC_NAME_2 = "telnetMetric2"; + private static final String TELNET_METRIC_NAME_3 = "telnetMetric3"; + private static final String TELNET_METRIC_NAME_4 = "telnetMetric4"; + private static final String TELNET_TAG_NAME_1 = "telnetTag1"; + private static final String TELNET_TAG_NAME_2 = "telnetTag2"; + private static final String TELNET_TAG_VALUE_1 = "telnetTag1"; + private static final String TELNET_TAG_VALUE_2 = "telnetTag2"; + + private static final String SSL_METRIC_NAME_1 = "sslMetric1"; + private static final String SSL_METRIC_NAME_2 = "sslMetric2"; + private static final String SSL_TAG_NAME_1 = "sslTag1"; + private static final String SSL_TAG_NAME_2 = "sslTag2"; + private static final String SSL_TAG_VALUE_1 = "sslTag1"; + private static final String SSL_TAG_VALUE_2 = "sslTag2"; private static InMemoryKairosServer kairos; @@ -71,7 +59,7 @@ public static void setupClass() throws InterruptedException while (!kairos.isStarted()) { - Thread.sleep(5); + sleep(5); } } @@ -81,19 +69,10 @@ public static void tearDownClass() throws DatastoreException, InterruptedExcepti kairos.shutdown(); } - @After - public void tearDown() - { - kairos.getDataPointListener().setEvent(null); - } - @SuppressWarnings("deprecation") @Test - public void test_telnetClient() throws IOException, URISyntaxException, DataFormatException + public void test_telnetClient() throws IOException, URISyntaxException, DataFormatException, InterruptedException { - DataPointEvent dataPointEvent = mock(DataPointEvent.class); - kairos.getDataPointListener().setEvent(dataPointEvent); - TelnetClient client = new TelnetClient("localhost", 4244); try @@ -116,8 +95,7 @@ public void test_telnetClient() throws IOException, URISyntaxException, DataForm // Because Telnet is Asynchronous, it takes some time before the datapoints get written. // Wait for Kairos to notify us that they have been written. - verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_1); - verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_2); + watiForEvent(); // Query metrics QueryBuilder builder = QueryBuilder.getInstance(); @@ -148,13 +126,9 @@ public void test_telnetClient() throws IOException, URISyntaxException, DataForm } } - @Test - public void test_telnetClientPutMetrics() throws IOException, URISyntaxException, DataFormatException + public void test_telnetClientPutMetrics() throws IOException, URISyntaxException, DataFormatException, InterruptedException { - DataPointEvent dataPointEvent = mock(DataPointEvent.class); - kairos.getDataPointListener().setEvent(dataPointEvent); - TelnetClient client = new TelnetClient("localhost", 4244); try @@ -177,8 +151,7 @@ public void test_telnetClientPutMetrics() throws IOException, URISyntaxException // Because Telnet is Asynchronous, it takes some time before the datapoints get written. // Wait for Kairos to notify us that they have been written. - verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_3); - verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_4); + watiForEvent(); // Query metrics QueryBuilder builder = QueryBuilder.getInstance(); @@ -211,7 +184,7 @@ public void test_telnetClientPutMetrics() throws IOException, URISyntaxException @Test public void test_httpClient_no_results_from_query() - throws InterruptedException, IOException, URISyntaxException, DataFormatException + throws IOException, URISyntaxException { HttpClient client = new HttpClient("http://localhost:8081"); @@ -256,7 +229,7 @@ public void test_httpClient_no_results_from_query() } @Test - public void test_httpClient() throws InterruptedException, IOException, URISyntaxException, DataFormatException + public void test_httpClient() throws IOException, URISyntaxException, DataFormatException { HttpClient client = new HttpClient("http://localhost:8081"); @@ -299,7 +272,7 @@ public void test_httpClient() throws InterruptedException, IOException, URISynta assertThat(tagValues.getResults(), hasItems(HTTP_TAG_VALUE_1, HTTP_TAG_VALUE_2)); // Check Status - GetResponse status = client.getStatus(); + client.getStatus(); assertThat(tagValues.getStatusCode(), equalTo(200)); @@ -326,7 +299,7 @@ public void test_httpClient() throws InterruptedException, IOException, URISynta } @Test - public void test_httpClient_multiTagValues() throws InterruptedException, IOException, URISyntaxException, DataFormatException + public void test_httpClient_multiTagValues() throws IOException, URISyntaxException, DataFormatException { HttpClient client = new HttpClient("http://localhost:8081"); @@ -388,7 +361,7 @@ public void test_httpClient_multiTagValues() throws InterruptedException, IOExce * return any errors which means that the aggregators and groupBys are all valid. */ @Test - public void test_aggregatorsAndGroupBy() throws InterruptedException, IOException, URISyntaxException + public void test_aggregatorsAndGroupBy() throws IOException, URISyntaxException { HttpClient client = new HttpClient("http://localhost:8081"); @@ -429,7 +402,6 @@ public void test_aggregatorsAndGroupBy() throws InterruptedException, IOExceptio metric.addAggregator(AggregatorFactory.createPercentileAggregator(0.3, 1, TimeUnit.SECONDS)); metric.addAggregator(AggregatorFactory.createLastAggregator(1, TimeUnit.SECONDS)); metric.addAggregator(AggregatorFactory.createFirstAggregator(1, TimeUnit.SECONDS)); - metric.addAggregator(AggregatorFactory.createDataGapsMarkingAggregator(1, TimeUnit.SECONDS)); metric.addAggregator(AggregatorFactory.createDiffAggregator()); metric.addAggregator(AggregatorFactory.createLeastSquaresAggregator(1, TimeUnit.SECONDS)); metric.addAggregator(AggregatorFactory.createSamplerAggregator()); @@ -437,6 +409,8 @@ public void test_aggregatorsAndGroupBy() throws InterruptedException, IOExceptio metric.addAggregator(AggregatorFactory.createSaveAsAggregator("newMetricName")); metric.addAggregator(AggregatorFactory.createTrimAggregator(AggregatorFactory.Trim.BOTH)); metric.addAggregator(AggregatorFactory.createSimpleMovingAverage(2)); + metric.addAggregator(AggregatorFactory.createFilterAggregator(AggregatorFactory.FilterOperation.GTE, 2.0)); + metric.addAggregator(AggregatorFactory.createDataGapsMarkingAggregator(1, TimeUnit.SECONDS)); metric.addGrouper(new TagGrouper(HTTP_TAG_NAME_1, HTTP_TAG_NAME_2)); metric.addGrouper(new TimeGrouper(new RelativeTime(1, TimeUnit.MILLISECONDS), 3)); @@ -522,7 +496,7 @@ public void test_ssl() throws IOException, URISyntaxException, DataFormatExcepti @SuppressWarnings("PointlessArithmeticExpression") @Test - public void test_limit() throws InterruptedException, IOException, URISyntaxException, DataFormatException + public void test_limit() throws IOException, URISyntaxException, DataFormatException { HttpClient client = new HttpClient("http://localhost:8081"); @@ -576,7 +550,7 @@ public void test_limit() throws InterruptedException, IOException, URISyntaxExce @SuppressWarnings("PointlessArithmeticExpression") @Test - public void test_Order() throws InterruptedException, IOException, URISyntaxException, DataFormatException + public void test_Order() throws IOException, URISyntaxException, DataFormatException { HttpClient client = new HttpClient("http://localhost:8081"); @@ -625,7 +599,7 @@ public void test_Order() throws InterruptedException, IOException, URISyntaxExce } @Test - public void test_customDataType() throws IOException, URISyntaxException, InterruptedException + public void test_customDataType() throws IOException, URISyntaxException { HttpClient client = new HttpClient("http://localhost:8081"); client.registerCustomDataType("complex", Complex.class); @@ -664,6 +638,20 @@ public void test_customDataType() throws IOException, URISyntaxException, Interr } + private void watiForEvent() throws InterruptedException + { + boolean done = false; + Stopwatch stopwatch = Stopwatch.createStarted(); + while(!done) + { + sleep(200); + if (kairos.getDataPointListener().getEvent() != null || stopwatch.elapsed(java.util.concurrent.TimeUnit.SECONDS) > 1) + { + done = true; + } + } + } + private class Complex { private long real; diff --git a/src/test/java/org/kairosdb/client/TestDataPointListener.java b/src/test/java/org/kairosdb/client/TestDataPointListener.java index b44d451..e5b720c 100644 --- a/src/test/java/org/kairosdb/client/TestDataPointListener.java +++ b/src/test/java/org/kairosdb/client/TestDataPointListener.java @@ -1,23 +1,20 @@ package org.kairosdb.client; -import org.kairosdb.core.DataPoint; -import org.kairosdb.core.DataPointListener; +import org.kairosdb.eventbus.Subscribe; +import org.kairosdb.events.DataPointEvent; -import java.util.SortedMap; - -public class TestDataPointListener implements DataPointListener +public class TestDataPointListener { - private DataPointEvent event; + private org.kairosdb.events.DataPointEvent event; - public void setEvent(DataPointEvent event) + @Subscribe + public void dataPoint(org.kairosdb.events.DataPointEvent event) { this.event = event; } - @Override - public void dataPoint(String metricName, SortedMap tags, DataPoint dataPoint) + public DataPointEvent getEvent() { - if (event != null) - event.datapoint(metricName); + return event; } } diff --git a/src/test/java/org/kairosdb/client/builder/AggregatorFactoryTest.java b/src/test/java/org/kairosdb/client/builder/AggregatorFactoryTest.java index 25e161f..63a6265 100644 --- a/src/test/java/org/kairosdb/client/builder/AggregatorFactoryTest.java +++ b/src/test/java/org/kairosdb/client/builder/AggregatorFactoryTest.java @@ -205,4 +205,14 @@ public void test_createSimpleMovingAggregator() assertThat(aggregator.getName(), equalTo("sma")); assertThat(aggregator.toJson(), equalTo("{\"name\":\"sma\",'size':5}")); } + + @Test + public void test_createFilterAggregator() + { + CustomAggregator aggregator = AggregatorFactory.createFilterAggregator(AggregatorFactory.FilterOperation.GT, 3.0); + + assertThat(aggregator.getName(), equalTo("filter")); + assertThat(aggregator.toJson(), equalTo("{\"name\":\"filter\",\"filter_op\":\"GT\", \"threshold\":3.0}")); + } + }