diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f4d6f93205..5c62ff19acd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [CHANGE] Added new metric `cortex_compactor_disk_out_of_space_errors_total` which counts how many times a compaction failed due to the compactor being out of disk, alert if there is a single increase. #8237 #8278 * [CHANGE] Store-gateway: Remove experimental parameter `-blocks-storage.bucket-store.series-selection-strategy`. The default strategy is now `worst-case`. #8702 * [CHANGE] Store-gateway: Rename `-blocks-storage.bucket-store.series-selection-strategies.worst-case-series-preference` to `-blocks-storage.bucket-store.series-fetch-preference` and promote to stable. #8702 +* [CHANGE] Querier, store-gateway: remove deprecated `-querier.prefer-streaming-chunks-from-store-gateways=true`. Streaming from store-gateways is now always enabled. #8696 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671 * [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 * What it is: diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 2a2fa4ac0f5..f66c446bbf2 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -1865,17 +1865,6 @@ "fieldType": "boolean", "fieldCategory": "advanced" }, - { - "kind": "field", - "name": "prefer_streaming_chunks_from_store_gateways", - "required": false, - "desc": "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.", - "fieldValue": null, - "fieldDefaultValue": true, - "fieldFlag": "querier.prefer-streaming-chunks-from-store-gateways", - "fieldType": "boolean", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "streaming_chunks_per_ingester_series_buffer_size", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 7e8e67bf699..44a500261b3 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1911,8 +1911,6 @@ Usage of ./cmd/mimir/mimir: If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true) -querier.minimize-ingester-requests-hedging-delay duration Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s) - -querier.prefer-streaming-chunks-from-store-gateways - [experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this. (default true) -querier.promql-experimental-functions-enabled [experimental] Enable experimental PromQL functions. This config option should be set on query-frontend too when query sharding is enabled. -querier.query-engine string diff --git a/docs/sources/mimir/configure/about-versioning.md b/docs/sources/mimir/configure/about-versioning.md index 3dd72fff66b..7e552097365 100644 --- a/docs/sources/mimir/configure/about-versioning.md +++ b/docs/sources/mimir/configure/about-versioning.md @@ -146,7 +146,6 @@ The following features are currently experimental: - `-ingester.client.circuit-breaker.cooldown-period` - Querier - Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`) - - Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`) - Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`) - Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`) - Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`) @@ -215,8 +214,6 @@ The following features or configuration parameters are currently deprecated and - `-ingester.return-only-grpc-errors` - Ingester client - `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled` -- Querier - - the flag `-querier.prefer-streaming-chunks-from-store-gateways` The following features or configuration parameters are currently deprecated and will be **removed in a future release (to be announced)**: diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index fd59b0e4faa..4ca753d1913 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -1452,12 +1452,6 @@ store_gateway_client: # CLI flag: -querier.shuffle-sharding-ingesters-enabled [shuffle_sharding_ingesters_enabled: | default = true] -# (experimental) Request store-gateways stream chunks. Store-gateways will only -# respond with a stream of chunks if the target store-gateway supports this, and -# this preference will be ignored by store-gateways that do not support this. -# CLI flag: -querier.prefer-streaming-chunks-from-store-gateways -[prefer_streaming_chunks_from_store_gateways: | default = true] - # (advanced) Number of series to buffer per ingester when streaming chunks from # ingesters. # CLI flag: -querier.streaming-chunks-per-ingester-buffer-size diff --git a/integration/querier_test.go b/integration/querier_test.go index 1f19bbb46f8..ba26267d8a7 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -28,22 +28,14 @@ import ( ) func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { - for _, streamingEnabled := range []bool{true, false} { - t.Run(fmt.Sprintf("streaming=%t", streamingEnabled), func(t *testing.T) { - testQuerierWithBlocksStorageRunningInMicroservicesMode(t, streamingEnabled, generateFloatSeries) - }) - } + testQuerierWithBlocksStorageRunningInMicroservicesMode(t, generateFloatSeries) } func TestQuerierWithBlocksStorageRunningInMicroservicesModeWithHistograms(t *testing.T) { - for _, streamingEnabled := range []bool{true, false} { - t.Run(fmt.Sprintf("streaming=%t", streamingEnabled), func(t *testing.T) { - testQuerierWithBlocksStorageRunningInMicroservicesMode(t, streamingEnabled, generateHistogramSeries) - }) - } + testQuerierWithBlocksStorageRunningInMicroservicesMode(t, generateHistogramSeries) } -func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, streamingEnabled bool, seriesGenerator func(name string, ts time.Time, additionalLabels ...prompb.Label) (series []prompb.TimeSeries, vector model.Vector, matrix model.Matrix)) { +func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, seriesGenerator func(name string, ts time.Time, additionalLabels ...prompb.Label) (series []prompb.TimeSeries, vector model.Vector, matrix model.Matrix)) { tests := map[string]struct { tenantShardSize int indexCacheBackend string @@ -158,7 +150,6 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream "-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize), "-query-frontend.query-stats-enabled": "true", "-query-frontend.parallelize-shardable-queries": strconv.FormatBool(testCfg.queryShardingEnabled), - "-querier.prefer-streaming-chunks-from-store-gateways": strconv.FormatBool(streamingEnabled), }) // Start store-gateways. @@ -234,21 +225,15 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream // Make sure the querier is using the bucket index blocks finder. require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_bucket_index_loads_total")) - comparingFunction := e2e.Equals - if streamingEnabled { - // Some metrics can be higher when streaming is enabled. The exact number is not deterministic in every case. - comparingFunction = e2e.GreaterOrEqual - } - // Check the in-memory index cache metrics (in the store-gateway). - require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9), "thanos_store_index_cache_requests_total")) // 5 + 2 + 2 - require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_requests_total")) // 5 + 2 + 2 + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items_added_total")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { - require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9*2), "thanos_cache_operations_total")) // one set for each get + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9*2), "thanos_cache_operations_total")) // one set for each get } // Query back again the 1st series from storage. This time it should use the index cache. @@ -258,14 +243,14 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream assert.Equal(t, expectedVector1, result.(model.Vector)) expectedFetchedSeries++ // Storage only. - require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9+2), "thanos_store_index_cache_requests_total")) - require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9+2), "thanos_store_index_cache_requests_total")) + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items")) // as before require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items_added_total")) // as before } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { - require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9*2+2), "thanos_cache_operations_total")) // as before + 2 gets (expanded postings and series) + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9*2+2), "thanos_cache_operations_total")) // as before + 2 gets (expanded postings and series) } // Query range. We expect 1 data point with a value of 3 (number of series). @@ -899,82 +884,77 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { const blockRangePeriod = 5 * time.Second - for _, streamingEnabled := range []bool{true, false} { - t.Run(fmt.Sprintf("store-gateway streaming enabled: %v", streamingEnabled), func(t *testing.T) { - s, err := e2e.NewScenario(networkName) - require.NoError(t, err) - defer s.Close() + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() - // Configure the blocks storage to frequently compact TSDB head - // and ship blocks to the storage. - flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{ - "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), - "-blocks-storage.tsdb.ship-interval": "1s", - "-blocks-storage.bucket-store.sync-interval": "1s", - "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), - "-querier.max-fetched-series-per-query": "3", - "-querier.prefer-streaming-chunks-from-store-gateways": strconv.FormatBool(streamingEnabled), - }) + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-querier.max-fetched-series-per-query": "3", + }) - // Start dependencies. - consul := e2edb.NewConsul() - minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) - memcached := e2ecache.NewMemcached() - require.NoError(t, s.StartAndWaitReady(consul, minio, memcached)) + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached)) - // Add the memcached address to the flags. - flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + // Add the memcached address to the flags. + flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) - // Start Mimir components. - distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) - ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags) - storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags) - require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway)) + // Start Mimir components. + distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) + ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags) + storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway)) - querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags) - require.NoError(t, s.StartAndWaitReady(querier)) + querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(querier)) - c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") - require.NoError(t, err) + c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) - // Push some series to Mimir. - series1Name := "series_1" - series2Name := "series_2" - series3Name := "series_3" - series4Name := "series_4" - series1Timestamp := time.Now() - series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) - series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2) - series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3) + // Push some series to Mimir. + series1Name := "series_1" + series2Name := "series_2" + series3Name := "series_3" + series4Name := "series_4" + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3) - series1, _, _ := generateFloatSeries(series1Name, series1Timestamp, prompb.Label{Name: series1Name, Value: series1Name}) - series2, _, _ := generateHistogramSeries(series2Name, series2Timestamp, prompb.Label{Name: series2Name, Value: series2Name}) - series3, _, _ := generateFloatSeries(series3Name, series3Timestamp, prompb.Label{Name: series3Name, Value: series3Name}) - series4, _, _ := generateHistogramSeries(series4Name, series4Timestamp, prompb.Label{Name: series4Name, Value: series4Name}) + series1, _, _ := generateFloatSeries(series1Name, series1Timestamp, prompb.Label{Name: series1Name, Value: series1Name}) + series2, _, _ := generateHistogramSeries(series2Name, series2Timestamp, prompb.Label{Name: series2Name, Value: series2Name}) + series3, _, _ := generateFloatSeries(series3Name, series3Timestamp, prompb.Label{Name: series3Name, Value: series3Name}) + series4, _, _ := generateHistogramSeries(series4Name, series4Timestamp, prompb.Label{Name: series4Name, Value: series4Name}) - res, err := c.Push(series1) - require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - res, err = c.Push(series2) - require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) - result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod) - require.NoError(t, err) - require.Equal(t, model.ValMatrix, result.Type()) + result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) - res, err = c.Push(series3) - require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) - res, err = c.Push(series4) - require.NoError(t, err) - require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + res, err = c.Push(series4) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) - _, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod) - require.Error(t, err) - assert.ErrorContains(t, err, "the query exceeded the maximum number of series") - }) - } + _, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod) + require.Error(t, err) + assert.ErrorContains(t, err, "the query exceeded the maximum number of series") } func TestHashCollisionHandling(t *testing.T) { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 44dc2fa323e..4a1459565e6 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -263,9 +263,6 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa ) streamingBufferSize := querierCfg.StreamingChunksPerStoreGatewaySeriesBufferSize - if !querierCfg.PreferStreamingChunksFromStoreGateways { - streamingBufferSize = 0 - } return NewBlocksStoreQueryable(stores, finder, consistency, limits, querierCfg.QueryStoreAfter, streamingBufferSize, logger, reg) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ed59fb58ba0..576678a581c 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -50,7 +50,6 @@ type Config struct { ShuffleShardingIngestersEnabled bool `yaml:"shuffle_sharding_ingesters_enabled" category:"advanced"` - PreferStreamingChunksFromStoreGateways bool `yaml:"prefer_streaming_chunks_from_store_gateways" category:"experimental"` // Enabled by default as of Mimir 2.13, remove altogether in 2.14. PreferAvailabilityZone string `yaml:"prefer_availability_zone" category:"experimental" doc:"hidden"` StreamingChunksPerIngesterSeriesBufferSize uint64 `yaml:"streaming_chunks_per_ingester_series_buffer_size" category:"advanced"` StreamingChunksPerStoreGatewaySeriesBufferSize uint64 `yaml:"streaming_chunks_per_store_gateway_series_buffer_size" category:"advanced"` @@ -77,7 +76,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.") f.DurationVar(&cfg.QueryStoreAfter, queryStoreAfterFlag, 12*time.Hour, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. If this option is enabled, the time range of the query sent to the store-gateway will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.") f.BoolVar(&cfg.ShuffleShardingIngestersEnabled, "querier.shuffle-sharding-ingesters-enabled", true, fmt.Sprintf("Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -%s. If this setting is false or -%s is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).", validation.QueryIngestersWithinFlag, validation.QueryIngestersWithinFlag)) - f.BoolVar(&cfg.PreferStreamingChunksFromStoreGateways, "querier.prefer-streaming-chunks-from-store-gateways", true, "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.") f.StringVar(&cfg.PreferAvailabilityZone, "querier.prefer-availability-zone", "", "Preferred availability zone to query ingesters from when using the ingest storage.") const minimiseIngesterRequestsFlagName = "querier.minimize-ingester-requests"