diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 115be52ff8..50db86336b 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "net" "net/http" "net/url" "os" @@ -13,7 +12,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "syscall" "time" @@ -33,6 +31,7 @@ import ( "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/util/strutil" "github.com/thanos-io/thanos/pkg/alert" + "github.com/thanos-io/thanos/pkg/alertmanager" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" @@ -288,7 +287,7 @@ func runRule( // Run rule evaluation and alert notifications. var ( - alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver)) + alertmgrs = alertmanager.NewAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver)) alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) ruleMgr = thanosrule.NewManager(dataDir) ) @@ -353,7 +352,7 @@ func runRule( } { // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. - sdr := alert.NewSender(logger, reg, alertmgrs.get, nil, alertmgrsTimeout) + sdr := alert.NewSender(logger, reg, alertmgrs.Get, nil, alertmgrsTimeout) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { @@ -375,7 +374,7 @@ func runRule( g.Add(func() error { return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - if err := alertmgrs.update(ctx); err != nil { + if err := alertmgrs.Update(ctx); err != nil { level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err) alertMngrAddrResolutionErrors.Inc() } @@ -615,90 +614,6 @@ func runRule( return nil } -type alertmanagerSet struct { - resolver dns.Resolver - addrs []string - mtx sync.Mutex - current []*url.URL -} - -func newAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet { - return &alertmanagerSet{ - resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)), - addrs: addrs, - } -} - -func (s *alertmanagerSet) get() []*url.URL { - s.mtx.Lock() - defer s.mtx.Unlock() - return s.current -} - -const defaultAlertmanagerPort = 9093 - -func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) { - qType = "" - parsedUrl, err = url.Parse(addr) - if err != nil { - return qType, nil, err - } - // The Scheme might contain DNS resolver type separated by + so we split it a part. - if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 { - parsedUrl.Scheme = schemeParts[len(schemeParts)-1] - qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+")) - } - return qType, parsedUrl, err -} - -func (s *alertmanagerSet) update(ctx context.Context) error { - var result []*url.URL - for _, addr := range s.addrs { - var ( - qtype dns.QType - resolvedDomain []string - ) - - qtype, u, err := parseAlertmanagerAddress(addr) - if err != nil { - return errors.Wrapf(err, "parse URL %q", addr) - } - - // Get only the host and resolve it if needed. - host := u.Host - if qtype != "" { - if qtype == dns.A { - _, _, err = net.SplitHostPort(host) - if err != nil { - // The host could be missing a port. Append the defaultAlertmanagerPort. - host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) - } - } - resolvedDomain, err = s.resolver.Resolve(ctx, host, qtype) - if err != nil { - return errors.Wrap(err, "alertmanager resolve") - } - } else { - resolvedDomain = []string{host} - } - - for _, resolved := range resolvedDomain { - result = append(result, &url.URL{ - Scheme: u.Scheme, - Host: resolved, - Path: u.Path, - User: u.User, - }) - } - } - - s.mtx.Lock() - s.current = result - s.mtx.Unlock() - - return nil -} - func parseFlagLabels(s []string) (labels.Labels, error) { var lset labels.Labels for _, l := range s { diff --git a/cmd/thanos/rule_test.go b/cmd/thanos/rule_test.go index f4d801d747..2abd38cf6a 100644 --- a/cmd/thanos/rule_test.go +++ b/cmd/thanos/rule_test.go @@ -1,12 +1,8 @@ package main import ( - "context" - "net/url" "testing" - "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -49,97 +45,3 @@ func Test_parseFlagLabels(t *testing.T) { testutil.Equals(t, err != nil, td.expectErr) } } - -func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) { - mockResolver := mockResolver{ - resultIPs: map[string][]string{ - "alertmanager.com:9093": {"1.1.1.1:9300"}, - }, - } - am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}} - - ctx := context.TODO() - err := am.update(ctx) - testutil.Ok(t, err) - - expected := []*url.URL{ - { - Scheme: "http", - Host: "1.1.1.1:9300", - }, - } - gotURLs := am.get() - testutil.Equals(t, expected, gotURLs) -} - -func TestRule_AlertmanagerResolveWithPort(t *testing.T) { - mockResolver := mockResolver{ - resultIPs: map[string][]string{ - "alertmanager.com:19093": {"1.1.1.1:9300"}, - }, - } - am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}} - - ctx := context.TODO() - err := am.update(ctx) - testutil.Ok(t, err) - - expected := []*url.URL{ - { - Scheme: "http", - Host: "1.1.1.1:9300", - }, - } - gotURLs := am.get() - testutil.Equals(t, expected, gotURLs) -} - -type mockResolver struct { - resultIPs map[string][]string - err error -} - -func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) { - if m.err != nil { - return nil, m.err - } - if res, ok := m.resultIPs[name]; ok { - return res, nil - } - return nil, errors.Errorf("mockResolver not found response for name: %s", name) -} - -func Test_ParseAlertmanagerAddress(t *testing.T) { - var tData = []struct { - address string - expectQueryType dns.QType - expectUrl *url.URL - expectError error - }{ - { - address: "http://user:pass+word@foo.bar:3289", - expectQueryType: dns.QType(""), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")}, - expectError: nil, - }, - { - address: "dnssrvnoa+http://user:pass+word@foo.bar:3289", - expectQueryType: dns.QType("dnssrvnoa"), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")}, - expectError: nil, - }, - { - address: "foo+bar+http://foo.bar:3289", - expectQueryType: dns.QType("foo+bar"), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http"}, - expectError: nil, - }, - } - - for _, d := range tData { - q, u, e := parseAlertmanagerAddress(d.address) - testutil.Equals(t, d.expectError, e) - testutil.Equals(t, d.expectUrl, u) - testutil.Equals(t, d.expectQueryType, q) - } -} diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go new file mode 100644 index 0000000000..6769cd4fed --- /dev/null +++ b/pkg/alertmanager/alertmanager.go @@ -0,0 +1,127 @@ +package alertmanager + +import ( + "context" + "net" + "net/url" + "strconv" + "strings" + "sync" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/discovery/dns" +) + +const ( + defaultAlertmanagerPort = 9093 +) + +// Alertmanager replica URLs to push firing alerts. Ruler claims success if +// push to at least one alertmanager from discovered succeeds. The scheme +//should not be empty e.g `http` might be used. The scheme may be prefixed +//with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective +//DNS lookups. The port defaults to 9093 or the SRV record's value. +//The URL path is used as a prefix for the regular Alertmanager API path. +type AlertManager interface { + // Gets the address of the configured alertmanager + Get() []*url.URL + + // Update and parse the raw url + Update(ctx context.Context) error +} + +type alertmanagerSet struct { + resolver dns.Resolver + addrs []string + mtx sync.Mutex + current []*url.URL +} + +func NewAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet { + return &alertmanagerSet{ + resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)), + addrs: addrs, + } +} + +// Gets the address of the configured alertmanager +func (s *alertmanagerSet) Get() []*url.URL { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.current +} + +// Update and parse the raw url +func (s *alertmanagerSet) Update(ctx context.Context) error { + var result []*url.URL + for _, addr := range s.addrs { + var ( + qtype dns.QType + resolvedHosts []string + ) + + qtype, u, err := parseAlertmanagerAddress(addr) + if err != nil { + return errors.Wrapf(err, "parse URL %q", addr) + } + + // Get only the host and resolve it if needed. + host := u.Host + if qtype != "" { + if qtype == dns.A { + _, _, err = net.SplitHostPort(host) + if err != nil { + // The host could be missing a port. Append the defaultAlertmanagerPort. + host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) + } + } + resolvedHosts, err = s.resolver.Resolve(ctx, host, qtype) + if err != nil { + return errors.Wrap(err, "alertmanager resolve") + } + } else { + resolvedHosts = []string{host} + } + + for _, host := range resolvedHosts { + result = append(result, &url.URL{ + Scheme: u.Scheme, + Host: host, + Path: u.Path, + User: u.User, + }) + } + } + + s.mtx.Lock() + s.current = result + s.mtx.Unlock() + + return nil +} + +func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) { + qType = "" + parsedUrl, err = url.Parse(addr) + if err != nil { + return qType, nil, err + } + + // The Scheme might contain DNS resolver type separated by + so we split it a part. + if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 { + parsedUrl.Scheme = schemeParts[len(schemeParts)-1] + qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+")) + } + + switch parsedUrl.Scheme { + case "http", "https": + case "": + return "", nil, errors.New("The scheme should not be empty, e.g `http` or `https`") + default: + return "", nil, errors.New("Scheme should be `http` or `https`") + } + + return qType, parsedUrl, err +} diff --git a/pkg/alertmanager/alertmanager_test.go b/pkg/alertmanager/alertmanager_test.go new file mode 100644 index 0000000000..160a3bcead --- /dev/null +++ b/pkg/alertmanager/alertmanager_test.go @@ -0,0 +1,104 @@ +package alertmanager + +import ( + "context" + "net/url" + "testing" + + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestParseAlertmanagerAddress(t *testing.T) { + type expected struct { + hasErr bool + qtype dns.QType + url *url.URL + } + tests := []struct { + addr string + expected expected + }{ + // no schema or no support schema + {"alertmanager", expected{hasErr: true}}, + {"alertmanager:9093", expected{hasErr: true}}, + {"tcp://alertmanager:9093", expected{hasErr: true}}, + + // correct cases + {"http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType(""), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + {"dns+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dns"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + {"dnssrv+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrv"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + {"dnssrvnoa+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrvnoa"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + } + for _, tt := range tests { + gotQType, gotParsedUrl, err := parseAlertmanagerAddress(tt.addr) + if tt.expected.hasErr { + testutil.NotOk(t, err) + } else { + testutil.Ok(t, err) + } + testutil.Equals(t, tt.expected.qtype, gotQType) + testutil.Equals(t, tt.expected.url, gotParsedUrl) + } +} + +type mockResolver struct { + resultIPs map[string][]string + err error +} + +func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) { + if m.err != nil { + return nil, m.err + } + if res, ok := m.resultIPs[name]; ok { + return res, nil + } + return nil, errors.Errorf("mockResolver not found response for name: %s", name) +} + +func TestRuleAlertmanagerResolveWithPort(t *testing.T) { + mockResolver := mockResolver{ + resultIPs: map[string][]string{ + "alertmanager.com:19093": {"1.1.1.1:9300"}, + }, + } + + am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}} + + ctx := context.TODO() + err := am.Update(ctx) + testutil.Ok(t, err) + + expected := []*url.URL{ + { + Scheme: "http", + Host: "1.1.1.1:9300", + }, + } + gotURLs := am.Get() + testutil.Equals(t, expected, gotURLs) +} + +func TestRuleAlertmanagerResolveWithoutPort(t *testing.T) { + mockResolver := mockResolver{ + resultIPs: map[string][]string{ + "alertmanager.com:9093": {"1.1.1.1:9300"}, + }, + } + am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}} + + ctx := context.TODO() + err := am.Update(ctx) + testutil.Ok(t, err) + + expected := []*url.URL{ + { + Scheme: "http", + Host: "1.1.1.1:9300", + }, + } + gotURLs := am.Get() + testutil.Equals(t, expected, gotURLs) +}