Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add return error support to MetricsQueryRequest.WithStartEnd() #8392

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pkg/frontend/querymiddleware/cardinality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,37 +278,37 @@ func Test_cardinalityEstimateBucket_QueryRequest_requestEquality(t *testing.T) {
tenantA: "1",
tenantB: "1",
requestA: rangeQuery,
requestB: rangeQuery.WithStartEnd(rangeQuery.GetStart()+5*time.Minute.Milliseconds(), rangeQuery.GetEnd()+5*time.Minute.Milliseconds()),
requestB: mustSucceed(rangeQuery.WithStartEnd(rangeQuery.GetStart()+5*time.Minute.Milliseconds(), rangeQuery.GetEnd()+5*time.Minute.Milliseconds())),
expectedEqual: true,
},
{
name: "same tenant, same query with start time in different bucket",
tenantA: "1",
tenantB: "1",
requestA: rangeQuery,
requestB: rangeQuery.WithStartEnd(
requestB: mustSucceed(rangeQuery.WithStartEnd(
rangeQuery.GetStart()+2*cardinalityEstimateBucketSize.Milliseconds(),
rangeQuery.GetEnd()+2*cardinalityEstimateBucketSize.Milliseconds(),
),
)),
expectedEqual: false,
},
{
name: "same tenant, same query with start time in same bucket and range size in same bucket",
tenantA: "1",
tenantB: "1",
requestA: rangeQuery,
requestB: rangeQuery.WithStartEnd(rangeQuery.GetStart(), rangeQuery.GetEnd()+time.Second.Milliseconds()),
requestB: mustSucceed(rangeQuery.WithStartEnd(rangeQuery.GetStart(), rangeQuery.GetEnd()+time.Second.Milliseconds())),
expectedEqual: true,
},
{
name: "same tenant, same query with start time in same bucket and range size in different bucket",
tenantA: "1",
tenantB: "1",
requestA: rangeQuery,
requestB: rangeQuery.WithStartEnd(
requestB: mustSucceed(rangeQuery.WithStartEnd(
rangeQuery.GetStart()+5*time.Minute.Milliseconds(),
rangeQuery.GetEnd()+2*cardinalityEstimateBucketSize.Milliseconds(),
),
)),
expectedEqual: false,
},
// The following two test cases test consistent hashing of queries, which is used
Expand All @@ -318,21 +318,21 @@ func Test_cardinalityEstimateBucket_QueryRequest_requestEquality(t *testing.T) {
tenantA: "1",
tenantB: "1",
requestA: rangeQuerySum,
requestB: rangeQuerySum.WithStartEnd(
requestB: mustSucceed(rangeQuerySum.WithStartEnd(
rangeQuery.GetStart()+(cardinalityEstimateBucketSize/2).Milliseconds(),
rangeQuery.GetEnd()+(cardinalityEstimateBucketSize/2).Milliseconds(),
),
)),
expectedEqual: false,
},
{
name: "same tenant, same query with start time less than a bucket width apart and in the same bucket",
tenantA: "1",
tenantB: "1",
requestA: rangeQuery,
requestB: rangeQuery.WithStartEnd(
requestB: mustSucceed(rangeQuery.WithStartEnd(
rangeQuery.GetStart()+(cardinalityEstimateBucketSize/2).Milliseconds(),
rangeQuery.GetEnd()+(cardinalityEstimateBucketSize/2).Milliseconds(),
),
)),
expectedEqual: true,
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type MetricsQueryRequest interface {
WithID(id int64) MetricsQueryRequest
// WithStartEnd clone the current request with different start and end timestamp.
// Implementations must ensure minT and maxT are recalculated when the start and end timestamp change.
WithStartEnd(startTime int64, endTime int64) MetricsQueryRequest
WithStartEnd(startTime int64, endTime int64) (MetricsQueryRequest, error)
// WithQuery clones the current request with a different query; returns error if query parse fails.
// Implementations must ensure minT and maxT are recalculated when the query changes.
WithQuery(string) (MetricsQueryRequest, error)
Expand Down
12 changes: 10 additions & 2 deletions pkg/frontend/querymiddleware/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ func TestMetricsQuery_MinMaxTime(t *testing.T) {
}

func TestMetricsQuery_WithStartEnd_TransformConsistency(t *testing.T) {

startTime, err := time.Parse(time.RFC3339, "2024-02-21T00:00:00-08:00")
require.NoError(t, err)
endTime, err := time.Parse(time.RFC3339, "2024-02-22T00:00:00-08:00")
Expand Down Expand Up @@ -353,7 +352,8 @@ func TestMetricsQuery_WithStartEnd_TransformConsistency(t *testing.T) {
// apply WithStartEnd
newStart := testCase.updatedStartTime.UnixMilli()
newEnd := testCase.updatedEndTime.UnixMilli()
updatedMetricsQuery := testCase.initialMetricsQuery.WithStartEnd(newStart, newEnd)
updatedMetricsQuery, err := testCase.initialMetricsQuery.WithStartEnd(newStart, newEnd)
require.NoError(t, err)

require.Equal(t, testCase.expectedUpdatedMinT, updatedMetricsQuery.GetMinT())
require.Equal(t, testCase.expectedUpdatedMaxT, updatedMetricsQuery.GetMaxT())
Expand Down Expand Up @@ -1613,3 +1613,11 @@ func TestPrometheusCodec_DecodeMultipleTimes(t *testing.T) {
func newTestPrometheusCodec() Codec {
return NewPrometheusCodec(prometheus.NewPedanticRegistry(), 0*time.Minute, formatJSON)
}

func mustSucceed[T any](value T, err error) T {
if err != nil {
panic(err)
Copy link
Contributor

@krajorama krajorama Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: To me panic in test implies that it's catching a programming error that should never happen in real life. I'd use require.NoError() or assert.NoError and pass t.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. It's a bit annoying having to pass t because can't be a wrapper anymore. The closest thing would be:

func mustNoError[T any](t *testing.T) func(value T, err error) T {
       return func(value T, err error) T {
               require.NoError(t, err)
               return value
       }
}

And then you can call it like this:

-                       requestB:      mustSucceed(rangeQuery.WithStartEnd(rangeQuery.GetStart()+5*time.Minute.Milliseconds(), rangeQuery.GetEnd()+5*time.Minute.Milliseconds())),
+                       requestB:      mustNoError[MetricsQueryRequest](t)(rangeQuery.WithStartEnd(rangeQuery.GetStart()+5*time.Minute.Milliseconds(), rangeQuery.GetEnd()+5*time.Minute.Milliseconds())),

I think this latter version makes code a bit more ugly. Given the panic affects only tests I will move on.

}

return value
}
5 changes: 4 additions & 1 deletion pkg/frontend/querymiddleware/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ func (l limitsMiddleware) Do(ctx context.Context, r MetricsQueryRequest) (Respon
"maxQueryLookback", maxQueryLookback,
"blocksRetentionPeriod", blocksRetentionPeriod)

r = r.WithStartEnd(minStartTime, r.GetEnd())
r, err = r.WithStartEnd(minStartTime, r.GetEnd())
if err != nil {
return nil, apierror.New(apierror.TypeInternal, err.Error())
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/frontend/querymiddleware/model_extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (r *PrometheusRangeQueryRequest) WithID(id int64) MetricsQueryRequest {
}

// WithStartEnd clones the current `PrometheusRangeQueryRequest` with a new `start` and `end` timestamp.
func (r *PrometheusRangeQueryRequest) WithStartEnd(start int64, end int64) MetricsQueryRequest {
func (r *PrometheusRangeQueryRequest) WithStartEnd(start int64, end int64) (MetricsQueryRequest, error) {
newRequest := *r
newRequest.start = start
newRequest.end = end
Expand All @@ -164,7 +164,7 @@ func (r *PrometheusRangeQueryRequest) WithStartEnd(start int64, end int64) Metri
newRequest.queryExpr, newRequest.GetStart(), newRequest.GetEnd(), newRequest.GetStep(), newRequest.lookbackDelta,
)
}
return &newRequest
return &newRequest, nil
}

// WithQuery clones the current `PrometheusRangeQueryRequest` with a new query; returns error if query parse fails.
Expand Down Expand Up @@ -355,15 +355,15 @@ func (r *PrometheusInstantQueryRequest) WithID(id int64) MetricsQueryRequest {
}

// WithStartEnd clones the current `PrometheusInstantQueryRequest` with a new `time` timestamp.
func (r *PrometheusInstantQueryRequest) WithStartEnd(time int64, _ int64) MetricsQueryRequest {
func (r *PrometheusInstantQueryRequest) WithStartEnd(time int64, _ int64) (MetricsQueryRequest, error) {
newRequest := *r
newRequest.time = time
if newRequest.queryExpr != nil {
newRequest.minT, newRequest.maxT = decodeQueryMinMaxTime(
newRequest.queryExpr, newRequest.GetStart(), newRequest.GetEnd(), newRequest.GetStep(), newRequest.lookbackDelta,
)
}
return &newRequest
return &newRequest, nil
}

// WithQuery clones the current `PrometheusInstantQueryRequest` with a new query; returns error if query parse fails.
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,7 @@ func TestQuerySharding_ShouldUseCardinalityEstimate(t *testing.T) {
}{
{
"range query",
req.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end)).WithEstimatedSeriesCountHint(55_000),
mustSucceed(req.WithStartEnd(util.TimeToMillis(start), util.TimeToMillis(end))).WithEstimatedSeriesCountHint(55_000),
6,
},
{
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (r *remoteReadQueryRequest) WithHeaders([]*PrometheusHeader) MetricsQueryRe
panic("not implemented")
}

func (r *remoteReadQueryRequest) WithStartEnd(_ int64, _ int64) MetricsQueryRequest {
func (r *remoteReadQueryRequest) WithStartEnd(_ int64, _ int64) (MetricsQueryRequest, error) {
panic("not implemented")
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/frontend/querymiddleware/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,11 @@ func partitionCacheExtents(req MetricsQueryRequest, extents []Extent, minCacheEx

// If there is a bit missing at the front, make a request for that.
if start < extent.Start {
r := req.WithStartEnd(start, extent.Start)
r, err := req.WithStartEnd(start, extent.Start)
if err != nil {
return nil, nil, err
}

requests = append(requests, r)
}
res, err := extent.toResponse()
Expand Down Expand Up @@ -499,7 +503,11 @@ func partitionCacheExtents(req MetricsQueryRequest, extents []Extent, minCacheEx

// Lastly, make a request for any data missing at the end.
if start < req.GetEnd() {
r := req.WithStartEnd(start, req.GetEnd())
r, err := req.WithStartEnd(start, req.GetEnd())
if err != nil {
return nil, nil, err
}

requests = append(requests, r)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/frontend/querymiddleware/split_and_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,10 @@ func splitQueryByInterval(req MetricsQueryRequest, interval time.Duration) ([]Me
if err != nil {
return nil, err
}
splitReq = splitReq.WithStartEnd(start, end)
splitReq, err = splitReq.WithStartEnd(start, end)
if err != nil {
return nil, err
}
reqs = append(reqs, splitReq)

start = end + splitReq.GetStep()
Expand Down
8 changes: 6 additions & 2 deletions pkg/frontend/querymiddleware/split_and_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,9 @@ func TestSplitAndCacheMiddleware_ResultsCache(t *testing.T) {
assert.Equal(t, uint32(1), queryStats.LoadSplitQueries())

// Doing request with new end time should do one more query.
req = req.WithStartEnd(req.GetStart(), req.GetEnd()+step)
req, err = req.WithStartEnd(req.GetStart(), req.GetEnd()+step)
require.NoError(t, err)

_, err = rc.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, downstreamReqs)
Expand Down Expand Up @@ -558,7 +560,9 @@ func TestSplitAndCacheMiddleware_ResultsCache_EnabledCachingOfStepUnalignedReque
assert.Equal(t, uint32(1), queryStats.LoadSplitQueries())

// New request with slightly different Start time will not reuse the cached result.
req = req.WithStartEnd(parseTimeRFC3339(t, "2021-10-15T10:00:05Z").Unix()*1000, parseTimeRFC3339(t, "2021-10-15T12:00:05Z").Unix()*1000)
req, err = req.WithStartEnd(parseTimeRFC3339(t, "2021-10-15T10:00:05Z").Unix()*1000, parseTimeRFC3339(t, "2021-10-15T12:00:05Z").Unix()*1000)
require.NoError(t, err)

resp, err = rc.Do(ctx, req)
require.NoError(t, err)
require.Equal(t, 2, downstreamReqs)
Expand Down
7 changes: 6 additions & 1 deletion pkg/frontend/querymiddleware/step_align.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func (s *stepAlignMiddleware) Do(ctx context.Context, r MetricsQueryRequest) (Re
"step", r.GetStep(),
)

return s.next.Do(ctx, r.WithStartEnd(start, end))
updatedReq, err := r.WithStartEnd(start, end)
if err != nil {
return nil, err
}

return s.next.Do(ctx, updatedReq)
}
}

Expand Down
Loading