Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Min/Max for Timestamp #3299

Merged
merged 4 commits into from
Nov 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions api/src/main/java/io/druid/data/input/impl/TimestampSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,19 @@ public DateTime getMissingValue()

public DateTime extractTimestamp(Map<String, Object> input)
{
final Object o = input.get(timestampColumn);
return parseDateTime(input.get(timestampColumn));
}

public DateTime parseDateTime(Object input)
{
DateTime extracted = missingValue;
if (o != null) {
if (o.equals(parseCtx.lastTimeObject)) {
if (input != null) {
if (input.equals(parseCtx.lastTimeObject)) {
extracted = parseCtx.lastDateTime;
} else {
ParseCtx newCtx = new ParseCtx();
newCtx.lastTimeObject = o;
extracted = timestampConverter.apply(o);
newCtx.lastTimeObject = input;
extracted = timestampConverter.apply(input);
newCtx.lastDateTime = extracted;
parseCtx = newCtx;
}
Expand Down
85 changes: 85 additions & 0 deletions docs/content/development/extensions-contrib/time-min-max.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
layout: doc_page
---

# Timestamp Min/Max aggregators

To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-time-min-max`.

These aggregators enable more precise calculation of min and max time of given events than `__time` column whose granularity is sparse, the same as query granularity.
To use this feature, a "timeMin" or "timeMax" aggregator must be included at indexing time.
They can apply to any columns that can be converted to timestamp, which include Long, DateTime, Timestamp, and String types.

For example, when a data set consists of timestamp, dimension, and metric value like followings.

```
2015-07-28T01:00:00.000Z A 1
2015-07-28T02:00:00.000Z A 1
2015-07-28T03:00:00.000Z A 1
2015-07-28T04:00:00.000Z B 1
2015-07-28T05:00:00.000Z A 1
2015-07-28T06:00:00.000Z B 1
2015-07-29T01:00:00.000Z C 1
2015-07-29T02:00:00.000Z C 1
2015-07-29T03:00:00.000Z A 1
2015-07-29T04:00:00.000Z A 1
```

At ingestion time, timeMin and timeMax aggregator can be included as other aggregators.

```json
{
"type": "timeMin",
"name": "tmin",
"fieldName": "<field_name, typically column specified in timestamp spec>"
}
```

```json
{
"type": "timeMax",
"name": "tmax",
"fieldName": "<field_name, typically column specified in timestamp spec>"
}
```

`name` is output name of aggregator and can be any string. `fieldName` is typically column specified in timestamp spec but can be any column that can be converted to timestamp.

To query for results, the same aggregators "timeMin" and "timeMax" is used.

```json
{
"queryType": "groupBy",
"dataSource": "timeMinMax",
"granularity": "DAY",
"dimensions": ["product"],
"aggregations": [
{
"type": "count",
"name": "count"
}
{
"type": "timeMin",
"name": "<output_name of timeMin>",
"fieldName": "tmin"
},
{
"type": "timeMax",
"name": "<output_name of timeMax>",
"fieldName": "tmax"
}
],
"intervals": [
"2010-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"
]
}
```

Then, result has min and max of timestamp, which is finer than query granularity.

```
2015-07-28T00:00:00.000Z A 4 2015-07-28T01:00:00.000Z 2015-07-28T05:00:00.000Z
2015-07-28T00:00:00.000Z B 2 2015-07-28T04:00:00.000Z 2015-07-28T06:00:00.000Z
2015-07-29T00:00:00.000Z A 2 2015-07-29T03:00:00.000Z 2015-07-29T04:00:00.000Z
2015-07-29T00:00:00.000Z C 2 2015-07-29T01:00:00.000Z 2015-07-29T02:00:00.000Z
```
1 change: 1 addition & 0 deletions docs/content/development/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|druid-time-min-max|Min/Max aggregator for timestamp.|[link](../development/extensions-contrib/time-min-max.html)|
|sqlserver-metadata-storage|Microsoft SqlServer deep storage.|[link](../development/extensions-contrib/sqlserver.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|statsd-emitter|StatsD metrics emitter|[link](../development/extensions-contrib/statsd.html)|
Expand Down
79 changes: 79 additions & 0 deletions extensions-contrib/time-min-max/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.9.3-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-time-min-max</artifactId>
<name>druid-time-min-max</name>
<description>Min/Max of timestamp</description>

<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.aggregation;

import io.druid.data.input.impl.TimestampSpec;
import io.druid.segment.ObjectColumnSelector;

import java.util.Comparator;

public class TimestampAggregator implements Aggregator
{
static final Comparator COMPARATOR = LongMaxAggregator.COMPARATOR;

static long combineValues(Object lhs, Object rhs)
{
return Math.max(((Number)lhs).longValue(), ((Number)rhs).longValue());
}

private final ObjectColumnSelector selector;
private final String name;
private final TimestampSpec timestampSpec;
private final Comparator<Long> comparator;
private final Long initValue;

private long most;

public TimestampAggregator(
String name,
ObjectColumnSelector selector,
TimestampSpec timestampSpec,
Comparator<Long> comparator,
Long initValue
)
{
this.name = name;
this.selector = selector;
this.timestampSpec = timestampSpec;
this.comparator = comparator;
this.initValue = initValue;

reset();
}

@Override
public void aggregate() {
Long value = TimestampAggregatorFactory.convertLong(timestampSpec, selector.get());

if (value != null) {
most = comparator.compare(most, value) > 0 ? most : value;
}
}

@Override
public void reset()
{
most = initValue;
}

@Override
public Object get()
{
return most;
}

@Override
public float getFloat()
{
return (float) most;
}

@Override
public String getName()
{
return name;
}

@Override
public void close()
{
// no resource to cleanup
}

@Override
public long getLong()
{
return most;
}

@Override
public Aggregator clone()
{
return new TimestampAggregator(name, selector, timestampSpec, comparator, initValue);
}
}
Loading