From 86e53096ab9f9886e9f74b936c0f3d82111ef1aa Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Tue, 7 Sep 2021 15:21:53 +0200 Subject: [PATCH] query: Added tests for compatiblity for old components without InfoAPI. Signed-off-by: Bartlomiej Plotka --- pkg/query/endpointset.go | 1 - test/e2e/compact_test.go | 2 +- test/e2e/e2ethanos/services.go | 27 ++-- test/e2e/exemplars_api_test.go | 154 ++++++++--------------- test/e2e/metadata_api_test.go | 4 +- test/e2e/query_frontend_test.go | 40 ++++-- test/e2e/query_test.go | 217 +++++++++++++++++++++++++++++--- test/e2e/receive_test.go | 16 +-- test/e2e/rule_test.go | 2 +- test/e2e/rules_api_test.go | 12 +- test/e2e/store_gateway_test.go | 2 +- test/e2e/targets_api_test.go | 10 +- 12 files changed, 329 insertions(+), 158 deletions(-) diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 5b6f04532a..13625aa828 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -105,7 +105,6 @@ func (es *grpcEndpointSpec) getMetadataUsingStoreAPI(ctx context.Context, client } func (es *grpcEndpointSpec) fillExpectedAPIs(componentType component.Component, mintime, maxTime int64) infopb.InfoResponse { - switch componentType { case component.Sidecar: return infopb.InfoResponse{ diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index d19bc778ec..2359816ed8 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -453,7 +453,7 @@ func testCompactWithStoreGateway(t *testing.T, penaltyDedup bool) { testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_sync_failures_total")) testutil.Ok(t, str.WaitSumMetrics(e2e.Equals(0), "thanos_blocks_meta_modified")) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{str.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", str.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 54a46600bb..e71a39b213 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -96,6 +96,10 @@ func NewPrometheus(sharedDir, name, config, promImage string, enableFeatures ... } func NewPrometheusWithSidecar(sharedDir, netName, name, config, promImage string, enableFeatures ...string) (*e2e.HTTPService, *Service, error) { + return NewPrometheusWithSidecarCustomImage(sharedDir, netName, name, config, promImage, DefaultImage(), enableFeatures...) +} + +func NewPrometheusWithSidecarCustomImage(sharedDir, netName, name, config, promImage string, sidecarImage string, enableFeatures ...string) (*e2e.HTTPService, *Service, error) { prom, dataDir, err := NewPrometheus(sharedDir, name, config, promImage, enableFeatures...) if err != nil { return nil, nil, err @@ -104,7 +108,7 @@ func NewPrometheusWithSidecar(sharedDir, netName, name, config, promImage string sidecar := NewService( fmt.Sprintf("sidecar-%s", name), - DefaultImage(), + sidecarImage, e2e.NewCommand("sidecar", e2e.BuildArgs(map[string]string{ "--debug.name": fmt.Sprintf("sidecar-%v", name), "--grpc-address": ":9091", @@ -129,6 +133,7 @@ type QuerierBuilder struct { name string routePrefix string externalPrefix string + image string storeAddresses []string fileSDStoreAddresses []string @@ -140,35 +145,41 @@ type QuerierBuilder struct { tracingConfig string } -func NewQuerierBuilder(sharedDir, name string, storeAddresses []string) *QuerierBuilder { +func NewQuerierBuilder(sharedDir, name string, storeAddresses ...string) *QuerierBuilder { return &QuerierBuilder{ sharedDir: sharedDir, name: name, storeAddresses: storeAddresses, + image: DefaultImage(), } } -func (q *QuerierBuilder) WithFileSDStoreAddresses(fileSDStoreAddresses []string) *QuerierBuilder { +func (q *QuerierBuilder) WithImage(image string) *QuerierBuilder { + q.image = image + return q +} + +func (q *QuerierBuilder) WithFileSDStoreAddresses(fileSDStoreAddresses ...string) *QuerierBuilder { q.fileSDStoreAddresses = fileSDStoreAddresses return q } -func (q *QuerierBuilder) WithRuleAddresses(ruleAddresses []string) *QuerierBuilder { +func (q *QuerierBuilder) WithRuleAddresses(ruleAddresses ...string) *QuerierBuilder { q.ruleAddresses = ruleAddresses return q } -func (q *QuerierBuilder) WithTargetAddresses(targetAddresses []string) *QuerierBuilder { +func (q *QuerierBuilder) WithTargetAddresses(targetAddresses ...string) *QuerierBuilder { q.targetAddresses = targetAddresses return q } -func (q *QuerierBuilder) WithExemplarAddresses(exemplarAddresses []string) *QuerierBuilder { +func (q *QuerierBuilder) WithExemplarAddresses(exemplarAddresses ...string) *QuerierBuilder { q.exemplarAddresses = exemplarAddresses return q } -func (q *QuerierBuilder) WithMetadataAddresses(metadataAddresses []string) *QuerierBuilder { +func (q *QuerierBuilder) WithMetadataAddresses(metadataAddresses ...string) *QuerierBuilder { q.metadataAddresses = metadataAddresses return q } @@ -260,7 +271,7 @@ func (q *QuerierBuilder) Build() (*Service, error) { querier := NewService( fmt.Sprintf("querier-%v", q.name), - DefaultImage(), + q.image, e2e.NewCommand("query", args...), e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), 8080, diff --git a/test/e2e/exemplars_api_test.go b/test/e2e/exemplars_api_test.go index 0379258c7c..15aa749ecd 100644 --- a/test/e2e/exemplars_api_test.go +++ b/test/e2e/exemplars_api_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cortexproject/cortex/integration/e2e" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/timestamp" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -26,68 +27,42 @@ func TestExemplarsAPI_Fanout(t *testing.T) { netName := "e2e_test_exemplars_fanout" - var ( - prom1, prom2 *e2e.HTTPService - sidecar1, sidecar2 *e2ethanos.Service - err error - s *e2e.Scenario - ) - - s, err = e2e.NewScenario(netName) + s, err := e2e.NewScenario(netName) testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, s)) - // 2x Prometheus. - _, sidecar1, err = e2ethanos.NewPrometheusWithSidecar( - s.SharedDir(), - netName, - "prom1", - defaultPromConfig("ha", 0, "", ""), - e2ethanos.DefaultPrometheusImage(), - e2ethanos.FeatureExemplarStorage, - ) - testutil.Ok(t, err) - _, sidecar2, err = e2ethanos.NewPrometheusWithSidecar( - s.SharedDir(), - netName, - "prom2", - defaultPromConfig("ha", 1, "", ""), - e2ethanos.DefaultPrometheusImage(), - e2ethanos.FeatureExemplarStorage, - ) - testutil.Ok(t, err) - - tracingCfg := fmt.Sprintf(`type: JAEGER + stores := []string{ + e2e.NetworkContainerHostPort(netName, "sidecar-prom1", 9091), // TODO(bwplotka): Use newer e2e lib to handle this in type safe manner. + e2e.NetworkContainerHostPort(netName, "sidecar-prom2", 9091), // TODO(bwplotka): Use newer e2e lib to handle this in type safe manner. + } + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores...). + WithExemplarAddresses(stores...). + WithTracingConfig(fmt.Sprintf(`type: JAEGER config: sampler_type: const sampler_param: 1 - service_name: %s`, s.NetworkName()+"-query") - - stores := []string{sidecar1.NetworkEndpointFor(s.NetworkName(), 9091), sidecar2.NetworkEndpointFor(s.NetworkName(), 9091)} - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores). - WithExemplarAddresses(stores). - WithTracingConfig(tracingCfg). + service_name: %s`, s.NetworkName()+"-query")). Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) // Recreate Prometheus and sidecar with Thanos query scrape target. - prom1, sidecar1, err = e2ethanos.NewPrometheusWithSidecar( + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar( s.SharedDir(), netName, "prom1", defaultPromConfig("ha", 0, "", "", "localhost:9090", q.NetworkHTTPEndpoint()), e2ethanos.DefaultPrometheusImage(), - "exemplar-storage", + e2ethanos.FeatureExemplarStorage, ) testutil.Ok(t, err) - prom2, sidecar2, err = e2ethanos.NewPrometheusWithSidecar( + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar( s.SharedDir(), netName, "prom2", defaultPromConfig("ha", 1, "", "", "localhost:9090", q.NetworkHTTPEndpoint()), e2ethanos.DefaultPrometheusImage(), - "exemplar-storage", + e2ethanos.FeatureExemplarStorage, ) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) @@ -103,88 +78,65 @@ config: end := timestamp.FromTime(now.Add(time.Hour)) // Send HTTP requests to thanos query to trigger exemplars. - labelNames(t, ctx, q.HTTPEndpoint(), nil, start, end, func(res []string) bool { - return true - }) + labelNames(t, ctx, q.HTTPEndpoint(), nil, start, end, func(res []string) bool { return true }) t.Run("Basic exemplars query", func(t *testing.T) { - requiredSeriesLabels := map[string]string{ + queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names"}`, start, end, exemplarsOnExpectedSeries(map[string]string{ "__name__": "http_request_duration_seconds_bucket", "handler": "label_names", "job": "myself", "method": "get", "prometheus": "ha", - } - queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names"}`, - start, end, func(data []*exemplarspb.ExemplarData) bool { - if len(data) != 1 { - return false - } - - // Compare series labels. - seriesLabels := labelpb.ZLabelSetsToPromLabelSets(data[0].SeriesLabels) - for _, lbls := range seriesLabels { - for k, v := range requiredSeriesLabels { - if lbls.Get(k) != v { - return false - } - } - } - - // Make sure the exemplar contains the correct traceID label. - for _, exemplar := range data[0].Exemplars { - for _, lbls := range labelpb.ZLabelSetsToPromLabelSets(exemplar.Labels) { - if !lbls.Has(traceIDLabel) { - return false - } - } - } - return true - }) + })) }) t.Run("Exemplars query with matched external label", func(t *testing.T) { - requiredSeriesLabels := map[string]string{ + // Here replica is an external label. + queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names", replica="0"}`, start, end, exemplarsOnExpectedSeries(map[string]string{ "__name__": "http_request_duration_seconds_bucket", "handler": "label_names", "job": "myself", "method": "get", "prometheus": "ha", - } - // Here replica is an external label. - queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names", replica="0"}`, - start, end, func(data []*exemplarspb.ExemplarData) bool { - if len(data) != 1 { - return false - } - - // Compare series labels. - seriesLabels := labelpb.ZLabelSetsToPromLabelSets(data[0].SeriesLabels) - for _, lbls := range seriesLabels { - for k, v := range requiredSeriesLabels { - if lbls.Get(k) != v { - return false - } - } - } - - // Make sure the exemplar contains the correct traceID label. - for _, exemplar := range data[0].Exemplars { - for _, lbls := range labelpb.ZLabelSetsToPromLabelSets(exemplar.Labels) { - if !lbls.Has(traceIDLabel) { - return false - } - } - } - return true - }) + })) }) t.Run("Exemplars query doesn't match external label", func(t *testing.T) { // Here replica is an external label, but it doesn't match. queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names", replica="foo"}`, - start, end, func(data []*exemplarspb.ExemplarData) bool { - return len(data) == 0 + start, end, func(data []*exemplarspb.ExemplarData) error { + if len(data) > 0 { + return errors.Errorf("expected no examplers, got %v", data) + } + return nil }) }) } + +func exemplarsOnExpectedSeries(requiredSeriesLabels map[string]string) func(data []*exemplarspb.ExemplarData) error { + return func(data []*exemplarspb.ExemplarData) error { + if len(data) != 1 { + return errors.Errorf("unexpected result size, expected 1, got: %v", len(data)) + } + + // Compare series labels. + seriesLabels := labelpb.ZLabelSetsToPromLabelSets(data[0].SeriesLabels) + for _, lbls := range seriesLabels { + for k, v := range requiredSeriesLabels { + if lbls.Get(k) != v { + return errors.Errorf("unexpected labels in result, expected %v, got: %v", requiredSeriesLabels, seriesLabels) + } + } + } + + // Make sure the exemplar contains the correct traceID label. + for _, exemplar := range data[0].Exemplars { + for _, lbls := range labelpb.ZLabelSetsToPromLabelSets(exemplar.Labels) { + if !lbls.Has(traceIDLabel) { + return errors.Errorf("unexpected labels in exemplar, expected %v, got: %v", traceIDLabel, exemplar.Labels) + } + } + } + return nil + } +} diff --git a/test/e2e/metadata_api_test.go b/test/e2e/metadata_api_test.go index 83ca8dea08..eda8b32594 100644 --- a/test/e2e/metadata_api_test.go +++ b/test/e2e/metadata_api_test.go @@ -49,8 +49,8 @@ func TestMetadataAPI_Fanout(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) stores := []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()} - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores). - WithMetadataAddresses(stores). + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores...). + WithMetadataAddresses(stores...). Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index dafb39de36..23c55eb214 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/cortexproject/cortex/integration/e2e" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -34,7 +35,7 @@ func TestQueryFrontend(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom, sidecar)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", sidecar.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -107,8 +108,11 @@ func TestQueryFrontend(t *testing.T) { timestamp.FromTime(now.Add(time.Hour)), 14, promclient.QueryOptions{}, - func(res model.Matrix) bool { - return len(res) > 0 + func(res model.Matrix) error { + if len(res) > 0 { + return errors.Errorf("expected some results, got nothing") + } + return nil }, ) @@ -146,8 +150,11 @@ func TestQueryFrontend(t *testing.T) { timestamp.FromTime(now.Add(time.Hour)), 14, promclient.QueryOptions{}, - func(res model.Matrix) bool { - return len(res) > 0 + func(res model.Matrix) error { + if len(res) > 0 { + return errors.Errorf("expected some results, got nothing") + } + return nil }, ) @@ -188,8 +195,11 @@ func TestQueryFrontend(t *testing.T) { timestamp.FromTime(now.Add(24*time.Hour)), 14, promclient.QueryOptions{}, - func(res model.Matrix) bool { - return len(res) > 0 + func(res model.Matrix) error { + if len(res) > 0 { + return errors.Errorf("expected some results, got nothing") + } + return nil }, ) @@ -388,7 +398,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom, sidecar)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{sidecar.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", sidecar.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -448,8 +458,11 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { timestamp.FromTime(now.Add(time.Hour)), 14, promclient.QueryOptions{}, - func(res model.Matrix) bool { - return len(res) > 0 + func(res model.Matrix) error { + if len(res) > 0 { + return errors.Errorf("expected some results, got nothing") + } + return nil }, ) @@ -475,8 +488,11 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { timestamp.FromTime(now.Add(time.Hour)), 14, promclient.QueryOptions{}, - func(res model.Matrix) bool { - return len(res) > 0 + func(res model.Matrix) error { + if len(res) > 0 { + return errors.Errorf("expected some results, got nothing") + } + return nil }, ) diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 234f405a7a..eca3af1b20 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -23,6 +23,10 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" "github.com/thanos-io/thanos/pkg/exemplars/exemplarspb" "github.com/thanos-io/thanos/pkg/promclient" @@ -113,8 +117,8 @@ func TestQuery(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2, prom3, sidecar3, prom4, sidecar4)) // Querier. Both fileSD and directly by flags. - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}). - WithFileSDStoreAddresses([]string{sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()). + WithFileSDStoreAddresses(sidecar3.GRPCNetworkEndpoint(), sidecar4.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -189,7 +193,7 @@ func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) { externalPrefix := "test" - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", nil). + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1"). WithExternalPrefix(externalPrefix).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -206,7 +210,7 @@ func TestQueryExternalPrefix(t *testing.T) { externalPrefix := "thanos" - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", nil). + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1"). WithExternalPrefix(externalPrefix).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -229,7 +233,7 @@ func TestQueryExternalPrefixAndRoutePrefix(t *testing.T) { externalPrefix := "thanos" routePrefix := "test" - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", nil). + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1"). WithRoutePrefix(routePrefix). WithExternalPrefix(externalPrefix). Build() @@ -261,7 +265,7 @@ func TestQueryLabelNames(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -312,7 +316,7 @@ func TestQueryLabelValues(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), receiver.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -342,6 +346,189 @@ func TestQueryLabelValues(t *testing.T) { ) } +func TestQueryCompatibilityWithPreInfoAPI(t *testing.T) { + t.Parallel() + + for i, tcase := range []struct { + queryImage string + sidecarImage string + }{ + { + queryImage: e2ethanos.DefaultImage(), + sidecarImage: "quay.io/thanos/thanos:v0.22.0", // Thanos components from version before 0.23 does not have new InfoAPI. + }, + { + queryImage: "quay.io/thanos/thanos:v0.22.0", // Thanos querier from version before 0.23 did not know about InfoAPI. + sidecarImage: e2ethanos.DefaultImage(), + }, + } { + i := i + t.Run(fmt.Sprintf("%+v", tcase), func(t *testing.T) { + net := fmt.Sprintf("e2e_test_query_comp_query_%d", i) + s, err := e2e.NewScenario(net) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + promRulesSubDir := filepath.Join("rules") + testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), promRulesSubDir), os.ModePerm)) + // Create the abort_on_partial_response alert for Prometheus. + // We don't create the warn_on_partial_response alert as Prometheus has strict yaml unmarshalling. + createRuleFile(t, filepath.Join(s.SharedDir(), promRulesSubDir, "rules.yaml"), testAlertRuleAbortOnPartialResponse) + + p1, s1, err := e2ethanos.NewPrometheusWithSidecarCustomImage( + s.SharedDir(), + net, + "p1", + defaultPromConfig("p1", 0, "", filepath.Join(e2e.ContainerSharedDir, promRulesSubDir, "*.yaml"), "localhost:9090", e2e.NetworkContainerHostPort(net, "querier-1", 8080)), // TODO(bwplotka): Use newer e2e lib to handle this in type safe manner. + e2ethanos.DefaultPrometheusImage(), + tcase.sidecarImage, + e2ethanos.FeatureExemplarStorage, + ) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(p1, s1)) + + // Newest querier with old --rules --meta etc flags. + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", s1.GRPCNetworkEndpoint()). + WithMetadataAddresses(s1.GRPCNetworkEndpoint()). + WithExemplarAddresses(s1.GRPCNetworkEndpoint()). + WithTargetAddresses(s1.GRPCNetworkEndpoint()). + WithRuleAddresses(s1.GRPCNetworkEndpoint()). + WithTracingConfig(fmt.Sprintf(`type: JAEGER +config: + sampler_type: const + sampler_param: 1 + service_name: %s`, s.NetworkName()+"-query")). // Use fake tracing config to trigger exemplar. + WithImage(tcase.queryImage). + Build() + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + // We should have single TCP connection, since all APIs are against the same server. + testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics)) + + queryAndAssertSeries(t, ctx, q.HTTPEndpoint(), queryUpWithoutInstance, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "job": "myself", + "prometheus": "p1", + "replica": "0", + }, + }) + + // We expect rule and other APIs to work. + + // Metadata. + { + var promMeta map[string][]metadatapb.Meta + // Wait metadata response to be ready as Prometheus gets metadata after scrape. + testutil.Ok(t, runutil.Retry(3*time.Second, ctx.Done(), func() error { + promMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+p1.HTTPEndpoint()), "", -1) + testutil.Ok(t, err) + if len(promMeta) > 0 { + return nil + } + return fmt.Errorf("empty metadata response from Prometheus") + })) + + thanosMeta, err := promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", -1) + testutil.Ok(t, err) + testutil.Assert(t, len(thanosMeta) > 0, "got empty metadata response from Thanos") + + // Metadata response from Prometheus and Thanos Querier should be the same after deduplication. + metadataEqual(t, thanosMeta, promMeta) + } + + // Exemplars. + { + now := time.Now() + start := timestamp.FromTime(now.Add(-time.Hour)) + end := timestamp.FromTime(now.Add(time.Hour)) + + // Send HTTP requests to thanos query to trigger exemplars. + labelNames(t, ctx, q.HTTPEndpoint(), nil, start, end, func(res []string) bool { + return true + }) + + queryExemplars(t, ctx, q.HTTPEndpoint(), `http_request_duration_seconds_bucket{handler="label_names"}`, start, end, exemplarsOnExpectedSeries(map[string]string{ + "__name__": "http_request_duration_seconds_bucket", + "handler": "label_names", + "job": "myself", + "method": "get", + "prometheus": "p1", + })) + } + + // Targets. + { + targetAndAssert(t, ctx, q.HTTPEndpoint(), "", &targetspb.TargetDiscovery{ + ActiveTargets: []*targetspb.ActiveTarget{ + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: fmt.Sprintf("e2e_test_query_comp_query_%d-querier-1:8080", i)}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "p1"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: fmt.Sprintf("e2e_test_query_comp_query_%d-querier-1:8080", i)}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "p1"}, + }}, + ScrapePool: "myself", + ScrapeUrl: fmt.Sprintf("http://e2e_test_query_comp_query_%d-querier-1:8080/metrics", i), + Health: targetspb.TargetHealth_UP, + }, + { + DiscoveredLabels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "__address__", Value: "localhost:9090"}, + {Name: "__metrics_path__", Value: "/metrics"}, + {Name: "__scheme__", Value: "http"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "p1"}, + }}, + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "instance", Value: "localhost:9090"}, + {Name: "job", Value: "myself"}, + {Name: "prometheus", Value: "p1"}, + }}, + ScrapePool: "myself", + ScrapeUrl: "http://localhost:9090/metrics", + Health: targetspb.TargetHealth_UP, + }, + }, + DroppedTargets: []*targetspb.DroppedTarget{}, + }) + } + + // Rules. + { + ruleAndAssert(t, ctx, q.HTTPEndpoint(), "", []*rulespb.RuleGroup{ + { + Name: "example_abort", + File: "/shared/rules/rules.yaml", + Rules: []*rulespb.Rule{ + rulespb.NewAlertingRule(&rulespb.Alert{ + Name: "TestAlert_AbortOnPartialResponse", + State: rulespb.AlertState_FIRING, + Query: "absent(some_metric)", + Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{ + {Name: "prometheus", Value: "p1"}, + {Name: "severity", Value: "page"}, + }}, + }), + }, + }, + }) + } + }) + } +} + func checkNetworkRequests(t *testing.T, addr string) { ctx, cancel := chromedp.NewContext(context.Background()) t.Cleanup(cancel) @@ -485,7 +672,7 @@ func series(t *testing.T, ctx context.Context, addr string, matchers []*labels.M } //nolint:unparam -func rangeQuery(t *testing.T, ctx context.Context, addr, q string, start, end, step int64, opts promclient.QueryOptions, check func(res model.Matrix) bool) { +func rangeQuery(t *testing.T, ctx context.Context, addr, q string, start, end, step int64, opts promclient.QueryOptions, check func(res model.Matrix) error) { t.Helper() logger := log.NewLogfmtLogger(os.Stdout) @@ -500,15 +687,15 @@ func rangeQuery(t *testing.T, ctx context.Context, addr, q string, start, end, s return errors.Errorf("unexpected warnings %s", warnings) } - if check(res) { - return nil + if err := check(res); err != nil { + return errors.Wrap(err, "result check failed") } - return errors.Errorf("unexpected results size %d", len(res)) + return nil })) } -func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) bool) { +func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, end int64, check func(data []*exemplarspb.ExemplarData) error) { t.Helper() logger := log.NewLogfmtLogger(os.Stdout) @@ -520,10 +707,10 @@ func queryExemplars(t *testing.T, ctx context.Context, addr, q string, start, en return err } - if check(res) { - return nil + if err := check(res); err != nil { + return errors.Wrap(err, "exemplar check failed") } - return errors.Errorf("unexpected results size %d", len(res)) + return nil })) } diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index e63656f54b..bb335d5c66 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -89,7 +89,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{i.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", i.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -176,7 +176,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -289,7 +289,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, prom2)) //Setup Querier - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", i1.GRPCNetworkEndpoint(), i2.GRPCNetworkEndpoint(), i3.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -391,7 +391,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -467,7 +467,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1, prom2, prom3)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -542,7 +542,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint(), r3.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -614,7 +614,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(prom1)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -689,7 +689,7 @@ func TestReceive(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1)) testutil.Ok(t, s.StartAndWaitReady(prom2)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{r1.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", r1.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 3e7f6a2069..fb7289ca90 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -256,7 +256,7 @@ func TestRule(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r)) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{r.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", r.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 5b993cd812..86f97cdf18 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/cortexproject/cortex/integration/e2e" + "github.com/go-kit/kit/log" "github.com/pkg/errors" http_util "github.com/thanos-io/thanos/pkg/http" @@ -71,8 +72,8 @@ func TestRulesAPI_Fanout(t *testing.T) { testutil.Ok(t, err) stores := []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.NetworkEndpointFor(s.NetworkName(), 9091), r2.NetworkEndpointFor(s.NetworkName(), 9091)} - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores). - WithRuleAddresses(stores). + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores...). + WithRuleAddresses(stores...). Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -150,14 +151,17 @@ func ruleAndAssert(t *testing.T, ctx context.Context, addr, typ string, want []* fmt.Println("ruleAndAssert: Waiting for results for rules type", typ) var result []*rulespb.RuleGroup - testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + + logger := log.NewLogfmtLogger(os.Stdout) + testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, mustURLParse(t, "http://"+addr), typ) if err != nil { return err } if len(result) != len(res) { - fmt.Println("ruleAndAssert: New result:", res) + fmt.Println("ruleAndAssert: new result:", res) + result = res } if len(res) != len(want) { diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index f34919d901..92f51ac93c 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -62,7 +62,7 @@ func TestStoreGateway(t *testing.T) { // Ensure bucket UI. ensureGETStatusCode(t, http.StatusOK, "http://"+path.Join(s1.HTTPEndpoint(), "loaded")) - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", []string{s1.GRPCNetworkEndpoint()}).Build() + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", s1.GRPCNetworkEndpoint()).Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) diff --git a/test/e2e/targets_api_test.go b/test/e2e/targets_api_test.go index 15fee88d25..7b8bb33fb1 100644 --- a/test/e2e/targets_api_test.go +++ b/test/e2e/targets_api_test.go @@ -6,12 +6,14 @@ package e2e_test import ( "context" "fmt" + "os" "reflect" "sort" "testing" "time" "github.com/cortexproject/cortex/integration/e2e" + "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/thanos-io/thanos/pkg/promclient" @@ -53,8 +55,8 @@ func TestTargetsAPI_Fanout(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) stores := []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint()} - q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores). - WithTargetAddresses(stores). + q, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "query", stores...). + WithTargetAddresses(stores...). Build() testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(q)) @@ -98,13 +100,13 @@ func TestTargetsAPI_Fanout(t *testing.T) { }) } -//nolint:unused func targetAndAssert(t *testing.T, ctx context.Context, addr, state string, want *targetspb.TargetDiscovery) { t.Helper() fmt.Println("targetAndAssert: Waiting for results for targets state", state) - testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + logger := log.NewLogfmtLogger(os.Stdout) + testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { res, err := promclient.NewDefaultClient().TargetsInGRPC(ctx, mustURLParse(t, "http://"+addr), state) if err != nil { return err