Skip to content

Commit

Permalink
Added filter aggregator support.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeff Sabin committed Apr 25, 2018
1 parent 5debb3b commit 7877d6a
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 89 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0</version>
<version>19.0</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
Expand All @@ -64,7 +64,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.2</version>
<version>2.5</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
Expand All @@ -86,7 +86,7 @@
<dependency>
<groupId>org.kairosdb</groupId>
<artifactId>kairosdb</artifactId>
<version>1.1.3-1</version>
<version>1.2.1-1</version>
<scope>test</scope>
</dependency>

Expand Down
19 changes: 19 additions & 0 deletions src/main/java/org/kairosdb/client/builder/AggregatorFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public String toString()
}
}

public enum FilterOperation
{
LTE, LT, GTE, GT, EQUAL
}

/**
* Creates an aggregator that returns the minimum values for each time period as specified.
* For example, "5 minutes" would returns the minimum value for each 5 minute period.
Expand Down Expand Up @@ -304,4 +309,18 @@ public static CustomAggregator createTrimAggregator(Trim trim)
checkNotNull(trim, "trim cannot be null");
return new CustomAggregator("trim", "\"trim\":\"" + trim + "\"");
}

/**
* Creates an aggregator that filters datapoints according to the filter operation.
*
* @param operation what to filter on
* @param threshold the value the operation is performed on. If the operation is lt, then a null data point is returned if the data point is less than the threshold.
* @return filter aggregator
*/
public static CustomAggregator createFilterAggregator(FilterOperation operation, double threshold)
{
checkNotNull(operation, "operation cannot be null");
checkArgument(threshold >= 0.0, "threshold must be greater than or equal to zero");
return new CustomAggregator("filter", "\"filter_op\":\"" + operation + "\", \"threshold\":" + threshold);
}
}
138 changes: 63 additions & 75 deletions src/test/java/org/kairosdb/client/ClientIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,10 @@
package org.kairosdb.client;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;

import org.junit.After;
import com.google.common.base.Stopwatch;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.kairosdb.client.builder.AggregatorFactory;
import org.kairosdb.client.builder.DataFormatException;
import org.kairosdb.client.builder.DataPoint;
import org.kairosdb.client.builder.Metric;
import org.kairosdb.client.builder.MetricBuilder;
import org.kairosdb.client.builder.QueryBuilder;
import org.kairosdb.client.builder.QueryMetric;
import org.kairosdb.client.builder.RelativeTime;
import org.kairosdb.client.builder.TimeUnit;
import org.kairosdb.client.builder.*;
import org.kairosdb.client.builder.grouper.BinGrouper;
import org.kairosdb.client.builder.grouper.TagGrouper;
import org.kairosdb.client.builder.grouper.TimeGrouper;
Expand All @@ -35,31 +14,40 @@
import org.kairosdb.client.response.Response;
import org.kairosdb.core.exception.DatastoreException;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;

import static java.lang.Thread.sleep;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertThat;

public class ClientIntegrationTest
{
public static final String HTTP_METRIC_NAME_1 = "httpMetric1";
public static final String HTTP_METRIC_NAME_2 = "httpMetric2";
public static final String HTTP_TAG_NAME_1 = "httpTag1";
public static final String HTTP_TAG_NAME_2 = "httpTag2";
public static final String HTTP_TAG_VALUE_1 = "httpTagValue1";
public static final String HTTP_TAG_VALUE_2 = "httpTagValue2";
public static final String HTTP_TAG_VALUE_3 = "httpTagValue3";

public static final String TELNET_METRIC_NAME_1 = "telnetMetric1";
public static final String TELNET_METRIC_NAME_2 = "telnetMetric2";
public static final String TELNET_METRIC_NAME_3 = "telnetMetric3";
public static final String TELNET_METRIC_NAME_4 = "telnetMetric4";
public static final String TELNET_TAG_NAME_1 = "telnetTag1";
public static final String TELNET_TAG_NAME_2 = "telnetTag2";
public static final String TELNET_TAG_VALUE_1 = "telnetTag1";
public static final String TELNET_TAG_VALUE_2 = "telnetTag2";

public static final String SSL_METRIC_NAME_1 = "sslMetric1";
public static final String SSL_METRIC_NAME_2 = "sslMetric2";
public static final String SSL_TAG_NAME_1 = "sslTag1";
public static final String SSL_TAG_NAME_2 = "sslTag2";
public static final String SSL_TAG_VALUE_1 = "sslTag1";
public static final String SSL_TAG_VALUE_2 = "sslTag2";
private static final String HTTP_METRIC_NAME_1 = "httpMetric1";
private static final String HTTP_METRIC_NAME_2 = "httpMetric2";
private static final String HTTP_TAG_NAME_1 = "httpTag1";
private static final String HTTP_TAG_NAME_2 = "httpTag2";
private static final String HTTP_TAG_VALUE_1 = "httpTagValue1";
private static final String HTTP_TAG_VALUE_2 = "httpTagValue2";
private static final String HTTP_TAG_VALUE_3 = "httpTagValue3";

private static final String TELNET_METRIC_NAME_1 = "telnetMetric1";
private static final String TELNET_METRIC_NAME_2 = "telnetMetric2";
private static final String TELNET_METRIC_NAME_3 = "telnetMetric3";
private static final String TELNET_METRIC_NAME_4 = "telnetMetric4";
private static final String TELNET_TAG_NAME_1 = "telnetTag1";
private static final String TELNET_TAG_NAME_2 = "telnetTag2";
private static final String TELNET_TAG_VALUE_1 = "telnetTag1";
private static final String TELNET_TAG_VALUE_2 = "telnetTag2";

private static final String SSL_METRIC_NAME_1 = "sslMetric1";
private static final String SSL_METRIC_NAME_2 = "sslMetric2";
private static final String SSL_TAG_NAME_1 = "sslTag1";
private static final String SSL_TAG_NAME_2 = "sslTag2";
private static final String SSL_TAG_VALUE_1 = "sslTag1";
private static final String SSL_TAG_VALUE_2 = "sslTag2";

private static InMemoryKairosServer kairos;

Expand All @@ -71,7 +59,7 @@ public static void setupClass() throws InterruptedException

while (!kairos.isStarted())
{
Thread.sleep(5);
sleep(5);
}
}

Expand All @@ -81,19 +69,10 @@ public static void tearDownClass() throws DatastoreException, InterruptedExcepti
kairos.shutdown();
}

@After
public void tearDown()
{
kairos.getDataPointListener().setEvent(null);
}

@SuppressWarnings("deprecation")
@Test
public void test_telnetClient() throws IOException, URISyntaxException, DataFormatException
public void test_telnetClient() throws IOException, URISyntaxException, DataFormatException, InterruptedException
{
DataPointEvent dataPointEvent = mock(DataPointEvent.class);
kairos.getDataPointListener().setEvent(dataPointEvent);

TelnetClient client = new TelnetClient("localhost", 4244);

try
Expand All @@ -116,8 +95,7 @@ public void test_telnetClient() throws IOException, URISyntaxException, DataForm

// Because Telnet is Asynchronous, it takes some time before the datapoints get written.
// Wait for Kairos to notify us that they have been written.
verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_1);
verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_2);
watiForEvent();

// Query metrics
QueryBuilder builder = QueryBuilder.getInstance();
Expand Down Expand Up @@ -148,13 +126,9 @@ public void test_telnetClient() throws IOException, URISyntaxException, DataForm
}
}


@Test
public void test_telnetClientPutMetrics() throws IOException, URISyntaxException, DataFormatException
public void test_telnetClientPutMetrics() throws IOException, URISyntaxException, DataFormatException, InterruptedException
{
DataPointEvent dataPointEvent = mock(DataPointEvent.class);
kairos.getDataPointListener().setEvent(dataPointEvent);

TelnetClient client = new TelnetClient("localhost", 4244);

try
Expand All @@ -177,8 +151,7 @@ public void test_telnetClientPutMetrics() throws IOException, URISyntaxException

// Because Telnet is Asynchronous, it takes some time before the datapoints get written.
// Wait for Kairos to notify us that they have been written.
verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_3);
verify(dataPointEvent, timeout(5000).times(1)).datapoint(TELNET_METRIC_NAME_4);
watiForEvent();

// Query metrics
QueryBuilder builder = QueryBuilder.getInstance();
Expand Down Expand Up @@ -211,7 +184,7 @@ public void test_telnetClientPutMetrics() throws IOException, URISyntaxException

@Test
public void test_httpClient_no_results_from_query()
throws InterruptedException, IOException, URISyntaxException, DataFormatException
throws IOException, URISyntaxException
{
HttpClient client = new HttpClient("http://localhost:8081");

Expand Down Expand Up @@ -256,7 +229,7 @@ public void test_httpClient_no_results_from_query()
}

@Test
public void test_httpClient() throws InterruptedException, IOException, URISyntaxException, DataFormatException
public void test_httpClient() throws IOException, URISyntaxException, DataFormatException
{
HttpClient client = new HttpClient("http://localhost:8081");

Expand Down Expand Up @@ -299,7 +272,7 @@ public void test_httpClient() throws InterruptedException, IOException, URISynta
assertThat(tagValues.getResults(), hasItems(HTTP_TAG_VALUE_1, HTTP_TAG_VALUE_2));

// Check Status
GetResponse status = client.getStatus();
client.getStatus();

assertThat(tagValues.getStatusCode(), equalTo(200));

Expand All @@ -326,7 +299,7 @@ public void test_httpClient() throws InterruptedException, IOException, URISynta
}

@Test
public void test_httpClient_multiTagValues() throws InterruptedException, IOException, URISyntaxException, DataFormatException
public void test_httpClient_multiTagValues() throws IOException, URISyntaxException, DataFormatException
{
HttpClient client = new HttpClient("http://localhost:8081");

Expand Down Expand Up @@ -388,7 +361,7 @@ public void test_httpClient_multiTagValues() throws InterruptedException, IOExce
* return any errors which means that the aggregators and groupBys are all valid.
*/
@Test
public void test_aggregatorsAndGroupBy() throws InterruptedException, IOException, URISyntaxException
public void test_aggregatorsAndGroupBy() throws IOException, URISyntaxException
{
HttpClient client = new HttpClient("http://localhost:8081");

Expand Down Expand Up @@ -429,14 +402,15 @@ public void test_aggregatorsAndGroupBy() throws InterruptedException, IOExceptio
metric.addAggregator(AggregatorFactory.createPercentileAggregator(0.3, 1, TimeUnit.SECONDS));
metric.addAggregator(AggregatorFactory.createLastAggregator(1, TimeUnit.SECONDS));
metric.addAggregator(AggregatorFactory.createFirstAggregator(1, TimeUnit.SECONDS));
metric.addAggregator(AggregatorFactory.createDataGapsMarkingAggregator(1, TimeUnit.SECONDS));
metric.addAggregator(AggregatorFactory.createDiffAggregator());
metric.addAggregator(AggregatorFactory.createLeastSquaresAggregator(1, TimeUnit.SECONDS));
metric.addAggregator(AggregatorFactory.createSamplerAggregator());
metric.addAggregator(AggregatorFactory.createScaleAggregator(.05));
metric.addAggregator(AggregatorFactory.createSaveAsAggregator("newMetricName"));
metric.addAggregator(AggregatorFactory.createTrimAggregator(AggregatorFactory.Trim.BOTH));
metric.addAggregator(AggregatorFactory.createSimpleMovingAverage(2));
metric.addAggregator(AggregatorFactory.createFilterAggregator(AggregatorFactory.FilterOperation.GTE, 2.0));
metric.addAggregator(AggregatorFactory.createDataGapsMarkingAggregator(1, TimeUnit.SECONDS));

metric.addGrouper(new TagGrouper(HTTP_TAG_NAME_1, HTTP_TAG_NAME_2));
metric.addGrouper(new TimeGrouper(new RelativeTime(1, TimeUnit.MILLISECONDS), 3));
Expand Down Expand Up @@ -522,7 +496,7 @@ public void test_ssl() throws IOException, URISyntaxException, DataFormatExcepti

@SuppressWarnings("PointlessArithmeticExpression")
@Test
public void test_limit() throws InterruptedException, IOException, URISyntaxException, DataFormatException
public void test_limit() throws IOException, URISyntaxException, DataFormatException
{
HttpClient client = new HttpClient("http://localhost:8081");

Expand Down Expand Up @@ -576,7 +550,7 @@ public void test_limit() throws InterruptedException, IOException, URISyntaxExce

@SuppressWarnings("PointlessArithmeticExpression")
@Test
public void test_Order() throws InterruptedException, IOException, URISyntaxException, DataFormatException
public void test_Order() throws IOException, URISyntaxException, DataFormatException
{
HttpClient client = new HttpClient("http://localhost:8081");

Expand Down Expand Up @@ -625,7 +599,7 @@ public void test_Order() throws InterruptedException, IOException, URISyntaxExce
}

@Test
public void test_customDataType() throws IOException, URISyntaxException, InterruptedException
public void test_customDataType() throws IOException, URISyntaxException
{
HttpClient client = new HttpClient("http://localhost:8081");
client.registerCustomDataType("complex", Complex.class);
Expand Down Expand Up @@ -664,6 +638,20 @@ public void test_customDataType() throws IOException, URISyntaxException, Interr

}

private void watiForEvent() throws InterruptedException
{
boolean done = false;
Stopwatch stopwatch = Stopwatch.createStarted();
while(!done)
{
sleep(200);
if (kairos.getDataPointListener().getEvent() != null || stopwatch.elapsed(java.util.concurrent.TimeUnit.SECONDS) > 1)
{
done = true;
}
}
}

private class Complex
{
private long real;
Expand Down
19 changes: 8 additions & 11 deletions src/test/java/org/kairosdb/client/TestDataPointListener.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
package org.kairosdb.client;

import org.kairosdb.core.DataPoint;
import org.kairosdb.core.DataPointListener;
import org.kairosdb.eventbus.Subscribe;
import org.kairosdb.events.DataPointEvent;

import java.util.SortedMap;

public class TestDataPointListener implements DataPointListener
public class TestDataPointListener
{
private DataPointEvent event;
private org.kairosdb.events.DataPointEvent event;

public void setEvent(DataPointEvent event)
@Subscribe
public void dataPoint(org.kairosdb.events.DataPointEvent event)
{
this.event = event;
}

@Override
public void dataPoint(String metricName, SortedMap<String, String> tags, DataPoint dataPoint)
public DataPointEvent getEvent()
{
if (event != null)
event.datapoint(metricName);
return event;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,14 @@ public void test_createSimpleMovingAggregator()
assertThat(aggregator.getName(), equalTo("sma"));
assertThat(aggregator.toJson(), equalTo("{\"name\":\"sma\",'size':5}"));
}

@Test
public void test_createFilterAggregator()
{
CustomAggregator aggregator = AggregatorFactory.createFilterAggregator(AggregatorFactory.FilterOperation.GT, 3.0);

assertThat(aggregator.getName(), equalTo("filter"));
assertThat(aggregator.toJson(), equalTo("{\"name\":\"filter\",\"filter_op\":\"GT\", \"threshold\":3.0}"));
}

}

0 comments on commit 7877d6a

Please sign in to comment.