Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STREAMPIPES-616] refactor cumsum processor #132

Merged
merged 5 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumController;
import org.apache.streampipes.processors.changedetection.jvm.welford.WelfordChangeDetection;

public class ChangeDetectionJvmInit extends StandaloneModelSubmitter {

Expand All @@ -43,7 +44,10 @@ public SpServiceDefinition provideServiceDefinition() {
"Processors Change Detection JVM",
"",
8090)
.registerPipelineElements(new CusumController())
.registerPipelineElements(
new CusumController(),
new WelfordChangeDetection()
)
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;

@Deprecated(since="0.70.0", forRemoval = true)
public class Cusum implements EventProcessor<CusumParameters> {

private String selectedNumberMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import java.util.Arrays;

@Deprecated(since = "0.70.0", forRemoval = true)
public class CusumController extends StandaloneEventProcessingDeclarer<CusumParameters> {

private static final String NUMBER_MAPPING = "number-mapping";
Expand All @@ -44,6 +45,7 @@ public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.cusum")
.category(DataProcessorType.VALUE_OBSERVER)
.withAssets(Assets.DOCUMENTATION)
.withAssets(Assets.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.streampipes.processors.changedetection.jvm.cusum;


@Deprecated(since = "0.70.0", forRemoval = true)
public class CusumEventFields {

public static final String VAL_LOW = "cusumLow";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;

@Deprecated(since = "0.70.0", forRemoval = true)
public class CusumParameters extends EventProcessorBindingParams {

private String selectedNumberMapping;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.streampipes.processors.changedetection.jvm.cusum;

@Deprecated(since = "0.70.0", forRemoval = true)
public class WelfordAggregate {
private Integer count;
private Double mean;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.changedetection.jvm.welford;

public class WelfordAggregate {
private Integer count;
private Double mean;
private Double m2;

public WelfordAggregate() {
count = 0;
mean = 0.0;
m2 = 0.0;
}

public void update(Double newValue) {
count++;
Double delta = mean != null ? newValue - mean : 0.0;
mean += delta / count;
Double delta2 = newValue - mean;
m2 += delta * delta2;
}

public Double getMean() {
return mean;
}

public Double getPopulationVariance() {
return m2 / count;
}

public Double getSampleVariance() {
return m2 / (count - 1);
}

public Double getSampleStd() {
return Math.sqrt(getSampleVariance());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.streampipes.processors.changedetection.jvm.welford;

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.changedetection.jvm.cusum.CusumEventFields;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import java.util.Arrays;
import java.util.List;

public class WelfordChangeDetection extends StreamPipesDataProcessor {

private static final String NUMBER_MAPPING = "number-mapping";
private static final String PARAM_K = "param-k";
private static final String PARAM_H = "param-h";

private String selectedNumberMapping;
private Double k;
private Double h;
private Double cuSumLow;
private Double cuSumHigh;
private WelfordAggregate welfordAggregate;

@Override
public DataProcessorDescription declareModel() {
return ProcessingElementBuilder.create("org.apache.streampipes.processors.changedetection.jvm.welford")
.category(DataProcessorType.VALUE_OBSERVER)
.withAssets(Assets.DOCUMENTATION)
.withAssets(Assets.ICON)
.withLocales(Locales.EN)
.requiredStream(StreamRequirementsBuilder
.create()
.requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
Labels.withId(NUMBER_MAPPING),
PropertyScope.NONE).build())
.requiredFloatParameter(Labels.withId(PARAM_K), 0.0f, 0.0f, 100.0f, 0.01f)
.requiredFloatParameter(Labels.withId(PARAM_H), 0.0f, 0.0f, 100.0f, 0.01f)
.outputStrategy(
OutputStrategies.append(
Arrays.asList(
EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_LOW.label, SO.Number),
EpProperties.numberEp(Labels.empty(), WelfordEventFields.VAL_HIGH.label, SO.Number),
EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_LOW.label, SO.Boolean),
EpProperties.booleanEp(Labels.empty(), WelfordEventFields.DECISION_HIGH.label, SO.Boolean)
)
))
.build();
}

@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {

ProcessingElementParameterExtractor extractor = parameters.extractor();
this.selectedNumberMapping = extractor.mappingPropertyValue(NUMBER_MAPPING);
this.k = extractor.singleValueParameter(PARAM_K, Double.class);
this.h = extractor.singleValueParameter(PARAM_H, Double.class);
this.cuSumLow = 0.0;
this.cuSumHigh = 0.0;
this.welfordAggregate = new WelfordAggregate();

}

@Override
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {

Double number = event.getFieldBySelector(selectedNumberMapping).getAsPrimitive().getAsDouble();
welfordAggregate.update(number); // update mean and standard deviation
Double normalized = getZScoreNormalizedValue(number);
updateStatistics(normalized);

bossenti marked this conversation as resolved.
Show resolved Hide resolved
Boolean isChangeHigh = getTestResult(this.cuSumHigh, h);
Boolean isChangeLow = getTestResult(this.cuSumLow, h);

Event updatedEvent = updateEvent(event, this.cuSumLow, this.cuSumHigh, isChangeLow, isChangeHigh);
collector.collect(updatedEvent);

if (isChangeHigh || isChangeLow) {
resetAfterChange();
}

}

@Override
public void onDetach() throws SpRuntimeException {
this.cuSumLow = 0.0;
this.cuSumHigh = 0.0;
}

private Double getZScoreNormalizedValue(Double value) {
Double mean = welfordAggregate.getMean();
Double std = welfordAggregate.getSampleStd();
return (value - mean) / std;
}

private void updateStatistics(Double newValue) {
if (newValue.isNaN()) {
return;
}
this.cuSumHigh = Math.max(0, this.cuSumHigh + newValue - k);
this.cuSumLow = Math.min(0, this.cuSumLow + newValue + k);
}

private Boolean getTestResult(Double cusum, Double h) {
return Math.abs(cusum) > this.h;
}

private Event updateEvent(Event event, Double cusumLow, Double cusumHigh, Boolean decisionLow, Boolean decisionHigh) {
event.addField(WelfordEventFields.VAL_LOW.label, cusumLow);
event.addField(WelfordEventFields.VAL_HIGH.label, cusumHigh);
event.addField(WelfordEventFields.DECISION_LOW.label, decisionLow);
event.addField(WelfordEventFields.DECISION_HIGH.label, decisionHigh);
return event;
}

private void resetAfterChange() {
this.cuSumHigh = 0.0;
this.cuSumLow = 0.0;
welfordAggregate = new WelfordAggregate();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.changedetection.jvm.welford;

public enum WelfordEventFields {
VAL_LOW("cumSumLow"),
VAL_HIGH("cumSumHigh"),
DECISION_LOW("changeDetectedLow"),
DECISION_HIGH("changeDetectedHigh");

public final String label;

WelfordEventFields(String label) {
this.label = label;
}

@Override
public String toString() {
return this.label;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
~
-->

## Cusum (Cumulative Sum)
## Cusum (Cumulative Sum) - Deprecated

<!--
<p align="center">
Expand All @@ -28,6 +28,8 @@

## Description

**This processing element is deprecated. Please use the `WelfordChangeDetection` processing element instead.**

Performs change detection on a single dimension of the incoming data stream. A change is detected if the cumulative deviation from the mean exceeds a certain threshold. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams.

***
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum
org.apache.streampipes.processors.changedetection.jvm.cusum.description=
org.apache.streampipes.processors.changedetection.jvm.cusum.title=Cusum (Deprecated)
org.apache.streampipes.processors.changedetection.jvm.cusum.description=This processor is deprecated. Please use the processing element `WelfordChangeDetection` instead.

number-mapping.title=Value to observe
number-mapping.description=Specifies the monitored dimension.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

## Welford Change Detection

<!--
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
</p>
-->

***

## Description

Performs change detection on a single dimension of the incoming data stream. This implementation tracks the mean and the standard deviation using Welford's algorithm, which is well suited for data streams. A change is detected if the cumulative deviation from the mean exceeds a certain threshold.

***

## Required input

The welford change dectection processor requires a data stream that has at least one field containing a numerical value.

***

## Configuration

### Value to observe
Specify the dimension of the data stream (e.g. the temperature) on which to perform change detection.

### Parameter `k`
`k` controls the sensitivity of the change detector. Its unit are standard deviations. For an observation `x_n`, the Cusum value is `S_n = max(0, S_{n-1} - z-score(x_n) - k)`. Thus, the cusum-score `S` icnreases if `S_{n-1} - z-score(x_n) > k`.

### Parameter `h`
The alarm theshold in standard deviations. An alarm occurs if `S_n > h`

## Output

This processor outputs the original data stream plus

- `cumSumLow`: The cumulative sum value for negative changes
- `cumSumHigh`: The cumulative sum value for positive changes
- `changeDetectedLow`: Boolean indicating if a negative change was detected
- `changeDetectedHigh`: Boolean indicating if a positive change was detected
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading