Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relay histograms from the gRPC Query API #6178

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
}
case promql.Vector:
for _, sample := range vector {
floats, histograms := prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point})
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: floats,
Histograms: histograms,
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
Expand Down Expand Up @@ -205,9 +207,11 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
switch matrix := result.Value.(type) {
case promql.Matrix:
for _, series := range matrix {
floats, histograms := prompb.SamplesFromPromqlPoints(series.Points)
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: floats,
Histograms: histograms,
}
if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

// Opts are the options for a PromQL query.
Expand Down Expand Up @@ -192,6 +193,12 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
V: s.Value,
})
}
for _, h := range ts.Histograms {
series.Points = append(series.Points, promql.Point{
T: h.Timestamp,
H: prompb.HistogramProtoToFloatHistogram(h),
})
}
result = append(result, series)
}
level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start))
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand Down Expand Up @@ -134,7 +133,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
}

for _, hp := range t.Histograms {
h := storepb.HistogramProtoToHistogram(hp)
h := prompb.HistogramProtoToHistogram(hp)
ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, nil)
switch err {
case storage.ErrOutOfOrderSample:
Expand Down
28 changes: 0 additions & 28 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import (

"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"

"github.com/thanos-io/thanos/pkg/store/labelpb"
prompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

var PartialResponseStrategyValues = func() []string {
Expand Down Expand Up @@ -554,29 +552,3 @@ func (m *QueryHints) IsSafeToExecute() bool {

return false
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}
93 changes: 86 additions & 7 deletions pkg/store/storepb/prompb/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package prompb

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
)

Expand All @@ -24,14 +25,92 @@ func SamplesFromSamplePairs(samples []model.SamplePair) []Sample {

// SamplesFromPromqlPoints converts a slice of promql.Point
// to a slice of Sample.
func SamplesFromPromqlPoints(samples []promql.Point) []Sample {
result := make([]Sample, 0, len(samples))
func SamplesFromPromqlPoints(samples []promql.Point) ([]Sample, []Histogram) {
floats := make([]Sample, 0, len(samples))
histograms := make([]Histogram, 0, len(samples))
for _, s := range samples {
result = append(result, Sample{
Value: s.V,
Timestamp: s.T,
})
if s.H == nil {
floats = append(floats, Sample{
Value: s.V,
Timestamp: s.T,
})
} else {
histograms = append(histograms, FloatHistogramToHistogramProto(s.T, s.H))
}
}

return result
return floats, histograms
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L529-L542.
func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

// FloatHistogramToHistogramProto converts a float histogram to a protobuf type.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L587-L601.
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}

// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents an float histogram and not a integer histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L547-L560.
func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}

func spansToSpansProto(s []histogram.Span) []*BucketSpan {
spans := make([]*BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

func spansProtoToSpans(s []*BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}