From 1f529c670c3d8f589a38581fb93af228dcc0d94c Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Tue, 16 Jun 2020 15:24:54 +0200 Subject: [PATCH 1/5] Bump cortex deps to include non-experimental memberlist service --- go.mod | 2 +- go.sum | 4 +- .../cortex/pkg/chunk/chunk_store.go | 12 +- .../cortex/pkg/chunk/composite_store.go | 5 + .../cortex/pkg/chunk/purger/purger.go | 87 +++-- .../cortex/pkg/chunk/series_store.go | 5 + .../cortexproject/cortex/pkg/cortex/cortex.go | 2 +- .../cortex/pkg/cortex/modules.go | 8 +- .../cortex/pkg/ingester/client/compat.go | 31 +- .../cortex/pkg/querier/frontend/frontend.go | 47 +-- .../pkg/querier/queryrange/query_range.go | 2 + .../cortex/pkg/ring/kv/client.go | 2 +- .../pkg/ring/kv/memberlist/kv_init_service.go | 63 ++++ .../cortex/pkg/ring/kv/memberlist/kv_state.go | 38 --- .../ring/kv/memberlist/memberlist_client.go | 300 ++++++++++++++---- .../cortex/pkg/ring/kv/memberlist/metrics.go | 19 +- .../cortex/tools/querytee/proxy.go | 10 +- .../cortex/tools/querytee/proxy_endpoint.go | 141 ++++---- .../cortex/tools/querytee/proxy_metrics.go | 18 +- vendor/modules.txt | 2 +- 20 files changed, 562 insertions(+), 236 deletions(-) create mode 100644 vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_init_service.go delete mode 100644 vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_state.go diff --git a/go.mod b/go.mod index fe0517a91bb2d..fd776fd648ebb 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/containerd/containerd v1.3.2 // indirect github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf - github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a + github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3 github.com/davecgh/go-spew v1.1.1 github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/docker v0.7.3-0.20190817195342-4760db040282 diff --git a/go.sum b/go.sum index 40a98feab99ba..350dd0c2920a9 100644 --- a/go.sum +++ b/go.sum @@ -233,8 +233,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cortexproject/cortex v0.6.1-0.20200228110116-92ab6cbe0995/go.mod h1:3Xa3DjJxtpXqxcMGdk850lcIRb81M0fyY1MQ6udY134= -github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a h1:7K5ZOo5c+G+nyhG0CNeL6g5LnyjTUMykA8ceC2YgTYs= -github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a/go.mod h1:gySKqWpKaHBAFML0dOQyglOhxlJ/Eyfslf4QNBJjPoY= +github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3 h1:PebLohuxr0MSgWYAc/iUCFpxIdZyISk7jKUuQKTRbRw= +github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3/go.mod h1:gySKqWpKaHBAFML0dOQyglOhxlJ/Eyfslf4QNBJjPoY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8= diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go index aa556b5d07710..bd69eedeea384 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go @@ -505,6 +505,11 @@ func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQ log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries") defer log.Span.Finish() + // Nothing to do if there are no queries. + if len(queries) == 0 { + return nil, nil + } + var lock sync.Mutex var entries []IndexEntry err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { @@ -527,7 +532,12 @@ func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQ return entries, err } -func (c *baseStore) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { +func (c *baseStore) parseIndexEntries(_ context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { + // Nothing to do if there are no entries. + if len(entries) == 0 { + return nil, nil + } + result := make([]string, 0, len(entries)) for _, entry := range entries { chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go index 659ef65edf6e0..46e055fcb1308 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go @@ -162,6 +162,11 @@ func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, t return err } + // Skip it if there are no chunks. + if len(ids) == 0 { + return nil + } + chunkIDs = append(chunkIDs, ids...) fetchers = append(fetchers, fetcher...) return nil diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go index a5e62ced6716c..8b463ef89ad2e 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go @@ -102,6 +102,13 @@ type DataPurger struct { inProcessRequestIDs map[string]string inProcessRequestIDsMtx sync.RWMutex + // We do not want to limit pulling new delete requests to a fixed interval which otherwise would limit number of delete requests we process per user. + // While loading delete requests if we find more requests from user pending to be processed, we just set their id in usersWithPendingRequests and + // when a user's delete request gets processed we just check this map to see whether we want to load more requests without waiting for next ticker to load new batch. + usersWithPendingRequests map[string]struct{} + usersWithPendingRequestsMtx sync.Mutex + pullNewRequestsChan chan struct{} + pendingPlansCount map[string]int // per request pending plan count pendingPlansCountMtx sync.Mutex @@ -113,31 +120,23 @@ func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, util.WarnExperimentalUse("Delete series API") dataPurger := DataPurger{ - cfg: cfg, - deleteStore: deleteStore, - chunkStore: chunkStore, - objectClient: storageClient, - metrics: newPurgerMetrics(registerer), - executePlansChan: make(chan deleteRequestWithLogger, 50), - workerJobChan: make(chan workerJob, 50), - inProcessRequestIDs: map[string]string{}, - pendingPlansCount: map[string]int{}, + cfg: cfg, + deleteStore: deleteStore, + chunkStore: chunkStore, + objectClient: storageClient, + metrics: newPurgerMetrics(registerer), + pullNewRequestsChan: make(chan struct{}, 1), + executePlansChan: make(chan deleteRequestWithLogger, 50), + workerJobChan: make(chan workerJob, 50), + inProcessRequestIDs: map[string]string{}, + usersWithPendingRequests: map[string]struct{}{}, + pendingPlansCount: map[string]int{}, } - dataPurger.Service = services.NewTimerService(time.Hour, dataPurger.init, dataPurger.runOneIteration, dataPurger.stop) + dataPurger.Service = services.NewBasicService(dataPurger.init, dataPurger.loop, dataPurger.stop) return &dataPurger, nil } -// Run keeps pulling delete requests for planning after initializing necessary things -func (dp *DataPurger) runOneIteration(ctx context.Context) error { - err := dp.pullDeleteRequestsToPlanDeletes() - if err != nil { - level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err) - } - // Don't return error here, or Timer service will stop. - return nil -} - // init starts workers, scheduler and then loads in process delete requests func (dp *DataPurger) init(ctx context.Context) error { for i := 0; i < dp.cfg.NumWorkers; i++ { @@ -151,6 +150,29 @@ func (dp *DataPurger) init(ctx context.Context) error { return dp.loadInprocessDeleteRequests() } +func (dp *DataPurger) loop(ctx context.Context) error { + loadRequestsTicker := time.NewTicker(time.Hour) + defer loadRequestsTicker.Stop() + + loadRequests := func() { + err := dp.pullDeleteRequestsToPlanDeletes() + if err != nil { + level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err) + } + } + + for { + select { + case <-loadRequestsTicker.C: + loadRequests() + case <-dp.pullNewRequestsChan: + loadRequests() + case <-ctx.Done(): + return nil + } + } +} + // Stop waits until all background tasks stop. func (dp *DataPurger) stop(_ error) error { dp.wg.Wait() @@ -183,6 +205,21 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) { dp.inProcessRequestIDsMtx.Lock() delete(dp.inProcessRequestIDs, job.userID) dp.inProcessRequestIDsMtx.Unlock() + + // request loading of more delete request if + // - user has more pending requests and + // - we do not have a pending request to load more requests + dp.usersWithPendingRequestsMtx.Lock() + defer dp.usersWithPendingRequestsMtx.Unlock() + if _, ok := dp.usersWithPendingRequests[job.userID]; ok { + delete(dp.usersWithPendingRequests, job.userID) + select { + case dp.pullNewRequestsChan <- struct{}{}: + // sent + default: + // already sent + } + } } else { dp.pendingPlansCountMtx.Unlock() } @@ -345,12 +382,16 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { } dp.inProcessRequestIDsMtx.RLock() - inprocessDeleteRequstID := dp.inProcessRequestIDs[deleteRequest.UserID] + inprocessDeleteRequestID := dp.inProcessRequestIDs[deleteRequest.UserID] dp.inProcessRequestIDsMtx.RUnlock() - if inprocessDeleteRequstID != "" { + if inprocessDeleteRequestID != "" { + dp.usersWithPendingRequestsMtx.Lock() + dp.usersWithPendingRequests[deleteRequest.UserID] = struct{}{} + dp.usersWithPendingRequestsMtx.Unlock() + level.Debug(util.Logger).Log("msg", "skipping delete request processing for now since another request from same user is already in process", - "inprocess_request_id", inprocessDeleteRequstID, + "inprocess_request_id", inprocessDeleteRequestID, "skipped_request_id", deleteRequest.RequestID, "user_id", deleteRequest.UserID) continue } diff --git a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go index 74f3f67b57ff6..4928ca9b96455 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go +++ b/vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go @@ -182,6 +182,11 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, userID string, from, thr level.Debug(log).Log("chunks-post-filtering", len(chunks)) chunksPerQuery.Observe(float64(len(chunks))) + // We should return an empty chunks slice if there are no chunks. + if len(chunks) == 0 { + return [][]Chunk{}, []*Fetcher{}, nil + } + return [][]Chunk{chunks}, []*Fetcher{c.baseStore.Fetcher}, nil } diff --git a/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go b/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go index cb3e9185e422c..d1389cc8a13f7 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go +++ b/vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go @@ -216,7 +216,7 @@ type Cortex struct { Alertmanager *alertmanager.MultitenantAlertmanager Compactor *compactor.Compactor StoreGateway *storegateway.StoreGateway - MemberlistKV *memberlist.KVInit + MemberlistKV *memberlist.KVInitService // Queryable that the querier should use to query the long // term storage. It depends on the storage engine used. diff --git a/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go b/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go index 99e2ec3134828..eac5c04b0c289 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go +++ b/vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go @@ -475,12 +475,8 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), } - t.MemberlistKV = memberlist.NewKVInit(&t.Cfg.MemberlistKV) - - return services.NewIdleService(nil, func(_ error) error { - t.MemberlistKV.Stop() - return nil - }), nil + t.MemberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV) + return t.MemberlistKV, nil } func (t *Cortex) initDataPurger() (services.Service, error) { diff --git a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go index 6114bf04e6418..9f966903b7914 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go @@ -229,19 +229,34 @@ func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels { // get in input labels whose data structure is reused. func FromLabelAdaptersToLabelsWithCopy(input []LabelAdapter) labels.Labels { result := make(labels.Labels, len(input)) + + size := 0 + for _, l := range input { + size += len(l.Name) + size += len(l.Value) + } + + // Copy all strings into the buffer, and use 'yoloString' to convert buffer + // slices to strings. + buf := make([]byte, size) + for i, l := range input { - result[i] = labels.Label{ - Name: copyString(l.Name), - Value: copyString(l.Value), - } + result[i].Name, buf = copyStringToBuffer(l.Name, buf) + result[i].Value, buf = copyStringToBuffer(l.Value, buf) } return result } -func copyString(in string) string { - out := make([]byte, len(in)) - copy(out, in) - return string(out) +// Copies string to buffer (which must be big enough), and converts buffer slice containing +// the string copy into new string. +func copyStringToBuffer(in string, buf []byte) (string, []byte) { + l := len(in) + c := copy(buf, in) + if c != l { + panic("not copied full string") + } + + return yoloString(buf[0:l]), buf[l:] } // FromLabelsToLabelAdapters casts labels.Labels to []LabelAdapter. diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go b/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go index 9bd092513c2bf..f2fba3c8191a5 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go @@ -51,7 +51,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.") f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.") f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.") - f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. 0 to disable.") + f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.") } // Frontend queues HTTP requests, dispatches them to backends, and handles retries @@ -164,34 +164,41 @@ func (f *Frontend) handle(w http.ResponseWriter, r *http.Request) { resp, err := f.roundTripper.RoundTrip(r) queryResponseTime := time.Since(startTime) - if f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan { + if err != nil { + writeError(w, err) + } else { + hs := w.Header() + for h, vs := range resp.Header { + hs[h] = vs + } + w.WriteHeader(resp.StatusCode) + io.Copy(w, resp.Body) + } + + // If LogQueriesLongerThan is set to <0 we log every query, if it is set to 0 query logging + // is disabled + if f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan { logMessage := []interface{}{ - "msg", "slow query", + "msg", "slow query detected", + "method", r.Method, "host", r.Host, "path", r.URL.Path, "time_taken", queryResponseTime.String(), } - for k, v := range r.URL.Query() { - logMessage = append(logMessage, fmt.Sprintf("qs_%s", k), strings.Join(v, ",")) - } - pf := r.PostForm.Encode() - if pf != "" { - logMessage = append(logMessage, "body", pf) + + // Ensure the form has been parsed so all the parameters are present + err = r.ParseForm() + if err != nil { + level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse form for request", "error", err) } - level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...) - } - if err != nil { - writeError(w, err) - return - } + // Attempt to iterate through the Form to log any filled in values + for k, v := range r.Form { + logMessage = append(logMessage, fmt.Sprintf("param_%s", k), strings.Join(v, ",")) + } - hs := w.Header() - for h, vs := range resp.Header { - hs[h] = vs + level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...) } - w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) } func writeError(w http.ResponseWriter, err error) { diff --git a/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/query_range.go b/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/query_range.go index f50866bf32b11..04d14498928f0 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/query_range.go +++ b/vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/query_range.go @@ -266,6 +266,8 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http. return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format") } + sp.LogFields(otlog.Int("series", len(a.Data.Result))) + b, err := json.Marshal(a) if err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err) diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/client.go b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/client.go index 2c334e7d99728..5aecb8c98e0a7 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/client.go @@ -61,7 +61,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f flagsPrefix = "ring." } f.StringVar(&cfg.Prefix, flagsPrefix+"prefix", defaultPrefix, "The prefix for the keys in the store. Should end with a /.") - f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, multi, memberlist (experimental).") + f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.") } // Client is a high-level client for key-value stores (such as Etcd and diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_init_service.go b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_init_service.go new file mode 100644 index 0000000000000..65caeb6750c01 --- /dev/null +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_init_service.go @@ -0,0 +1,63 @@ +package memberlist + +import ( + "context" + "sync" + + "github.com/cortexproject/cortex/pkg/util/services" +) + +// This service initialized memberlist.KV on first call to GetMemberlistKV, and starts it. On stop, +// KV is stopped too. If KV fails, error is reported from the service. +type KVInitService struct { + services.Service + + // config used for initialization + cfg *KVConfig + + // init function, to avoid multiple initializations. + init sync.Once + + // state + kv *KV + err error + watcher *services.FailureWatcher +} + +func NewKVInitService(cfg *KVConfig) *KVInitService { + kvinit := &KVInitService{ + cfg: cfg, + watcher: services.NewFailureWatcher(), + } + kvinit.Service = services.NewBasicService(nil, kvinit.running, kvinit.stopping) + return kvinit +} + +// This method will initialize Memberlist.KV on first call, and add it to service failure watcher. +func (kvs *KVInitService) GetMemberlistKV() (*KV, error) { + kvs.init.Do(func() { + kvs.kv = NewKV(*kvs.cfg) + kvs.watcher.WatchService(kvs.kv) + kvs.err = kvs.kv.StartAsync(context.Background()) + }) + + return kvs.kv, kvs.err +} + +func (kvs *KVInitService) running(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case err := <-kvs.watcher.Chan(): + // Only happens if KV service was actually initialized in GetMemberlistKV and it fails. + return err + } +} + +func (kvs *KVInitService) stopping(_ error) error { + if kvs.kv == nil { + return nil + } + + return services.StopAndAwaitTerminated(context.Background(), kvs.kv) +} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_state.go b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_state.go deleted file mode 100644 index b75cc208c4f1f..0000000000000 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_state.go +++ /dev/null @@ -1,38 +0,0 @@ -package memberlist - -import ( - "sync" -) - -// This struct holds state of initialization of memberlist.KV instance. -type KVInit struct { - // config used for initialization - cfg *KVConfig - - // init function, to avoid multiple initializations. - init sync.Once - - // state - kv *KV - err error -} - -func NewKVInit(cfg *KVConfig) *KVInit { - return &KVInit{ - cfg: cfg, - } -} - -func (kvs *KVInit) GetMemberlistKV() (*KV, error) { - kvs.init.Do(func() { - kvs.kv, kvs.err = NewKV(*kvs.cfg) - }) - - return kvs.kv, kvs.err -} - -func (kvs *KVInit) Stop() { - if kvs.kv != nil { - kvs.kv.Stop() - } -} diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go index 66e0f9b2ce7c0..cce9dd6659da2 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go @@ -3,6 +3,7 @@ package memberlist import ( "bytes" "context" + "crypto/rand" "encoding/binary" "errors" "flag" @@ -15,11 +16,11 @@ import ( "github.com/go-kit/kit/log/level" "github.com/hashicorp/memberlist" "github.com/prometheus/client_golang/prometheus" - "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/services" ) const ( @@ -48,11 +49,21 @@ func NewClient(kv *KV, codec codec.Codec) (*Client, error) { // List is part of kv.Client interface. func (c *Client) List(ctx context.Context, prefix string) ([]string, error) { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return nil, err + } + return c.kv.List(prefix), nil } // Get is part of kv.Client interface. func (c *Client) Get(ctx context.Context, key string) (interface{}, error) { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return nil, err + } + return c.kv.Get(key, c.codec) } @@ -63,24 +74,57 @@ func (c *Client) Delete(ctx context.Context, key string) error { // CAS is part of kv.Client interface func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return err + } + return c.kv.CAS(ctx, key, c.codec, f) } // WatchKey is part of kv.Client interface. func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return + } + c.kv.WatchKey(ctx, key, c.codec, f) } // WatchPrefix calls f whenever any value stored under prefix changes. // Part of kv.Client interface. func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { + err := c.awaitKVRunningOrStopping(ctx) + if err != nil { + return + } + c.kv.WatchPrefix(ctx, prefix, c.codec, f) } +// We want to use KV in Running and Stopping states. +func (c *Client) awaitKVRunningOrStopping(ctx context.Context) error { + s := c.kv.State() + switch s { + case services.Running, services.Stopping: + return nil + case services.New, services.Starting: + err := c.kv.AwaitRunning(ctx) + if ns := c.kv.State(); ns == services.Stopping { + return nil + } + return err + default: + return fmt.Errorf("unexpected state: %v", s) + } +} + // KVConfig is a config for memberlist.KV type KVConfig struct { // Memberlist options. NodeName string `yaml:"node_name"` + RandomizeNodeName bool `yaml:"randomize_node_name"` StreamTimeout time.Duration `yaml:"stream_timeout"` RetransmitMult int `yaml:"retransmit_factor"` PushPullInterval time.Duration `yaml:"pull_push_interval"` @@ -91,7 +135,11 @@ type KVConfig struct { // List of members to join JoinMembers flagext.StringSlice `yaml:"join_members"` + MinJoinBackoff time.Duration `yaml:"min_join_backoff"` + MaxJoinBackoff time.Duration `yaml:"max_join_backoff"` + MaxJoinRetries int `yaml:"max_join_retries"` AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"` + RejoinInterval time.Duration `yaml:"rejoin_interval"` // Remove LEFT ingesters from ring after this timeout. LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"` @@ -113,10 +161,15 @@ type KVConfig struct { func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) { // "Defaults to hostname" -- memberlist sets it to hostname by default. f.StringVar(&cfg.NodeName, prefix+"memberlist.nodename", "", "Name of the node in memberlist cluster. Defaults to hostname.") // memberlist.DefaultLANConfig will put hostname here. + f.BoolVar(&cfg.RandomizeNodeName, prefix+"memberlist.randomize-node-name", true, "Add random suffix to the node name.") f.DurationVar(&cfg.StreamTimeout, prefix+"memberlist.stream-timeout", 0, "The timeout for establishing a connection with a remote node, and for read/write operations. Uses memberlist LAN defaults if 0.") f.IntVar(&cfg.RetransmitMult, prefix+"memberlist.retransmit-factor", 0, "Multiplication factor used when sending out messages (factor * log(N+1)).") - f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times. Memberlist store is EXPERIMENTAL.") + f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times.") + f.DurationVar(&cfg.MinJoinBackoff, prefix+"memberlist.min-join-backoff", 1*time.Second, "Min backoff duration to join other cluster members.") + f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.") + f.IntVar(&cfg.MaxJoinRetries, prefix+"memberlist.max-join-retries", 10, "Max number of retries to join other cluster members.") f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", true, "If this node fails to join memberlist cluster, abort.") + f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.") f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.") f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 5*time.Second, "Timeout for leaving memberlist cluster.") f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", 0, "How often to gossip. Uses memberlist LAN defaults if 0.") @@ -128,16 +181,31 @@ func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) { cfg.TCPTransport.RegisterFlags(f, prefix) } +func generateRandomSuffix() string { + suffix := make([]byte, 4) + _, err := rand.Read(suffix) + if err != nil { + level.Error(util.Logger).Log("msg", "failed to generate random suffix", "err", err) + return "error" + } + return fmt.Sprintf("%2x", suffix) +} + // KV implements Key-Value store on top of memberlist library. KV store has API similar to kv.Client, // except methods also need explicit codec for each operation. +// KV is a Service. It needs to be started first, and is only usable once it enters Running state. +// If joining of the cluster if configured, it is done in Running state, and if join fails and Abort flag is set, service +// fails. type KV struct { - cfg KVConfig + services.Service + + cfg KVConfig + + // Protects access to memberlist and broadcasts fields. + initWG sync.WaitGroup memberlist *memberlist.Memberlist broadcasts *memberlist.TransmitLimitedQueue - // Disabled on Stop() - casBroadcastsEnabled *atomic.Bool - // KV Store. storeMu sync.Mutex store map[string]valueDesc @@ -199,46 +267,70 @@ var ( errTooManyRetries = errors.New("too many retries") ) -// NewKV creates new Client instance. If cfg.JoinMembers is set, it will also try to connect -// to these members and join the cluster. If that fails and AbortIfJoinFails is true, error is returned and no -// client is created. -func NewKV(cfg KVConfig) (*KV, error) { - util.WarnExperimentalUse("Gossip memberlist ring") - +// NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize +// gossiping part. Only after service is in Running state, it is really gossiping. +// Starting the service will also trigger connecting to the existing memberlist cluster, if cfg.JoinMembers is set. +// If that fails and AbortIfJoinFails is true, error is returned and service enters Failed state. +func NewKV(cfg KVConfig) *KV { cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace - tr, err := NewTCPTransport(cfg.TCPTransport) + mlkv := &KV{ + cfg: cfg, + store: make(map[string]valueDesc), + codecs: make(map[string]codec.Codec), + watchers: make(map[string][]chan string), + prefixWatchers: make(map[string][]chan string), + shutdown: make(chan struct{}), + maxCasRetries: maxCasRetries, + } + + mlkv.createAndRegisterMetrics() + + for _, c := range cfg.Codecs { + mlkv.codecs[c.CodecID()] = c + } + mlkv.Service = services.NewBasicService(mlkv.starting, mlkv.running, mlkv.stopping) + return mlkv +} + +func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { + tr, err := NewTCPTransport(m.cfg.TCPTransport) if err != nil { return nil, fmt.Errorf("failed to create transport: %v", err) } mlCfg := memberlist.DefaultLANConfig() + mlCfg.Delegate = m - if cfg.StreamTimeout != 0 { - mlCfg.TCPTimeout = cfg.StreamTimeout + if m.cfg.StreamTimeout != 0 { + mlCfg.TCPTimeout = m.cfg.StreamTimeout + } + if m.cfg.RetransmitMult != 0 { + mlCfg.RetransmitMult = m.cfg.RetransmitMult } - if cfg.RetransmitMult != 0 { - mlCfg.RetransmitMult = cfg.RetransmitMult + if m.cfg.PushPullInterval != 0 { + mlCfg.PushPullInterval = m.cfg.PushPullInterval } - if cfg.PushPullInterval != 0 { - mlCfg.PushPullInterval = cfg.PushPullInterval + if m.cfg.GossipInterval != 0 { + mlCfg.GossipInterval = m.cfg.GossipInterval } - if cfg.GossipInterval != 0 { - mlCfg.GossipInterval = cfg.GossipInterval + if m.cfg.GossipNodes != 0 { + mlCfg.GossipNodes = m.cfg.GossipNodes } - if cfg.GossipNodes != 0 { - mlCfg.GossipNodes = cfg.GossipNodes + if m.cfg.GossipToTheDeadTime > 0 { + mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime } - if cfg.GossipToTheDeadTime > 0 { - mlCfg.GossipToTheDeadTime = cfg.GossipToTheDeadTime + if m.cfg.DeadNodeReclaimTime > 0 { + mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime } - if cfg.DeadNodeReclaimTime > 0 { - mlCfg.DeadNodeReclaimTime = cfg.DeadNodeReclaimTime + if m.cfg.NodeName != "" { + mlCfg.Name = m.cfg.NodeName } - if cfg.NodeName != "" { - mlCfg.Name = cfg.NodeName + if m.cfg.RandomizeNodeName { + mlCfg.Name = mlCfg.Name + "-" + generateRandomSuffix() + level.Info(util.Logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name) } mlCfg.LogOutput = newMemberlistLoggerAdapter(util.Logger, false) @@ -247,55 +339,80 @@ func NewKV(cfg KVConfig) (*KV, error) { // Memberlist uses UDPBufferSize to figure out how many messages it can put into single "packet". // As we don't use UDP for sending packets, we can use higher value here. mlCfg.UDPBufferSize = 10 * 1024 * 1024 + return mlCfg, nil +} - mlkv := &KV{ - cfg: cfg, - store: make(map[string]valueDesc), - codecs: make(map[string]codec.Codec), - watchers: make(map[string][]chan string), - prefixWatchers: make(map[string][]chan string), - shutdown: make(chan struct{}), - maxCasRetries: maxCasRetries, - casBroadcastsEnabled: atomic.NewBool(true), - } +func (m *KV) starting(_ context.Context) error { + util.WarnExperimentalUse("Gossip memberlist ring") - mlCfg.Delegate = mlkv + mlCfg, err := m.buildMemberlistConfig() + if err != nil { + return err + } + // Wait for memberlist and broadcasts fields creation because + // memberlist may start calling delegate methods if it + // receives traffic. + // See https://godoc.org/github.com/hashicorp/memberlist#Delegate + // + // Note: We cannot check for Starting state, as we want to use delegate during cluster joining process + // that happens in Starting state. + m.initWG.Add(1) list, err := memberlist.Create(mlCfg) if err != nil { - return nil, fmt.Errorf("failed to create memberlist: %v", err) + return fmt.Errorf("failed to create memberlist: %v", err) } - // finish delegate initialization - mlkv.memberlist = list - mlkv.broadcasts = &memberlist.TransmitLimitedQueue{ + // Finish delegate initialization. + m.memberlist = list + m.broadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: list.NumMembers, - RetransmitMult: cfg.RetransmitMult, + RetransmitMult: m.cfg.RetransmitMult, } + m.initWG.Done() - // Almost ready... - mlkv.createAndRegisterMetrics() - - for _, c := range cfg.Codecs { - mlkv.codecs[c.CodecID()] = c - } + return nil +} - // Join the cluster - if len(cfg.JoinMembers) > 0 { - reached, err := mlkv.JoinMembers(cfg.JoinMembers) - if err != nil && cfg.AbortIfJoinFails { - _ = mlkv.memberlist.Shutdown() - return nil, err - } +var errFailedToJoinCluster = errors.New("failed to join memberlist cluster on startup") +func (m *KV) running(ctx context.Context) error { + // Join the cluster, if configured. We want this to happen in Running state, because started memberlist + // is good enough for usage from Client (which checks for Running state), even before it connects to the cluster. + if len(m.cfg.JoinMembers) > 0 { + err := m.joinMembersOnStartup(ctx, m.cfg.JoinMembers) if err != nil { level.Error(util.Logger).Log("msg", "failed to join memberlist cluster", "err", err) - } else { - level.Info(util.Logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached) + + if m.cfg.AbortIfJoinFails { + return errFailedToJoinCluster + } } } - return mlkv, nil + var tickerChan <-chan time.Time = nil + if m.cfg.RejoinInterval > 0 { + t := time.NewTicker(m.cfg.RejoinInterval) + defer t.Stop() + + tickerChan = t.C + } + + for { + select { + case <-tickerChan: + reached, err := m.memberlist.Join(m.cfg.JoinMembers) + if err == nil { + level.Info(util.Logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached) + } else { + // Don't report error from rejoin, otherwise KV service would be stopped completely. + level.Warn(util.Logger).Log("msg", "re-joining memberlist cluster failed", "err", err) + } + + case <-ctx.Done(): + return nil + } + } } // GetCodec returns codec for given ID or nil. @@ -304,23 +421,65 @@ func (m *KV) GetCodec(codecID string) codec.Codec { } // GetListeningPort returns port used for listening for memberlist communication. Useful when BindPort is set to 0. +// This call is only valid after KV service has been started. func (m *KV) GetListeningPort() int { return int(m.memberlist.LocalNode().Port) } // JoinMembers joins the cluster with given members. // See https://godoc.org/github.com/hashicorp/memberlist#Memberlist.Join +// This call is only valid after KV service has been started and is still running. func (m *KV) JoinMembers(members []string) (int, error) { + if m.State() != services.Running { + return 0, fmt.Errorf("service not Running") + } return m.memberlist.Join(members) } -// Stop tries to leave memberlist cluster and then shutdown memberlist client. +func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error { + reached, err := m.memberlist.Join(m.cfg.JoinMembers) + if err == nil { + level.Info(util.Logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached) + return nil + } + + if m.cfg.MaxJoinRetries <= 0 { + return err + } + + level.Debug(util.Logger).Log("msg", "attempt to join memberlist cluster failed", "retries", 0, "err", err) + lastErr := err + + cfg := util.BackoffConfig{ + MinBackoff: m.cfg.MinJoinBackoff, + MaxBackoff: m.cfg.MaxJoinBackoff, + MaxRetries: m.cfg.MaxJoinRetries, + } + + backoff := util.NewBackoff(ctx, cfg) + + for backoff.Ongoing() { + backoff.Wait() + + reached, err := m.memberlist.Join(members) + if err != nil { + lastErr = err + level.Debug(util.Logger).Log("msg", "attempt to join memberlist cluster failed", "retries", backoff.NumRetries(), "err", err) + continue + } + + level.Info(util.Logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached) + return nil + } + + return lastErr +} + +// While Stopping, we try to leave memberlist cluster and then shutdown memberlist client. // We do this in order to send out last messages, typically that ingester has LEFT the ring. -func (m *KV) Stop() { +func (m *KV) stopping(_ error) error { level.Info(util.Logger).Log("msg", "leaving memberlist cluster") - m.casBroadcastsEnabled.Store(false) - // Wait until broadcast queue is empty, but don't wait for too long. // Also don't wait if there is just one node left. // Problem is that broadcast queue is also filled up by state changes received from other nodes, @@ -348,6 +507,7 @@ func (m *KV) Stop() { if err != nil { level.Error(util.Logger).Log("msg", "error when shutting down memberlist client", "err", err) } + return nil } // List returns all known keys under a given prefix. @@ -581,7 +741,7 @@ outer: m.casSuccesses.Inc() m.notifyWatchers(key) - if m.casBroadcastsEnabled.Load() { + if m.State() == services.Running { m.broadcastNewValue(key, change, newver, codec) } else { level.Warn(util.Logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key) @@ -680,6 +840,8 @@ func (m *KV) NodeMeta(limit int) []byte { // NotifyMsg is method from Memberlist Delegate interface // Called when single message is received, i.e. what our broadcastNewValue has sent. func (m *KV) NotifyMsg(msg []byte) { + m.initWG.Wait() + m.numberOfReceivedMessages.Inc() m.totalSizeOfReceivedMessages.Add(float64(len(msg))) @@ -740,6 +902,8 @@ func (m *KV) queueBroadcast(key string, content []string, version uint, message // GetBroadcasts is method from Memberlist Delegate interface // It returns all pending broadcasts (within the size limit) func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { + m.initWG.Wait() + return m.broadcasts.GetBroadcasts(overhead, limit) } @@ -749,6 +913,8 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte { // Here we dump our entire state -- all keys and their values. There is no limit on message size here, // as Memberlist uses 'stream' operations for transferring this state. func (m *KV) LocalState(join bool) []byte { + m.initWG.Wait() + m.numberOfPulls.Inc() m.storeMu.Lock() @@ -799,6 +965,8 @@ func (m *KV) LocalState(join bool) []byte { // // Data is full state of remote KV store, as generated by `LocalState` method (run on another node). func (m *KV) MergeRemoteState(data []byte, join bool) { + m.initWG.Wait() + m.numberOfPushes.Inc() m.totalSizeOfPushes.Add(float64(len(data))) diff --git a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/metrics.go b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/metrics.go index b3a7fa504beb5..e0fcf7c9964df 100644 --- a/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/metrics.go +++ b/vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/metrics.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" ) func (m *KV) createAndRegisterMetrics() { @@ -69,7 +70,11 @@ func (m *KV) createAndRegisterMetrics() { Name: "messages_in_broadcast_queue", Help: "Number of user messages in the broadcast queue", }, func() float64 { - return float64(m.broadcasts.NumQueued()) + // m.broadcasts is not set before Starting state + if m.State() == services.Running || m.State() == services.Stopping { + return float64(m.broadcasts.NumQueued()) + } + return 0 }) m.totalSizeOfBroadcastMessagesInQueue = prometheus.NewGauge(prometheus.GaugeOpts{ @@ -116,7 +121,11 @@ func (m *KV) createAndRegisterMetrics() { Name: "cluster_members_count", Help: "Number of members in memberlist cluster", }, func() float64 { - return float64(m.memberlist.NumMembers()) + // m.memberlist is not set before Starting state + if m.State() == services.Running || m.State() == services.Stopping { + return float64(m.memberlist.NumMembers()) + } + return 0 }) m.memberlistHealthScore = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ @@ -125,7 +134,11 @@ func (m *KV) createAndRegisterMetrics() { Name: "cluster_node_health_score", Help: "Health score of this cluster. Lower value is better. 0 = healthy", }, func() float64 { - return float64(m.memberlist.GetHealthScore()) + // m.memberlist is not set before Starting state + if m.State() == services.Running || m.State() == services.Stopping { + return float64(m.memberlist.GetHealthScore()) + } + return 0 }) m.watchPrefixDroppedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{ diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go index c5013fff8bc1a..7d2802e2cf568 100644 --- a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go @@ -34,7 +34,7 @@ type ProxyConfig struct { func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.") f.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.") - f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client.") + f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.") f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.") f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.") } @@ -42,7 +42,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { type Route struct { Path string RouteName string - Methods string + Methods []string ResponseComparator ResponsesComparator } @@ -63,7 +63,7 @@ type Proxy struct { func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer prometheus.Registerer) (*Proxy, error) { if cfg.CompareResponses && cfg.PreferredBackend == "" { - return nil, fmt.Errorf("when enabling comparion of results -backend.preferred flag must be set to hostname of preferred backend") + return nil, fmt.Errorf("when enabling comparison of results -backend.preferred flag must be set to hostname of preferred backend") } p := &Proxy{ @@ -134,12 +134,12 @@ func (p *Proxy) Start() error { })) // register routes - var comparator ResponsesComparator for _, route := range p.routes { + var comparator ResponsesComparator if p.cfg.CompareResponses { comparator = route.ResponseComparator } - router.Path(route.Path).Methods(route.Methods).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator)) + router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator)) } p.srvListener = listener diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go index 91121f35e7a58..6e16367fa1a25 100644 --- a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go @@ -7,10 +7,10 @@ import ( "sync" "time" - "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + + "github.com/cortexproject/cortex/pkg/util" ) type ResponsesComparator interface { @@ -23,17 +23,29 @@ type ProxyEndpoint struct { logger log.Logger comparator ResponsesComparator + // Whether for this endpoint there's a preferred backend configured. + hasPreferredBackend bool + // The route name used to track metrics. routeName string } func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator) *ProxyEndpoint { + hasPreferredBackend := false + for _, backend := range backends { + if backend.preferred { + hasPreferredBackend = true + break + } + } + return &ProxyEndpoint{ - backends: backends, - routeName: routeName, - metrics: metrics, - logger: logger, - comparator: comparator, + backends: backends, + routeName: routeName, + metrics: metrics, + logger: logger, + comparator: comparator, + hasPreferredBackend: hasPreferredBackend, } } @@ -41,9 +53,29 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { level.Debug(p.logger).Log("msg", "Received request", "path", r.URL.Path, "query", r.URL.RawQuery) // Send the same request to all backends. + resCh := make(chan *backendResponse, len(p.backends)) + go p.executeBackendRequests(r, resCh) + + // Wait for the first response that's feasible to be sent back to the client. + downstreamRes := p.waitBackendResponseForDownstream(resCh) + + if downstreamRes.err != nil { + http.Error(w, downstreamRes.err.Error(), http.StatusInternalServerError) + } else { + w.WriteHeader(downstreamRes.status) + if _, err := w.Write(downstreamRes.body); err != nil { + level.Warn(p.logger).Log("msg", "Unable to write response", "err", err) + } + } + + p.metrics.responsesTotal.WithLabelValues(downstreamRes.backend.name, r.Method, p.routeName).Inc() +} + +func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *backendResponse) { + responses := make([]*backendResponse, 0, len(p.backends)) + wg := sync.WaitGroup{} wg.Add(len(p.backends)) - resCh := make(chan *backendResponse, len(p.backends)) for _, b := range p.backends { b := b @@ -60,9 +92,7 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { status: status, body: body, err: err, - elapsed: elapsed, } - resCh <- res // Log with a level based on the backend response. lvl := level.Debug @@ -71,6 +101,14 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { } lvl(p.logger).Log("msg", "Backend response", "path", r.URL.Path, "query", r.URL.RawQuery, "backend", b.name, "status", status, "elapsed", elapsed) + p.metrics.requestDuration.WithLabelValues(res.backend.name, r.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(elapsed.Seconds()) + + // Keep track of the response if required. + if p.comparator != nil { + responses = append(responses, res) + } + + resCh <- res }() } @@ -78,59 +116,55 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) { wg.Wait() close(resCh) - // Collect all responses and track metrics for each of them. - responses := make([]*backendResponse, 0, len(p.backends)) - for res := range resCh { - responses = append(responses, res) - - p.metrics.durationMetric.WithLabelValues(res.backend.name, r.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(res.elapsed.Seconds()) - } - - // Select the response to send back to the client. - downstreamRes := p.pickResponseForDownstream(responses) - if downstreamRes.err != nil { - http.Error(w, downstreamRes.err.Error(), http.StatusInternalServerError) - } else { - w.WriteHeader(downstreamRes.status) - if _, err := w.Write(downstreamRes.body); err != nil { - level.Warn(p.logger).Log("msg", "Unable to write response", "err", err) - } - } - + // Compare responses. if p.comparator != nil { - go func() { - expectedResponse := responses[0] - actualResponse := responses[1] - if responses[1].backend.preferred { - expectedResponse, actualResponse = actualResponse, expectedResponse - } + expectedResponse := responses[0] + actualResponse := responses[1] + if responses[1].backend.preferred { + expectedResponse, actualResponse = actualResponse, expectedResponse + } - result := resultSuccess - err := p.compareResponses(expectedResponse, actualResponse) - if err != nil { - level.Error(util.Logger).Log("msg", "response comparison failed", "route-name", p.routeName, - "query", r.URL.RawQuery, "err", err) - result = resultFailed - } + result := comparisonSuccess + err := p.compareResponses(expectedResponse, actualResponse) + if err != nil { + level.Error(util.Logger).Log("msg", "response comparison failed", "route-name", p.routeName, + "query", r.URL.RawQuery, "err", err) + result = comparisonFailed + } - p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc() - }() + p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc() } } -func (p *ProxyEndpoint) pickResponseForDownstream(responses []*backendResponse) *backendResponse { - // Look for a successful response from the preferred backend. - for _, res := range responses { - if res.backend.preferred && res.succeeded() { +func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResponse) *backendResponse { + var ( + responses = make([]*backendResponse, 0, len(p.backends)) + preferredResponseReceived = false + ) + + for res := range resCh { + // If the response is successful we can immediately return it if: + // - There's no preferred backend configured + // - Or this response is from the preferred backend + // - Or the preferred backend response has already been received and wasn't successful + if res.succeeded() && (!p.hasPreferredBackend || res.backend.preferred || preferredResponseReceived) { return res } - } - // Look for any other successful response. - for _, res := range responses { - if res.succeeded() { - return res + // If we received a non successful response from the preferred backend, then we can + // return the first successful response received so far (if any). + if res.backend.preferred && !res.succeeded() { + preferredResponseReceived = true + + for _, prevRes := range responses { + if prevRes.succeeded() { + return prevRes + } + } } + + // Otherwise we keep track of it for later. + responses = append(responses, res) } // No successful response, so let's pick the first one. @@ -159,7 +193,6 @@ type backendResponse struct { status int body []byte err error - elapsed time.Duration } func (r *backendResponse) succeeded() bool { diff --git a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go index 26bef1e69d359..5fda10bbfbe38 100644 --- a/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go +++ b/vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go @@ -7,28 +7,34 @@ import ( ) const ( - resultSuccess = "success" - resultFailed = "fail" + comparisonSuccess = "success" + comparisonFailed = "fail" ) type ProxyMetrics struct { - durationMetric *prometheus.HistogramVec + requestDuration *prometheus.HistogramVec + responsesTotal *prometheus.CounterVec responsesComparedTotal *prometheus.CounterVec } func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { m := &ProxyMetrics{ - durationMetric: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + requestDuration: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex_querytee", Name: "request_duration_seconds", Help: "Time (in seconds) spent serving HTTP requests.", Buckets: instrument.DefBuckets, }, []string{"backend", "method", "route", "status_code"}), + responsesTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex_querytee", + Name: "responses_total", + Help: "Total number of responses sent back to the client by the selected backend.", + }, []string{"backend", "method", "route"}), responsesComparedTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex_querytee", Name: "responses_compared_total", - Help: "Total number of responses compared per route name by result", - }, []string{"route_name", "result"}), + Help: "Total number of responses compared per route name by result.", + }, []string{"route", "result"}), } return m diff --git a/vendor/modules.txt b/vendor/modules.txt index 874ceba742dbb..2f39999817150 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -142,7 +142,7 @@ github.com/coreos/go-systemd/journal github.com/coreos/go-systemd/sdjournal # github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f github.com/coreos/pkg/capnslog -# github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a +# github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3 github.com/cortexproject/cortex/pkg/alertmanager github.com/cortexproject/cortex/pkg/alertmanager/alerts github.com/cortexproject/cortex/pkg/alertmanager/alerts/configdb From f46c9c74b9f83bc6d2708b71de71686cfb59fd33 Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Tue, 16 Jun 2020 15:25:50 +0200 Subject: [PATCH 2/5] Replace memberlist service in favor of cortex provided service --- pkg/loki/loki.go | 2 +- pkg/loki/modules.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 7b7db40ca3697..475b8842d6b06 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -123,7 +123,7 @@ type Loki struct { frontend *frontend.Frontend stopper queryrange.Stopper runtimeConfig *runtimeconfig.Manager - memberlistKV *memberlist.KVInit + memberlistKV *memberlist.KVInitService httpAuthMiddleware middleware.Interface } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index f9dc3fe75a039..1bff071dd04a4 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -324,11 +324,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), } - t.memberlistKV = memberlist.NewKVInit(&t.cfg.MemberlistKV) - return services.NewIdleService(nil, func(_ error) error { - t.memberlistKV.Stop() - return nil - }), nil + + t.memberlistKV = memberlist.NewKVInitService(&t.cfg.MemberlistKV) + return t.memberlistKV, nil } // activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now From 856829aadd60b1958174b775adf94ec05d7a8cab Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Tue, 16 Jun 2020 15:41:16 +0200 Subject: [PATCH 3/5] Fix route method param for querytee tool --- cmd/querytee/main.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/querytee/main.go b/cmd/querytee/main.go index 0ea26142f2384..009977039fb34 100644 --- a/cmd/querytee/main.go +++ b/cmd/querytee/main.go @@ -63,15 +63,15 @@ func lokiReadRoutes() []querytee.Route { samplesComparator.RegisterSamplesType(loghttp.ResultTypeStream, compareStreams) return []querytee.Route{ - {Path: "/loki/api/v1/query_range", RouteName: "api_v1_query_range", Methods: "GET", ResponseComparator: samplesComparator}, - {Path: "/loki/api/v1/query", RouteName: "api_v1_query", Methods: "GET", ResponseComparator: samplesComparator}, - {Path: "/loki/api/v1/label", RouteName: "api_v1_label", Methods: "GET", ResponseComparator: nil}, - {Path: "/loki/api/v1/labels", RouteName: "api_v1_labels", Methods: "GET", ResponseComparator: nil}, - {Path: "/loki/api/v1/label/{name}/values", RouteName: "api_v1_label_name_values", Methods: "GET", ResponseComparator: nil}, - {Path: "/loki/api/v1/series", RouteName: "api_v1_series", Methods: "GET", ResponseComparator: nil}, - {Path: "/api/prom/query", RouteName: "api_prom_query", Methods: "GET", ResponseComparator: samplesComparator}, - {Path: "/api/prom/label", RouteName: "api_prom_label", Methods: "GET", ResponseComparator: nil}, - {Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: "GET", ResponseComparator: nil}, - {Path: "/api/prom/series", RouteName: "api_prom_series", Methods: "GET", ResponseComparator: nil}, + {Path: "/loki/api/v1/query_range", RouteName: "api_v1_query_range", Methods: []string{"GET"}, ResponseComparator: samplesComparator}, + {Path: "/loki/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET"}, ResponseComparator: samplesComparator}, + {Path: "/loki/api/v1/label", RouteName: "api_v1_label", Methods: []string{"GET"}, ResponseComparator: nil}, + {Path: "/loki/api/v1/labels", RouteName: "api_v1_labels", Methods: []string{"GET"}, ResponseComparator: nil}, + {Path: "/loki/api/v1/label/{name}/values", RouteName: "api_v1_label_name_values", Methods: []string{"GET"}, ResponseComparator: nil}, + {Path: "/loki/api/v1/series", RouteName: "api_v1_series", Methods: []string{"GET"}, ResponseComparator: nil}, + {Path: "/api/prom/query", RouteName: "api_prom_query", Methods: []string{"GET"}, ResponseComparator: samplesComparator}, + {Path: "/api/prom/label", RouteName: "api_prom_label", Methods: []string{"GET"}, ResponseComparator: nil}, + {Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: []string{"GET"}, ResponseComparator: nil}, + {Path: "/api/prom/series", RouteName: "api_prom_series", Methods: []string{"GET"}, ResponseComparator: nil}, } } From 58fa405dcadf7ad84dbe6777fcccecc43a2e5552 Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Tue, 16 Jun 2020 18:50:19 +0200 Subject: [PATCH 4/5] Register memberlist flags --- pkg/loki/loki.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 475b8842d6b06..95127c9474995 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -83,6 +83,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Worker.RegisterFlags(f) c.QueryRange.RegisterFlags(f) c.RuntimeConfig.RegisterFlags(f) + c.MemberlistKV.RegisterFlags(f, "") c.Tracing.RegisterFlags(f) } From e9712e05822b2d264c0b56a7f630084ce697e2b1 Mon Sep 17 00:00:00 2001 From: Periklis Tsirakidis Date: Wed, 17 Jun 2020 11:01:03 +0200 Subject: [PATCH 5/5] Add documentation and example for memberlist --- docs/configuration/README.md | 108 ++++++++++++++++++++++++++++++++- docs/configuration/examples.md | 74 +++++++++++++++++++++- 2 files changed, 180 insertions(+), 2 deletions(-) diff --git a/docs/configuration/README.md b/docs/configuration/README.md index eab37fea8e1d2..969e2c052b876 100644 --- a/docs/configuration/README.md +++ b/docs/configuration/README.md @@ -15,6 +15,7 @@ Configuration examples can be found in the [Configuration Examples](examples.md) * [ingester_config](#ingester_config) * [lifecycler_config](#lifecycler_config) * [ring_config](#ring_config) +* [memberlist_config](#memberlist_config) * [storage_config](#storage_config) * [cache_config](#cache_config) * [chunk_store_config](#chunk_store_config) @@ -78,6 +79,10 @@ Supported contents and default values of `loki.yaml`: # key value store. [ingester: ] +# Configuration for an memberlist gossip ring. Only applies if +# store is "memberlist" +[memberlist: ] + # Configures where Loki will store data. [storage_config: ] @@ -371,7 +376,7 @@ The `ring_config` is used to discover and connect to Ingesters. ```yaml kvstore: # The backend storage to use for the ring. Supported values are - # consul, etcd, inmemory + # consul, etcd, inmemory, memberlist store: # The prefix for the keys in the store. Should end with a /. @@ -414,6 +419,107 @@ kvstore: [replication_factor: | default = 3] ``` +## memberlist_config + +The `memberlist_config` block configures the gossip ring to discover and connect +between distributors, ingesters and queriers. The configuration is unique for all +three components to ensure a single shared ring. + +```yaml +# Name of the node in memberlist cluster. Defaults to hostname. +# CLI flag: -memberlist.nodename +[node_name: | default = ""] + +# Add random suffix to the node name. +# CLI flag: -memberlist.randomize-node-name +[randomize_node_name: | default = true] + +# The timeout for establishing a connection with a remote node, and for +# read/write operations. Uses memberlist LAN defaults if 0. +# CLI flag: -memberlist.stream-timeout +[stream_timeout: | default = 0s] + +# Multiplication factor used when sending out messages (factor * log(N+1)). +# CLI flag: -memberlist.retransmit-factor +[retransmit_factor: | default = 0] + +# How often to use pull/push sync. Uses memberlist LAN defaults if 0. +# CLI flag: -memberlist.pullpush-interval +[pull_push_interval: | default = 0s] + +# How often to gossip. Uses memberlist LAN defaults if 0. +# CLI flag: -memberlist.gossip-interval +[gossip_interval: | default = 0s] + +# How many nodes to gossip to. Uses memberlist LAN defaults if 0. +# CLI flag: -memberlist.gossip-nodes +[gossip_nodes: | default = 0] + +# How long to keep gossiping to dead nodes, to give them chance to refute their +# death. Uses memberlist LAN defaults if 0. +# CLI flag: -memberlist.gossip-to-dead-nodes-time +[gossip_to_dead_nodes_time: | default = 0s] + +# How soon can dead node's name be reclaimed with new address. Defaults to 0, +# which is disabled. +# CLI flag: -memberlist.dead-node-reclaim-time +[dead_node_reclaim_time: | default = 0s] + +# Other cluster members to join. Can be specified multiple times. +# CLI flag: -memberlist.join +[join_members: | default = ] + +# Min backoff duration to join other cluster members. +# CLI flag: -memberlist.min-join-backoff +[min_join_backoff: | default = 1s] + +# Max backoff duration to join other cluster members. +# CLI flag: -memberlist.max-join-backoff +[max_join_backoff: | default = 1m] + +# Max number of retries to join other cluster members. +# CLI flag: -memberlist.max-join-retries +[max_join_retries: | default = 10] + +# If this node fails to join memberlist cluster, abort. +# CLI flag: -memberlist.abort-if-join-fails +[abort_if_cluster_join_fails: | default = true] + +# If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix +# the cluster split issue, and is harmless otherwise. For example when using +# only few components as a seed nodes (via -memberlist.join), then it's +# recommended to use rejoin. If -memberlist.join points to dynamic service that +# resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin +# is not needed. +# CLI flag: -memberlist.rejoin-interval +[rejoin_interval: | default = 0s] + +# How long to keep LEFT ingesters in the ring. +# CLI flag: -memberlist.left-ingesters-timeout +[left_ingesters_timeout: | default = 5m] + +# Timeout for leaving memberlist cluster. +# CLI flag: -memberlist.leave-timeout +[leave_timeout: | default = 5s] + +# IP address to listen on for gossip messages. Multiple addresses may be +# specified. Defaults to 0.0.0.0 +# CLI flag: -memberlist.bind-addr +[bind_addr: | default = ] + +# Port to listen on for gossip messages. +# CLI flag: -memberlist.bind-port +[bind_port: | default = 7946] + +# Timeout used when connecting to other nodes to send packet. +# CLI flag: -memberlist.packet-dial-timeout +[packet_dial_timeout: | default = 5s] + +# Timeout for writing 'packet' data. +# CLI flag: -memberlist.packet-write-timeout +[packet_write_timeout: | default = 5s] +``` + ## storage_config The `storage_config` block configures one of many possible stores for both the diff --git a/docs/configuration/examples.md b/docs/configuration/examples.md index a01734424b448..46dceba7a97f2 100644 --- a/docs/configuration/examples.md +++ b/docs/configuration/examples.md @@ -4,7 +4,8 @@ 2. [Google Cloud Storage](#google-cloud-storage) 3. [Cassandra Index](#cassandra-index) 4. [AWS](#aws) -5. [Using the query-frontend](#query-frontend) +5. [Almost zero dependencies setup with Memberlist and BoltDB Shipper](#almost-zero-dependencies-setup) +6. [Using the query-frontend](#query-frontend) ## Complete Local config @@ -146,6 +147,77 @@ storage_config: s3forcepathstyle: true ``` +## Almost zero dependencies setup + +This is a configuration to deploy Loki depending only on storage solution, e.g. an +S3-compatible API like minio. The ring configuration is based on the gossip memberlist +and the index is shipped to storage via [boltdb-shipper](../operations/storage/boltdb-shipper.md). + + +```yaml +auth_enabled: false + +server: + http_listen_port: 3100 + +distributor: + ring: + store: memberlist + +ingester: + lifecycler: + ring: + kvstore: + store: memberlist + replication_factor: 1 + final_sleep: 0s + chunk_idle_period: 5m + chunk_retain_period: 30s + +memberlist: + abort_if_cluster_join_fails: false + + # Expose this port on all distributor, ingester + # and querier replicas. + bind_port: 7946 + + # You can use a headless k8s service for all distributor, + # ingester and querier components. + join_members: + - loki-gossip-ring.loki.svc.cluster.local:7946 + + max_join_backoff: 1m + max_join_retries: 10 + min_join_backoff: 1s + +schema_config: + configs: + - from: 2020-05-15 + store: boltdb_shipper + object_store: s3 + schema: v11 + index: + prefix: index_ + period: 168h + +storage_config: + boltdb_shipper: + active_index_directory: /data/loki/index + cache_location: /data/loki/index_cache + resync_interval: 5s + shared_store: s3 + + aws: + s3: s3://access_key:secret_access_key@custom_endpoint/bucket_name + s3forcepathstyle: true + +limits_config: + enforce_metric_name: false + reject_old_samples: true + reject_old_samples_max_age: 168h + +``` + ## Query Frontend [example configuration](./query-frontend.md)