Skip to content

Commit

Permalink
Update Cortex to master (#1785)
Browse files Browse the repository at this point in the history
* Update Cortex to v0.7.0-rc.0

Signed-off-by: Peter Štibraný <[email protected]>

* Fixed errors after updating to latest master.

Signed-off-by: Peter Štibraný <[email protected]>

* Fixed errors after updating to latest master.

Signed-off-by: Peter Štibraný <[email protected]>

* Fixed errors after updating to latest master.

queryrange.NewRetryMiddleware in NewLogFilterTripperware is now created
without metrics, as that would collide with metrics in NewMetricTripperware.

Signed-off-by: Peter Štibraný <[email protected]>

* Make sure to start client.Pool before using it.

Signed-off-by: Peter Štibraný <[email protected]>

* Use shared singleton memberlist KV. Fix queryrange metrics.

Signed-off-by: Peter Štibraný <[email protected]>

* Make lint happy

Signed-off-by: Peter Štibraný <[email protected]>

* go mod tidy

Signed-off-by: Peter Štibraný <[email protected]>

* Return err early.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Mar 10, 2020
1 parent 644d4be commit 5ae6ea7
Show file tree
Hide file tree
Showing 298 changed files with 13,174 additions and 13,630 deletions.
15 changes: 7 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/containerd/containerd v1.3.2 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/cortexproject/cortex v0.6.1-0.20200219140319-baae166e3335
github.com/cortexproject/cortex v0.7.0-rc.0
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.7.3-0.20190817195342-4760db040282
Expand Down Expand Up @@ -39,24 +39,23 @@ require (
github.com/opentracing/opentracing-go v1.1.1-0.20200124165624-2876d2018785
github.com/pierrec/lz4 v2.3.1-0.20191115212037-9085dacd1e1e+incompatible
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.2.1
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4
github.com/prometheus/common v0.7.0
github.com/prometheus/prometheus v1.8.2-0.20200107122003-4708915ac6ef
github.com/prometheus/client_golang v1.5.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
github.com/prometheus/prometheus v1.8.2-0.20200213233353-b90be6f32a33
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
github.com/stretchr/testify v1.5.1
github.com/tonistiigi/fifo v0.0.0-20190226154929-a9fb20d87448
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/ugorji/go v1.1.7 // indirect
github.com/weaveworks/common v0.0.0-20200201141823-27e183090ab1
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a
go.etcd.io/etcd v0.0.0-20190815204525-8f85f0dc2607 // indirect
golang.org/x/net v0.0.0-20191112182307-2180aed22343
golang.org/x/sys v0.0.0-20191218084908-4a24b4065292 // indirect
google.golang.org/grpc v1.25.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.2.5
gopkg.in/yaml.v2 v2.2.7
k8s.io/klog v1.0.0
)

Expand Down
79 changes: 31 additions & 48 deletions go.sum

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
cortex_util "github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/pkg/errors"

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -111,7 +113,10 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
return nil, err
}

distributorsRing.Start()
err = services.StartAndAwaitRunning(context.Background(), distributorsRing)
if err != nil {
return nil, err
}

ingestionRateStrategy = newGlobalIngestionRateStrategy(overrides, distributorsRing)
} else {
Expand All @@ -128,13 +133,18 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
}

if err := services.StartAndAwaitRunning(context.Background(), d.pool); err != nil {
return nil, errors.Wrap(err, "starting client pool")
}

return &d, nil
}

func (d *Distributor) Stop() {
if d.distributorsRing != nil {
d.distributorsRing.Shutdown()
_ = services.StopAndAwaitTerminated(context.Background(), d.distributorsRing)
}
_ = services.StopAndAwaitTerminated(context.Background(), d.pool)
}

// TODO taken from Cortex, see if we can refactor out an usable interface.
Expand Down
3 changes: 1 addition & 2 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util/flagext"

"github.com/grafana/loki/pkg/chunkenc"
Expand Down Expand Up @@ -177,7 +176,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {

// nolint
func defaultIngesterTestConfig(t *testing.T) Config {
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, codec.NewProtoCodec("foo", ring.ProtoDescFactory))
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec())
require.NoError(t, err)

cfg := Config{}
Expand Down
13 changes: 11 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
Expand Down Expand Up @@ -147,7 +148,10 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
return nil, err
}

i.lifecycler.Start()
err = services.StartAndAwaitRunning(context.Background(), i.lifecycler)
if err != nil {
return nil, err
}

// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
Expand Down Expand Up @@ -181,7 +185,12 @@ func (i *Ingester) Shutdown() {
close(i.quit)
i.done.Wait()

i.lifecycler.Shutdown()
i.stopIncomingRequests()

err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
if err != nil {
level.Error(util.Logger).Log("msg", "lifecycler failed", "err", err)
}
}

// Stopping helps cleaning up resources before actual shutdown
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIn
return nil
}

// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
// stopIncomingRequests is called when ingester is stopping
func (i *Ingester) stopIncomingRequests() {
i.shutdownMtx.Lock()
defer i.shutdownMtx.Unlock()

Expand Down
18 changes: 11 additions & 7 deletions pkg/ingester/transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import (
"testing"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -120,7 +120,7 @@ type testIngesterFactory struct {
}

func newTestIngesterFactory(t *testing.T) *testIngesterFactory {
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, codec.NewProtoCodec("foo", ring.ProtoDescFactory))
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec())
require.NoError(t, err)

return &testIngesterFactory{
Expand Down Expand Up @@ -196,7 +196,11 @@ func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption)
// unhealthy state, permanently stuck in the handler for claiming tokens.
go func() {
time.Sleep(time.Millisecond * 50)
c.i.lifecycler.Shutdown()
c.i.stopIncomingRequests() // used to be called from lifecycler, now it must be called *before* stopping lifecyler. (ingester does this on shutdown)
err := services.StopAndAwaitTerminated(context.Background(), c.i.lifecycler)
if err != nil {
level.Error(util.Logger).Log("msg", "lifecycler failed", "err", err)
}
}()

go func() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"

Expand Down Expand Up @@ -45,6 +46,7 @@ type Config struct {
Frontend frontend.Config `yaml:"frontend,omitempty"`
QueryRange queryrange.Config `yaml:"query_range,omitempty"`
RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"`
MemberlistKV memberlist.KVConfig `yaml:"memberlist"`
}

// RegisterFlags registers flag.
Expand Down Expand Up @@ -87,6 +89,7 @@ type Loki struct {
frontend *frontend.Frontend
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
memberlistKV *memberlist.KVInit

httpAuthMiddleware middleware.Interface
}
Expand Down
61 changes: 49 additions & 12 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loki

import (
"context"
"fmt"
"net/http"
"os"
Expand All @@ -11,8 +12,11 @@ import (
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -46,6 +50,7 @@ const (
QueryFrontend
Store
TableManager
MemberlistKV
All
)

Expand Down Expand Up @@ -80,6 +85,8 @@ func (m moduleName) String() string {
return "query-frontend"
case TableManager:
return "table-manager"
case MemberlistKV:
return "memberlist-kv"
case All:
return "all"
default:
Expand Down Expand Up @@ -134,7 +141,11 @@ func (t *Loki) initServer() (err error) {

func (t *Loki) initRing() (err error) {
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.ring, err = ring.New(t.cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey)
if err == nil {
err = services.StartAndAwaitRunning(context.Background(), t.ring)
}
if err != nil {
return
}
Expand All @@ -143,6 +154,10 @@ func (t *Loki) initRing() (err error) {
return
}

func (t *Loki) stopRing() (err error) {
return services.StopAndAwaitTerminated(context.Background(), t.ring)
}

func (t *Loki) initRuntimeConfig() (err error) {
if t.cfg.RuntimeConfig.LoadPath == "" {
t.cfg.RuntimeConfig.LoadPath = t.cfg.LimitsConfig.PerTenantOverrideConfig
Expand All @@ -154,12 +169,14 @@ func (t *Loki) initRuntimeConfig() (err error) {
validation.SetDefaultLimitsForYAMLUnmarshalling(t.cfg.LimitsConfig)

t.runtimeConfig, err = runtimeconfig.NewRuntimeConfigManager(t.cfg.RuntimeConfig, prometheus.DefaultRegisterer)
if err == nil {
err = services.StartAndAwaitRunning(context.Background(), t.runtimeConfig)
}
return err
}

func (t *Loki) stopRuntimeConfig() (err error) {
t.runtimeConfig.Stop()
return nil
return services.StopAndAwaitTerminated(context.Background(), t.runtimeConfig)
}

func (t *Loki) initOverrides() (err error) {
Expand All @@ -168,6 +185,8 @@ func (t *Loki) initOverrides() (err error) {
}

func (t *Loki) initDistributor() (err error) {
t.cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides)
if err != nil {
return
Expand Down Expand Up @@ -226,6 +245,7 @@ func (t *Loki) initQuerier() (err error) {

func (t *Loki) initIngester() (err error) {
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides)
if err != nil {
Expand Down Expand Up @@ -281,18 +301,15 @@ func (t *Loki) initTableManager() error {
bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig.Config)
util.CheckFatal("initializing bucket client", err)

t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient)
t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, prometheus.DefaultRegisterer)
if err != nil {
return err
}

t.tableManager.Start()
return nil
return services.StartAndAwaitRunning(context.Background(), t.tableManager)
}

func (t *Loki) stopTableManager() error {
t.tableManager.Stop()
return nil
return services.StopAndAwaitTerminated(context.Background(), t.tableManager)
}

func (t *Loki) initStore() (err error) {
Expand All @@ -307,15 +324,15 @@ func (t *Loki) stopStore() error {

func (t *Loki) initQueryFrontend() (err error) {
level.Debug(util.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.cfg.Frontend))
t.frontend, err = frontend.New(t.cfg.Frontend, util.Logger)
t.frontend, err = frontend.New(t.cfg.Frontend, util.Logger, prometheus.DefaultRegisterer)
if err != nil {
return
}
level.Debug(util.Logger).Log("msg", "initializing query range tripperware",
"config", fmt.Sprintf("%+v", t.cfg.QueryRange),
"limits", fmt.Sprintf("%+v", t.cfg.LimitsConfig),
)
tripperware, stopper, err := queryrange.NewTripperware(t.cfg.QueryRange, util.Logger, t.overrides)
tripperware, stopper, err := queryrange.NewTripperware(t.cfg.QueryRange, util.Logger, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return err
}
Expand Down Expand Up @@ -347,6 +364,20 @@ func (t *Loki) stopQueryFrontend() error {
return nil
}

func (t *Loki) initMemberlistKV() error {
t.cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer
t.cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
}
t.memberlistKV = memberlist.NewKVInit(&t.cfg.MemberlistKV)
return nil
}

func (t *Loki) stopMemberlistKV() error {
t.memberlistKV.Stop()
return nil
}

// listDeps recursively gets a list of dependencies for a passed moduleName
func listDeps(m moduleName) []moduleName {
deps := modules[m].deps
Expand Down Expand Up @@ -422,9 +453,15 @@ var modules = map[moduleName]module{
stop: (*Loki).stopRuntimeConfig,
},

MemberlistKV: {
init: (*Loki).initMemberlistKV,
stop: (*Loki).stopMemberlistKV,
},

Ring: {
deps: []moduleName{RuntimeConfig, Server},
deps: []moduleName{RuntimeConfig, Server, MemberlistKV},
init: (*Loki).initRing,
stop: (*Loki).stopRing,
},

Overrides: {
Expand All @@ -445,7 +482,7 @@ var modules = map[moduleName]module{
},

Ingester: {
deps: []moduleName{Store, Server},
deps: []moduleName{Store, Server, MemberlistKV},
init: (*Loki).initIngester,
stop: (*Loki).stopIngester,
stopping: (*Loki).stoppingIngester,
Expand Down
Loading

0 comments on commit 5ae6ea7

Please sign in to comment.