Skip to content

Commit

Permalink
change the ID for groups
Browse files Browse the repository at this point in the history
Signed-off-by: Yijie Qin <[email protected]>
  • Loading branch information
qinxx108 committed Jun 2, 2023
1 parent 07f2957 commit e8d049c
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 92 deletions.
37 changes: 21 additions & 16 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,20 +458,15 @@ func (api *API) getAlertGroupInfosHandler(params alertgroupinfos_ops.GetAlertGro
}
}(receiverFilter)

var previousAgFp *prometheus_model.Fingerprint
var nextTokenParam *prometheus_model.Fingerprint
var previousAgID *string

if params.NextToken != nil && *params.NextToken != "" {
parseResult, err := prometheus_model.ParseFingerprint(*params.NextToken)
if err != nil {
level.Error(logger).Log("msg", "Failed to parse NextToken parameter", "err", err)
return alertgroupinfos_ops.
NewGetAlertGroupInfosBadRequest().
WithPayload(
fmt.Sprintf("failed to parse NextToken param: %v", *params.NextToken),
)
}
nextTokenParam = &parseResult
if err = validateNextToken(params.NextToken); err != nil {
level.Error(logger).Log("msg", "Failed to parse NextToken parameter", "err", err)
return alertgroupinfos_ops.
NewGetAlertGroupInfosBadRequest().
WithPayload(
fmt.Sprintf("failed to parse NextToken param: %v", *params.NextToken),
)
}

if err = validateMaxResult(params.MaxResults); err != nil {
Expand All @@ -489,7 +484,7 @@ func (api *API) getAlertGroupInfosHandler(params alertgroupinfos_ops.GetAlertGro
for _, alertGroup := range ags {

// Skip the aggregation group if the next token is set and hasn't arrived the nextToken item yet.
if nextTokenParam != nil && *nextTokenParam >= alertGroup.Fingerprint {
if params.NextToken != nil && *params.NextToken != "" && *params.NextToken >= alertGroup.ID {
continue
}

Expand All @@ -500,15 +495,15 @@ func (api *API) getAlertGroupInfosHandler(params alertgroupinfos_ops.GetAlertGro
Labels: ModelLabelSetToAPILabelSet(alertGroup.Labels),
}

previousAgFp = &alertGroup.Fingerprint
previousAgID = &alertGroup.ID
alertGroupInfos = append(alertGroupInfos, ag)
resultNumber++
continue
}

// Return the pagination token if there is more aggregation group
if resultNumber == int(*params.MaxResults) {
returnPaginationToken = previousAgFp.String()
returnPaginationToken = *previousAgID
break
}
}
Expand Down Expand Up @@ -827,3 +822,13 @@ func validateMaxResult(maxItem *int64) error {
}
return nil
}

func validateNextToken(nextToken *string) error {
if nextToken != nil && *nextToken != "" {
match, _ := regexp.MatchString("^[a-fA-F0-9]{40}$", *nextToken)
if !match {
return fmt.Errorf("invalid nextToken: %s", *nextToken)
}
}
return nil
}
49 changes: 18 additions & 31 deletions api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,29 +129,13 @@ func convertIntToPointerInt64(x int64) *int64 {

func TestGetAlertGroupInfosHandler(t *testing.T) {
aginfos := dispatch.AlertGroupInfos{
&dispatch.AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "aa",
},
Receiver: "prod",
Fingerprint: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "aa",
}.Fingerprint(),
},
&dispatch.AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "TestingAlert",
"service": "api",
},
Receiver: "testing",
Fingerprint: model.LabelSet{
"alertname": "TestingAlert",
"service": "api",
}.Fingerprint(),
ID: "478b4114226224a35910d449fdba8186ebfb441f",
},
&dispatch.AlertGroupInfo{
Labels: model.LabelSet{
Expand All @@ -160,20 +144,23 @@ func TestGetAlertGroupInfosHandler(t *testing.T) {
"cluster": "bb",
},
Receiver: "prod",
Fingerprint: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "bb",
}.Fingerprint(),
ID: "7f4084a078a3fe29d6de82fad15af8f1411e803f",
},
&dispatch.AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "OtherAlert",
},
Receiver: "prod",
Fingerprint: model.LabelSet{
"alertname": "OtherAlert",
}.Fingerprint(),
ID: "d525244929240cbdb75a497913c1890ab8de1962",
},
&dispatch.AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "aa",
},
Receiver: "prod",
ID: "d73984d43949112ae1ea59dcc5af4af7b630a5b1",
},
}
for _, tc := range []struct {
Expand Down Expand Up @@ -207,28 +194,28 @@ func TestGetAlertGroupInfosHandler(t *testing.T) {
{
convertIntToPointerInt64(int64(1)),
"",
`{"alertGroupInfos":[{"labels":{"alertname":"HighErrorRate","cluster":"aa","service":"api"},"receiver":{"name":"prod"}}],"nextToken":"0e758306edce4595"}`,
`{"alertGroupInfos":[{"labels":{"alertname":"TestingAlert","service":"api"},"receiver":{"name":"testing"}}],"nextToken":"478b4114226224a35910d449fdba8186ebfb441f"}`,
200,
},
// One item to return, has next token.
{
convertIntToPointerInt64(int64(1)),
"0e758306edce4595",
`{"alertGroupInfos":[{"labels":{"alertname":"TestingAlert","service":"api"},"receiver":{"name":"testing"}}],"nextToken":"1ea9baf838dfe7bb"}`,
"478b4114226224a35910d449fdba8186ebfb441f",
`{"alertGroupInfos":[{"labels":{"alertname":"HighErrorRate","cluster":"bb","service":"api"},"receiver":{"name":"prod"}}],"nextToken":"7f4084a078a3fe29d6de82fad15af8f1411e803f"}`,
200,
},
// Five item to return, has next token.
{
convertIntToPointerInt64(int64(5)),
"1ea9baf838dfe7bb",
`{"alertGroupInfos":[{"labels":{"alertname":"HighErrorRate","cluster":"bb","service":"api"},"receiver":{"name":"prod"}},{"labels":{"alertname":"OtherAlert"},"receiver":{"name":"prod"}}]}`,
"7f4084a078a3fe29d6de82fad15af8f1411e803f",
`{"alertGroupInfos":[{"labels":{"alertname":"OtherAlert"},"receiver":{"name":"prod"}},{"labels":{"alertname":"HighErrorRate","cluster":"aa","service":"api"},"receiver":{"name":"prod"}}]}`,
200,
},
// Return all results.
{
nil,
"",
`{"alertGroupInfos":[{"labels":{"alertname":"HighErrorRate","cluster":"aa","service":"api"},"receiver":{"name":"prod"}},{"labels":{"alertname":"TestingAlert","service":"api"},"receiver":{"name":"testing"}},{"labels":{"alertname":"HighErrorRate","cluster":"bb","service":"api"},"receiver":{"name":"prod"}},{"labels":{"alertname":"OtherAlert"},"receiver":{"name":"prod"}}]}`,
`{"alertGroupInfos":[{"labels":{"alertname":"TestingAlert","service":"api"},"receiver":{"name":"testing"}},{"labels":{"alertname":"HighErrorRate","cluster":"bb","service":"api"},"receiver":{"name":"prod"}},{"labels":{"alertname":"OtherAlert"},"receiver":{"name":"prod"}},{"labels":{"alertname":"HighErrorRate","cluster":"aa","service":"api"},"receiver":{"name":"prod"}}]}`,
200,
},
} {
Expand Down
23 changes: 16 additions & 7 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package dispatch

import (
"context"
"crypto/sha1"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -280,16 +281,16 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ

// AlertGroupInfo represents the aggrGroup information.
type AlertGroupInfo struct {
Labels model.LabelSet
Receiver string
Fingerprint model.Fingerprint
Labels model.LabelSet
Receiver string
ID string
}

type AlertGroupInfos []*AlertGroupInfo

func (ag AlertGroupInfos) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
func (ag AlertGroupInfos) Less(i, j int) bool {
return ag[i].Fingerprint < ag[j].Fingerprint
return ag[i].ID < ag[j].ID
}
func (ag AlertGroupInfos) Len() int { return len(ag) }

Expand All @@ -307,9 +308,9 @@ func (d *Dispatcher) GroupInfos(routeFilter func(*Route) bool) AlertGroupInfos {
for _, ag := range ags {
receiver := route.RouteOpts.Receiver
alertGroup := &AlertGroupInfo{
Labels: ag.labels,
Receiver: receiver,
Fingerprint: ag.fingerprint(),
Labels: ag.labels,
Receiver: receiver,
ID: ag.GroupID(),
}

groups = append(groups, alertGroup)
Expand Down Expand Up @@ -416,6 +417,7 @@ type aggrGroup struct {
opts *RouteOpts
logger log.Logger
routeKey string
routeID string

alerts *store.Alerts
ctx context.Context
Expand All @@ -436,6 +438,7 @@ func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(
ag := &aggrGroup{
labels: labels,
routeKey: r.Key(),
routeID: r.ID(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
Expand All @@ -456,6 +459,12 @@ func (ag *aggrGroup) fingerprint() model.Fingerprint {
return ag.labels.Fingerprint()
}

func (ag *aggrGroup) GroupID() string {
h := sha1.New()
h.Write([]byte(fmt.Sprintf("%s:%s", ag.routeID, ag.labels)))
return fmt.Sprintf("%x", h.Sum(nil))
}

func (ag *aggrGroup) GroupKey() string {
return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
}
Expand Down
82 changes: 44 additions & 38 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ route:
}
require.Equal(t, 7, len(recorder.Alerts()))

alertGroups := dispatcher.GroupInfos(
alertGroupInfos := dispatcher.GroupInfos(
func(*Route) bool {
return true
},
Expand All @@ -644,27 +644,29 @@ route:
require.Equal(t, AlertGroupInfos{
&AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"alertname": "TestingAlert",
"service": "api",
"cluster": "aa",
},
Receiver: "prod",
Fingerprint: model.LabelSet{
"alertname": "HighErrorRate",
Receiver: "testing",
// Matches the first sub-route.
ID: dispatcher.aggrGroupsPerRoute[dispatcher.route.Routes[0]][model.LabelSet{
"alertname": "TestingAlert",
"service": "api",
"cluster": "aa",
}.Fingerprint(),
}.Fingerprint()].GroupID(),
},
&AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "TestingAlert",
"service": "api",
"alertname": "HighLatency",
"service": "db",
"cluster": "bb",
},
Receiver: "testing",
Fingerprint: model.LabelSet{
"alertname": "TestingAlert",
"service": "api",
}.Fingerprint(),
Receiver: "kafka",
// Matches the third sub-route.
ID: dispatcher.aggrGroupsPerRoute[dispatcher.route.Routes[2]][model.LabelSet{
"alertname": "HighLatency",
"service": "db",
"cluster": "bb",
}.Fingerprint()].GroupID(),
},
&AlertGroupInfo{
Labels: model.LabelSet{
Expand All @@ -673,20 +675,12 @@ route:
"cluster": "bb",
},
Receiver: "prod",
Fingerprint: model.LabelSet{
// Matches the second sub-route.
ID: dispatcher.aggrGroupsPerRoute[dispatcher.route.Routes[1]][model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "bb",
}.Fingerprint(),
},
&AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "OtherAlert",
},
Receiver: "prod",
Fingerprint: model.LabelSet{
"alertname": "OtherAlert",
}.Fingerprint(),
}.Fingerprint()].GroupID(),
},
&AlertGroupInfo{
Labels: model.LabelSet{
Expand All @@ -695,26 +689,38 @@ route:
"cluster": "bb",
},
Receiver: "prod",
Fingerprint: model.LabelSet{
// Matches the second sub-route.
ID: dispatcher.aggrGroupsPerRoute[dispatcher.route.Routes[1]][model.LabelSet{
"alertname": "HighLatency",
"service": "db",
"cluster": "bb",
}.Fingerprint(),
}.Fingerprint()].GroupID(),
},
&AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "HighLatency",
"service": "db",
"cluster": "bb",
"alertname": "OtherAlert",
},
Receiver: "kafka",
Fingerprint: model.LabelSet{
"alertname": "HighLatency",
"service": "db",
"cluster": "bb",
}.Fingerprint(),
Receiver: "prod",
// Matches the parent route.
ID: dispatcher.aggrGroupsPerRoute[dispatcher.route][model.LabelSet{
"alertname": "OtherAlert",
}.Fingerprint()].GroupID(),
},
}, alertGroups)
&AlertGroupInfo{
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "aa",
},
Receiver: "prod",
// Matches the second sub-route.
ID: dispatcher.aggrGroupsPerRoute[dispatcher.route.Routes[1]][model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
"cluster": "aa",
}.Fingerprint()].GroupID(),
},
}, alertGroupInfos)
}

type recordStage struct {
Expand Down
24 changes: 24 additions & 0 deletions dispatch/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,30 @@ func (r *Route) Key() string {
return b.String()
}

// ID returns a key for the route. It should uniquely identify the route in general,
// it is different than Key() as it adds the route's position on its parent's children.
func (r *Route) ID() string {
b := strings.Builder{}

var position *int
if r.parent != nil {
// Find the position in the same level leaf.
for i, cr := range r.parent.Routes {
if cr == r {
position = &i
break
}
}
}
b.WriteString(r.Key())

if position != nil {
b.WriteRune('/')
b.WriteString(fmt.Sprint(*position))
}
return b.String()
}

// Walk traverses the route tree in depth-first order.
func (r *Route) Walk(visit func(*Route)) {
visit(r)
Expand Down
Loading

0 comments on commit e8d049c

Please sign in to comment.