diff --git a/CHANGELOG.md b/CHANGELOG.md index 7245f028937a1..6135d5bfdea01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ Check the history of the branch FIXME. * [6952](https://github.com/grafana/loki/pull/6952) **DylanGuedes**: Experimental: Introduce a new feature named stream sharding. ##### Fixes +* [7453](https://github.com/grafana/loki/pull/7453) **periklis**: Add single compactor http client for delete and gennumber clients * [7426](https://github.com/grafana/loki/pull/7426) **periklis**: Add missing compactor delete client tls client config * [7238](https://github.com/grafana/loki/pull/7328) **periklis**: Fix internal server bootstrap for query frontend * [7288](https://github.com/grafana/loki/pull/7288) **ssncferreira**: Fix query mapping in AST mapper `rangemapper` to support the new `VectorExpr` expression. diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 309b381038854..a667be14d7fd4 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -74,7 +74,7 @@ type Config struct { InternalServer internalserver.Config `yaml:"internal_server,omitempty"` Distributor distributor.Config `yaml:"distributor,omitempty"` Querier querier.Config `yaml:"querier,omitempty"` - DeleteClient deletion.Config `yaml:"delete_client,omitempty"` + CompactorClient compactor.ClientConfig `yaml:"delete_client,omitempty"` IngesterClient client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` StorageConfig storage.Config `yaml:"storage_config,omitempty"` @@ -115,7 +115,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Common.RegisterFlags(f) c.Distributor.RegisterFlags(f) c.Querier.RegisterFlags(f) - c.DeleteClient.RegisterFlags(f) + c.CompactorClient.RegisterFlags(f) c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) c.StorageConfig.RegisterFlags(f) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 3168e03075da3..d547cc131bb94 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -350,7 +350,7 @@ func (t *Loki) initQuerier() (services.Service, error) { toMerge := []middleware.Interface{ httpreq.ExtractQueryMetricsMiddleware(), } - if t.supportIndexDeleteRequest() { + if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled { toMerge = append( toMerge, queryrangebase.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader), @@ -660,7 +660,8 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) { t.Cfg.QueryRange, util_log.Logger, t.overrides, - t.Cfg.SchemaConfig, t.cacheGenerationLoader, + t.Cfg.SchemaConfig, + t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled, prometheus.DefaultRegisterer, ) if err != nil { @@ -679,7 +680,13 @@ func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) { if err != nil { return nil, err } - client, err = generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}) + + httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) + if err != nil { + return nil, err + } + + client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient) if err != nil { return nil, err } @@ -1112,7 +1119,7 @@ func (t *Loki) initUsageReport() (services.Service, error) { } func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overrides) (deletion.DeleteRequestsClient, error) { - if !t.supportIndexDeleteRequest() { + if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled { return deletion.NewNoOpDeleteRequestsStore(), nil } @@ -1121,7 +1128,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri return nil, err } - httpClient, err := deletion.NewDeleteHTTPClient(t.Cfg.DeleteClient) + httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient) if err != nil { return nil, err } diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 678a602fa6f97..28dd30aff9882 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -52,7 +52,7 @@ func Test_seriesLimiter(t *testing.T) { cfg.CacheResults = false // split in 7 with 2 in // max. l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour) - tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -237,7 +237,7 @@ func Test_MaxQueryLookBack(t *testing.T) { tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{ maxQueryLookback: 1 * time.Hour, maxQueryParallelism: 1, - }, config.SchemaConfig{}, nil, nil) + }, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache.go b/pkg/querier/queryrange/queryrangebase/results_cache.go index bd6c783f96462..24a0cfb7d0a1c 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache.go @@ -163,6 +163,7 @@ type resultsCache struct { merger Merger cacheGenNumberLoader CacheGenNumberLoader shouldCache ShouldCacheFn + retentionEnabled bool metrics *ResultsCacheMetrics } @@ -181,6 +182,7 @@ func NewResultsCacheMiddleware( extractor Extractor, cacheGenNumberLoader CacheGenNumberLoader, shouldCache ShouldCacheFn, + retentionEnabled bool, metrics *ResultsCacheMetrics, ) (Middleware, error) { if cacheGenNumberLoader != nil { @@ -199,6 +201,7 @@ func NewResultsCacheMiddleware( splitter: splitter, cacheGenNumberLoader: cacheGenNumberLoader, shouldCache: shouldCache, + retentionEnabled: retentionEnabled, metrics: metrics, } }), nil @@ -214,7 +217,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) { return s.next.Do(ctx, r) } - if s.cacheGenNumberLoader != nil { + if s.cacheGenNumberLoader != nil && s.retentionEnabled { ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs)) } diff --git a/pkg/querier/queryrange/queryrangebase/results_cache_test.go b/pkg/querier/queryrange/queryrangebase/results_cache_test.go index 8a4cf0f818510..e73989b2e3fd6 100644 --- a/pkg/querier/queryrange/queryrangebase/results_cache_test.go +++ b/pkg/querier/queryrange/queryrangebase/results_cache_test.go @@ -765,6 +765,7 @@ func TestResultsCache(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -807,6 +808,7 @@ func TestResultsCacheRecent(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -871,6 +873,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -910,6 +913,7 @@ func Test_resultsCache_MissingData(t *testing.T) { PrometheusResponseExtractor{}, nil, nil, + false, nil, ) require.NoError(t, err) @@ -1021,6 +1025,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { PrometheusResponseExtractor{}, nil, tc.shouldCache, + false, nil, ) require.NoError(t, err) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 7982db1e2c0cb..bb83bdd4c1b3b 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -45,6 +45,7 @@ func NewTripperware( limits Limits, schema config.SchemaConfig, cacheGenNumLoader queryrangebase.CacheGenNumberLoader, + retentionEnabled bool, registerer prometheus.Registerer, ) (queryrangebase.Tripperware, Stopper, error) { metrics := NewMetrics(registerer) @@ -65,7 +66,7 @@ func NewTripperware( } metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c, - cacheGenNumLoader, PrometheusExtractor{}, metrics, registerer) + cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, registerer) if err != nil { return nil, nil, err } @@ -395,6 +396,7 @@ func NewMetricTripperware( codec queryrangebase.Codec, c cache.Cache, cacheGenNumLoader queryrangebase.CacheGenNumberLoader, + retentionEnabled bool, extractor queryrangebase.Extractor, metrics *Metrics, registerer prometheus.Registerer, @@ -427,6 +429,7 @@ func NewMetricTripperware( func(r queryrangebase.Request) bool { return !r.GetCachingOptions().Disabled }, + retentionEnabled, metrics.ResultsCacheMetrics, ) if err != nil { diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 7aa6acab9b84f..2a17dc1de2d45 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -110,7 +110,7 @@ var ( // those tests are mostly for testing the glue between all component and make sure they activate correctly. func TestMetricsTripperware(t *testing.T) { l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour) - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -173,7 +173,7 @@ func TestMetricsTripperware(t *testing.T) { } func TestLogFilterTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -222,7 +222,7 @@ func TestLogFilterTripperware(t *testing.T) { func TestInstantQueryTripperware(t *testing.T) { testShardingConfig := testConfig testShardingConfig.ShardedQueries = true - tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -258,7 +258,7 @@ func TestInstantQueryTripperware(t *testing.T) { } func TestSeriesTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -299,7 +299,7 @@ func TestSeriesTripperware(t *testing.T) { } func TestLabelsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -345,7 +345,7 @@ func TestLabelsTripperware(t *testing.T) { } func TestLogNoFilter(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -381,7 +381,7 @@ func TestLogNoFilter(t *testing.T) { func TestRegexpParamsSupport(t *testing.T) { l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour) - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } @@ -464,7 +464,7 @@ func TestPostQueries(t *testing.T) { } func TestEntriesLimitsTripperware(t *testing.T) { - tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, nil) + tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, false, nil) if stopper != nil { defer stopper.Stop() } diff --git a/pkg/storage/stores/indexshipper/compactor/compactor_client.go b/pkg/storage/stores/indexshipper/compactor/compactor_client.go new file mode 100644 index 0000000000000..e37cdcab5e446 --- /dev/null +++ b/pkg/storage/stores/indexshipper/compactor/compactor_client.go @@ -0,0 +1,42 @@ +package compactor + +import ( + "flag" + "net/http" + "time" + + "github.com/grafana/dskit/crypto/tls" +) + +// Config for compactor's delete and generation-number client +type ClientConfig struct { + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) { + prefix := "boltdb.shipper.compactor.delete_client" + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, + "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) +} + +// NewDeleteHTTPClient return a pointer to a http client instance based on the +// delete client tls settings. +func NewCompactorHTTPClient(cfg ClientConfig) (*http.Client, error) { + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConns = 250 + transport.MaxIdleConnsPerHost = 250 + + if cfg.TLSEnabled { + tlsCfg, err := cfg.TLS.GetTLSConfig() + if err != nil { + return nil, err + } + + transport.TLSClientConfig = tlsCfg + } + + return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil +} diff --git a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go index 91c9d30d49eb8..3e8639e50c4c7 100644 --- a/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go +++ b/pkg/storage/stores/indexshipper/compactor/deletion/delete_requests_client.go @@ -3,7 +3,6 @@ package deletion import ( "context" "encoding/json" - "flag" "fmt" "io" "net/http" @@ -14,8 +13,6 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/dskit/crypto/tls" - "github.com/grafana/loki/pkg/util/log" ) @@ -24,20 +21,6 @@ const ( getDeletePath = "/loki/api/v1/delete" ) -// Config for compactor's delete client -type Config struct { - TLSEnabled bool `yaml:"tls_enabled"` - TLS tls.ClientConfig `yaml:",inline"` -} - -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - prefix := "boltdb.shipper.compactor.delete_client" - f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false, - "Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.") - cfg.TLS.RegisterFlagsWithPrefix(prefix, f) -} - type DeleteRequestsClient interface { GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) Stop() @@ -69,25 +52,6 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption { } } -// NewDeleteHTTPClient return a pointer to a http client instance based on the -// delete client tls settings. -func NewDeleteHTTPClient(cfg Config) (*http.Client, error) { - transport := http.DefaultTransport.(*http.Transport).Clone() - transport.MaxIdleConns = 250 - transport.MaxIdleConnsPerHost = 250 - - if cfg.TLSEnabled { - tlsCfg, err := cfg.TLS.GetTLSConfig() - if err != nil { - return nil, err - } - - transport.TLSClientConfig = tlsCfg - } - - return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil -} - func NewDeleteRequestsClient(addr string, c httpClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) { u, err := url.Parse(addr) if err != nil {