Skip to content

Commit

Permalink
feat: add DEVIATION_TYPE_ANY to check for any type of change
Browse files Browse the repository at this point in the history
  • Loading branch information
agparadiso committed Nov 14, 2024
1 parent 95dae33 commit ca56369
Show file tree
Hide file tree
Showing 2 changed files with 556 additions and 30 deletions.
55 changes: 41 additions & 14 deletions pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ import (
const (
AGGREGATION_METHOD_MEDIAN = "median"
AGGREGATION_METHOD_MODE = "mode"
DEVIATION_TYPE_NONE = "none"
DEVIATION_TYPE_PERCENT = "percent"
DEVIATION_TYPE_ABSOLUTE = "absolute"
REPORT_FORMAT_MAP = "map"
REPORT_FORMAT_ARRAY = "array"
REPORT_FORMAT_VALUE = "value"
// DEVIATION_TYPE_NONE is no deviation check
DEVIATION_TYPE_NONE = "none"
// DEVIATION_TYPE_ANY is any difference from the previous value to the next value
DEVIATION_TYPE_ANY = "any"
// DEVIATION_TYPE_PERCENT is a numeric percentage difference
DEVIATION_TYPE_PERCENT = "percent"
// DEVIATION_TYPE_ABSOLUTE is a numeric absolute difference
DEVIATION_TYPE_ABSOLUTE = "absolute"
REPORT_FORMAT_MAP = "map"
REPORT_FORMAT_ARRAY = "array"
REPORT_FORMAT_VALUE = "value"

DEFAULT_REPORT_FORMAT = REPORT_FORMAT_MAP
DEFAULT_OUTPUT_FIELD_NAME = "Reports"
Expand All @@ -57,7 +62,9 @@ type AggregationField struct {
// The format of the deviation being provided
// * percent - a percentage deviation
// * absolute - an unsigned numeric difference
DeviationType string `mapstructure:"deviationType" json:"deviationType,omitempty" jsonschema:"enum=percent,enum=absolute,enum=none"`
// * none - no deviation check
// * any - any difference from the previous value to the next value
DeviationType string `mapstructure:"deviationType" json:"deviationType,omitempty" jsonschema:"enum=percent,enum=absolute,enum=none,enum=any"`
// The key to find a data point within the input data
// If omitted, the entire input will be used
InputKey string `mapstructure:"inputKey" json:"inputKey"`
Expand Down Expand Up @@ -111,10 +118,14 @@ func (a *reduceAggregator) Aggregate(lggr logger.Logger, previousOutcome *types.
return nil, fmt.Errorf("unable to determine if should report, err: %s", err.Error())
}

if shouldReportField || field.DeviationType == DEVIATION_TYPE_NONE {
(*currentState)[field.OutputKey] = singleValue
}

if shouldReportField {
shouldReport = true
(*currentState)[field.OutputKey] = singleValue
}

if len(field.OutputKey) > 0 {
report[field.OutputKey] = singleValue
} else {
Expand Down Expand Up @@ -183,14 +194,17 @@ func (a *reduceAggregator) Aggregate(lggr logger.Logger, previousOutcome *types.
}

func (a *reduceAggregator) shouldReport(lggr logger.Logger, field AggregationField, singleValue values.Value, currentState *map[string]values.Value) (bool, error) {
oldValue := (*currentState)[field.OutputKey]
if field.DeviationType == DEVIATION_TYPE_NONE {
return false, nil
}

oldValue := (*currentState)[field.OutputKey]
// this means its the first round and the field has not been initialised
if oldValue == nil {
return true, nil
}

if field.DeviationType == DEVIATION_TYPE_NONE {
if field.DeviationType == DEVIATION_TYPE_ANY {
unwrappedOldValue, err := oldValue.Unwrap()
if err != nil {
return false, err
Expand All @@ -207,6 +221,19 @@ func (a *reduceAggregator) shouldReport(lggr logger.Logger, field AggregationFie
if !bytes.Equal(v, unwrappedSingleValue.([]byte)) {
return true, nil
}
case map[string]interface{}, []any:
marshalledOldValue, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(oldValue))
if err != nil {
return false, err
}

marshalledSingleValue, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(singleValue))
if err != nil {
return false, err
}
if !bytes.Equal(marshalledOldValue, marshalledSingleValue) {
return true, nil
}
default:
if unwrappedOldValue != unwrappedSingleValue {
return true, nil
Expand Down Expand Up @@ -518,10 +545,10 @@ func ParseConfigReduceAggregator(config values.Map) (ReduceAggConfig, error) {
field.DeviationType = DEVIATION_TYPE_NONE
parsedConfig.Fields[i].DeviationType = DEVIATION_TYPE_NONE
}
if !isOneOf(field.DeviationType, []string{DEVIATION_TYPE_ABSOLUTE, DEVIATION_TYPE_PERCENT, DEVIATION_TYPE_NONE}) {
if !isOneOf(field.DeviationType, []string{DEVIATION_TYPE_ABSOLUTE, DEVIATION_TYPE_PERCENT, DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) {
return ReduceAggConfig{}, fmt.Errorf("invalid config DeviationType. received: %s. options: [%s, %s, %s]", field.DeviationType, DEVIATION_TYPE_ABSOLUTE, DEVIATION_TYPE_PERCENT, DEVIATION_TYPE_NONE)
}
if field.DeviationType != DEVIATION_TYPE_NONE && len(field.DeviationString) == 0 {
if !isOneOf(field.DeviationType, []string{DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) && len(field.DeviationString) == 0 {
return ReduceAggConfig{}, errors.New("aggregation field deviation must contain DeviationString amount")
}
if field.DeviationType != DEVIATION_TYPE_NONE && len(field.DeviationString) > 0 {
Expand All @@ -534,8 +561,8 @@ func ParseConfigReduceAggregator(config values.Map) (ReduceAggConfig, error) {
if len(field.Method) == 0 || !isOneOf(field.Method, []string{AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE}) {
return ReduceAggConfig{}, fmt.Errorf("aggregation field must contain a method. options: [%s, %s]", AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE)
}
if len(field.DeviationString) > 0 && field.DeviationType == DEVIATION_TYPE_NONE {
return ReduceAggConfig{}, fmt.Errorf("aggregation field cannot have deviation with a deviation type of %s", DEVIATION_TYPE_NONE)
if len(field.DeviationString) > 0 && isOneOf(field.DeviationType, []string{DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) {
return ReduceAggConfig{}, fmt.Errorf("aggregation field cannot have deviation with a deviation type of %s", field.DeviationType)
}
if field.SubMapField {
hasSubMapField = true
Expand Down
Loading

0 comments on commit ca56369

Please sign in to comment.