From 4c655bffd40497a933313b5d95f82de7631ac507 Mon Sep 17 00:00:00 2001 From: zenador Date: Fri, 8 Dec 2023 22:45:20 +0800 Subject: [PATCH] Add unit test to compare histogram_quantile results with and without query sharding (#6525) * Add TestQuerySharding_NonMonotonicHistogramBuckets * Update new unit test for hidden warning --- .../querymiddleware/querysharding_test.go | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/pkg/frontend/querymiddleware/querysharding_test.go b/pkg/frontend/querymiddleware/querysharding_test.go index d8a6edcf423..70af9933a1f 100644 --- a/pkg/frontend/querymiddleware/querysharding_test.go +++ b/pkg/frontend/querymiddleware/querysharding_test.go @@ -769,6 +769,82 @@ func TestQuerySharding_Correctness(t *testing.T) { } } +func TestQuerySharding_NonMonotonicHistogramBuckets(t *testing.T) { + queries := []string{ + `histogram_quantile(1, sum by(le) (rate(metric_histogram_bucket[1m])))`, + } + + series := []*promql.StorageSeries{} + for i := 0; i < 100; i++ { + series = append(series, newSeries(labels.FromStrings(labels.MetricName, "metric_histogram_bucket", "app", strconv.Itoa(i), "le", "10"), start.Add(-lookbackDelta), end, step, arithmeticSequence(1))) + series = append(series, newSeries(labels.FromStrings(labels.MetricName, "metric_histogram_bucket", "app", strconv.Itoa(i), "le", "20"), start.Add(-lookbackDelta), end, step, arithmeticSequence(3))) + series = append(series, newSeries(labels.FromStrings(labels.MetricName, "metric_histogram_bucket", "app", strconv.Itoa(i), "le", "30"), start.Add(-lookbackDelta), end, step, arithmeticSequence(3))) + series = append(series, newSeries(labels.FromStrings(labels.MetricName, "metric_histogram_bucket", "app", strconv.Itoa(i), "le", "40"), start.Add(-lookbackDelta), end, step, arithmeticSequence(3))) + series = append(series, newSeries(labels.FromStrings(labels.MetricName, "metric_histogram_bucket", "app", strconv.Itoa(i), "le", "+Inf"), start.Add(-lookbackDelta), end, step, arithmeticSequence(3))) + } + + // Create a queryable on the fixtures. + queryable := storageSeriesQueryable(series) + + engine := newEngine() + downstream := &downstreamHandler{ + engine: engine, + queryable: queryable, + } + + for _, query := range queries { + t.Run(query, func(t *testing.T) { + req := &PrometheusRangeQueryRequest{ + Path: "/query_range", + Start: util.TimeToMillis(start), + End: util.TimeToMillis(end), + Step: step.Milliseconds(), + Query: query, + } + + // Run the query without sharding. + expectedRes, err := downstream.Do(context.Background(), req) + require.Nil(t, err) + + expectedPrometheusRes := expectedRes.(*PrometheusResponse) + sort.Sort(byLabels(expectedPrometheusRes.Data.Result)) + + // Ensure the query produces some results. + require.NotEmpty(t, expectedPrometheusRes.Data.Result) + requireValidSamples(t, expectedPrometheusRes.Data.Result) + + // Ensure the bucket monotonicity has not been fixed by PromQL engine. + require.Len(t, expectedPrometheusRes.GetWarnings(), 0) + + for _, numShards := range []int{8, 16} { + t.Run(fmt.Sprintf("shards=%d", numShards), func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + shardingware := newQueryShardingMiddleware( + log.NewNopLogger(), + engine, + mockLimits{totalShards: numShards}, + 0, + reg, + ) + + // Run the query with sharding. + shardedRes, err := shardingware.Wrap(downstream).Do(user.InjectOrgID(context.Background(), "test"), req) + require.Nil(t, err) + + // Ensure the two results matches (float precision can slightly differ, there's no guarantee in PromQL engine too + // if you rerun the same query twice). + shardedPrometheusRes := shardedRes.(*PrometheusResponse) + sort.Sort(byLabels(shardedPrometheusRes.Data.Result)) + approximatelyEquals(t, expectedPrometheusRes, shardedPrometheusRes) + + // Ensure the warning about bucket monotonicity from PromQL engine is hidden. + require.Len(t, shardedPrometheusRes.GetWarnings(), 0) + }) + } + }) + } +} + // requireValidSamples ensures the query produces some results which are not NaN. func requireValidSamples(t *testing.T, result []SampleStream) { t.Helper()