diff --git a/.gitignore b/.gitignore index 7ef77d321a..26a705f498 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,7 @@ !/.promu.yml !/api/v2/openapi.yaml !.github/workflows/*.yml + +# Editor +.vscode +.DS_Store diff --git a/api/v2/api.go b/api/v2/api.go index 6f034d2596..80babb460b 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -33,7 +33,6 @@ import ( "github.com/prometheus/common/version" "github.com/rs/cors" - "github.com/prometheus/alertmanager/api/metrics" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/restapi" "github.com/prometheus/alertmanager/api/v2/restapi/operations" @@ -42,6 +41,8 @@ import ( general_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/general" receiver_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/receiver" silence_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/silence" + + "github.com/prometheus/alertmanager/api/metrics" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/dispatch" @@ -225,8 +226,9 @@ func (api *API) getReceiversHandler(params receiver_ops.GetReceiversParams) midd defer api.mtx.RUnlock() receivers := make([]*open_api_models.Receiver, 0, len(api.alertmanagerConfig.Receivers)) - for i := range api.alertmanagerConfig.Receivers { - receivers = append(receivers, &open_api_models.Receiver{Name: &api.alertmanagerConfig.Receivers[i].Name}) + for _, r := range api.alertmanagerConfig.Receivers { + name := r.Name + receivers = append(receivers, &open_api_models.Receiver{Name: &name}) } return receiver_ops.NewGetReceiversOK().WithPayload(receivers) @@ -660,8 +662,7 @@ func (api *API) postSilencesHandler(params silence_ops.PostSilencesParams) middl return silence_ops.NewPostSilencesBadRequest().WithPayload(msg) } - sid, err := api.silences.Set(sil) - if err != nil { + if err = api.silences.Set(sil); err != nil { level.Error(logger).Log("msg", "Failed to create silence", "err", err) if errors.Is(err, silence.ErrNotFound) { return silence_ops.NewPostSilencesNotFound().WithPayload(err.Error()) @@ -670,7 +671,7 @@ func (api *API) postSilencesHandler(params silence_ops.PostSilencesParams) middl } return silence_ops.NewPostSilencesOK().WithPayload(&silence_ops.PostSilencesOKBody{ - SilenceID: sid, + SilenceID: sil.Id, }) } diff --git a/api/v2/api_test.go b/api/v2/api_test.go index d520dbde13..438e6f44d4 100644 --- a/api/v2/api_test.go +++ b/api/v2/api_test.go @@ -15,6 +15,7 @@ package v2 import ( "bytes" + "encoding/json" "fmt" "io" "net/http" @@ -24,6 +25,7 @@ import ( "time" "github.com/go-openapi/runtime" + "github.com/go-openapi/runtime/middleware" "github.com/go-openapi/strfmt" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -159,8 +161,7 @@ func TestDeleteSilenceHandler(t *testing.T) { EndsAt: now.Add(time.Hour), UpdatedAt: now, } - unexpiredSid, err := silences.Set(unexpiredSil) - require.NoError(t, err) + require.NoError(t, silences.Set(unexpiredSil)) expiredSil := &silencepb.Silence{ Matchers: []*silencepb.Matcher{m}, @@ -168,9 +169,8 @@ func TestDeleteSilenceHandler(t *testing.T) { EndsAt: now.Add(time.Hour), UpdatedAt: now, } - expiredSid, err := silences.Set(expiredSil) - require.NoError(t, err) - require.NoError(t, silences.Expire(expiredSid)) + require.NoError(t, silences.Set(expiredSil)) + require.NoError(t, silences.Expire(expiredSil.Id)) for i, tc := range []struct { sid string @@ -181,11 +181,11 @@ func TestDeleteSilenceHandler(t *testing.T) { 404, }, { - unexpiredSid, + unexpiredSil.Id, 200, }, { - expiredSid, + expiredSil.Id, 200, }, } { @@ -223,8 +223,7 @@ func TestPostSilencesHandler(t *testing.T) { EndsAt: now.Add(time.Hour), UpdatedAt: now, } - unexpiredSid, err := silences.Set(unexpiredSil) - require.NoError(t, err) + require.NoError(t, silences.Set(unexpiredSil)) expiredSil := &silencepb.Silence{ Matchers: []*silencepb.Matcher{m}, @@ -232,9 +231,8 @@ func TestPostSilencesHandler(t *testing.T) { EndsAt: now.Add(time.Hour), UpdatedAt: now, } - expiredSid, err := silences.Set(expiredSil) - require.NoError(t, err) - require.NoError(t, silences.Expire(expiredSid)) + require.NoError(t, silences.Set(expiredSil)) + require.NoError(t, silences.Expire(expiredSil.Id)) t.Run("Silences CRUD", func(t *testing.T) { for i, tc := range []struct { @@ -259,46 +257,122 @@ func TestPostSilencesHandler(t *testing.T) { }, { "with an active silence ID - it extends the silence", - unexpiredSid, + unexpiredSil.Id, now.Add(time.Hour), now.Add(time.Hour * 2), 200, }, { "with an expired silence ID - it re-creates the silence", - expiredSid, + expiredSil.Id, now.Add(time.Hour), now.Add(time.Hour * 2), 200, }, } { t.Run(tc.name, func(t *testing.T) { - silence, silenceBytes := createSilence(t, tc.sid, "silenceCreator", tc.start, tc.end) - api := API{ uptime: time.Now(), silences: silences, logger: log.NewNopLogger(), } - r, err := http.NewRequest("POST", "/api/v2/silence/${tc.sid}", bytes.NewReader(silenceBytes)) - require.NoError(t, err) - + sil := createSilence(t, tc.sid, "silenceCreator", tc.start, tc.end) w := httptest.NewRecorder() - p := runtime.TextProducer() - responder := api.postSilencesHandler(silence_ops.PostSilencesParams{ - HTTPRequest: r, - Silence: &silence, - }) - responder.WriteResponse(w, p) + postSilences(t, w, api.postSilencesHandler, sil) body, _ := io.ReadAll(w.Result().Body) - require.Equal(t, tc.expectedCode, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body))) }) } }) } +func TestPostSilencesHandlerMissingIdCreatesSilence(t *testing.T) { + now := time.Now() + silences := newSilences(t) + api := API{ + uptime: time.Now(), + silences: silences, + logger: log.NewNopLogger(), + } + + // Create a new silence. It should be assigned a random UUID. + sil := createSilence(t, "", "silenceCreator", now.Add(time.Hour), now.Add(time.Hour*2)) + w := httptest.NewRecorder() + postSilences(t, w, api.postSilencesHandler, sil) + require.Equal(t, http.StatusOK, w.Code) + + // Get the silences from the API. + w = httptest.NewRecorder() + getSilences(t, w, api.getSilencesHandler) + require.Equal(t, http.StatusOK, w.Code) + var resp []open_api_models.GettableSilence + require.NoError(t, json.NewDecoder(w.Body).Decode(&resp)) + require.Len(t, resp, 1) + + // Change the ID. It should return 404 Not Found. + sil = open_api_models.PostableSilence{ + ID: "unknownID", + Silence: resp[0].Silence, + } + w = httptest.NewRecorder() + postSilences(t, w, api.postSilencesHandler, sil) + require.Equal(t, http.StatusNotFound, w.Code) + + // Remove the ID. It should duplicate the silence with a different UUID. + sil = open_api_models.PostableSilence{ + ID: "", + Silence: resp[0].Silence, + } + w = httptest.NewRecorder() + postSilences(t, w, api.postSilencesHandler, sil) + require.Equal(t, http.StatusOK, w.Code) + + // Get the silences from the API. There should now be 2 silences. + w = httptest.NewRecorder() + getSilences(t, w, api.getSilencesHandler) + require.Equal(t, http.StatusOK, w.Code) + require.NoError(t, json.NewDecoder(w.Body).Decode(&resp)) + require.Len(t, resp, 2) + require.NotEqual(t, resp[0].ID, resp[1].ID) +} + +func getSilences( + t *testing.T, + w *httptest.ResponseRecorder, + handlerFunc func(params silence_ops.GetSilencesParams) middleware.Responder, +) { + r, err := http.NewRequest("GET", "/api/v2/silences", nil) + require.NoError(t, err) + + p := runtime.TextProducer() + responder := handlerFunc(silence_ops.GetSilencesParams{ + HTTPRequest: r, + Filter: nil, + }) + responder.WriteResponse(w, p) +} + +func postSilences( + t *testing.T, + w *httptest.ResponseRecorder, + handlerFunc func(params silence_ops.PostSilencesParams) middleware.Responder, + sil open_api_models.PostableSilence, +) { + b, err := json.Marshal(sil) + require.NoError(t, err) + + r, err := http.NewRequest("POST", "/api/v2/silences", bytes.NewReader(b)) + require.NoError(t, err) + + p := runtime.TextProducer() + responder := handlerFunc(silence_ops.PostSilencesParams{ + HTTPRequest: r, + Silence: &sil, + }) + responder.WriteResponse(w, p) +} + func TestCheckSilenceMatchesFilterLabels(t *testing.T) { type test struct { silenceMatchers []*silencepb.Matcher diff --git a/api/v2/testing.go b/api/v2/testing.go index c813d350d3..7665a19f67 100644 --- a/api/v2/testing.go +++ b/api/v2/testing.go @@ -14,19 +14,17 @@ package v2 import ( - "encoding/json" "testing" "time" "github.com/go-openapi/strfmt" - "github.com/stretchr/testify/require" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/pkg/labels" "github.com/prometheus/alertmanager/silence/silencepb" ) -func createSilence(t *testing.T, ID, creator string, start, ends time.Time) (open_api_models.PostableSilence, []byte) { +func createSilence(t *testing.T, ID, creator string, start, ends time.Time) open_api_models.PostableSilence { t.Helper() comment := "test" @@ -46,10 +44,7 @@ func createSilence(t *testing.T, ID, creator string, start, ends time.Time) (ope Comment: &comment, }, } - b, err := json.Marshal(&sil) - require.NoError(t, err) - - return sil, b + return sil } func createSilenceMatcher(t *testing.T, name, pattern string, matcherType silencepb.Matcher_Type) *silencepb.Matcher { diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index c3e9d1b239..613e26b334 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -145,6 +145,8 @@ func run() int { dataDir = kingpin.Flag("storage.path", "Base path for data storage.").Default("data/").String() retention = kingpin.Flag("data.retention", "How long to keep data for.").Default("120h").Duration() maintenanceInterval = kingpin.Flag("data.maintenance-interval", "Interval between garbage collection and snapshotting to disk of the silences and the notification logs.").Default("15m").Duration() + maxSilences = kingpin.Flag("silences.max-silences", "Maximum number of silences, including expired silences. If negative or zero, no limit is set.").Default("0").Int() + maxSilenceSizeBytes = kingpin.Flag("silences.max-silence-size-bytes", "Maximum silence size in bytes. If negative or zero, no limit is set.").Default("0").Int() alertGCInterval = kingpin.Flag("alerts.gc-interval", "Interval between alert GC.").Default("30m").Duration() webConfig = webflag.AddFlags(kingpin.CommandLine, ":9093") @@ -258,8 +260,12 @@ func run() int { silenceOpts := silence.Options{ SnapshotFile: filepath.Join(*dataDir, "silences"), Retention: *retention, - Logger: log.With(logger, "component", "silences"), - Metrics: prometheus.DefaultRegisterer, + Limits: silence.Limits{ + MaxSilences: func() int { return *maxSilences }, + MaxSilenceSizeBytes: func() int { return *maxSilenceSizeBytes }, + }, + Logger: log.With(logger, "component", "silences"), + Metrics: prometheus.DefaultRegisterer, } silences, err := silence.New(silenceOpts) @@ -383,16 +389,16 @@ func run() int { // Build the routing tree and record which receivers are used. routes := dispatch.NewRoute(conf.Route, nil) - activeReceivers := make(map[string]struct{}) + activeReceiversMap := make(map[string]struct{}) routes.Walk(func(r *dispatch.Route) { - activeReceivers[r.RouteOpts.Receiver] = struct{}{} + activeReceiversMap[r.RouteOpts.Receiver] = struct{}{} }) // Build the map of receiver to integrations. - receivers := make(map[string][]notify.Integration, len(activeReceivers)) + receivers := make(map[string][]*notify.Integration, len(activeReceiversMap)) var integrationsNum int for _, rcv := range conf.Receivers { - if _, found := activeReceivers[rcv.Name]; !found { + if _, found := activeReceiversMap[rcv.Name]; !found { // No need to build a receiver if no route is using it. level.Info(configLogger).Log("msg", "skipping creation of receiver not referenced by any route", "receiver", rcv.Name) continue @@ -442,7 +448,7 @@ func run() int { pipelinePeer, ) - configuredReceivers.Set(float64(len(activeReceivers))) + configuredReceivers.Set(float64(len(activeReceiversMap))) configuredIntegrations.Set(float64(integrationsNum)) configuredInhibitionRules.Set(float64(len(conf.InhibitRules))) diff --git a/config/receiver/receiver.go b/config/receiver/receiver.go index 9bb039ef05..fdbf35e698 100644 --- a/config/receiver/receiver.go +++ b/config/receiver/receiver.go @@ -39,10 +39,10 @@ import ( // BuildReceiverIntegrations builds a list of integration notifiers off of a // receiver config. -func BuildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, logger log.Logger, httpOpts ...commoncfg.HTTPClientOption) ([]notify.Integration, error) { +func BuildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, logger log.Logger, httpOpts ...commoncfg.HTTPClientOption) ([]*notify.Integration, error) { var ( errs types.MultiError - integrations []notify.Integration + integrations []*notify.Integration add = func(name string, i int, rs notify.ResolvedSender, f func(l log.Logger) (notify.Notifier, error)) { n, err := f(log.With(logger, "integration", name)) if err != nil { diff --git a/config/receiver/receiver_test.go b/config/receiver/receiver_test.go index 3d146a98d0..e19820e99c 100644 --- a/config/receiver/receiver_test.go +++ b/config/receiver/receiver_test.go @@ -31,7 +31,7 @@ func TestBuildReceiverIntegrations(t *testing.T) { for _, tc := range []struct { receiver config.Receiver err bool - exp []notify.Integration + exp []*notify.Integration }{ { receiver: config.Receiver{ @@ -48,7 +48,7 @@ func TestBuildReceiverIntegrations(t *testing.T) { }, }, }, - exp: []notify.Integration{ + exp: []*notify.Integration{ notify.NewIntegration(nil, sendResolved(false), "webhook", 0, "foo"), notify.NewIntegration(nil, sendResolved(true), "webhook", 1, "foo"), }, diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 640b22abe2..ffb87dad16 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -25,6 +25,10 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" @@ -32,6 +36,8 @@ import ( "github.com/prometheus/alertmanager/types" ) +var tracer = otel.Tracer("github.com/prometheus/alertmanager/dispatch") + // DispatcherMetrics represents metrics associated to a dispatcher. type DispatcherMetrics struct { aggrGroups prometheus.Gauge @@ -161,20 +167,44 @@ func (d *Dispatcher) run(it provider.AlertIterator) { return } - level.Debug(d.logger).Log("msg", "Received alert", "alert", alert) + // this block is wrapped in a function to make sure that the span + // is ended before the next alert is processed + func() { + traceCtx, span := tracer.Start(d.ctx, "dispatch.Dispatcher.handleAlert", + trace.WithAttributes( + attribute.String("alert.name", alert.Name()), + attribute.String("alert.fingerprint", alert.Fingerprint().String()), + attribute.String("alert.status", string(alert.Status())), + attribute.String("receiver", d.route.RouteOpts.Receiver), + ), + // we'll use producer here since the alert is not processed + // synchronously + trace.WithSpanKind(trace.SpanKindProducer), + ) + defer span.End() + + // make a link to this span - we can't make the processAlert + // span a child of this, because it would make it long-lived + dispatchLink := trace.LinkFromContext(traceCtx) + + level.Debug(d.logger).Log("msg", "Received alert", "alert", alert) + + // Log errors but keep trying. + if err := it.Err(); err != nil { + level.Error(d.logger).Log("msg", "Error on alert update", "err", err) - // Log errors but keep trying. - if err := it.Err(); err != nil { - level.Error(d.logger).Log("msg", "Error on alert update", "err", err) - continue - } + span.RecordError(fmt.Errorf("error on alert update: %w", err)) + span.SetStatus(codes.Error, err.Error()) - now := time.Now() - for _, r := range d.route.Match(alert.Labels) { - d.processAlert(alert, r) - } - d.metrics.processingDuration.Observe(time.Since(now).Seconds()) + return + } + now := time.Now() + for _, r := range d.route.Match(alert.Labels) { + d.processAlert(dispatchLink, alert, r) + } + d.metrics.processingDuration.Observe(time.Since(now).Seconds()) + }() case <-cleanup.C: d.mtx.Lock() @@ -303,7 +333,7 @@ type notifyFunc func(context.Context, ...*types.Alert) bool // processAlert determines in which aggregation group the alert falls // and inserts it. -func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { +func (d *Dispatcher) processAlert(dispatchLink trace.Link, alert *types.Alert, route *Route) { groupLabels := getGroupLabels(alert, route) fp := groupLabels.Fingerprint() @@ -341,6 +371,13 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { ag.insert(alert) go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { + ctx, span := tracer.Start(ctx, "dispatch.Dispatch.notify", + trace.WithAttributes(attribute.Int("alerts.count", len(alerts))), + trace.WithLinks(dispatchLink), + trace.WithSpanKind(trace.SpanKindConsumer), + ) + defer span.End() + _, _, err := d.stage.Exec(ctx, d.logger, alerts...) if err != nil { lvl := level.Error(d.logger) @@ -351,6 +388,9 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { lvl = level.Debug(d.logger) } lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err) + + span.RecordError(fmt.Errorf("notify for alerts failed: %w", err)) + span.SetStatus(codes.Error, err.Error()) } return err == nil }) diff --git a/docs/configuration.md b/docs/configuration.md index fa9878399c..6da631572e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -22,6 +22,17 @@ is not well-formed, the changes will not be applied and an error is logged. A configuration reload is triggered by sending a `SIGHUP` to the process or sending an HTTP POST request to the `/-/reload` endpoint. +## Limits + +Alertmanager supports a number of configurable limits via command-line flags. + +To limit the maximum number of silences, including expired ones, +use the `--silences.max-silences` flag. +You can limit the maximum size of individual silences with `--silences.max-per-silence-bytes`, +where the unit is in bytes. + +Both limits are disabled by default. + ## Configuration file introduction To specify which configuration file to load, use the `--config.file` flag. diff --git a/go.mod b/go.mod index 4a2c59cf03..3c7bc333de 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/prometheus/alertmanager -go 1.21 +go 1.22 + +toolchain go1.22.0 require ( github.com/alecthomas/kingpin/v2 v2.4.0 @@ -36,8 +38,12 @@ require ( github.com/rs/cors v1.10.1 github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/xlab/treeprint v1.2.0 + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.55.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 + go.opentelemetry.io/otel v1.30.0 + go.opentelemetry.io/otel/trace v1.30.0 go.uber.org/atomic v1.11.0 golang.org/x/mod v0.14.0 golang.org/x/net v0.20.0 @@ -54,8 +60,9 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.20.2 // indirect github.com/go-openapi/jsonreference v0.20.4 // indirect @@ -82,9 +89,7 @@ require ( github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect go.mongodb.org/mongo-driver v1.13.1 // indirect - go.opentelemetry.io/otel v1.17.0 // indirect - go.opentelemetry.io/otel/metric v1.17.0 // indirect - go.opentelemetry.io/otel/trace v1.17.0 // indirect + go.opentelemetry.io/otel/metric v1.30.0 // indirect golang.org/x/crypto v0.18.0 // indirect golang.org/x/oauth2 v0.16.0 // indirect golang.org/x/sync v0.6.0 // indirect diff --git a/go.sum b/go.sum index 4aa19a8876..c9c56ad48f 100644 --- a/go.sum +++ b/go.sum @@ -133,6 +133,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -150,8 +152,8 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= -github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/analysis v0.22.2 h1:ZBmNoP2h5omLKr/srIC9bfqrUGzT6g6gNv03HE9Vpj0= @@ -483,8 +485,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -514,14 +516,18 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= -go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= -go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= -go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.55.0 h1:sqmsIQ75l6lfZjjpnXXT9DFVtYEDg6CH0/Cn4/3A1Wg= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.55.0/go.mod h1:rsg1EO8LXSs2po50PB5CeY/MSVlhghuKBgXlKnqm6ks= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/aQ7AfKqdwp7ECpOK6vHqquXXuyTjIO8ZdmPs= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= +go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= +go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= +go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= +go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= go.opentelemetry.io/otel/sdk v1.17.0 h1:FLN2X66Ke/k5Sg3V623Q7h7nt3cHXaW1FOvKKrW0IpE= go.opentelemetry.io/otel/sdk v1.17.0/go.mod h1:U87sE0f5vQB7hwUoW98pW5Rz4ZDuCFBZFNUBlSgmDFQ= -go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= -go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= +go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= +go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= diff --git a/notify/msteams/adaptive_cards.go b/notify/msteams/adaptive_cards.go new file mode 100644 index 0000000000..f50d755ae6 --- /dev/null +++ b/notify/msteams/adaptive_cards.go @@ -0,0 +1,155 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package msteams + +import "encoding/json" + +// AdaptiveCardsMessage represents a message for adaptive cards. +type AdaptiveCardsMessage struct { + Attachments []AdaptiveCardsAttachment `json:"attachments"` + Summary string `json:"summary,omitempty"` // Summary is the text shown in notifications + Type string `json:"type"` +} + +// NewAdaptiveCardsMessage returns a message prepared for adaptive cards. +// https://docs.microsoft.com/en-us/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using#send-adaptive-cards-using-an-incoming-webhook +// more info https://learn.microsoft.com/en-us/connectors/teams/?tabs=text1#microsoft-teams-webhook +func NewAdaptiveCardsMessage(card AdaptiveCard) AdaptiveCardsMessage { + return AdaptiveCardsMessage{ + Attachments: []AdaptiveCardsAttachment{{ + ContentType: "application/vnd.microsoft.card.adaptive", + Content: card, + }}, + Type: "message", + } +} + +// AdaptiveCardsAttachment contains an adaptive card. +type AdaptiveCardsAttachment struct { + Content AdaptiveCard `json:"content"` + ContentType string `json:"contentType"` + ContentURL string `json:"contentUrl,omitempty"` +} + +// AdaptiveCard repesents an Adaptive Card. +// https://adaptivecards.io/explorer/AdaptiveCard.html +type AdaptiveCard struct { + Body []AdaptiveCardItem + Schema string + Type string + Version string +} + +// NewAdaptiveCard returns a prepared Adaptive Card. +func NewAdaptiveCard() AdaptiveCard { + return AdaptiveCard{ + Body: make([]AdaptiveCardItem, 0), + Schema: "http://adaptivecards.io/schemas/adaptive-card.json", + Type: "AdaptiveCard", + Version: "1.4", + } +} + +func (c *AdaptiveCard) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Body []AdaptiveCardItem `json:"body"` + Schema string `json:"$schema"` + Type string `json:"type"` + Version string `json:"version"` + MsTeams map[string]interface{} `json:"msTeams,omitempty"` + }{ + Body: c.Body, + Schema: c.Schema, + Type: c.Type, + Version: c.Version, + MsTeams: map[string]interface{}{"width": "Full"}, + }) +} + +// AppendItem appends an item, such as text or an image, to the Adaptive Card. +func (c *AdaptiveCard) AppendItem(i AdaptiveCardItem) { + c.Body = append(c.Body, i) +} + +// AdaptiveCardItem is an interface for adaptive card items such as containers, elements and inputs. +type AdaptiveCardItem interface { + MarshalJSON() ([]byte, error) +} + +// AdaptiveCardTextBlockItem is a TextBlock. +type AdaptiveCardTextBlockItem struct { + Color string + Size string + Text string + Weight string + Wrap bool +} + +func (i AdaptiveCardTextBlockItem) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Text string `json:"text"` + Color string `json:"color,omitempty"` + Size string `json:"size,omitempty"` + Weight string `json:"weight,omitempty"` + Wrap bool `json:"wrap,omitempty"` + }{ + Type: "TextBlock", + Text: i.Text, + Color: i.Color, + Size: i.Size, + Weight: i.Weight, + Wrap: i.Wrap, + }) +} + +// AdaptiveCardActionSetItem is an ActionSet. +type AdaptiveCardActionSetItem struct { + Actions []AdaptiveCardActionItem +} + +func (i AdaptiveCardActionSetItem) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Actions []AdaptiveCardActionItem `json:"actions"` + }{ + Type: "ActionSet", + Actions: i.Actions, + }) +} + +type AdaptiveCardActionItem interface { + MarshalJSON() ([]byte, error) +} + +// AdaptiveCardOpenURLActionItem is an Action.OpenUrl action. +type AdaptiveCardOpenURLActionItem struct { + IconURL string + Title string + URL string +} + +func (i AdaptiveCardOpenURLActionItem) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Type string `json:"type"` + Title string `json:"title"` + URL string `json:"url"` + IconURL string `json:"iconUrl,omitempty"` + }{ + Type: "Action.OpenUrl", + Title: i.Title, + URL: i.URL, + IconURL: i.IconURL, + }) +} diff --git a/notify/msteams/adaptive_cards_test.go b/notify/msteams/adaptive_cards_test.go new file mode 100644 index 0000000000..378b474ce0 --- /dev/null +++ b/notify/msteams/adaptive_cards_test.go @@ -0,0 +1,128 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package msteams + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewAdaptiveCard(t *testing.T) { + card := NewAdaptiveCard() + + require.NotNil(t, card) + require.Equal(t, "http://adaptivecards.io/schemas/adaptive-card.json", card.Schema) + require.Equal(t, "AdaptiveCard", card.Type) + require.Equal(t, "1.4", card.Version) + require.Empty(t, card.Body) +} + +func TestAdaptiveCard_MarshalJSON(t *testing.T) { + card := NewAdaptiveCard() + card.AppendItem(AdaptiveCardTextBlockItem{Text: "Text"}) + + bytes, err := card.MarshalJSON() + require.NoError(t, err) + + expectedJSON := ` + { + "body":[ + {"type":"TextBlock","text":"Text"} + ], + "msTeams":{"width":"Full"}, + "$schema":"http://adaptivecards.io/schemas/adaptive-card.json", + "type":"AdaptiveCard", + "version":"1.4" + }` + require.JSONEq(t, expectedJSON, string(bytes)) +} + +func TestNewAdaptiveCardsMessage(t *testing.T) { + card := NewAdaptiveCard() + message := NewAdaptiveCardsMessage(card) + + require.Equal(t, "message", message.Type) + require.Len(t, message.Attachments, 1) + require.Equal(t, "application/vnd.microsoft.card.adaptive", message.Attachments[0].ContentType) + require.Equal(t, card, message.Attachments[0].Content) +} + +func TestAdaptiveCardTextBlockItem_MarshalJSON(t *testing.T) { + item := AdaptiveCardTextBlockItem{ + Text: "hello world", + Color: "test-color", + Size: "medium", + Weight: "bold", + Wrap: true, + } + + bytes, err := item.MarshalJSON() + require.NoError(t, err) + + expectedJSON := `{ + "type": "TextBlock", + "text": "hello world", + "color": "test-color", + "size": "medium", + "weight": "bold", + "wrap": true + }` + require.JSONEq(t, expectedJSON, string(bytes)) +} + +func AdaptiveCardActionSetItemMarshalJSON(t *testing.T) { + item := AdaptiveCardActionSetItem{ + Actions: []AdaptiveCardActionItem{ + AdaptiveCardOpenURLActionItem{ + Title: "View URL", + URL: "https://example.com", + }, + }, + } + + bytes, err := item.MarshalJSON() + require.NoError(t, err) + + expectedJSON := `{ + "type":"ActionSet", + "actions":[ + { + "type":"Action.OpenUrl", + "title":"View URL", + "url":"https://example.com" + } + ] + }` + require.JSONEq(t, expectedJSON, string(bytes)) +} + +func AdaptiveCardOpenURLActionItemMarshalJSON(t *testing.T) { + item := AdaptiveCardOpenURLActionItem{ + IconURL: "https://example.com/icon.png", + Title: "View URL", + URL: "https://example.com", + } + + bytes, err := item.MarshalJSON() + require.NoError(t, err) + + expectedJSON := `{ + "type":"Action.OpenUrl", + "title":"View URL", + "url":"https://example.com", + "iconUrl":"https://example.com/icon.png" + }` + require.JSONEq(t, expectedJSON, string(bytes)) +} diff --git a/notify/msteams/msteams.go b/notify/msteams/msteams.go index d71e108144..ca080badd5 100644 --- a/notify/msteams/msteams.go +++ b/notify/msteams/msteams.go @@ -35,9 +35,12 @@ import ( ) const ( - colorRed = "8C1A1A" - colorGreen = "2DC72D" - colorGrey = "808080" + TextColorAttention = "attention" + TextColorGood = "good" + + TextSizeLarge = "large" + + TextWeightBolder = "bolder" ) type Notifier struct { @@ -50,16 +53,6 @@ type Notifier struct { postJSONFunc func(ctx context.Context, client *http.Client, url string, body io.Reader) (*http.Response, error) } -// Message card reference can be found at https://learn.microsoft.com/en-us/outlook/actionable-messages/message-card-reference. -type teamsMessage struct { - Context string `json:"@context"` - Type string `json:"type"` - Title string `json:"title"` - Summary string `json:"summary"` - Text string `json:"text"` - ThemeColor string `json:"themeColor"` -} - // New returns a new notifier that uses the Microsoft Teams Webhook API. func New(c *config.MSTeamsConfig, t *template.Template, l log.Logger, httpOpts ...commoncfg.HTTPClientOption) (*Notifier, error) { client, err := commoncfg.NewClientFromConfig(*c.HTTPConfig, "msteams", httpOpts...) @@ -107,14 +100,30 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) return false, err } - alerts := types.Alerts(as...) - color := colorGrey - switch alerts.Status() { - case model.AlertFiring: - color = colorRed - case model.AlertResolved: - color = colorGreen - } + card := NewAdaptiveCard() + card.AppendItem(AdaptiveCardTextBlockItem{ + Color: getTeamsTextColor(types.Alerts(as...)), + Text: title, + Size: TextSizeLarge, + Weight: TextWeightBolder, + Wrap: true, + }) + card.AppendItem(AdaptiveCardTextBlockItem{ + Text: text, + Wrap: true, + }) + + card.AppendItem(AdaptiveCardActionSetItem{ + Actions: []AdaptiveCardActionItem{ + AdaptiveCardOpenURLActionItem{ + Title: "View URL", + URL: n.tmpl.ExternalURL.String(), + }, + }, + }) + + msg := NewAdaptiveCardsMessage(card) + msg.Summary = summary var url string if n.conf.WebhookURL != nil { @@ -127,17 +136,8 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) url = strings.TrimSpace(string(content)) } - t := teamsMessage{ - Context: "http://schema.org/extensions", - Type: "MessageCard", - Title: title, - Summary: summary, - Text: text, - ThemeColor: color, - } - var payload bytes.Buffer - if err = json.NewEncoder(&payload).Encode(t); err != nil { + if err = json.NewEncoder(&payload).Encode(msg); err != nil { return false, err } @@ -154,3 +154,11 @@ func (n *Notifier) Notify(ctx context.Context, as ...*types.Alert) (bool, error) } return shouldRetry, err } + +// getTeamsTextColor returns the text color for the message title. +func getTeamsTextColor(alerts model.Alerts) string { + if alerts.Status() == model.AlertFiring { + return TextColorAttention + } + return TextColorGood +} diff --git a/notify/msteams/msteams_test.go b/notify/msteams/msteams_test.go index 80f9439173..91f52abfbf 100644 --- a/notify/msteams/msteams_test.go +++ b/notify/msteams/msteams_test.go @@ -14,6 +14,7 @@ package msteams import ( + "bytes" "context" "encoding/json" "fmt" @@ -146,13 +147,120 @@ func TestNotifier_Notify_WithReason(t *testing.T) { statusCode int responseContent string expectedReason notify.Reason + expectedBody string noError bool + isResolved bool + expectRetry bool }{ { - name: "with a 2xx status code and response 1", + name: "Simple alerting message", statusCode: http.StatusOK, - responseContent: "1", - noError: true, + responseContent: "", + expectedBody: `{ + "attachments": [ + { + "content": { + "body": [ + { + "color":"attention", + "size":"large", + "text":"", + "type":"TextBlock", + "weight":"bolder", + "wrap":true + }, + { + "text":"", + "type":"TextBlock", + "wrap":true + }, + { + "actions":[ + { + "title":"View URL", + "type":"Action.OpenUrl", + "url":"http://am" + } + ], + "type":"ActionSet" + } + ], + "$schema":"http://adaptivecards.io/schemas/adaptive-card.json", + "msTeams": { + "width": "Full" + }, + "type":"AdaptiveCard", + "version":"1.4" + }, + "contentType":"application/vnd.microsoft.card.adaptive" + } + ], + "type":"message" + }`, + noError: true, + }, + { + name: "Resolved message", + statusCode: http.StatusOK, + isResolved: true, + responseContent: "", + expectedBody: `{ + "attachments": [ + { + "content": { + "body": [ + { + "color":"good", + "size":"large", + "text":"", + "type":"TextBlock", + "weight":"bolder", + "wrap":true + }, + { + "text":"", + "type":"TextBlock", + "wrap":true + }, + { + "actions":[ + { + "title":"View URL", + "type":"Action.OpenUrl", + "url":"http://am" + } + ], + "type":"ActionSet" + } + ], + "$schema":"http://adaptivecards.io/schemas/adaptive-card.json", + "msTeams": { + "width": "Full" + }, + "type":"AdaptiveCard", + "version":"1.4" + }, + "contentType":"application/vnd.microsoft.card.adaptive" + } + ], + "type":"message" + }`, + noError: true, + }, + { + name: "Error response 400", + statusCode: http.StatusBadRequest, + responseContent: "error", + expectedReason: notify.ClientErrorReason, + noError: false, + }, + { + name: "Error response 500", + statusCode: http.StatusInternalServerError, + responseContent: "error", + expectedReason: notify.ServerErrorReason, + noError: false, + expectRetry: true, }, } for _, tt := range tests { @@ -167,11 +275,19 @@ func TestNotifier_Notify_WithReason(t *testing.T) { ) require.NoError(t, err) + var requestBody bytes.Buffer + notifier.postJSONFunc = func(ctx context.Context, client *http.Client, url string, body io.Reader) (*http.Response, error) { + _, err := io.Copy(&requestBody, body) + require.NoError(t, err) + resp := httptest.NewRecorder() resp.WriteString(tt.responseContent) - resp.WriteHeader(tt.statusCode) - return resp.Result(), nil + + result := resp.Result() + result.StatusCode = tt.statusCode + + return result, nil } ctx := context.Background() ctx = notify.WithGroupKey(ctx, "1") @@ -179,13 +295,23 @@ func TestNotifier_Notify_WithReason(t *testing.T) { alert1 := &types.Alert{ Alert: model.Alert{ StartsAt: time.Now(), - EndsAt: time.Now().Add(time.Hour), }, } - _, err = notifier.Notify(ctx, alert1) + if tt.isResolved { + alert1.Alert.EndsAt = time.Now() + } else { + alert1.Alert.EndsAt = time.Now().Add(time.Hour) + } + + shouldRetry, err := notifier.Notify(ctx, alert1) + + require.Equal(t, tt.expectRetry, shouldRetry) + if tt.noError { require.NoError(t, err) + require.JSONEq(t, tt.expectedBody, requestBody.String()) } else { + require.Error(t, err) var reasonError *notify.ErrorWithReason require.ErrorAs(t, err, &reasonError) require.Equal(t, tt.expectedReason, reasonError.Reason) diff --git a/notify/notify.go b/notify/notify.go index bd4b4faea5..cc0e90d032 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -27,6 +27,9 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/inhibit" @@ -37,6 +40,8 @@ import ( "github.com/prometheus/alertmanager/types" ) +var tracer = otel.Tracer("github.com/prometheus/alertmanager/notify") + // ResolvedSender returns true if resolved notifications should be sent. type ResolvedSender interface { SendResolved() bool @@ -70,8 +75,8 @@ type Integration struct { } // NewIntegration returns a new integration. -func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int, receiverName string) Integration { - return Integration{ +func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int, receiverName string) *Integration { + return &Integration{ notifier: notifier, rs: rs, name: name, @@ -82,6 +87,14 @@ func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int, // Notify implements the Notifier interface. func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { + ctx, span := tracer.Start(ctx, "notify.Integration.Notify", + trace.WithAttributes( + attribute.String("integration", i.name), + attribute.Int("alerts", len(alerts)), + ), + ) + defer span.End() + return i.notifier.Notify(ctx, alerts...) } @@ -311,7 +324,7 @@ func NewMetrics(r prometheus.Registerer, ff featurecontrol.Flagger) *Metrics { return m } -func (m *Metrics) InitializeFor(receiver map[string][]Integration) { +func (m *Metrics) InitializeFor(receivers map[string][]*Integration) { if m.ff.EnableReceiverNamesInMetrics() { // Reset the vectors to take into account receiver names changing after hot reloads. @@ -321,9 +334,8 @@ func (m *Metrics) InitializeFor(receiver map[string][]Integration) { m.notificationLatencySeconds.Reset() m.numTotalFailedNotifications.Reset() - for name, integrations := range receiver { + for name, integrations := range receivers { for _, integration := range integrations { - m.numNotifications.WithLabelValues(integration.Name(), name) m.numNotificationRequestsTotal.WithLabelValues(integration.Name(), name) m.numNotificationRequestsFailedTotal.WithLabelValues(integration.Name(), name) @@ -379,7 +391,7 @@ func NewPipelineBuilder(r prometheus.Registerer, ff featurecontrol.Flagger) *Pip // New returns a map of receivers to Stages. func (pb *PipelineBuilder) New( - receivers map[string][]Integration, + receivers map[string][]*Integration, wait func() time.Duration, inhibitor *inhibit.Inhibitor, silencer *silence.Silencer, @@ -408,7 +420,7 @@ func (pb *PipelineBuilder) New( // createReceiverStage creates a pipeline of stages for a receiver. func createReceiverStage( name string, - integrations []Integration, + integrations []*Integration, wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, @@ -422,7 +434,7 @@ func createReceiverStage( } var s MultiStage s = append(s, NewWaitStage(wait)) - s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) + s = append(s, NewDedupStage(integrations[i], notificationLog, recv)) s = append(s, NewRetryStage(integrations[i], name, metrics)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) @@ -736,14 +748,14 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { - integration Integration + integration *Integration groupName string metrics *Metrics labelValues []string } // NewRetryStage returns a new instance of a RetryStage. -func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { +func NewRetryStage(i *Integration, groupName string, metrics *Metrics) *RetryStage { labelValues := []string{i.Name()} if metrics.ff.EnableReceiverNamesInMetrics() { @@ -837,9 +849,11 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale case <-tick.C: now := time.Now() retry, err := r.integration.Notify(ctx, sent...) - dur := time.Since(now) - r.metrics.notificationLatencySeconds.WithLabelValues(r.labelValues...).Observe(dur.Seconds()) + + duration := time.Since(now) + r.metrics.notificationLatencySeconds.WithLabelValues(r.labelValues...).Observe(duration.Seconds()) r.metrics.numNotificationRequestsTotal.WithLabelValues(r.labelValues...).Inc() + if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc() if !retry { @@ -860,7 +874,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale lvl = level.Debug(log.With(l, "alerts", fmt.Sprintf("%v", alerts))) } - lvl.Log("msg", "Notify success", "attempts", i, "duration", dur) + lvl.Log("msg", "Notify success", "attempts", i, "duration", duration) return ctx, alerts, nil } case <-ctx.Done(): diff --git a/notify/notify_test.go b/notify/notify_test.go index 5077c6d666..ae65079936 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -381,7 +381,7 @@ func TestRoutingStage(t *testing.T) { func TestRetryStageWithError(t *testing.T) { fail, retry := true, true sent := []*types.Alert{} - i := Integration{ + i := &Integration{ notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { if fail { fail = false @@ -435,7 +435,7 @@ func TestRetryStageWithErrorCode(t *testing.T) { for _, testData := range testcases { retry := false testData := testData - i := Integration{ + i := &Integration{ name: "test", notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { if !testData.isNewErrorWithReason { @@ -472,7 +472,7 @@ func TestRetryStageWithErrorCode(t *testing.T) { func TestRetryStageWithContextCanceled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - i := Integration{ + i := &Integration{ name: "test", notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { cancel() @@ -504,7 +504,7 @@ func TestRetryStageWithContextCanceled(t *testing.T) { func TestRetryStageNoResolved(t *testing.T) { sent := []*types.Alert{} - i := Integration{ + i := &Integration{ notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { sent = append(sent, alerts...) return false, nil @@ -555,7 +555,7 @@ func TestRetryStageNoResolved(t *testing.T) { func TestRetryStageSendResolved(t *testing.T) { sent := []*types.Alert{} - i := Integration{ + i := &Integration{ notifier: notifierFunc(func(ctx context.Context, alerts ...*types.Alert) (bool, error) { sent = append(sent, alerts...) return false, nil @@ -717,11 +717,11 @@ func TestMuteStageWithSilences(t *testing.T) { if err != nil { t.Fatal(err) } - silID, err := silences.Set(&silencepb.Silence{ + sil := &silencepb.Silence{ EndsAt: utcNow().Add(time.Hour), Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}}, - }) - if err != nil { + } + if err = silences.Set(sil); err != nil { t.Fatal(err) } @@ -798,7 +798,7 @@ func TestMuteStageWithSilences(t *testing.T) { } // Expire the silence and verify that no alerts are silenced now. - if err := silences.Expire(silID); err != nil { + if err := silences.Expire(sil.Id); err != nil { t.Fatal(err) } diff --git a/notify/webhook/webhook.go b/notify/webhook/webhook.go index ce0eef3feb..ccee63d154 100644 --- a/notify/webhook/webhook.go +++ b/notify/webhook/webhook.go @@ -25,13 +25,19 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" commoncfg "github.com/prometheus/common/config" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/template" + "github.com/prometheus/alertmanager/tracing" "github.com/prometheus/alertmanager/types" ) +var tracer = otel.Tracer("github.com/prometheus/alertmanager/notify/webhook") + // Notifier implements a Notifier for generic webhooks. type Notifier struct { conf *config.WebhookConfig @@ -47,6 +53,10 @@ func New(conf *config.WebhookConfig, t *template.Template, l log.Logger, httpOpt if err != nil { return nil, err } + + // instrument for tracing + client.Transport = tracing.Transport(client.Transport, "webhook") + return &Notifier{ conf: conf, tmpl: t, @@ -78,6 +88,11 @@ func truncateAlerts(maxAlerts uint64, alerts []*types.Alert) ([]*types.Alert, ui // Notify implements the Notifier interface. func (n *Notifier) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) { + ctx, span := tracer.Start(ctx, "webhook.Notifier.Notify", trace.WithAttributes( + attribute.Int("alerts", len(alerts)), + )) + defer span.End() + alerts, numTruncated := truncateAlerts(n.conf.MaxAlerts, alerts) data := notify.GetTemplateData(ctx, n.tmpl, alerts, n.logger) diff --git a/silence/silence.go b/silence/silence.go index c87ab76e4d..df471820ed 100644 --- a/silence/silence.go +++ b/silence/silence.go @@ -44,10 +44,10 @@ import ( ) // ErrNotFound is returned if a silence was not found. -var ErrNotFound = fmt.Errorf("silence not found") +var ErrNotFound = errors.New("silence not found") // ErrInvalidState is returned if the state isn't valid. -var ErrInvalidState = fmt.Errorf("invalid state") +var ErrInvalidState = errors.New("invalid state") type matcherCache map[*pb.Silence]labels.Matchers @@ -193,6 +193,7 @@ type Silences struct { logger log.Logger metrics *metrics retention time.Duration + limits Limits mtx sync.RWMutex st state @@ -201,6 +202,16 @@ type Silences struct { mc matcherCache } +// Limits contains the limits for silences. +type Limits struct { + // MaxSilences limits the maximum number of silences, including expired + // silences. + MaxSilences func() int + // MaxSilenceSizeBytes is the maximum size of an individual silence as + // stored on disk. + MaxSilenceSizeBytes func() int +} + // MaintenanceFunc represents the function to run as part of the periodic maintenance for silences. // It returns the size of the snapshot taken or an error if it failed. type MaintenanceFunc func() (int64, error) @@ -318,6 +329,7 @@ type Options struct { // Retention time for newly created Silences. Silences may be // garbage collected after the given duration after they ended. Retention time.Duration + Limits Limits // A logger used by background processing. Logger log.Logger @@ -326,7 +338,7 @@ type Options struct { func (o *Options) validate() error { if o.SnapshotFile != "" && o.SnapshotReader != nil { - return fmt.Errorf("only one of SnapshotFile and SnapshotReader must be set") + return errors.New("only one of SnapshotFile and SnapshotReader must be set") } return nil } @@ -342,6 +354,7 @@ func New(o Options) (*Silences, error) { mc: matcherCache{}, logger: log.NewNopLogger(), retention: o.Retention, + limits: o.Limits, broadcast: func([]byte) {}, st: state{}, } @@ -505,9 +518,6 @@ func matchesEmpty(m *pb.Matcher) bool { } func validateSilence(s *pb.Silence) error { - if s.Id == "" { - return errors.New("ID missing") - } if len(s.Matchers) == 0 { return errors.New("at least one matcher required") } @@ -531,9 +541,6 @@ func validateSilence(s *pb.Silence) error { if s.EndsAt.Before(s.StartsAt) { return errors.New("end time must not be before start time") } - if s.UpdatedAt.IsZero() { - return errors.New("invalid zero update timestamp") - } return nil } @@ -543,6 +550,16 @@ func cloneSilence(sil *pb.Silence) *pb.Silence { return &s } +func (s *Silences) checkSizeLimits(msil *pb.MeshSilence) error { + if s.limits.MaxSilenceSizeBytes != nil { + n := msil.Size() + if m := s.limits.MaxSilenceSizeBytes(); m > 0 && n > m { + return fmt.Errorf("silence exceeded maximum size: %d bytes (limit: %d bytes)", n, m) + } + } + return nil +} + func (s *Silences) getSilence(id string) (*pb.Silence, bool) { msil, ok := s.st[id] if !ok { @@ -551,67 +568,111 @@ func (s *Silences) getSilence(id string) (*pb.Silence, bool) { return msil.Silence, true } -func (s *Silences) setSilence(sil *pb.Silence, now time.Time, skipValidate bool) error { - sil.UpdatedAt = now - - if !skipValidate { - if err := validateSilence(sil); err != nil { - return fmt.Errorf("silence invalid: %w", err) - } - } - - msil := &pb.MeshSilence{ +func (s *Silences) toMeshSilence(sil *pb.Silence) *pb.MeshSilence { + return &pb.MeshSilence{ Silence: sil, ExpiresAt: sil.EndsAt.Add(s.retention), } +} + +func (s *Silences) setSilence(msil *pb.MeshSilence, now time.Time) error { b, err := marshalMeshSilence(msil) if err != nil { return err } - if s.st.merge(msil, now) { s.version++ } s.broadcast(b) - return nil } +// Upsert allows creating silences with a predefined ID. +func (s *Silences) Upsert(sil *pb.Silence) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + if err := s.set(sil); !errors.Is(err, ErrNotFound) { + return err + } + + // If the silence was not found, create it with the given ID. + now := s.nowUTC() + if sil.StartsAt.Before(now) { + sil.StartsAt = now + } + sil.UpdatedAt = now + + return s.setSilence(s.toMeshSilence(sil), now) +} + // Set the specified silence. If a silence with the ID already exists and the modification // modifies history, the old silence gets expired and a new one is created. -func (s *Silences) Set(sil *pb.Silence) (string, error) { +func (s *Silences) Set(sil *pb.Silence) error { s.mtx.Lock() defer s.mtx.Unlock() + return s.set(sil) +} +// set assumes a lock is being held in the calling method. +func (s *Silences) set(sil *pb.Silence) error { now := s.nowUTC() - prev, ok := s.getSilence(sil.Id) + if sil.StartsAt.IsZero() { + sil.StartsAt = now + } + + if err := validateSilence(sil); err != nil { + return fmt.Errorf("invalid silence: %w", err) + } + prev, ok := s.getSilence(sil.Id) if sil.Id != "" && !ok { - return "", ErrNotFound + return ErrNotFound } - if ok { - if canUpdate(prev, sil, now) { - return sil.Id, s.setSilence(sil, now, false) + + if ok && canUpdate(prev, sil, now) { + sil.UpdatedAt = now + msil := s.toMeshSilence(sil) + if err := s.checkSizeLimits(msil); err != nil { + return err } - if getState(prev, s.nowUTC()) != types.SilenceStateExpired { - // We cannot update the silence, expire the old one. - if err := s.expire(prev.Id); err != nil { - return "", fmt.Errorf("expire previous silence: %w", err) - } + return s.setSilence(msil, now) + } + + // If we got here it's either a new silence or a replacing one (which would + // also create a new silence) so we need to make sure we have capacity for + // the new silence. + if s.limits.MaxSilences != nil { + if m := s.limits.MaxSilences(); m > 0 && len(s.st)+1 > m { + return fmt.Errorf("exceeded maximum number of silences: %d (limit: %d)", len(s.st), m) } } - // If we got here it's either a new silence or a replacing one. + uid, err := uuid.NewV4() if err != nil { - return "", fmt.Errorf("generate uuid: %w", err) + return fmt.Errorf("generate uuid: %w", err) } sil.Id = uid.String() if sil.StartsAt.Before(now) { sil.StartsAt = now } + sil.UpdatedAt = now + + msil := s.toMeshSilence(sil) + if err := s.checkSizeLimits(msil); err != nil { + return err + } - return sil.Id, s.setSilence(sil, now, false) + if ok && getState(prev, s.nowUTC()) != types.SilenceStateExpired { + // We cannot update the silence, expire the old one to leave a history of + // the silence before modification. + if err := s.expire(prev.Id); err != nil { + return fmt.Errorf("expire previous silence: %w", err) + } + } + + return s.setSilence(msil, now) } // canUpdate returns true if silence a can be updated to b without @@ -669,10 +730,8 @@ func (s *Silences) expire(id string) error { sil.StartsAt = now sil.EndsAt = now } - - // Skip validation of the silence when expiring it. Without this, silences created - // with valid UTF-8 matchers cannot be expired when Alertmanager is run in classic mode. - return s.setSilence(sil, now, true) + sil.UpdatedAt = now + return s.setSilence(s.toMeshSilence(sil), now) } // QueryParam expresses parameters along which silences are queried. @@ -755,6 +814,9 @@ func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) { // Query for silences based on the given query parameters. It returns the // resulting silences and the state version the result is based on. func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, int, error) { + s.mtx.Lock() + defer s.mtx.Unlock() + s.metrics.queriesTotal.Inc() defer prometheus.NewTimer(s.metrics.queryDuration).ObserveDuration() @@ -794,9 +856,6 @@ func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, int, error) { // the use of post-filter functions is the trivial solution for now. var res []*pb.Silence - s.mtx.Lock() - defer s.mtx.Unlock() - if q.ids != nil { for _, id := range q.ids { if s, ok := s.st[id]; ok { diff --git a/silence/silence_bench_test.go b/silence/silence_bench_test.go index 146316e3f3..e1d39a9418 100644 --- a/silence/silence_bench_test.go +++ b/silence/silence_bench_test.go @@ -58,8 +58,7 @@ func benchmarkMutes(b *testing.B, n int) { var silenceIDs []string for i := 0; i < n; i++ { - var silenceID string - silenceID, err = silences.Set(&silencepb.Silence{ + s := &silencepb.Silence{ Matchers: []*silencepb.Matcher{{ Type: silencepb.Matcher_EQUAL, Name: "foo", @@ -67,9 +66,10 @@ func benchmarkMutes(b *testing.B, n int) { }}, StartsAt: now, EndsAt: now.Add(time.Minute), - }) + } + require.NoError(b, silences.Set(s)) require.NoError(b, err) - silenceIDs = append(silenceIDs, silenceID) + silenceIDs = append(silenceIDs, s.Id) } require.Len(b, silenceIDs, n) diff --git a/silence/silence_test.go b/silence/silence_test.go index 89bd6ac6f6..2ff01db104 100644 --- a/silence/silence_test.go +++ b/silence/silence_test.go @@ -15,9 +15,11 @@ package silence import ( "bytes" + "fmt" "os" "runtime" "sort" + "strings" "sync" "testing" "time" @@ -293,7 +295,7 @@ func TestSilencesSetSilence(t *testing.T) { func() { s.mtx.Lock() defer s.mtx.Unlock() - require.NoError(t, s.setSilence(sil, nowpb, false)) + require.NoError(t, s.setSilence(s.toMeshSilence(sil), nowpb)) }() // Ensure broadcast was called. @@ -320,14 +322,13 @@ func TestSilenceSet(t *testing.T) { StartsAt: start1.Add(2 * time.Minute), EndsAt: start1.Add(5 * time.Minute), } - id1, err := s.Set(sil1) - require.NoError(t, err) - require.NotEqual(t, "", id1) + require.NoError(t, s.Set(sil1)) + require.NotEqual(t, "", sil1.Id) want := state{ - id1: &pb.MeshSilence{ + sil1.Id: &pb.MeshSilence{ Silence: &pb.Silence{ - Id: id1, + Id: sil1.Id, Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, StartsAt: start1.Add(2 * time.Minute), EndsAt: start1.Add(5 * time.Minute), @@ -346,15 +347,14 @@ func TestSilenceSet(t *testing.T) { Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, EndsAt: start2.Add(1 * time.Minute), } - id2, err := s.Set(sil2) - require.NoError(t, err) - require.NotEqual(t, "", id2) + require.NoError(t, s.Set(sil2)) + require.NotEqual(t, "", sil2.Id) want = state{ - id1: want[id1], - id2: &pb.MeshSilence{ + sil1.Id: want[sil1.Id], + sil2.Id: &pb.MeshSilence{ Silence: &pb.Silence{ - Id: id2, + Id: sil2.Id, Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, StartsAt: start2, EndsAt: start2.Add(1 * time.Minute), @@ -365,97 +365,390 @@ func TestSilenceSet(t *testing.T) { } require.Equal(t, want, s.st, "unexpected state after silence creation") - // Overwrite silence 2 with new end time. - clock.Add(time.Minute) - start3 := s.nowUTC() - + // Should be able to update silence without modifications. It is expected to + // keep the same ID. sil3 := cloneSilence(sil2) - sil3.EndsAt = start3.Add(100 * time.Minute) + require.NoError(t, s.Set(sil3)) + require.Equal(t, sil2.Id, sil3.Id) - id3, err := s.Set(sil3) - require.NoError(t, err) - require.Equal(t, id2, id3) + // Should be able to update silence with comment. It is also expected to + // keep the same ID. + sil4 := cloneSilence(sil3) + sil4.Comment = "c" + require.NoError(t, s.Set(sil4)) + require.Equal(t, sil3.Id, sil4.Id) + // Extend sil4 to expire at a later time. This should not expire the + // existing silence, and so should also keep the same ID. + clock.Add(time.Minute) + start5 := s.nowUTC() + sil5 := cloneSilence(sil4) + sil5.EndsAt = start5.Add(100 * time.Minute) + require.NoError(t, s.Set(sil5)) + require.Equal(t, sil4.Id, sil5.Id) want = state{ - id1: want[id1], - id2: &pb.MeshSilence{ + sil1.Id: want[sil1.Id], + sil2.Id: &pb.MeshSilence{ Silence: &pb.Silence{ - Id: id2, + Id: sil2.Id, Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, StartsAt: start2, - EndsAt: start3.Add(100 * time.Minute), - UpdatedAt: start3, + EndsAt: start5.Add(100 * time.Minute), + UpdatedAt: start5, + Comment: "c", }, - ExpiresAt: start3.Add(100*time.Minute + s.retention), + ExpiresAt: start5.Add(100*time.Minute + s.retention), }, } require.Equal(t, want, s.st, "unexpected state after silence creation") - // Update this silence again with new matcher. This expires it and creates a new one. + // Replace the silence sil5 with another silence with different matchers. + // Unlike previous updates, changing the matchers for an existing silence + // will expire the existing silence and create a new silence. The new + // silence is expected to have a different ID to preserve the history of + // the previous silence. clock.Add(time.Minute) - start4 := s.nowUTC() - - sil4 := cloneSilence(sil3) - sil4.Matchers = []*pb.Matcher{{Name: "a", Pattern: "c"}} - - id4, err := s.Set(sil4) - require.NoError(t, err) - // This new silence gets a new id. - require.NotEqual(t, id2, id4) + start6 := s.nowUTC() + sil6 := cloneSilence(sil5) + sil6.Matchers = []*pb.Matcher{{Name: "a", Pattern: "c"}} + require.NoError(t, s.Set(sil6)) + require.NotEqual(t, sil5.Id, sil6.Id) want = state{ - id1: want[id1], - id2: &pb.MeshSilence{ + sil1.Id: want[sil1.Id], + sil2.Id: &pb.MeshSilence{ Silence: &pb.Silence{ - Id: id2, + Id: sil2.Id, Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, StartsAt: start2, - EndsAt: start4, // Expired - UpdatedAt: start4, + EndsAt: start6, // Expired + UpdatedAt: start6, + Comment: "c", }, - ExpiresAt: start4.Add(s.retention), + ExpiresAt: start6.Add(s.retention), }, - id4: &pb.MeshSilence{ + sil6.Id: &pb.MeshSilence{ Silence: &pb.Silence{ - Id: id4, + Id: sil6.Id, Matchers: []*pb.Matcher{{Name: "a", Pattern: "c"}}, - StartsAt: start4, - EndsAt: start3.Add(100 * time.Minute), - UpdatedAt: start4, + StartsAt: start6, + EndsAt: start5.Add(100 * time.Minute), + UpdatedAt: start6, + Comment: "c", }, - ExpiresAt: start3.Add(100*time.Minute + s.retention), + ExpiresAt: start5.Add(100*time.Minute + s.retention), }, } require.Equal(t, want, s.st, "unexpected state after silence creation") - // Re-create the silence that just expired. + // Re-create the silence that we just replaced. Changing the start time, + // just like changing the matchers, creates a new silence with a different + // ID. This is again to preserve the history of the original silence. clock.Add(time.Minute) - start5 := s.nowUTC() + start7 := s.nowUTC() + sil7 := cloneSilence(sil5) + sil7.StartsAt = start1 + sil7.EndsAt = start1.Add(5 * time.Minute) + require.NoError(t, s.Set(sil7)) + require.NotEqual(t, sil2.Id, sil7.Id) + want = state{ + sil1.Id: want[sil1.Id], + sil2.Id: want[sil2.Id], + sil6.Id: want[sil6.Id], + sil7.Id: &pb.MeshSilence{ + Silence: &pb.Silence{ + Id: sil7.Id, + Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, + StartsAt: start7, // New silences have their start time set to "now" when created. + EndsAt: start1.Add(5 * time.Minute), + UpdatedAt: start7, + Comment: "c", + }, + ExpiresAt: start1.Add(5*time.Minute + s.retention), + }, + } + require.Equal(t, want, s.st, "unexpected state after silence creation") + + // Updating an existing silence with an invalid silence should not expire + // the original silence. + clock.Add(time.Millisecond) + sil8 := cloneSilence(sil7) + sil8.EndsAt = time.Time{} + require.EqualError(t, s.Set(sil8), "invalid silence: invalid zero end timestamp") + + // sil7 should not be expired because the update failed. + clock.Add(time.Millisecond) + sil7, err = s.QueryOne(QIDs(sil7.Id)) + require.NoError(t, err) + require.Equal(t, types.SilenceStateActive, getState(sil7, s.nowUTC())) +} + +func TestSilenceLimits(t *testing.T) { + s, err := New(Options{ + Limits: Limits{ + MaxSilences: func() int { return 1 }, + MaxSilenceSizeBytes: func() int { return 2 << 11 }, // 4KB + }, + }) + require.NoError(t, err) + + // Insert sil1 should succeed without error. + sil1 := &pb.Silence{ + Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, + StartsAt: time.Now(), + EndsAt: time.Now().Add(5 * time.Minute), + } + require.NoError(t, s.Set(sil1)) + + // Insert sil2 should fail because maximum number of silences has been + // exceeded. + sil2 := &pb.Silence{ + Matchers: []*pb.Matcher{{Name: "c", Pattern: "d"}}, + StartsAt: time.Now(), + EndsAt: time.Now().Add(5 * time.Minute), + } + require.EqualError(t, s.Set(sil2), "exceeded maximum number of silences: 1 (limit: 1)") + + // Expire sil1 and run the GC. This should allow sil2 to be inserted. + require.NoError(t, s.Expire(sil1.Id)) + n, err := s.GC() + require.NoError(t, err) + require.Equal(t, 1, n) + require.NoError(t, s.Set(sil2)) + + // Expire sil2 and run the GC. + require.NoError(t, s.Expire(sil2.Id)) + n, err = s.GC() + require.NoError(t, err) + require.Equal(t, 1, n) + + // Insert sil3 should fail because it exceeds maximum size. + sil3 := &pb.Silence{ + Matchers: []*pb.Matcher{ + { + Name: strings.Repeat("e", 2<<9), + Pattern: strings.Repeat("f", 2<<9), + }, + { + Name: strings.Repeat("g", 2<<9), + Pattern: strings.Repeat("h", 2<<9), + }, + }, + CreatedBy: strings.Repeat("i", 2<<9), + Comment: strings.Repeat("j", 2<<9), + StartsAt: time.Now(), + EndsAt: time.Now().Add(5 * time.Minute), + } + require.EqualError(t, s.Set(sil3), fmt.Sprintf("silence exceeded maximum size: %d bytes (limit: 4096 bytes)", s.toMeshSilence(sil3).Size())) + + // Should be able to insert sil4. + sil4 := &pb.Silence{ + Matchers: []*pb.Matcher{{Name: "k", Pattern: "l"}}, + StartsAt: time.Now(), + EndsAt: time.Now().Add(5 * time.Minute), + } + require.NoError(t, s.Set(sil4)) + + // Should be able to update sil4 without modifications. It is expected to + // keep the same ID. + sil5 := cloneSilence(sil4) + require.NoError(t, s.Set(sil5)) + require.Equal(t, sil4.Id, sil5.Id) + + // Should be able to update the comment. It is also expected to keep the + // same ID. + sil6 := cloneSilence(sil5) + sil6.Comment = "m" + require.NoError(t, s.Set(sil6)) + require.Equal(t, sil5.Id, sil6.Id) + + // Should not be able to update the start and end time as this requires + // sil6 to be expired and a new silence to be created. However, this would + // exceed the maximum number of silences, which counts both active and + // expired silences. + sil7 := cloneSilence(sil6) + sil7.StartsAt = time.Now().Add(5 * time.Minute) + sil7.EndsAt = time.Now().Add(10 * time.Minute) + require.EqualError(t, s.Set(sil7), "exceeded maximum number of silences: 1 (limit: 1)") + + // sil6 should not be expired because the update failed. + sil6, err = s.QueryOne(QIDs(sil6.Id)) + require.NoError(t, err) + require.Equal(t, types.SilenceStateActive, getState(sil6, s.nowUTC())) + + // Should not be able to update with a comment that exceeds maximum size. + // Need to increase the maximum number of silences to test this. + s.limits.MaxSilences = func() int { return 2 } + sil8 := cloneSilence(sil6) + sil8.Comment = strings.Repeat("m", 2<<11) + require.EqualError(t, s.Set(sil8), fmt.Sprintf("silence exceeded maximum size: %d bytes (limit: 4096 bytes)", s.toMeshSilence(sil8).Size())) + + // sil6 should not be expired because the update failed. + sil6, err = s.QueryOne(QIDs(sil6.Id)) + require.NoError(t, err) + require.Equal(t, types.SilenceStateActive, getState(sil6, s.nowUTC())) + + // Should not be able to replace with a silence that exceeds maximum size. + // This is different from the previous assertion as unlike when adding or + // updating a comment, changing the matchers for a silence should expire + // the existing silence, unless the silence that is replacing it exceeds + // limits, in which case the operation should fail and the existing silence + // should still be active. + sil9 := cloneSilence(sil8) + sil9.Matchers = []*pb.Matcher{{Name: "n", Pattern: "o"}} + require.EqualError(t, s.Set(sil9), fmt.Sprintf("silence exceeded maximum size: %d bytes (limit: 4096 bytes)", s.toMeshSilence(sil9).Size())) + + // sil6 should not be expired because the update failed. + sil6, err = s.QueryOne(QIDs(sil6.Id)) + require.NoError(t, err) + require.Equal(t, types.SilenceStateActive, getState(sil6, s.nowUTC())) +} + +func TestSilenceNoLimits(t *testing.T) { + s, err := New(Options{ + Limits: Limits{}, + }) + require.NoError(t, err) + + // Insert sil should succeed without error. + sil := &pb.Silence{ + Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, + StartsAt: time.Now(), + EndsAt: time.Now().Add(5 * time.Minute), + Comment: strings.Repeat("c", 2<<9), + } + require.NoError(t, s.Set(sil)) + require.NotEqual(t, "", sil.Id) +} + +func TestSilenceUpsert(t *testing.T) { + s, err := New(Options{ + Retention: time.Hour, + }) + require.NoError(t, err) + + clock := clock.NewMock() + s.clock = clock + + // Inserting an invalid silence should fail. + checkErr(t, "invalid silence", s.Upsert(&pb.Silence{})) - sil5 := cloneSilence(sil3) - sil5.StartsAt = start1 - sil5.EndsAt = start1.Add(5 * time.Minute) + // Insert a silence with the id "foo". + clock.Add(time.Minute) + start1 := s.nowUTC() + sil1 := &pb.Silence{ + Id: "foo", + Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, + StartsAt: start1, + EndsAt: start1.Add(5 * time.Minute), + } + require.NoError(t, s.Upsert(sil1)) + require.Equal(t, "foo", sil1.Id) + + want := state{ + sil1.Id: &pb.MeshSilence{ + Silence: &pb.Silence{ + Id: "foo", + Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, + StartsAt: start1, + EndsAt: start1.Add(5 * time.Minute), + UpdatedAt: start1, + }, + ExpiresAt: start1.Add(5*time.Minute + s.retention), + }, + } + require.Equal(t, want, s.st, "unexpected state after silence creation") + + // Updating the silence should fail because the new silence + // is invalid. The original silence should not be expired. + sil2 := cloneSilence(sil1) + sil2.Matchers = nil + require.EqualError(t, s.Upsert(sil2), "invalid silence: at least one matcher required") + sil1, err = s.QueryOne(QIDs(sil1.Id)) - id5, err := s.Set(sil5) require.NoError(t, err) - require.NotEqual(t, id2, id4) + require.Equal(t, types.SilenceStateActive, getState(sil1, s.nowUTC())) + require.Equal(t, want, s.st, "unexpected state after silence creation") + + // Adding a comment should not expire the original silence. + clock.Add(time.Minute) + start3 := s.nowUTC() + sil3 := cloneSilence(sil1) + sil3.Comment = "c" + require.NoError(t, s.Upsert(sil3)) + require.Equal(t, sil1.Id, sil3.Id) + + want[sil1.Id].Silence.Comment = "c" + want[sil1.Id].Silence.UpdatedAt = start3 + require.Equal(t, want, s.st, "unexpected state after silence creation") + + // Changing the matchers should expire the original silence and + // create a new silence. + clock.Add(time.Minute) + start4 := s.nowUTC() + sil4 := cloneSilence(sil3) + sil4.Matchers = []*pb.Matcher{{Name: "c", Pattern: "d"}} + require.NoError(t, s.Upsert(sil4)) + require.NotEqual(t, sil1.Id, sil4.Id) + + clock.Add(time.Millisecond) + sil1, err = s.QueryOne(QIDs(sil1.Id)) + require.NoError(t, err) + require.Equal(t, types.SilenceStateExpired, getState(sil1, s.nowUTC())) want = state{ - id1: want[id1], - id2: want[id2], - id4: want[id4], - id5: &pb.MeshSilence{ + sil1.Id: &pb.MeshSilence{ Silence: &pb.Silence{ - Id: id5, + Id: "foo", Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, - StartsAt: start5, // New silences have their start time set to "now" when created. + StartsAt: start1, + EndsAt: start4, + UpdatedAt: start4, + Comment: "c", + }, + ExpiresAt: start4.Add(s.retention), + }, + sil4.Id: &pb.MeshSilence{ + Silence: &pb.Silence{ + Id: sil4.Id, + Matchers: []*pb.Matcher{{Name: "c", Pattern: "d"}}, + StartsAt: start4, EndsAt: start1.Add(5 * time.Minute), - UpdatedAt: start5, + UpdatedAt: start4, + Comment: "c", }, ExpiresAt: start1.Add(5*time.Minute + s.retention), }, } require.Equal(t, want, s.st, "unexpected state after silence creation") + + // Changing the ID of the silence should upsert a new silence. + clock.Add(time.Minute) + start5 := s.nowUTC() + sil5 := cloneSilence(sil4) + sil5.Id = "bar" + require.NoError(t, s.Upsert(sil5)) + require.NotEqual(t, sil4.Id, sil5.Id) + + want[sil5.Id] = &pb.MeshSilence{ + Silence: &pb.Silence{ + Id: sil5.Id, + Matchers: []*pb.Matcher{{Name: "c", Pattern: "d"}}, + StartsAt: start5, + EndsAt: start1.Add(5 * time.Minute), + UpdatedAt: start5, + Comment: "c", + }, + ExpiresAt: start1.Add(5*time.Minute + s.retention), + } + require.Equal(t, want, s.st, "unexpected state after silence creation") + + // Changing the ID of the silence should fail when it is invalid. + clock.Add(time.Minute) + sil6 := cloneSilence(sil5) + sil6.Id = "baz" + sil6.EndsAt = time.Time{} + require.EqualError(t, s.Upsert(sil6), "invalid silence: invalid zero end timestamp") } func TestSetActiveSilence(t *testing.T) { @@ -476,7 +769,7 @@ func TestSetActiveSilence(t *testing.T) { StartsAt: startsAt, EndsAt: endsAt, } - id1, _ := s.Set(sil1) + require.NoError(t, s.Set(sil1)) // Update silence with 2 extra nanoseconds so the "seconds" part should not change @@ -484,20 +777,19 @@ func TestSetActiveSilence(t *testing.T) { newEndsAt := endsAt.Add(2 * time.Minute) sil2 := cloneSilence(sil1) - sil2.Id = id1 + sil2.Id = sil1.Id sil2.StartsAt = newStartsAt sil2.EndsAt = newEndsAt clock.Add(time.Minute) now = s.nowUTC() - id2, err := s.Set(sil2) - require.NoError(t, err) - require.Equal(t, id1, id2) + require.NoError(t, s.Set(sil2)) + require.Equal(t, sil1.Id, sil2.Id) want := state{ - id2: &pb.MeshSilence{ + sil2.Id: &pb.MeshSilence{ Silence: &pb.Silence{ - Id: id1, + Id: sil1.Id, Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, StartsAt: newStartsAt, EndsAt: newEndsAt, @@ -521,16 +813,19 @@ func TestSilencesSetFail(t *testing.T) { err string }{ { - s: &pb.Silence{Id: "some_id"}, + s: &pb.Silence{ + Id: "some_id", + Matchers: []*pb.Matcher{{Name: "a", Pattern: "b"}}, + EndsAt: clock.Now().Add(5 * time.Minute), + }, err: ErrNotFound.Error(), }, { s: &pb.Silence{}, // Silence without matcher. - err: "silence invalid", + err: "invalid silence", }, } for _, c := range cases { - _, err := s.Set(c.s) - checkErr(t, c.err, err) + checkErr(t, c.err, s.Set(c.s)) } } @@ -1095,22 +1390,22 @@ func TestSilencer(t *testing.T) { require.False(t, s.Mutes(model.LabelSet{"foo": "bar"}), "expected alert not silenced without any silences") - _, err = ss.Set(&pb.Silence{ + sil1 := &pb.Silence{ Matchers: []*pb.Matcher{{Name: "foo", Pattern: "baz"}}, StartsAt: now.Add(-time.Hour), EndsAt: now.Add(5 * time.Minute), - }) - require.NoError(t, err) + } + require.NoError(t, ss.Set(sil1)) require.False(t, s.Mutes(model.LabelSet{"foo": "bar"}), "expected alert not silenced by non-matching silence") - id, err := ss.Set(&pb.Silence{ + sil2 := &pb.Silence{ Matchers: []*pb.Matcher{{Name: "foo", Pattern: "bar"}}, StartsAt: now.Add(-time.Hour), EndsAt: now.Add(5 * time.Minute), - }) - require.NoError(t, err) - require.NotEmpty(t, id) + } + require.NoError(t, ss.Set(sil2)) + require.NotEmpty(t, sil2.Id) require.True(t, s.Mutes(model.LabelSet{"foo": "bar"}), "expected alert silenced by matching silence") @@ -1121,8 +1416,8 @@ func TestSilencer(t *testing.T) { require.False(t, s.Mutes(model.LabelSet{"foo": "bar"}), "expected alert not silenced by expired silence") // Update silence to start in the future. - _, err = ss.Set(&pb.Silence{ - Id: id, + err = ss.Set(&pb.Silence{ + Id: sil2.Id, Matchers: []*pb.Matcher{{Name: "foo", Pattern: "bar"}}, StartsAt: now.Add(time.Hour), EndsAt: now.Add(3 * time.Hour), @@ -1138,7 +1433,7 @@ func TestSilencer(t *testing.T) { // Exposes issue #2426. require.True(t, s.Mutes(model.LabelSet{"foo": "bar"}), "expected alert silenced by activated silence") - _, err = ss.Set(&pb.Silence{ + err = ss.Set(&pb.Silence{ Matchers: []*pb.Matcher{{Name: "foo", Pattern: "b..", Type: pb.Matcher_REGEXP}}, StartsAt: now.Add(time.Hour), EndsAt: now.Add(3 * time.Hour), @@ -1367,18 +1662,6 @@ func TestValidateSilence(t *testing.T) { }, err: "", }, - { - s: &pb.Silence{ - Id: "", - Matchers: []*pb.Matcher{ - {Name: "a", Pattern: "b"}, - }, - StartsAt: validTimestamp, - EndsAt: validTimestamp, - UpdatedAt: validTimestamp, - }, - err: "ID missing", - }, { s: &pb.Silence{ Id: "some_id", @@ -1451,18 +1734,6 @@ func TestValidateSilence(t *testing.T) { }, err: "invalid zero end timestamp", }, - { - s: &pb.Silence{ - Id: "some_id", - Matchers: []*pb.Matcher{ - {Name: "a", Pattern: "b"}, - }, - StartsAt: validTimestamp, - EndsAt: validTimestamp, - UpdatedAt: zeroTimestamp, - }, - err: "invalid zero update timestamp", - }, } for _, c := range cases { checkErr(t, c.err, validateSilence(c.s)) diff --git a/test/with_api_v2/acceptance/utf8_test.go b/test/with_api_v2/acceptance/utf8_test.go index 6c12b2f762..c0c1dce256 100644 --- a/test/with_api_v2/acceptance/utf8_test.go +++ b/test/with_api_v2/acceptance/utf8_test.go @@ -267,7 +267,7 @@ receivers: _, err := am.Client().Silence.PostSilences(silenceParams) require.Error(t, err) - require.True(t, strings.Contains(err.Error(), "silence invalid: invalid label matcher")) + require.True(t, strings.Contains(err.Error(), "invalid silence: invalid label matcher")) } func TestSendAlertsToUTF8Route(t *testing.T) { diff --git a/tracing/http.go b/tracing/http.go new file mode 100644 index 0000000000..00305173c7 --- /dev/null +++ b/tracing/http.go @@ -0,0 +1,51 @@ +// Copyright 2024 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + "fmt" + "net/http" + "net/http/httptrace" + + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" +) + +// Transport wraps the provided http.RoundTripper with one that starts a span +// and injects the span context into the outbound request headers. If the +// provided http.RoundTripper is nil, http.DefaultTransport will be used as the +// base http.RoundTripper. +func Transport(rt http.RoundTripper, name string) http.RoundTripper { + rt = otelhttp.NewTransport(rt, + otelhttp.WithClientTrace(func(ctx context.Context) *httptrace.ClientTrace { + return otelhttptrace.NewClientTrace(ctx) + }), + otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { + return name + "/HTTP " + r.Method + }), + ) + + return rt +} + +// Middleware returns a new HTTP handler that will trace all requests with the +// HTTP method and path as the span name. +func Middleware(handler http.Handler) http.Handler { + return otelhttp.NewHandler(handler, "", + otelhttp.WithSpanNameFormatter(func(_ string, r *http.Request) string { + return fmt.Sprintf("%s %s", r.Method, r.URL.Path) + }), + ) +}