Skip to content

Commit

Permalink
querier, store-gateway: promote chunks streaming to stable (#8696)
Browse files Browse the repository at this point in the history
preparing for the 2.14 release

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Jul 23, 2024
1 parent b7a4a03 commit 1498076
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 115 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [CHANGE] Added new metric `cortex_compactor_disk_out_of_space_errors_total` which counts how many times a compaction failed due to the compactor being out of disk, alert if there is a single increase. #8237 #8278
* [CHANGE] Store-gateway: Remove experimental parameter `-blocks-storage.bucket-store.series-selection-strategy`. The default strategy is now `worst-case`. #8702
* [CHANGE] Store-gateway: Rename `-blocks-storage.bucket-store.series-selection-strategies.worst-case-series-preference` to `-blocks-storage.bucket-store.series-fetch-preference` and promote to stable. #8702
* [CHANGE] Querier, store-gateway: remove deprecated `-querier.prefer-streaming-chunks-from-store-gateways=true`. Streaming from store-gateways is now always enabled. #8696
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750
* What it is:
Expand Down
11 changes: 0 additions & 11 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1865,17 +1865,6 @@
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "prefer_streaming_chunks_from_store_gateways",
"required": false,
"desc": "Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.prefer-streaming-chunks-from-store-gateways",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "streaming_chunks_per_ingester_series_buffer_size",
Expand Down
2 changes: 0 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1911,8 +1911,6 @@ Usage of ./cmd/mimir/mimir:
If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true)
-querier.minimize-ingester-requests-hedging-delay duration
Delay before initiating requests to further ingesters when request minimization is enabled and the initially selected set of ingesters have not all responded. Ignored if -querier.minimize-ingester-requests is not enabled. (default 3s)
-querier.prefer-streaming-chunks-from-store-gateways
[experimental] Request store-gateways stream chunks. Store-gateways will only respond with a stream of chunks if the target store-gateway supports this, and this preference will be ignored by store-gateways that do not support this. (default true)
-querier.promql-experimental-functions-enabled
[experimental] Enable experimental PromQL functions. This config option should be set on query-frontend too when query sharding is enabled.
-querier.query-engine string
Expand Down
3 changes: 0 additions & 3 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ The following features are currently experimental:
- `-ingester.client.circuit-breaker.cooldown-period`
- Querier
- Use of Redis cache backend (`-blocks-storage.bucket-store.metadata-cache.backend=redis`)
- Streaming chunks from store-gateway to querier (`-querier.prefer-streaming-chunks-from-store-gateways`)
- Limiting queries based on the estimated number of chunks that will be used (`-querier.max-estimated-fetched-chunks-per-query-multiplier`)
- Max concurrency for tenant federated queries (`-tenant-federation.max-concurrent`)
- Maximum response size for active series queries (`-querier.active-series-results-max-size-bytes`)
Expand Down Expand Up @@ -215,8 +214,6 @@ The following features or configuration parameters are currently deprecated and
- `-ingester.return-only-grpc-errors`
- Ingester client
- `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`
- Querier
- the flag `-querier.prefer-streaming-chunks-from-store-gateways`

The following features or configuration parameters are currently deprecated and will be **removed in a future release (to be announced)**:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,12 +1452,6 @@ store_gateway_client:
# CLI flag: -querier.shuffle-sharding-ingesters-enabled
[shuffle_sharding_ingesters_enabled: <boolean> | default = true]
# (experimental) Request store-gateways stream chunks. Store-gateways will only
# respond with a stream of chunks if the target store-gateway supports this, and
# this preference will be ignored by store-gateways that do not support this.
# CLI flag: -querier.prefer-streaming-chunks-from-store-gateways
[prefer_streaming_chunks_from_store_gateways: <boolean> | default = true]
# (advanced) Number of series to buffer per ingester when streaming chunks from
# ingesters.
# CLI flag: -querier.streaming-chunks-per-ingester-buffer-size
Expand Down
156 changes: 68 additions & 88 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,14 @@ import (
)

func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("streaming=%t", streamingEnabled), func(t *testing.T) {
testQuerierWithBlocksStorageRunningInMicroservicesMode(t, streamingEnabled, generateFloatSeries)
})
}
testQuerierWithBlocksStorageRunningInMicroservicesMode(t, generateFloatSeries)
}

func TestQuerierWithBlocksStorageRunningInMicroservicesModeWithHistograms(t *testing.T) {
for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("streaming=%t", streamingEnabled), func(t *testing.T) {
testQuerierWithBlocksStorageRunningInMicroservicesMode(t, streamingEnabled, generateHistogramSeries)
})
}
testQuerierWithBlocksStorageRunningInMicroservicesMode(t, generateHistogramSeries)
}

func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, streamingEnabled bool, seriesGenerator func(name string, ts time.Time, additionalLabels ...prompb.Label) (series []prompb.TimeSeries, vector model.Vector, matrix model.Matrix)) {
func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, seriesGenerator func(name string, ts time.Time, additionalLabels ...prompb.Label) (series []prompb.TimeSeries, vector model.Vector, matrix model.Matrix)) {
tests := map[string]struct {
tenantShardSize int
indexCacheBackend string
Expand Down Expand Up @@ -158,7 +150,6 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream
"-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize),
"-query-frontend.query-stats-enabled": "true",
"-query-frontend.parallelize-shardable-queries": strconv.FormatBool(testCfg.queryShardingEnabled),
"-querier.prefer-streaming-chunks-from-store-gateways": strconv.FormatBool(streamingEnabled),
})

// Start store-gateways.
Expand Down Expand Up @@ -234,21 +225,15 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream
// Make sure the querier is using the bucket index blocks finder.
require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_bucket_index_loads_total"))

comparingFunction := e2e.Equals
if streamingEnabled {
// Some metrics can be higher when streaming is enabled. The exact number is not deterministic in every case.
comparingFunction = e2e.GreaterOrEqual
}

// Check the in-memory index cache metrics (in the store-gateway).
require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9), "thanos_store_index_cache_requests_total")) // 5 + 2 + 2
require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_requests_total")) // 5 + 2 + 2
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items_added_total")) // 2 series both for postings and series cache, 2 expanded postings on one block, 3 on another one
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9*2), "thanos_cache_operations_total")) // one set for each get
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9*2), "thanos_cache_operations_total")) // one set for each get
}

// Query back again the 1st series from storage. This time it should use the index cache.
Expand All @@ -258,14 +243,14 @@ func testQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T, stream
assert.Equal(t, expectedVector1, result.(model.Vector))
expectedFetchedSeries++ // Storage only.

require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9+2), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9+2), "thanos_store_index_cache_requests_total"))
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache

if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items")) // as before
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2*2+2+3), "thanos_store_index_cache_items_added_total")) // as before
} else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached {
require.NoError(t, storeGateways.WaitSumMetrics(comparingFunction(9*2+2), "thanos_cache_operations_total")) // as before + 2 gets (expanded postings and series)
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9*2+2), "thanos_cache_operations_total")) // as before + 2 gets (expanded postings and series)
}

// Query range. We expect 1 data point with a value of 3 (number of series).
Expand Down Expand Up @@ -899,82 +884,77 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) {
func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) {
const blockRangePeriod = 5 * time.Second

for _, streamingEnabled := range []bool{true, false} {
t.Run(fmt.Sprintf("store-gateway streaming enabled: %v", streamingEnabled), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-querier.max-fetched-series-per-query": "3",
"-querier.prefer-streaming-chunks-from-store-gateways": strconv.FormatBool(streamingEnabled),
})
// Configure the blocks storage to frequently compact TSDB head
// and ship blocks to the storage.
flags := mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-querier.max-fetched-series-per-query": "3",
})

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))
// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))

// Add the memcached address to the flags.
flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)
// Add the memcached address to the flags.
flags["-blocks-storage.bucket-store.index-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort)

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway))
// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
storeGateway := e2emimir.NewStoreGateway("store-gateway", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway))

querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(querier))
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(querier))

c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

// Push some series to Mimir.
series1Name := "series_1"
series2Name := "series_2"
series3Name := "series_3"
series4Name := "series_4"
series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3)
// Push some series to Mimir.
series1Name := "series_1"
series2Name := "series_2"
series3Name := "series_3"
series4Name := "series_4"
series1Timestamp := time.Now()
series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series3Timestamp := series1Timestamp.Add(blockRangePeriod * 2)
series4Timestamp := series1Timestamp.Add(blockRangePeriod * 3)

series1, _, _ := generateFloatSeries(series1Name, series1Timestamp, prompb.Label{Name: series1Name, Value: series1Name})
series2, _, _ := generateHistogramSeries(series2Name, series2Timestamp, prompb.Label{Name: series2Name, Value: series2Name})
series3, _, _ := generateFloatSeries(series3Name, series3Timestamp, prompb.Label{Name: series3Name, Value: series3Name})
series4, _, _ := generateHistogramSeries(series4Name, series4Timestamp, prompb.Label{Name: series4Name, Value: series4Name})
series1, _, _ := generateFloatSeries(series1Name, series1Timestamp, prompb.Label{Name: series1Name, Value: series1Name})
series2, _, _ := generateHistogramSeries(series2Name, series2Timestamp, prompb.Label{Name: series2Name, Value: series2Name})
series3, _, _ := generateFloatSeries(series3Name, series3Timestamp, prompb.Label{Name: series3Name, Value: series3Name})
series4, _, _ := generateHistogramSeries(series4Name, series4Timestamp, prompb.Label{Name: series4Name, Value: series4Name})

res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err := c.Push(series1)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series2)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, result.Type())
result, err := c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series2Timestamp.Add(1*time.Hour), blockRangePeriod)
require.NoError(t, err)
require.Equal(t, model.ValMatrix, result.Type())

res, err = c.Push(series3)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series4)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series3)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
res, err = c.Push(series4)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

_, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod)
require.Error(t, err)
assert.ErrorContains(t, err, "the query exceeded the maximum number of series")
})
}
_, err = c.QueryRange("{__name__=~\"series_.+\"}", series1Timestamp, series4Timestamp.Add(1*time.Hour), blockRangePeriod)
require.Error(t, err)
assert.ErrorContains(t, err, "the query exceeded the maximum number of series")
}

func TestHashCollisionHandling(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
)

streamingBufferSize := querierCfg.StreamingChunksPerStoreGatewaySeriesBufferSize
if !querierCfg.PreferStreamingChunksFromStoreGateways {
streamingBufferSize = 0
}

return NewBlocksStoreQueryable(stores, finder, consistency, limits, querierCfg.QueryStoreAfter, streamingBufferSize, logger, reg)
}
Expand Down
Loading

0 comments on commit 1498076

Please sign in to comment.