Skip to content

Commit

Permalink
Relay histograms from the gRPC Query API
Browse files Browse the repository at this point in the history
Native histograms are currently not returned by the gRPC Query API.

This commit fixes that.
  • Loading branch information
fpetkovski committed Mar 2, 2023
1 parent fdeea39 commit d181add
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 41 deletions.
12 changes: 8 additions & 4 deletions pkg/api/query/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
}
case promql.Vector:
for _, sample := range vector {
floats, histograms := prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point})
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: floats,
Histograms: histograms,
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
Expand Down Expand Up @@ -205,9 +207,11 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
switch matrix := result.Value.(type) {
case promql.Matrix:
for _, series := range matrix {
floats, histograms := prompb.SamplesFromPromqlPoints(series.Points)
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: floats,
Histograms: histograms,
}
if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/query/remote_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

// Opts are the options for a PromQL query.
Expand Down Expand Up @@ -192,6 +193,12 @@ func (r *remoteQuery) Exec(ctx context.Context) *promql.Result {
V: s.Value,
})
}
for _, h := range ts.Histograms {
series.Points = append(series.Points, promql.Point{
T: h.Timestamp,
H: prompb.HistogramProtoToFloatHistogram(h),
})
}
result = append(result, series)
}
level.Debug(r.logger).Log("Executed query", "query", r.qs, "time", time.Since(start))
Expand Down
3 changes: 1 addition & 2 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/prometheus/prometheus/tsdb"

"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

Expand Down Expand Up @@ -134,7 +133,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
}

for _, hp := range t.Histograms {
h := storepb.HistogramProtoToHistogram(hp)
h := prompb.HistogramProtoToHistogram(hp)
ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, h, nil)
switch err {
case storage.ErrOutOfOrderSample:
Expand Down
28 changes: 0 additions & 28 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import (

"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"

"github.com/thanos-io/thanos/pkg/store/labelpb"
prompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb"
)

var PartialResponseStrategyValues = func() []string {
Expand Down Expand Up @@ -554,29 +552,3 @@ func (m *QueryHints) IsSafeToExecute() bool {

return false
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
func HistogramProtoToHistogram(hp prompb.Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

func spansProtoToSpans(s []*prompb.BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}
93 changes: 86 additions & 7 deletions pkg/store/storepb/prompb/samples.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package prompb

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/promql"
)

Expand All @@ -24,14 +25,92 @@ func SamplesFromSamplePairs(samples []model.SamplePair) []Sample {

// SamplesFromPromqlPoints converts a slice of promql.Point
// to a slice of Sample.
func SamplesFromPromqlPoints(samples []promql.Point) []Sample {
result := make([]Sample, 0, len(samples))
func SamplesFromPromqlPoints(samples []promql.Point) ([]Sample, []Histogram) {
floats := make([]Sample, 0, len(samples))
histograms := make([]Histogram, 0, len(samples))
for _, s := range samples {
result = append(result, Sample{
Value: s.V,
Timestamp: s.T,
})
if s.H == nil {
floats = append(floats, Sample{
Value: s.V,
Timestamp: s.T,
})
} else {
histograms = append(histograms, FloatHistogramToHistogramProto(s.T, s.H))
}
}

return result
return floats, histograms
}

// HistogramProtoToHistogram extracts a (normal integer) Histogram from the
// provided proto message. The caller has to make sure that the proto message
// represents an interger histogram and not a float histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L529-L542.
func HistogramProtoToHistogram(hp Histogram) *histogram.Histogram {
return &histogram.Histogram{
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountInt(),
Count: hp.GetCountInt(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveDeltas(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeDeltas(),
}
}

// FloatHistogramToHistogramProto converts a float histogram to a protobuf type.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L587-L601.
func FloatHistogramToHistogramProto(timestamp int64, fh *histogram.FloatHistogram) Histogram {
return Histogram{
Count: &Histogram_CountFloat{CountFloat: fh.Count},
Sum: fh.Sum,
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
ZeroCount: &Histogram_ZeroCountFloat{ZeroCountFloat: fh.ZeroCount},
NegativeSpans: spansToSpansProto(fh.NegativeSpans),
NegativeCounts: fh.NegativeBuckets,
PositiveSpans: spansToSpansProto(fh.PositiveSpans),
PositiveCounts: fh.PositiveBuckets,
ResetHint: Histogram_ResetHint(fh.CounterResetHint),
Timestamp: timestamp,
}
}

// HistogramProtoToFloatHistogram extracts a (normal integer) Histogram from the
// provided proto message to a Float Histogram. The caller has to make sure that
// the proto message represents an float histogram and not a integer histogram.
// Taken from https://github.com/prometheus/prometheus/blob/d33eb3ab17616a54b97d9f7791c791a79823f279/storage/remote/codec.go#L547-L560.
func HistogramProtoToFloatHistogram(hp Histogram) *histogram.FloatHistogram {
return &histogram.FloatHistogram{
CounterResetHint: histogram.CounterResetHint(hp.ResetHint),
Schema: hp.Schema,
ZeroThreshold: hp.ZeroThreshold,
ZeroCount: hp.GetZeroCountFloat(),
Count: hp.GetCountFloat(),
Sum: hp.Sum,
PositiveSpans: spansProtoToSpans(hp.GetPositiveSpans()),
PositiveBuckets: hp.GetPositiveCounts(),
NegativeSpans: spansProtoToSpans(hp.GetNegativeSpans()),
NegativeBuckets: hp.GetNegativeCounts(),
}
}

func spansToSpansProto(s []histogram.Span) []*BucketSpan {
spans := make([]*BucketSpan, len(s))
for i := 0; i < len(s); i++ {
spans[i] = &BucketSpan{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

func spansProtoToSpans(s []*BucketSpan) []histogram.Span {
spans := make([]histogram.Span, len(s))
for i := 0; i < len(s); i++ {
spans[i] = histogram.Span{Offset: s[i].Offset, Length: s[i].Length}
}

return spans
}

0 comments on commit d181add

Please sign in to comment.