Skip to content

Commit

Permalink
Moved client prometheus code to separate promclient package.
Browse files Browse the repository at this point in the history
Nothing changed, except fixed IsWALDirAccessible (as WAL is actually a dir).
Otherwise just moved code.

Signed-off-by: Bartek Plotka <[email protected]>
  • Loading branch information
bwplotka committed Jan 14, 2019
1 parent 1b82733 commit 51da5d9
Show file tree
Hide file tree
Showing 8 changed files with 330 additions and 253 deletions.
120 changes: 3 additions & 117 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package main

import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"net"
Expand All @@ -19,16 +17,16 @@ import (
"syscall"
"time"

"github.com/improbable-eng/thanos/pkg/extprom"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/promclient"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
"github.com/improbable-eng/thanos/pkg/store"
Expand All @@ -39,7 +37,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand Down Expand Up @@ -271,7 +268,7 @@ func runRule(
removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
vec, err := queryPrometheusInstant(ctx, logger, addrs[i], q, t)
vec, err := promclient.QueryInstant(ctx, logger, addrs[i], q, t)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -606,117 +603,6 @@ func runRule(
return nil
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separatelly.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
var (
// Do not specify exact length of the expected slice since JSON unmarshaling
// would make the leght fit the size and we won't be able to check the length afterwards.
resultPointSlice []json.RawMessage
resultTime model.Time
resultValue model.SampleValue
)
if err := json.Unmarshal(scalarJSONResult, &resultPointSlice); err != nil {
return nil, err
}
if len(resultPointSlice) != 2 {
return nil, errors.Errorf("invalid scalar result format %v, expected timestamp -> value tuple", resultPointSlice)
}
if err := json.Unmarshal(resultPointSlice[0], &resultTime); err != nil {
return nil, errors.Wrapf(err, "unmarshaling scalar time from %v", resultPointSlice)
}
if err := json.Unmarshal(resultPointSlice[1], &resultValue); err != nil {
return nil, errors.Wrapf(err, "unmarshaling scalar value from %v", resultPointSlice)
}
return model.Vector{&model.Sample{
Metric: model.Metric{},
Value: resultValue,
Timestamp: resultTime}}, nil
}

func queryPrometheusInstant(ctx context.Context, logger log.Logger, addr, query string, t time.Time) (promql.Vector, error) {
u, err := url.Parse(fmt.Sprintf("http://%s/api/v1/query", addr))
if err != nil {
return nil, err
}
params := url.Values{}
params.Add("query", query)
params.Add("time", t.Format(time.RFC3339Nano))
params.Add("dedup", "true")
u.RawQuery = params.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}

span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]")
defer span.Finish()

req = req.WithContext(ctx)

client := &http.Client{
Transport: tracing.HTTPTripperware(logger, http.DefaultTransport),
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
var m struct {
Data struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
} `json:"data"`
}

if err = json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, err
}

var vectorResult model.Vector

// Decode the Result depending on the ResultType
// Currently only `vector` and `scalar` types are supported
switch m.Data.ResultType {
case promql.ValueTypeVector:
if err = json.Unmarshal(m.Data.Result, &vectorResult); err != nil {
return nil, err
}
case promql.ValueTypeScalar:
vectorResult, err = convertScalarJSONToVector(m.Data.Result)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType)
}

vec := make(promql.Vector, 0, len(vectorResult))

for _, e := range vectorResult {
lset := make(promlabels.Labels, 0, len(e.Metric))

for k, v := range e.Metric {
lset = append(lset, promlabels.Label{
Name: string(k),
Value: string(v),
})
}
sort.Sort(lset)

vec = append(vec, promql.Sample{
Metric: lset,
Point: promql.Point{T: int64(e.Timestamp), V: float64(e.Value)},
})
}

return vec, nil
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
Expand Down
31 changes: 2 additions & 29 deletions cmd/thanos/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,12 @@ import (
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

func TestRule_UnmarshalScalarResponse(t *testing.T) {
var (
scalarJSONResult = []byte(`[1541196373.677,"1"]`)
invalidLengthScalarJSONResult = []byte(`[1541196373.677,"1", "nonsence"]`)
invalidDataScalarJSONResult = []byte(`["foo","bar"]`)

vectorResult model.Vector
expectedVector = model.Vector{&model.Sample{
Metric: model.Metric{},
Value: 1,
Timestamp: model.Time(1541196373677)}}
)
// Test valid input.
vectorResult, err := convertScalarJSONToVector(scalarJSONResult)
testutil.Ok(t, err)
testutil.Equals(t, vectorResult.String(), expectedVector.String())

// Test invalid length of scalar data structure.
vectorResult, err = convertScalarJSONToVector(invalidLengthScalarJSONResult)
testutil.NotOk(t, err)

// Test invalid format of scalar data.
vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:9093": []string{"1.1.1.1:9300"},
"alertmanager.com:9093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}}
Expand All @@ -62,7 +35,7 @@ func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) {
func TestRule_AlertmanagerResolveWithPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:19093": []string{"1.1.1.1:9300"},
"alertmanager.com:19093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}}
Expand Down
67 changes: 3 additions & 64 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ package main

import (
"context"
"encoding/json"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"sync"
"time"

Expand All @@ -19,6 +14,7 @@ import (
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/promclient"
"github.com/improbable-eng/thanos/pkg/reloader"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
Expand All @@ -31,7 +27,6 @@ import (
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
)

func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) {
Expand Down Expand Up @@ -253,7 +248,7 @@ func runSidecar(
}
}()

if err := isPrometheusDirAccesible(dataDir); err != nil {
if err := promclient.IsWALDirAccesible(dataDir); err != nil {
level.Error(logger).Log("err", err)
}

Expand Down Expand Up @@ -286,20 +281,6 @@ func runSidecar(
return nil
}

func isPrometheusDirAccesible(dir string) error {
const errMsg = "WAL file is not accessible. Does block shipper dir is a TSDB directory? If yes it is shared with TSDB?"
f, err := os.Stat(filepath.Join(dir, "WAL"))
if err != nil {
return errors.Wrap(err, errMsg)
}

if f.IsDir() {
return errors.New(errMsg)
}

return nil
}

type metadata struct {
promURL *url.URL

Expand All @@ -310,7 +291,7 @@ type metadata struct {
}

func (s *metadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
elset, err := queryExternalLabels(ctx, logger, s.promURL)
elset, err := promclient.ExternalLabels(ctx, logger, s.promURL)
if err != nil {
return err
}
Expand Down Expand Up @@ -357,45 +338,3 @@ func (s *metadata) Timestamps() (mint int64, maxt int64) {

return s.mint, s.maxt
}

func queryExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labels.Labels, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/status/config")

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "create request")
}
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, errors.Wrapf(err, "request config against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("failed to read body")
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

var d struct {
Data struct {
YAML string `json:"yaml"`
} `json:"data"`
}
if err := json.Unmarshal(b, &d); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(b))
}
var cfg struct {
Global struct {
ExternalLabels map[string]string `yaml:"external_labels"`
} `yaml:"global"`
}
if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil {
return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML)
}
return labels.FromMap(cfg.Global.ExternalLabels), nil
}
38 changes: 0 additions & 38 deletions cmd/thanos/sidecar_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/extprom/extprom.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package extprom is covering code that is used for extending native Prometheus packages functionality.

package extprom

import (
Expand Down
Loading

0 comments on commit 51da5d9

Please sign in to comment.