Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support Alertmanager API v2 #1982

Merged
merged 2 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#1969](https://github.com/thanos-io/thanos/pull/1969) Sidecar: allow setting http connection pool size via flags
- [#1967](https://github.com/thanos-io/thanos/issues/1967) Receive: Allow local TSDB compaction
- [#1970](https://github.com/thanos-io/thanos/issues/1970) *breaking* Receive: Use gRPC for forwarding requests between peers. Note that existing values for the `--receive.local-endpoint` flag and the endpoints in the hashring configuration file must now specify the receive gRPC port and must be updated to be a simple `host:port` combination, e.g. `127.0.0.1:10901`, rather than a full HTTP URL, e.g. `http://127.0.0.1:10902/api/v1/receive`.
- [#1982](https://github.com/thanos-io/thanos/pull/1982) Ruler: Add support for Alertmanager v2 API endpoints.

## [v0.10.0](https://github.com/thanos-io/thanos/releases/tag/v0.10.0) - 2020.01.13

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ME ?= $(shell whoami)
PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0
PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0

ALERTMANAGER_VERSION ?= v0.15.2
ALERTMANAGER_VERSION ?= v0.20.0
ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION)

MINIO_SERVER_VERSION ?= RELEASE.2018-10-06T00-15-16Z
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func runRule(
// Discover and resolve Alertmanager addresses.
addDiscoveryGroups(g, amClient, alertmgrsDNSSDInterval)

alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout)))
alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion))
}

// Run rule evaluation and alert notifications.
Expand Down
3 changes: 3 additions & 0 deletions docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,11 @@ alertmanagers:
scheme: http
path_prefix: ""
timeout: 10s
api_version: v1
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
```

Supported values for `api_version` are `v1` or `v2`.

### Query API

The `--query.config` and `--query.config-file` flags allow specifying multiple query endpoints. Those entries are treated as a single HA group. This means that query failure is claimed only if the Ruler fails to query all instances.
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.7
github.com/go-kit/kit v0.9.0
github.com/go-openapi/strfmt v0.19.2
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
github.com/golang/snappy v0.0.1
Expand Down Expand Up @@ -66,6 +67,7 @@ require (
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/alertmanager v0.20.0
github.com/prometheus/client_golang v1.2.1
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.7.0
Expand Down
66 changes: 66 additions & 0 deletions go.sum

Large diffs are not rendered by default.

76 changes: 64 additions & 12 deletions pkg/alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-openapi/strfmt"
"github.com/pkg/errors"
"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"

Expand All @@ -26,7 +28,6 @@ import (

const (
defaultAlertmanagerPort = 9093
alertPushEndpoint = "/api/v1/alerts"
contentTypeJSON = "application/json"
)

Expand Down Expand Up @@ -255,6 +256,7 @@ func (q *Queue) Push(alerts []*Alert) {
type Sender struct {
logger log.Logger
alertmanagers []*Alertmanager
versions []APIVersion

sent *prometheus.CounterVec
errs *prometheus.CounterVec
Expand All @@ -272,9 +274,20 @@ func NewSender(
if logger == nil {
logger = log.NewNopLogger()
}
var (
versions []APIVersion
versionPresent map[APIVersion]struct{}
)
for _, am := range alertmanagers {
if _, found := versionPresent[am.version]; found {
continue
}
versions = append(versions, am.version)
}
s := &Sender{
logger: logger,
alertmanagers: alertmanagers,
versions: versions,

sent: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_alert_sender_alerts_sent_total",
Expand Down Expand Up @@ -302,6 +315,15 @@ func NewSender(
return s
}

func toAPILabels(labels labels.Labels) models.LabelSet {
apiLabels := make(models.LabelSet, len(labels))
for _, label := range labels {
apiLabels[label.Name] = label.Value
}

return apiLabels
}

// Send an alert batch to all given Alertmanager clients.
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
Expand All @@ -310,10 +332,38 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
if len(alerts) == 0 {
return
}
b, err := json.Marshal(alerts)
if err != nil {
level.Warn(s.logger).Log("msg", "sending alerts failed", "err", err)
return

payload := make(map[APIVersion][]byte)
for _, version := range s.versions {
var (
b []byte
err error
)
switch version {
case APIv1:
if b, err = json.Marshal(alerts); err != nil {
level.Warn(s.logger).Log("msg", "encoding alerts for v1 API failed", "err", err)
return
}
case APIv2:
apiAlerts := make(models.PostableAlerts, 0, len(alerts))
for _, a := range alerts {
apiAlerts = append(apiAlerts, &models.PostableAlert{
Annotations: toAPILabels(a.Annotations),
EndsAt: strfmt.DateTime(a.EndsAt),
StartsAt: strfmt.DateTime(a.StartsAt),
Alert: models.Alert{
GeneratorURL: strfmt.URI(a.GeneratorURL),
Labels: toAPILabels(a.Labels),
},
})
}
if b, err = json.Marshal(apiAlerts); err != nil {
level.Warn(s.logger).Log("msg", "encoding alerts for v2 API failed", "err", err)
return
}
}
payload[version] = b
}

var (
Expand All @@ -323,18 +373,19 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
for _, am := range s.alertmanagers {
for _, u := range am.dispatcher.Endpoints() {
wg.Add(1)
go func(am *Alertmanager, u *url.URL) {
go func(am *Alertmanager, u url.URL) {
defer wg.Done()

level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts))
start := time.Now()
u.Path = path.Join(u.Path, fmt.Sprintf("/api/%s/alerts", string(am.version)))
span, ctx := tracing.StartSpan(ctx, "post_alerts HTTP[client]")
defer span.Finish()
if err := am.postAlerts(ctx, *u, bytes.NewReader(b)); err != nil {
if err := am.postAlerts(ctx, u, bytes.NewReader(payload[am.version])); err != nil {
level.Warn(s.logger).Log(
"msg", "sending alerts failed",
"alertmanager", u.Host,
"numAlerts", len(alerts),
"alerts", string(payload[am.version]),
"err", err,
)
s.errs.WithLabelValues(u.Host).Inc()
Expand All @@ -344,7 +395,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
s.sent.WithLabelValues(u.Host).Add(float64(len(alerts)))

atomic.AddUint64(&numSuccess, 1)
}(am, u)
}(am, *u)
}
}
wg.Wait()
Expand All @@ -354,7 +405,7 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
}

s.dropped.Add(float64(len(alerts)))
level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "alerts", string(b))
level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "numAlerts", len(alerts))
}

type Dispatcher interface {
Expand All @@ -369,10 +420,11 @@ type Alertmanager struct {
logger log.Logger
dispatcher Dispatcher
timeout time.Duration
version APIVersion
}

// NewAlertmanager returns a new Alertmanager client.
func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Duration) *Alertmanager {
func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Duration, version APIVersion) *Alertmanager {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -381,11 +433,11 @@ func NewAlertmanager(logger log.Logger, dispatcher Dispatcher, timeout time.Dura
logger: logger,
dispatcher: dispatcher,
timeout: timeout,
version: version,
}
}

func (a *Alertmanager) postAlerts(ctx context.Context, u url.URL, r io.Reader) error {
u.Path = path.Join(u.Path, alertPushEndpoint)
req, err := http.NewRequest("POST", u.String(), r)
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/alert/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestSenderSendsOk(t *testing.T) {
poster := &fakeClient{
urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}},
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute)})
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})

Expand All @@ -104,7 +104,7 @@ func TestSenderSendsOneFails(t *testing.T) {
return rec.Result(), nil
},
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute)})
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})

Expand All @@ -125,7 +125,7 @@ func TestSenderSendsAllFail(t *testing.T) {
return nil, errors.New("no such host")
},
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute)})
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})

Expand Down
34 changes: 32 additions & 2 deletions pkg/alert/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"

Expand All @@ -19,11 +20,39 @@ type AlertingConfig struct {
}

// AlertmanagerConfig represents a client to a cluster of Alertmanager endpoints.
// TODO(simonpasquier): add support for API version (v1 or v2).
type AlertmanagerConfig struct {
HTTPClientConfig http_util.ClientConfig `yaml:"http_config"`
EndpointsConfig http_util.EndpointsConfig `yaml:",inline"`
Timeout model.Duration `yaml:"timeout"`
APIVersion APIVersion `yaml:"api_version"`
}

// APIVersion represents the API version of the Alertmanager endpoint.
type APIVersion string

const (
APIv1 APIVersion = "v1"
APIv2 APIVersion = "v2"
)

var supportedAPIVersions = []APIVersion{
APIv1, APIv2,
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (v *APIVersion) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {
return errors.Wrap(err, "invalid Alertmanager API version")
}

for _, ver := range supportedAPIVersions {
if APIVersion(s) == ver {
*v = ver
return nil
}
}
return errors.Errorf("expected Alertmanager API version to be one of %v but got %q", supportedAPIVersions, s)
}

func DefaultAlertmanagerConfig() AlertmanagerConfig {
Expand All @@ -33,7 +62,8 @@ func DefaultAlertmanagerConfig() AlertmanagerConfig {
StaticAddresses: []string{},
FileSDConfigs: []http_util.FileSDConfig{},
},
Timeout: model.Duration(time.Second * 10),
Timeout: model.Duration(time.Second * 10),
APIVersion: APIv1,
}
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/alert/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,43 @@ import (
"testing"
"time"

"gopkg.in/yaml.v2"

"github.com/thanos-io/thanos/pkg/http"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestUnmarshalAPIVersion(t *testing.T) {
for _, tc := range []struct {
v string

err bool
expected APIVersion
}{
{
v: "v1",
expected: APIv1,
},
{
v: "v3",
err: true,
},
{
v: "{}",
err: true,
},
} {
var got APIVersion
err := yaml.Unmarshal([]byte(tc.v), &got)
if tc.err {
testutil.NotOk(t, err)
continue
}
testutil.Ok(t, err)
testutil.Equals(t, tc.expected, got)
}
}

func TestBuildAlertmanagerConfiguration(t *testing.T) {
for _, tc := range []struct {
address string
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutil/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

const (
defaultPrometheusVersion = "v2.13.0"
defaultAlertmanagerVersion = "v0.15.2"
defaultAlertmanagerVersion = "v0.20.0"
defaultMinioVersion = "RELEASE.2018-10-06T00-15-16Z"

// Space delimited list of versions.
Expand Down
Loading