Skip to content

Commit

Permalink
packaging alertmanager sets and add a tests.
Browse files Browse the repository at this point in the history
Signed-off-by: johncming <[email protected]>
  • Loading branch information
johncming committed Dec 15, 2019
1 parent a0e1771 commit d34701d
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 187 deletions.
93 changes: 4 additions & 89 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"os"
Expand All @@ -13,7 +12,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand All @@ -33,6 +31,7 @@ import (
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/util/strutil"
"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/alertmanager"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -288,7 +287,7 @@ func runRule(

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertmgrs = alertmanager.NewAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
)
Expand Down Expand Up @@ -353,7 +352,7 @@ func runRule(
}
{
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
sdr := alert.NewSender(logger, reg, alertmgrs.get, nil, alertmgrsTimeout)
sdr := alert.NewSender(logger, reg, alertmgrs.Get, nil, alertmgrsTimeout)
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
Expand All @@ -375,7 +374,7 @@ func runRule(

g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if err := alertmgrs.update(ctx); err != nil {
if err := alertmgrs.Update(ctx); err != nil {
level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err)
alertMngrAddrResolutionErrors.Inc()
}
Expand Down Expand Up @@ -615,90 +614,6 @@ func runRule(
return nil
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
mtx sync.Mutex
current []*url.URL
}

func newAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet {
return &alertmanagerSet{
resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)),
addrs: addrs,
}
}

func (s *alertmanagerSet) get() []*url.URL {
s.mtx.Lock()
defer s.mtx.Unlock()
return s.current
}

const defaultAlertmanagerPort = 9093

func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) {
qType = ""
parsedUrl, err = url.Parse(addr)
if err != nil {
return qType, nil, err
}
// The Scheme might contain DNS resolver type separated by + so we split it a part.
if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 {
parsedUrl.Scheme = schemeParts[len(schemeParts)-1]
qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+"))
}
return qType, parsedUrl, err
}

func (s *alertmanagerSet) update(ctx context.Context) error {
var result []*url.URL
for _, addr := range s.addrs {
var (
qtype dns.QType
resolvedDomain []string
)

qtype, u, err := parseAlertmanagerAddress(addr)
if err != nil {
return errors.Wrapf(err, "parse URL %q", addr)
}

// Get only the host and resolve it if needed.
host := u.Host
if qtype != "" {
if qtype == dns.A {
_, _, err = net.SplitHostPort(host)
if err != nil {
// The host could be missing a port. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
resolvedDomain, err = s.resolver.Resolve(ctx, host, qtype)
if err != nil {
return errors.Wrap(err, "alertmanager resolve")
}
} else {
resolvedDomain = []string{host}
}

for _, resolved := range resolvedDomain {
result = append(result, &url.URL{
Scheme: u.Scheme,
Host: resolved,
Path: u.Path,
User: u.User,
})
}
}

s.mtx.Lock()
s.current = result
s.mtx.Unlock()

return nil
}

func parseFlagLabels(s []string) (labels.Labels, error) {
var lset labels.Labels
for _, l := range s {
Expand Down
98 changes: 0 additions & 98 deletions cmd/thanos/rule_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package main

import (
"context"
"net/url"
"testing"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -49,97 +45,3 @@ func Test_parseFlagLabels(t *testing.T) {
testutil.Equals(t, err != nil, td.expectErr)
}
}

func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:9093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}}

ctx := context.TODO()
err := am.update(ctx)
testutil.Ok(t, err)

expected := []*url.URL{
{
Scheme: "http",
Host: "1.1.1.1:9300",
},
}
gotURLs := am.get()
testutil.Equals(t, expected, gotURLs)
}

func TestRule_AlertmanagerResolveWithPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:19093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}}

ctx := context.TODO()
err := am.update(ctx)
testutil.Ok(t, err)

expected := []*url.URL{
{
Scheme: "http",
Host: "1.1.1.1:9300",
},
}
gotURLs := am.get()
testutil.Equals(t, expected, gotURLs)
}

type mockResolver struct {
resultIPs map[string][]string
err error
}

func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) {
if m.err != nil {
return nil, m.err
}
if res, ok := m.resultIPs[name]; ok {
return res, nil
}
return nil, errors.Errorf("mockResolver not found response for name: %s", name)
}

func Test_ParseAlertmanagerAddress(t *testing.T) {
var tData = []struct {
address string
expectQueryType dns.QType
expectUrl *url.URL
expectError error
}{
{
address: "http://user:[email protected]:3289",
expectQueryType: dns.QType(""),
expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")},
expectError: nil,
},
{
address: "dnssrvnoa+http://user:[email protected]:3289",
expectQueryType: dns.QType("dnssrvnoa"),
expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")},
expectError: nil,
},
{
address: "foo+bar+http://foo.bar:3289",
expectQueryType: dns.QType("foo+bar"),
expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http"},
expectError: nil,
},
}

for _, d := range tData {
q, u, e := parseAlertmanagerAddress(d.address)
testutil.Equals(t, d.expectError, e)
testutil.Equals(t, d.expectUrl, u)
testutil.Equals(t, d.expectQueryType, q)
}
}
127 changes: 127 additions & 0 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package alertmanager

import (
"context"
"net"
"net/url"
"strconv"
"strings"
"sync"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/discovery/dns"
)

const (
defaultAlertmanagerPort = 9093
)

// Alertmanager replica URLs to push firing alerts. Ruler claims success if
// push to at least one alertmanager from discovered succeeds. The scheme
//should not be empty e.g `http` might be used. The scheme may be prefixed
//with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective
//DNS lookups. The port defaults to 9093 or the SRV record's value.
//The URL path is used as a prefix for the regular Alertmanager API path.
type AlertManager interface {
// Gets the address of the configured alertmanager
Get() []*url.URL

// Update and parse the raw url
Update(ctx context.Context) error
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
mtx sync.Mutex
current []*url.URL
}

func NewAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet {
return &alertmanagerSet{
resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)),
addrs: addrs,
}
}

// Gets the address of the configured alertmanager
func (s *alertmanagerSet) Get() []*url.URL {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.current
}

// Update and parse the raw url
func (s *alertmanagerSet) Update(ctx context.Context) error {
var result []*url.URL
for _, addr := range s.addrs {
var (
qtype dns.QType
resolvedHosts []string
)

qtype, u, err := parseAlertmanagerAddress(addr)
if err != nil {
return errors.Wrapf(err, "parse URL %q", addr)
}

// Get only the host and resolve it if needed.
host := u.Host
if qtype != "" {
if qtype == dns.A {
_, _, err = net.SplitHostPort(host)
if err != nil {
// The host could be missing a port. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
resolvedHosts, err = s.resolver.Resolve(ctx, host, qtype)
if err != nil {
return errors.Wrap(err, "alertmanager resolve")
}
} else {
resolvedHosts = []string{host}
}

for _, host := range resolvedHosts {
result = append(result, &url.URL{
Scheme: u.Scheme,
Host: host,
Path: u.Path,
User: u.User,
})
}
}

s.mtx.Lock()
s.current = result
s.mtx.Unlock()

return nil
}

func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) {
qType = ""
parsedUrl, err = url.Parse(addr)
if err != nil {
return qType, nil, err
}

// The Scheme might contain DNS resolver type separated by + so we split it a part.
if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 {
parsedUrl.Scheme = schemeParts[len(schemeParts)-1]
qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+"))
}

switch parsedUrl.Scheme {
case "http", "https":
case "":
return "", nil, errors.New("The scheme should not be empty, e.g `http` or `https`")
default:
return "", nil, errors.New("Scheme should be `http` or `https`")
}

return qType, parsedUrl, err
}
Loading

0 comments on commit d34701d

Please sign in to comment.