Skip to content

Commit

Permalink
fix(promhandler): support limit parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Aug 12, 2024
1 parent ec12e54 commit db7462d
Showing 1 changed file with 44 additions and 6 deletions.
50 changes: 44 additions & 6 deletions internal/promhandler/promhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func NewPromAPI(
}
}

var errResultTruncated = errors.New("results truncated due to limit")

// GetLabelValues implements getLabelValues operation.
// GET /api/v1/label/{label}/values
func (h *PromAPI) GetLabelValues(ctx context.Context, params promapi.GetLabelValuesParams) (*promapi.LabelValuesResponse, error) {
Expand All @@ -72,6 +74,7 @@ func (h *PromAPI) GetLabelValues(ctx context.Context, params promapi.GetLabelVal
if err != nil {
return nil, validationErr("parse match", err)
}
hints := &storage.LabelHints{Limit: params.Limit.Or(0)}

q, err := h.querier(ctx, mint, maxt)
if err != nil {
Expand All @@ -88,10 +91,14 @@ func (h *PromAPI) GetLabelValues(ctx context.Context, params promapi.GetLabelVal
matchers = sets[0]
}

values, annots, err := q.LabelValues(ctx, params.Label, matchers...)
values, annots, err := q.LabelValues(ctx, params.Label, hints, matchers...)
if err != nil {
return nil, executionErr("get label values", err)
}
if l := params.Limit.Or(-1); l > 0 && len(values) >= l {
values = values[:l]
annots.Add(errResultTruncated)
}

warnings, infos := annots.AsStrings("", 0, 0)
return &promapi.LabelValuesResponse{
Expand All @@ -114,7 +121,7 @@ func (h *PromAPI) GetLabelValues(ctx context.Context, params promapi.GetLabelVal
grp.Go(func() error {
ctx := grpCtx

vals, w, err := q.LabelValues(ctx, params.Label, set...)
vals, w, err := q.LabelValues(ctx, params.Label, hints, set...)
if err != nil {
return err
}
Expand All @@ -135,6 +142,12 @@ func (h *PromAPI) GetLabelValues(ctx context.Context, params promapi.GetLabelVal

data := maps.Keys(dedup)
slices.Sort(data)
// Truncating data AFTER reading whole data into memory is suboptimal, yet
// it makes output consistent.
if l := params.Limit.Or(-1); l > 0 && len(data) >= l {
data = data[:l]
annots.Add(errResultTruncated)
}

warnings, infos := annots.AsStrings("", 0, 0)
return &promapi.LabelValuesResponse{
Expand All @@ -161,6 +174,7 @@ func (h *PromAPI) GetLabels(ctx context.Context, params promapi.GetLabelsParams)
if err != nil {
return nil, validationErr("parse match", err)
}
hints := &storage.LabelHints{Limit: params.Limit.Or(0)}

q, err := h.querier(ctx, mint, maxt)
if err != nil {
Expand All @@ -177,10 +191,14 @@ func (h *PromAPI) GetLabels(ctx context.Context, params promapi.GetLabelsParams)
matchers = sets[0]
}

values, annots, err := q.LabelNames(ctx, matchers...)
values, annots, err := q.LabelNames(ctx, hints, matchers...)
if err != nil {
return nil, executionErr("label names", err)
}
if l := params.Limit.Or(-1); l > 0 && len(values) >= l {
values = values[:l]
annots.Add(errResultTruncated)
}

warnings, infos := annots.AsStrings("", 0, 0)
return &promapi.LabelsResponse{
Expand All @@ -203,7 +221,7 @@ func (h *PromAPI) GetLabels(ctx context.Context, params promapi.GetLabelsParams)
grp.Go(func() error {
ctx := grpCtx

vals, w, err := q.LabelNames(ctx, set...)
vals, w, err := q.LabelNames(ctx, hints, set...)
if err != nil {
return err
}
Expand All @@ -224,6 +242,12 @@ func (h *PromAPI) GetLabels(ctx context.Context, params promapi.GetLabelsParams)

data := maps.Keys(dedup)
slices.Sort(data)
// Truncating data AFTER reading whole data into memory is suboptimal, yet
// it makes output consistent.
if l := params.Limit.Or(-1); l > 0 && len(data) >= l {
data = data[:l]
annots.Add(errResultTruncated)
}

warnings, infos := annots.AsStrings("", 0, 0)
return &promapi.LabelsResponse{
Expand Down Expand Up @@ -460,6 +484,7 @@ func (h *PromAPI) GetSeries(ctx context.Context, params promapi.GetSeriesParams)

var result storage.SeriesSet
if osq, ok := q.(metricstorage.OptimizedSeriesQuerier); ok {
// TODO(tdakkota): pass limit.
result = osq.OnlySeries(ctx, false, mint.UnixMilli(), maxt.UnixMilli(), matchers...)
} else {
result, err = h.querySeries(ctx, q, mint, maxt, matchers, params)
Expand All @@ -468,8 +493,19 @@ func (h *PromAPI) GetSeries(ctx context.Context, params promapi.GetSeriesParams)
}
}

var data []promapi.LabelSet
var (
data []promapi.LabelSet
annots = result.Warnings()

limit = params.Limit.Or(-1)
)
for result.Next() {
if limit > 0 && len(data) >= limit {
data = data[:limit]
annots.Add(errResultTruncated)
break
}

series := result.At()
data = append(data, series.Labels().Map())
}
Expand All @@ -486,7 +522,8 @@ func (h *PromAPI) GetSeries(ctx context.Context, params promapi.GetSeriesParams)
}, nil
}

func (h *PromAPI) querySeries(ctx context.Context,
func (h *PromAPI) querySeries(
ctx context.Context,
q storage.Querier,
mint, maxt time.Time,
matchers [][]*labels.Matcher,
Expand All @@ -496,6 +533,7 @@ func (h *PromAPI) querySeries(ctx context.Context,
hints = &storage.SelectHints{
Start: mint.UnixMilli(),
End: maxt.UnixMilli(),
Limit: params.Limit.Or(0),
Func: "series",
}
result storage.SeriesSet
Expand Down

0 comments on commit db7462d

Please sign in to comment.