Skip to content

Commit

Permalink
Add single compactor http client for delete and gennumber clients (gr…
Browse files Browse the repository at this point in the history
  • Loading branch information
periklis authored and lxwzy committed Nov 7, 2022
1 parent 74d997d commit b91f15f
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [7270](https://github.com/grafana/loki/pull/7270) **wilfriedroset**: Add support for `username` to redis cache configuration.

##### Fixes
* [7453](https://github.com/grafana/loki/pull/7453) **periklis**: Add single compactor http client for delete and gennumber clients
* [7426](https://github.com/grafana/loki/pull/7426) **periklis**: Add missing compactor delete client tls client config
* [7238](https://github.com/grafana/loki/pull/7328) **periklis**: Fix internal server bootstrap for query frontend
* [7288](https://github.com/grafana/loki/pull/7288) **ssncferreira**: Fix query mapping in AST mapper `rangemapper` to support the new `VectorExpr` expression.
Expand Down
4 changes: 2 additions & 2 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Config struct {
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Distributor distributor.Config `yaml:"distributor,omitempty"`
Querier querier.Config `yaml:"querier,omitempty"`
DeleteClient deletion.Config `yaml:"delete_client,omitempty"`
CompactorClient compactor.ClientConfig `yaml:"compactor_client,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
Expand Down Expand Up @@ -115,7 +115,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Common.RegisterFlags(f)
c.Distributor.RegisterFlags(f)
c.Querier.RegisterFlags(f)
c.DeleteClient.RegisterFlags(f)
c.CompactorClient.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f)
c.StorageConfig.RegisterFlags(f)
Expand Down
17 changes: 12 additions & 5 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
toMerge := []middleware.Interface{
httpreq.ExtractQueryMetricsMiddleware(),
}
if t.supportIndexDeleteRequest() {
if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled {
toMerge = append(
toMerge,
queryrangebase.CacheGenNumberHeaderSetterMiddleware(t.cacheGenerationLoader),
Expand Down Expand Up @@ -660,7 +660,8 @@ func (t *Loki) initQueryFrontendTripperware() (_ services.Service, err error) {
t.Cfg.QueryRange,
util_log.Logger,
t.overrides,
t.Cfg.SchemaConfig, t.cacheGenerationLoader,
t.Cfg.SchemaConfig,
t.cacheGenerationLoader, t.Cfg.CompactorConfig.RetentionEnabled,
prometheus.DefaultRegisterer,
)
if err != nil {
Expand All @@ -679,7 +680,13 @@ func (t *Loki) initCacheGenerationLoader() (_ services.Service, err error) {
if err != nil {
return nil, err
}
client, err = generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})

httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
}

client, err = generationnumber.NewGenNumberClient(compactorAddress, httpClient)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1112,7 +1119,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
}

func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overrides) (deletion.DeleteRequestsClient, error) {
if !t.supportIndexDeleteRequest() {
if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}

Expand All @@ -1121,7 +1128,7 @@ func (t *Loki) deleteRequestsClient(clientType string, limits *validation.Overri
return nil, err
}

httpClient, err := deletion.NewDeleteHTTPClient(t.Cfg.DeleteClient)
httpClient, err := compactor.NewCompactorHTTPClient(t.Cfg.CompactorClient)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_seriesLimiter(t *testing.T) {
cfg.CacheResults = false
// split in 7 with 2 in // max.
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, time.Hour)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(cfg, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -237,7 +237,7 @@ func Test_MaxQueryLookBack(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{
maxQueryLookback: 1 * time.Hour,
maxQueryParallelism: 1,
}, config.SchemaConfig{}, nil, nil)
}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/queryrange/queryrangebase/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type resultsCache struct {
merger Merger
cacheGenNumberLoader CacheGenNumberLoader
shouldCache ShouldCacheFn
retentionEnabled bool
metrics *ResultsCacheMetrics
}

Expand All @@ -181,6 +182,7 @@ func NewResultsCacheMiddleware(
extractor Extractor,
cacheGenNumberLoader CacheGenNumberLoader,
shouldCache ShouldCacheFn,
retentionEnabled bool,
metrics *ResultsCacheMetrics,
) (Middleware, error) {
if cacheGenNumberLoader != nil {
Expand All @@ -199,6 +201,7 @@ func NewResultsCacheMiddleware(
splitter: splitter,
cacheGenNumberLoader: cacheGenNumberLoader,
shouldCache: shouldCache,
retentionEnabled: retentionEnabled,
metrics: metrics,
}
}), nil
Expand All @@ -214,7 +217,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
return s.next.Do(ctx, r)
}

if s.cacheGenNumberLoader != nil {
if s.cacheGenNumberLoader != nil && s.retentionEnabled {
ctx = cache.InjectCacheGenNumber(ctx, s.cacheGenNumberLoader.GetResultsCacheGenNumber(tenantIDs))
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/querier/queryrange/queryrangebase/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ func TestResultsCache(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -807,6 +808,7 @@ func TestResultsCacheRecent(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -871,6 +873,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -910,6 +913,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
PrometheusResponseExtractor{},
nil,
nil,
false,
nil,
)
require.NoError(t, err)
Expand Down Expand Up @@ -1021,6 +1025,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
PrometheusResponseExtractor{},
nil,
tc.shouldCache,
false,
nil,
)
require.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func NewTripperware(
limits Limits,
schema config.SchemaConfig,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
retentionEnabled bool,
registerer prometheus.Registerer,
) (queryrangebase.Tripperware, Stopper, error) {
metrics := NewMetrics(registerer)
Expand All @@ -65,7 +66,7 @@ func NewTripperware(
}

metricsTripperware, err := NewMetricTripperware(cfg, log, limits, schema, LokiCodec, c,
cacheGenNumLoader, PrometheusExtractor{}, metrics, registerer)
cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, registerer)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -395,6 +396,7 @@ func NewMetricTripperware(
codec queryrangebase.Codec,
c cache.Cache,
cacheGenNumLoader queryrangebase.CacheGenNumberLoader,
retentionEnabled bool,
extractor queryrangebase.Extractor,
metrics *Metrics,
registerer prometheus.Registerer,
Expand Down Expand Up @@ -427,6 +429,7 @@ func NewMetricTripperware(
func(r queryrangebase.Request) bool {
return !r.GetCachingOptions().Disabled
},
retentionEnabled,
metrics.ResultsCacheMetrics,
)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var (
// those tests are mostly for testing the glue between all component and make sure they activate correctly.
func TestMetricsTripperware(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: math.MaxInt32, maxQueryParallelism: 1}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestMetricsTripperware(t *testing.T) {
}

func TestLogFilterTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestLogFilterTripperware(t *testing.T) {
func TestInstantQueryTripperware(t *testing.T) {
testShardingConfig := testConfig
testShardingConfig.ShardedQueries = true
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testShardingConfig, util_log.Logger, fakeLimits{maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestInstantQueryTripperware(t *testing.T) {
}

func TestSeriesTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestSeriesTripperware(t *testing.T) {
}

func TestLabelsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxQueryLength: 48 * time.Hour, maxQueryParallelism: 1}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestLabelsTripperware(t *testing.T) {
}

func TestLogNoFilter(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestLogNoFilter(t *testing.T) {

func TestRegexpParamsSupport(t *testing.T) {
l := WithSplitByLimits(fakeLimits{maxSeries: 1, maxQueryParallelism: 2}, 4*time.Hour)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, l, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down Expand Up @@ -464,7 +464,7 @@ func TestPostQueries(t *testing.T) {
}

func TestEntriesLimitsTripperware(t *testing.T) {
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, nil)
tpw, stopper, err := NewTripperware(testConfig, util_log.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, config.SchemaConfig{}, nil, false, nil)
if stopper != nil {
defer stopper.Stop()
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/storage/stores/indexshipper/compactor/compactor_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package compactor

import (
"flag"
"net/http"
"time"

"github.com/grafana/dskit/crypto/tls"
)

// Config for compactor's generation-number client
type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *ClientConfig) RegisterFlags(f *flag.FlagSet) {
prefix := "boltdb.shipper.compactor.client"
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false,
"Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}

// NewDeleteHTTPClient return a pointer to a http client instance based on the
// delete client tls settings.
func NewCompactorHTTPClient(cfg ClientConfig) (*http.Client, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 250
transport.MaxIdleConnsPerHost = 250

if cfg.TLSEnabled {
tlsCfg, err := cfg.TLS.GetTLSConfig()
if err != nil {
return nil, err
}

transport.TLSClientConfig = tlsCfg
}

return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package deletion
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
Expand All @@ -14,8 +13,6 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/dskit/crypto/tls"

"github.com/grafana/loki/pkg/util/log"
)

Expand All @@ -24,20 +21,6 @@ const (
getDeletePath = "/loki/api/v1/delete"
)

// Config for compactor's delete client
type Config struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
prefix := "boltdb.shipper.compactor.delete_client"
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", false,
"Enable TLS in the HTTP client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to HTTP server will be used.")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}

type DeleteRequestsClient interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
Stop()
Expand Down Expand Up @@ -69,25 +52,6 @@ func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption {
}
}

// NewDeleteHTTPClient return a pointer to a http client instance based on the
// delete client tls settings.
func NewDeleteHTTPClient(cfg Config) (*http.Client, error) {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConns = 250
transport.MaxIdleConnsPerHost = 250

if cfg.TLSEnabled {
tlsCfg, err := cfg.TLS.GetTLSConfig()
if err != nil {
return nil, err
}

transport.TLSClientConfig = tlsCfg
}

return &http.Client{Timeout: 5 * time.Second, Transport: transport}, nil
}

func NewDeleteRequestsClient(addr string, c httpClient, deleteClientMetrics *DeleteRequestClientMetrics, clientType string, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) {
u, err := url.Parse(addr)
if err != nil {
Expand Down

0 comments on commit b91f15f

Please sign in to comment.