Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sync part2] Moved client prometheus code to separate promclient package. #729

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 47 additions & 47 deletions Gopkg.lock

Large diffs are not rendered by default.

125 changes: 9 additions & 116 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
Expand All @@ -28,6 +27,7 @@ import (
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/promclient"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
"github.com/improbable-eng/thanos/pkg/store"
Expand All @@ -38,7 +38,6 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/discovery/file"
"github.com/prometheus/prometheus/discovery/targetgroup"
Expand Down Expand Up @@ -273,16 +272,21 @@ func runRule(
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
vec, err := queryPrometheusInstant(ctx, logger, addrs[i], q, t)
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}
return vec, nil

span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]")
v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true)
span.Finish()
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}
Expand Down Expand Up @@ -628,117 +632,6 @@ func runRule(
return nil
}

// Scalar response consists of array with mixed types so it needs to be
// unmarshaled separatelly.
func convertScalarJSONToVector(scalarJSONResult json.RawMessage) (model.Vector, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those two are 1:1 moved, nothing changed.

var (
// Do not specify exact length of the expected slice since JSON unmarshaling
// would make the leght fit the size and we won't be able to check the length afterwards.
resultPointSlice []json.RawMessage
resultTime model.Time
resultValue model.SampleValue
)
if err := json.Unmarshal(scalarJSONResult, &resultPointSlice); err != nil {
return nil, err
}
if len(resultPointSlice) != 2 {
return nil, errors.Errorf("invalid scalar result format %v, expected timestamp -> value tuple", resultPointSlice)
}
if err := json.Unmarshal(resultPointSlice[0], &resultTime); err != nil {
return nil, errors.Wrapf(err, "unmarshaling scalar time from %v", resultPointSlice)
}
if err := json.Unmarshal(resultPointSlice[1], &resultValue); err != nil {
return nil, errors.Wrapf(err, "unmarshaling scalar value from %v", resultPointSlice)
}
return model.Vector{&model.Sample{
Metric: model.Metric{},
Value: resultValue,
Timestamp: resultTime}}, nil
}

func queryPrometheusInstant(ctx context.Context, logger log.Logger, addr, query string, t time.Time) (promql.Vector, error) {
u, err := url.Parse(fmt.Sprintf("http://%s/api/v1/query", addr))
if err != nil {
return nil, err
}
params := url.Values{}
params.Add("query", query)
params.Add("time", t.Format(time.RFC3339Nano))
params.Add("dedup", "true")
u.RawQuery = params.Encode()

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, err
}

span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]")
defer span.Finish()

req = req.WithContext(ctx)

client := &http.Client{
Transport: tracing.HTTPTripperware(logger, http.DefaultTransport),
}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
var m struct {
Data struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
} `json:"data"`
}

if err = json.NewDecoder(resp.Body).Decode(&m); err != nil {
return nil, err
}

var vectorResult model.Vector

// Decode the Result depending on the ResultType
// Currently only `vector` and `scalar` types are supported
switch m.Data.ResultType {
case promql.ValueTypeVector:
if err = json.Unmarshal(m.Data.Result, &vectorResult); err != nil {
return nil, err
}
case promql.ValueTypeScalar:
vectorResult, err = convertScalarJSONToVector(m.Data.Result)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType)
}

vec := make(promql.Vector, 0, len(vectorResult))

for _, e := range vectorResult {
lset := make(promlabels.Labels, 0, len(e.Metric))

for k, v := range e.Metric {
lset = append(lset, promlabels.Label{
Name: string(k),
Value: string(v),
})
}
sort.Sort(lset)

vec = append(vec, promql.Sample{
Metric: lset,
Point: promql.Point{T: int64(e.Timestamp), V: float64(e.Value)},
})
}

return vec, nil
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
Expand Down
31 changes: 2 additions & 29 deletions cmd/thanos/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,12 @@ import (
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

func TestRule_UnmarshalScalarResponse(t *testing.T) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1:1 moved.

var (
scalarJSONResult = []byte(`[1541196373.677,"1"]`)
invalidLengthScalarJSONResult = []byte(`[1541196373.677,"1", "nonsence"]`)
invalidDataScalarJSONResult = []byte(`["foo","bar"]`)

vectorResult model.Vector
expectedVector = model.Vector{&model.Sample{
Metric: model.Metric{},
Value: 1,
Timestamp: model.Time(1541196373677)}}
)
// Test valid input.
vectorResult, err := convertScalarJSONToVector(scalarJSONResult)
testutil.Ok(t, err)
testutil.Equals(t, vectorResult.String(), expectedVector.String())

// Test invalid length of scalar data structure.
vectorResult, err = convertScalarJSONToVector(invalidLengthScalarJSONResult)
testutil.NotOk(t, err)

// Test invalid format of scalar data.
vectorResult, err = convertScalarJSONToVector(invalidDataScalarJSONResult)
testutil.NotOk(t, err)
}

func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:9093": []string{"1.1.1.1:9300"},
"alertmanager.com:9093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}}
Expand All @@ -62,7 +35,7 @@ func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) {
func TestRule_AlertmanagerResolveWithPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:19093": []string{"1.1.1.1:9300"},
"alertmanager.com:19093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}}
Expand Down
67 changes: 3 additions & 64 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ package main

import (
"context"
"encoding/json"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"sync"
"time"

Expand All @@ -19,6 +14,7 @@ import (
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/promclient"
"github.com/improbable-eng/thanos/pkg/reloader"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
Expand All @@ -31,7 +27,6 @@ import (
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
)

func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name string) {
Expand Down Expand Up @@ -253,7 +248,7 @@ func runSidecar(
}
}()

if err := isPrometheusDirAccesible(dataDir); err != nil {
if err := promclient.IsWALDirAccesible(dataDir); err != nil {
level.Error(logger).Log("err", err)
}

Expand Down Expand Up @@ -286,20 +281,6 @@ func runSidecar(
return nil
}

func isPrometheusDirAccesible(dir string) error {
const errMsg = "WAL directory is not accessible. Please ensure that the WAL / TSDB directory is accessible by the sidecar."
f, err := os.Stat(filepath.Join(dir, "wal"))
if err != nil {
return errors.Wrap(err, errMsg)
}

if !f.IsDir() {
return errors.New(errMsg)
}

return nil
}

type promMetadata struct {
promURL *url.URL

Expand All @@ -310,7 +291,7 @@ type promMetadata struct {
}

func (s *promMetadata) UpdateLabels(ctx context.Context, logger log.Logger) error {
elset, err := queryExternalLabels(ctx, logger, s.promURL)
elset, err := promclient.ExternalLabels(ctx, logger, s.promURL)
if err != nil {
return err
}
Expand Down Expand Up @@ -357,45 +338,3 @@ func (s *promMetadata) Timestamps() (mint int64, maxt int64) {

return s.mint, s.maxt
}

func queryExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labels.Labels, error) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1:1 moved.

u := *base
u.Path = path.Join(u.Path, "/api/v1/status/config")

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return nil, errors.Wrap(err, "create request")
}
resp, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return nil, errors.Wrapf(err, "request config against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Errorf("failed to read body")
}

if resp.StatusCode != 200 {
return nil, errors.Errorf("got non-200 response code: %v, response: %v", resp.StatusCode, string(b))
}

var d struct {
Data struct {
YAML string `json:"yaml"`
} `json:"data"`
}
if err := json.Unmarshal(b, &d); err != nil {
return nil, errors.Wrapf(err, "unmarshal response: %v", string(b))
}
var cfg struct {
Global struct {
ExternalLabels map[string]string `yaml:"external_labels"`
} `yaml:"global"`
}
if err := yaml.Unmarshal([]byte(d.Data.YAML), &cfg); err != nil {
return nil, errors.Wrapf(err, "parse Prometheus config: %v", d.Data.YAML)
}
return labels.FromMap(cfg.Global.ExternalLabels), nil
}
38 changes: 0 additions & 38 deletions cmd/thanos/sidecar_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/extprom/extprom.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Package extprom is covering code that is used for extending native Prometheus packages functionality.

package extprom

import (
Expand Down
Loading