Skip to content

Commit

Permalink
Add support for Moving Function aggregation (#3419)
Browse files Browse the repository at this point in the history
See elastic/elasticsearch#29594

(cherry picked from commit 6c4f69f)
  • Loading branch information
Mpdreamz authored and russcam committed Oct 26, 2018
1 parent 81950a4 commit 4e62181
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 5 deletions.
12 changes: 12 additions & 0 deletions src/Nest/Aggregations/AggregationContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ public interface IAggregationContainer
[JsonProperty("moving_avg")]
IMovingAverageAggregation MovingAverage { get; set; }

[JsonProperty("moving_fn")]
IMovingFunctionAggregation MovingFunction { get; set; }

[JsonProperty("cumulative_sum")]
ICumulativeSumAggregation CumulativeSum { get; set; }

Expand Down Expand Up @@ -295,6 +298,8 @@ public class AggregationContainer : IAggregationContainer

public IMovingAverageAggregation MovingAverage { get; set; }

public IMovingFunctionAggregation MovingFunction { get; set; }

public ICumulativeSumAggregation CumulativeSum { get; set; }

public ISerialDifferencingAggregation SerialDifferencing { get; set; }
Expand Down Expand Up @@ -430,6 +435,8 @@ public class AggregationContainerDescriptor<T> : DescriptorBase<AggregationConta

IMovingAverageAggregation IAggregationContainer.MovingAverage { get; set; }

IMovingFunctionAggregation IAggregationContainer.MovingFunction { get; set; }

ICumulativeSumAggregation IAggregationContainer.CumulativeSum { get; set; }

ISerialDifferencingAggregation IAggregationContainer.SerialDifferencing { get; set; }
Expand Down Expand Up @@ -606,6 +613,10 @@ public AggregationContainerDescriptor<T> MovingAverage(string name,
Func<MovingAverageAggregationDescriptor, IMovingAverageAggregation> selector) =>
_SetInnerAggregation(name, selector, (a, d) => a.MovingAverage = d);

public AggregationContainerDescriptor<T> MovingFunction(string name,
Func<MovingFunctionAggregationDescriptor, IMovingFunctionAggregation> selector) =>
_SetInnerAggregation(name, selector, (a, d) => a.MovingFunction = d);

public AggregationContainerDescriptor<T> CumulativeSum(string name,
Func<CumulativeSumAggregationDescriptor, ICumulativeSumAggregation> selector) =>
_SetInnerAggregation(name, selector, (a, d) => a.CumulativeSum = d);
Expand Down Expand Up @@ -715,5 +726,6 @@ public void Accept(IAggregationVisitor visitor)
((IAggregationContainer)d).Aggregations = ((IAggregationContainer)left).Aggregations;
return d;
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using Newtonsoft.Json;

namespace Nest
{
[JsonObject(MemberSerialization = MemberSerialization.OptIn)]
[ContractJsonConverter(typeof(AggregationJsonConverter<MovingFunctionAggregation>))]
public interface IMovingFunctionAggregation : IPipelineAggregation
{
[JsonProperty("window")]
int? Window { get; set; }

[JsonProperty("script")]
string Script { get; set; }
}

public class MovingFunctionAggregation
: PipelineAggregationBase, IMovingFunctionAggregation
{
internal MovingFunctionAggregation () { }

public MovingFunctionAggregation(string name, SingleBucketsPath bucketsPath)
: base(name, bucketsPath) { }

internal override void WrapInContainer(AggregationContainer c) => c.MovingFunction = this;

public int? Window { get; set; }
public string Script { get; set; }
}

public class MovingFunctionAggregationDescriptor
: PipelineAggregationDescriptorBase<MovingFunctionAggregationDescriptor, IMovingFunctionAggregation, SingleBucketsPath>
, IMovingFunctionAggregation
{
int? IMovingFunctionAggregation.Window { get; set; }
string IMovingFunctionAggregation.Script { get; set; }

public MovingFunctionAggregationDescriptor Window(int? windowSize) => Assign(a => a.Window = windowSize);

public MovingFunctionAggregationDescriptor Script(string script) => Assign(a => a.Script = script);
}
}
5 changes: 1 addition & 4 deletions src/Nest/Aggregations/Pipeline/PipelineAggregationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ public abstract class PipelineAggregationBase : AggregationBase, IPipelineAggreg
{
internal PipelineAggregationBase() { }

public PipelineAggregationBase(string name, IBucketsPath bucketsPath) : base(name)
{
this.BucketsPath = bucketsPath;
}
public PipelineAggregationBase(string name, IBucketsPath bucketsPath) : base(name) => this.BucketsPath = bucketsPath;

public IBucketsPath BucketsPath { get; set; }
public string Format { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected override void ExpectResponse(ISearchResponse<Project> response)
projectsPerMonth.Buckets.Should().NotBeNull();
projectsPerMonth.Buckets.Count.Should().BeGreaterThan(0);

// average not calculated for the first bucket
// average not calculated for the first bucket so movingAvg.Value is expected to be null there
foreach(var item in projectsPerMonth.Buckets.Skip(1))
{
var movingAvg = item.Sum("commits_moving_avg");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
using System;
using System.Linq;
using Elastic.Xunit.XunitPlumbing;
using FluentAssertions;
using Nest;
using Tests.Core.Extensions;
using Tests.Core.ManagedElasticsearch.Clusters;
using Tests.Domain;
using Tests.Framework;
using Tests.Framework.Integration;

namespace Tests.Aggregations.Pipeline.MovingFunction
{
public class MovingFunctionAggregationUsageTests : AggregationUsageTestBase
{
public MovingFunctionAggregationUsageTests(ReadOnlyCluster cluster, EndpointUsage usage) : base(cluster, usage) { }

protected override object AggregationJson => new
{
projects_started_per_month = new
{
date_histogram = new
{
field = "startedOn",
interval = "month",
},
aggs = new
{
commits = new
{
sum = new
{
field = "numberOfCommits"
}
},
commits_moving_avg = new
{
moving_fn = new
{
buckets_path = "commits",
window = 30,
script = "MovingFunctions.unweightedAvg(values)"
}
}
}
}
};

protected override Func<AggregationContainerDescriptor<Project>, IAggregationContainer> FluentAggs => a => a
.DateHistogram("projects_started_per_month", dh => dh
.Field(p => p.StartedOn)
.Interval(DateInterval.Month)
.Aggregations(aa => aa
.Sum("commits", sm => sm
.Field(p => p.NumberOfCommits)
)
.MovingFunction("commits_moving_avg", mv => mv
.BucketsPath("commits")
.Window(30)
.Script("MovingFunctions.unweightedAvg(values)")
)
)
);

protected override AggregationDictionary InitializerAggs =>
new DateHistogramAggregation("projects_started_per_month")
{
Field = "startedOn",
Interval = DateInterval.Month,
Aggregations =
new SumAggregation("commits", "numberOfCommits")
&& new MovingFunctionAggregation("commits_moving_avg", "commits")
{
Window = 30,
Script = "MovingFunctions.unweightedAvg(values)"
}
};

protected override void ExpectResponse(ISearchResponse<Project> response)
{
response.ShouldBeValid();

var projectsPerMonth = response.Aggregations.DateHistogram("projects_started_per_month");
projectsPerMonth.Should().NotBeNull();
projectsPerMonth.Buckets.Should().NotBeNull();
projectsPerMonth.Buckets.Count.Should().BeGreaterThan(0);

// average not calculated for the first bucket
foreach(var item in projectsPerMonth.Buckets.Skip(1))
{
var movingAvg = item.Sum("commits_moving_avg");
movingAvg.Should().NotBeNull();
movingAvg.Value.Should().BeGreaterThan(0);
}
}
}
}

0 comments on commit 4e62181

Please sign in to comment.