From ca56369912e3e8c51d0b71442df5785c35be6a1c Mon Sep 17 00:00:00 2001 From: Gabriel Paradiso Date: Thu, 14 Nov 2024 18:31:34 -0300 Subject: [PATCH] feat: add DEVIATION_TYPE_ANY to check for any type of change --- .../ocr3/aggregators/reduce_aggregator.go | 55 +- .../consensus/ocr3/aggregators/reduce_test.go | 531 +++++++++++++++++- 2 files changed, 556 insertions(+), 30 deletions(-) diff --git a/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go b/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go index 0d7d81209..2a3fb2020 100644 --- a/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go +++ b/pkg/capabilities/consensus/ocr3/aggregators/reduce_aggregator.go @@ -25,12 +25,17 @@ import ( const ( AGGREGATION_METHOD_MEDIAN = "median" AGGREGATION_METHOD_MODE = "mode" - DEVIATION_TYPE_NONE = "none" - DEVIATION_TYPE_PERCENT = "percent" - DEVIATION_TYPE_ABSOLUTE = "absolute" - REPORT_FORMAT_MAP = "map" - REPORT_FORMAT_ARRAY = "array" - REPORT_FORMAT_VALUE = "value" + // DEVIATION_TYPE_NONE is no deviation check + DEVIATION_TYPE_NONE = "none" + // DEVIATION_TYPE_ANY is any difference from the previous value to the next value + DEVIATION_TYPE_ANY = "any" + // DEVIATION_TYPE_PERCENT is a numeric percentage difference + DEVIATION_TYPE_PERCENT = "percent" + // DEVIATION_TYPE_ABSOLUTE is a numeric absolute difference + DEVIATION_TYPE_ABSOLUTE = "absolute" + REPORT_FORMAT_MAP = "map" + REPORT_FORMAT_ARRAY = "array" + REPORT_FORMAT_VALUE = "value" DEFAULT_REPORT_FORMAT = REPORT_FORMAT_MAP DEFAULT_OUTPUT_FIELD_NAME = "Reports" @@ -57,7 +62,9 @@ type AggregationField struct { // The format of the deviation being provided // * percent - a percentage deviation // * absolute - an unsigned numeric difference - DeviationType string `mapstructure:"deviationType" json:"deviationType,omitempty" jsonschema:"enum=percent,enum=absolute,enum=none"` + // * none - no deviation check + // * any - any difference from the previous value to the next value + DeviationType string `mapstructure:"deviationType" json:"deviationType,omitempty" jsonschema:"enum=percent,enum=absolute,enum=none,enum=any"` // The key to find a data point within the input data // If omitted, the entire input will be used InputKey string `mapstructure:"inputKey" json:"inputKey"` @@ -111,10 +118,14 @@ func (a *reduceAggregator) Aggregate(lggr logger.Logger, previousOutcome *types. return nil, fmt.Errorf("unable to determine if should report, err: %s", err.Error()) } + if shouldReportField || field.DeviationType == DEVIATION_TYPE_NONE { + (*currentState)[field.OutputKey] = singleValue + } + if shouldReportField { shouldReport = true - (*currentState)[field.OutputKey] = singleValue } + if len(field.OutputKey) > 0 { report[field.OutputKey] = singleValue } else { @@ -183,14 +194,17 @@ func (a *reduceAggregator) Aggregate(lggr logger.Logger, previousOutcome *types. } func (a *reduceAggregator) shouldReport(lggr logger.Logger, field AggregationField, singleValue values.Value, currentState *map[string]values.Value) (bool, error) { - oldValue := (*currentState)[field.OutputKey] + if field.DeviationType == DEVIATION_TYPE_NONE { + return false, nil + } + oldValue := (*currentState)[field.OutputKey] // this means its the first round and the field has not been initialised if oldValue == nil { return true, nil } - if field.DeviationType == DEVIATION_TYPE_NONE { + if field.DeviationType == DEVIATION_TYPE_ANY { unwrappedOldValue, err := oldValue.Unwrap() if err != nil { return false, err @@ -207,6 +221,19 @@ func (a *reduceAggregator) shouldReport(lggr logger.Logger, field AggregationFie if !bytes.Equal(v, unwrappedSingleValue.([]byte)) { return true, nil } + case map[string]interface{}, []any: + marshalledOldValue, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(oldValue)) + if err != nil { + return false, err + } + + marshalledSingleValue, err := proto.MarshalOptions{Deterministic: true}.Marshal(values.Proto(singleValue)) + if err != nil { + return false, err + } + if !bytes.Equal(marshalledOldValue, marshalledSingleValue) { + return true, nil + } default: if unwrappedOldValue != unwrappedSingleValue { return true, nil @@ -518,10 +545,10 @@ func ParseConfigReduceAggregator(config values.Map) (ReduceAggConfig, error) { field.DeviationType = DEVIATION_TYPE_NONE parsedConfig.Fields[i].DeviationType = DEVIATION_TYPE_NONE } - if !isOneOf(field.DeviationType, []string{DEVIATION_TYPE_ABSOLUTE, DEVIATION_TYPE_PERCENT, DEVIATION_TYPE_NONE}) { + if !isOneOf(field.DeviationType, []string{DEVIATION_TYPE_ABSOLUTE, DEVIATION_TYPE_PERCENT, DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) { return ReduceAggConfig{}, fmt.Errorf("invalid config DeviationType. received: %s. options: [%s, %s, %s]", field.DeviationType, DEVIATION_TYPE_ABSOLUTE, DEVIATION_TYPE_PERCENT, DEVIATION_TYPE_NONE) } - if field.DeviationType != DEVIATION_TYPE_NONE && len(field.DeviationString) == 0 { + if !isOneOf(field.DeviationType, []string{DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) && len(field.DeviationString) == 0 { return ReduceAggConfig{}, errors.New("aggregation field deviation must contain DeviationString amount") } if field.DeviationType != DEVIATION_TYPE_NONE && len(field.DeviationString) > 0 { @@ -534,8 +561,8 @@ func ParseConfigReduceAggregator(config values.Map) (ReduceAggConfig, error) { if len(field.Method) == 0 || !isOneOf(field.Method, []string{AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE}) { return ReduceAggConfig{}, fmt.Errorf("aggregation field must contain a method. options: [%s, %s]", AGGREGATION_METHOD_MEDIAN, AGGREGATION_METHOD_MODE) } - if len(field.DeviationString) > 0 && field.DeviationType == DEVIATION_TYPE_NONE { - return ReduceAggConfig{}, fmt.Errorf("aggregation field cannot have deviation with a deviation type of %s", DEVIATION_TYPE_NONE) + if len(field.DeviationString) > 0 && isOneOf(field.DeviationType, []string{DEVIATION_TYPE_NONE, DEVIATION_TYPE_ANY}) { + return ReduceAggConfig{}, fmt.Errorf("aggregation field cannot have deviation with a deviation type of %s", field.DeviationType) } if field.SubMapField { hasSubMapField = true diff --git a/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go b/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go index ac61d1c37..33a1eba8c 100644 --- a/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go +++ b/pkg/capabilities/consensus/ocr3/aggregators/reduce_test.go @@ -1243,12 +1243,13 @@ func TestAggregateShouldReport(t *testing.T) { require.Equal(t, map[string]interface{}(map[string]interface{}{"Time": decimal.NewFromInt(45)}), state) }) - t.Run("NOK-do_not_report_if_deviation_type_none_byte_field_does_not_change", func(t *testing.T) { + t.Run("NOK-do_not_report_if_deviation_type_any_byte_field_does_not_change", func(t *testing.T) { fields := []aggregators.AggregationField{ { - InputKey: "FeedID", - OutputKey: "FeedID", - Method: "mode", + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", }, { InputKey: "Timestamp", @@ -1304,12 +1305,13 @@ func TestAggregateShouldReport(t *testing.T) { require.Equal(t, false, secondOutcome.ShouldReport) }) - t.Run("NOK-do_not_report_if_deviation_type_none_bool_field_does_not_change", func(t *testing.T) { + t.Run("NOK-do_not_report_if_deviation_type_any_bool_field_does_not_change", func(t *testing.T) { fields := []aggregators.AggregationField{ { - InputKey: "BoolField", - OutputKey: "BoolField", - Method: "mode", + InputKey: "BoolField", + OutputKey: "BoolField", + Method: "mode", + DeviationType: "any", }, { InputKey: "Timestamp", @@ -1365,12 +1367,13 @@ func TestAggregateShouldReport(t *testing.T) { require.Equal(t, false, secondOutcome.ShouldReport) }) - t.Run("OK-report_if_deviation_type_none_byte_field_is_changed", func(t *testing.T) { + t.Run("OK-report_if_deviation_type_any_byte_field_is_changed", func(t *testing.T) { fields := []aggregators.AggregationField{ { - InputKey: "FeedID", - OutputKey: "FeedID", - Method: "mode", + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", }, { InputKey: "Timestamp", @@ -1426,12 +1429,13 @@ func TestAggregateShouldReport(t *testing.T) { require.Equal(t, true, secondOutcome.ShouldReport) }) - t.Run("OK-report_if_deviation_type_none_bool_field_is_changed", func(t *testing.T) { + t.Run("OK-report_if_deviation_type_any_bool_field_is_changed", func(t *testing.T) { fields := []aggregators.AggregationField{ { - InputKey: "BoolField", - OutputKey: "BoolField", - Method: "mode", + InputKey: "BoolField", + OutputKey: "BoolField", + Method: "mode", + DeviationType: "any", }, { InputKey: "Timestamp", @@ -1487,6 +1491,501 @@ func TestAggregateShouldReport(t *testing.T) { require.Equal(t, true, secondOutcome.ShouldReport) }) + t.Run("OK-report_if_deviation_type_any_string_field_is_changed", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": "A", + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": "A", + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": "B", + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should report, given the value has changed from A to B + require.Equal(t, true, secondOutcome.ShouldReport) + }) + + t.Run("NOK-do_not_report_if_deviation_type_any_string_field_does_not_change", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": "A", + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": "A", + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": "A", + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should not report, given the value has not changed + require.Equal(t, false, secondOutcome.ShouldReport) + }) + + t.Run("OK-report_if_deviation_type_any_map_field_is_changed", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": map[string]any{"A": "A"}, + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": map[string]any{"A": "A"}, + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": map[string]any{"A": "B"}, + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should report, given the value of A has changed from A to B + require.Equal(t, true, secondOutcome.ShouldReport) + }) + + t.Run("NOK-do_not_report_if_deviation_type_any_map_field_does_not_change", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": map[string]any{"A": "A"}, + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": map[string]any{"A": "A"}, + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": map[string]any{"A": "A"}, + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should not report, given the value has not changed + require.Equal(t, false, secondOutcome.ShouldReport) + }) + + t.Run("OK-report_if_deviation_type_any_slice_field_is_changed", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": []any{"A"}, + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": []any{"A"}, + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": []any{"B"}, + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should report, given the value has changed from A to B + require.Equal(t, true, secondOutcome.ShouldReport) + }) + + t.Run("NOK-do_not_report_if_deviation_type_any_slice_field_does_not_change", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": []any{"A"}, + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": []any{"A"}, + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": []any{"A"}, + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should not report, given the value has not changed + require.Equal(t, false, secondOutcome.ShouldReport) + }) + + t.Run("OK-report_if_deviation_type_any_numeric_field_is_changed", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": int64(1), + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": int64(1), + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": int64(2), + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should report, given the value has changed from 1 to 2 + require.Equal(t, true, secondOutcome.ShouldReport) + }) + + t.Run("NOK-do_not_report_if_deviation_type_any_numeric_field_does_not_change", func(t *testing.T) { + fields := []aggregators.AggregationField{ + { + InputKey: "FeedID", + OutputKey: "FeedID", + Method: "mode", + DeviationType: "any", + }, + { + InputKey: "Timestamp", + OutputKey: "Time", + Method: "median", + DeviationString: "30", + DeviationType: "absolute", + }, + } + extraConfig := map[string]any{ + "reportFormat": "array", + } + + config := getConfigReduceAggregator(t, fields, extraConfig) + agg, err := aggregators.NewReduceAggregator(*config) + require.NoError(t, err) + + pb := &pb.Map{} + + // 1st round + mockValueFirstRound, err := values.WrapMap(map[string]any{ + "FeedID": int64(1), + "Timestamp": decimal.NewFromInt(10), + }) + require.NoError(t, err) + + firstOutcome, err := agg.Aggregate(logger.Nop(), nil, map[commontypes.OracleID][]values.Value{1: {mockValueFirstRound}, 2: {mockValueFirstRound}, 3: {mockValueFirstRound}}, 1) + require.NoError(t, err) + require.Equal(t, true, firstOutcome.ShouldReport) + + // validate metadata + proto.Unmarshal(firstOutcome.Metadata, pb) + vmap, err := values.FromMapValueProto(pb) + require.NoError(t, err) + state, err := vmap.Unwrap() + require.NoError(t, err) + require.Equal(t, map[string]interface{}(map[string]interface{}{ + "FeedID": int64(1), + "Time": decimal.NewFromInt(10), + }), state) + + // 2nd round + mockValueSecondRound, err := values.WrapMap(map[string]any{ + "FeedID": int64(1), + "Timestamp": decimal.NewFromInt(20), + }) + require.NoError(t, err) + + secondOutcome, err := agg.Aggregate(logger.Nop(), firstOutcome, map[commontypes.OracleID][]values.Value{1: {mockValueSecondRound}, 2: {mockValueSecondRound}, 3: {mockValueSecondRound}}, 1) + require.NoError(t, err) + + // This should not report, given the value has not changed + require.Equal(t, false, secondOutcome.ShouldReport) + }) } func getConfigReduceAggregator(t *testing.T, fields []aggregators.AggregationField, override map[string]any) *values.Map {