diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ce8b1dd0c296..8f2e81aa9b097 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -173,6 +173,8 @@ to include only the most relevant. * [5544](https://github.com/grafana/loki/pull/5544) **ssncferreira**: Update vectorAggEvaluator to fail for expressions without grouping * [5543](https://github.com/grafana/loki/pull/5543) **cyriltovena**: update loki go version to 1.17.8 * [5450](https://github.com/grafana/loki/pull/5450) **BenoitKnecht**: pkg/ruler/base: Add external_labels option +* [5484](https://github.com/grafana/loki/pull/5450) **sandeepsukhani**: Add support for per user index query readiness with limits overrides +* [5358](https://github.com/grafana/loki/pull/5358) **DylanGuedes**: Add `RingMode` support to `IndexGateway` * [5435](https://github.com/grafana/loki/pull/5435) **slim-bean**: set match_max_concurrent true by default * [5361](https://github.com/grafana/loki/pull/5361) **cyriltovena**: Add usage report into Loki. * [5243](https://github.com/grafana/loki/pull/5243) **owen-d**: Refactor/remove global splitby diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index d713ce608aee1..9b0bd0d27809c 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -85,6 +85,8 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { applyInstanceConfigs(r, &defaults) + applyCommonReplicationFactor(r, &defaults) + applyDynamicRingConfigs(r, &defaults) appendLoopbackInterface(r, &defaults) @@ -127,6 +129,7 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { r.Ruler.Ring.InstanceAddr = r.Common.InstanceAddr r.QueryScheduler.SchedulerRing.InstanceAddr = r.Common.InstanceAddr r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr + r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr } if !reflect.DeepEqual(r.Common.InstanceInterfaceNames, defaults.Common.InstanceInterfaceNames) { @@ -136,6 +139,14 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { r.Ruler.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames r.QueryScheduler.SchedulerRing.InstanceInterfaceNames = r.Common.InstanceInterfaceNames r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames + r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames + } +} + +// applyCommonReplicationFactor apply the common replication factor to the Index Gateway ring. +func applyCommonReplicationFactor(r, defaults *ConfigWrapper) { + if !reflect.DeepEqual(r.Common.ReplicationFactor, defaults.Common.ReplicationFactor) { + r.IndexGateway.Ring.ReplicationFactor = r.Common.ReplicationFactor } } @@ -147,9 +158,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { // 2. If no explicit ring config is set, use the common ring configured if provided. // 3. If no common ring was provided, use the memberlist config if provided. // 4. If no common ring or memberlist were provided, use the ingester's ring configuration. - +// // When using the ingester or common ring config, the loopback interface will be appended to the end of -// the list of default interface names +// the list of default interface names. func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { if !reflect.DeepEqual(r.Common.Ring, defaults.Common.Ring) { // common ring is provided, use that for all rings, merging with @@ -170,13 +181,13 @@ func applyDynamicRingConfigs(r, defaults *ConfigWrapper) { } } -//applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured. -//`mergeWithExisting` will be true when applying the common config, false when applying the ingester -//config. This decision was made since the ingester ring copying behavior is likely to be less intuitive, -//and was added as a stop-gap to prevent the new rings in 2.4 from breaking existing configs before 2.4 that only had an ingester -//ring defined. When `mergeWithExisting` is false, we will not apply any of the ring config to a ring that has -//any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified -//derivations, with the derivations taking precedence. +// applyConfigToRings will reuse a given RingConfig everywhere else we have a ring configured. +// `mergeWithExisting` will be true when applying the common config, false when applying the ingester +// config. This decision was made since the ingester ring copying behavior is likely to be less intuitive, +// and was added as a stop-gap to prevent the new rings in 2.4 from breaking existing configs before 2.4 that only had an ingester +// ring defined. When `mergeWithExisting` is false, we will not apply any of the ring config to a ring that has +// any deviations from defaults. When mergeWithExisting is true, the ring config is overlaid on top of any specified +// derivations, with the derivations taking precedence. func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWithExisting bool) { // Ingester - mergeWithExisting is false when applying the ingester config, and we only want to // change ingester ring values when applying the common config, so there's no need for the DeepEqual @@ -194,7 +205,6 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit r.Ingester.LifecyclerConfig.Zone = rc.InstanceZone r.Ingester.LifecyclerConfig.ListenPort = rc.ListenPort r.Ingester.LifecyclerConfig.ObservePeriod = rc.ObservePeriod - r.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor = r.Common.ReplicationFactor } // Distributor @@ -244,6 +254,19 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit r.CompactorConfig.CompactorRing.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled r.CompactorConfig.CompactorRing.KVStore = rc.KVStore } + + // IndexGateway + if mergeWithExisting || reflect.DeepEqual(r.IndexGateway.Ring, defaults.IndexGateway.Ring) { + r.IndexGateway.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.IndexGateway.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.IndexGateway.Ring.InstancePort = rc.InstancePort + r.IndexGateway.Ring.InstanceAddr = rc.InstanceAddr + r.IndexGateway.Ring.InstanceID = rc.InstanceID + r.IndexGateway.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.IndexGateway.Ring.InstanceZone = rc.InstanceZone + r.IndexGateway.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.IndexGateway.Ring.KVStore = rc.KVStore + } } func applyTokensFilePath(cfg *ConfigWrapper) error { @@ -268,6 +291,12 @@ func applyTokensFilePath(cfg *ConfigWrapper) error { } cfg.QueryScheduler.SchedulerRing.TokensFilePath = f + f, err = tokensFile(cfg, "indexgateway.tokens") + if err != nil { + return err + } + cfg.IndexGateway.Ring.TokensFilePath = f + return nil } @@ -304,8 +333,10 @@ func applyPathPrefixDefaults(r, defaults *ConfigWrapper) { } } -// appendLoopbackInterface will append the loopback interface to the interface names used for the ingester ring, -// v2 frontend, and common ring config unless an explicit list of names was provided. +// appendLoopbackInterface will append the loopback interface to the interface names used by the Loki components +// (ex: rings, v2 frontend, etc). +// +// The append won't occur for an specific component if an explicit list of net interface names is provided for that component. func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { loopbackIface, err := loki_net.LoopbackInterfaceName() if err != nil { @@ -339,6 +370,10 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { if reflect.DeepEqual(cfg.Ruler.Ring.InstanceInterfaceNames, defaults.Ruler.Ring.InstanceInterfaceNames) { cfg.Ruler.Ring.InstanceInterfaceNames = append(cfg.Ruler.Ring.InstanceInterfaceNames, loopbackIface) } + + if reflect.DeepEqual(cfg.IndexGateway.Ring.InstanceInterfaceNames, defaults.IndexGateway.Ring.InstanceInterfaceNames) { + cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface) + } } // applyMemberlistConfig will change the default ingester, distributor, ruler, and query scheduler ring configurations to use memberlist. @@ -351,6 +386,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.Ruler.Ring.KVStore.Store = memberlistStr r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr + r.IndexGateway.Ring.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 365d328fb3b52..bc796be2771a0 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -182,9 +182,10 @@ memberlist: join_members: - foo.bar.example.com` - config, _ := testContext(configFileString, []string{"-ruler.ring.store", "inmemory"}) + config, _ := testContext(configFileString, []string{"-ruler.ring.store", "inmemory", "-index-gateway.ring.store", "etcd"}) assert.EqualValues(t, "inmemory", config.Ruler.Ring.KVStore.Store) + assert.EqualValues(t, "etcd", config.IndexGateway.Ring.KVStore.Store) assert.EqualValues(t, memberlistStr, config.Ingester.LifecyclerConfig.RingConfig.KVStore.Store) assert.EqualValues(t, memberlistStr, config.Distributor.DistributorRing.KVStore.Store) @@ -948,6 +949,7 @@ common: assert.Equal(t, "", config.Ingester.LifecyclerConfig.TokensFilePath) assert.Equal(t, "", config.CompactorConfig.CompactorRing.TokensFilePath) assert.Equal(t, "", config.QueryScheduler.SchedulerRing.TokensFilePath) + assert.Equal(t, "", config.IndexGateway.Ring.TokensFilePath) }) t.Run("tokens files should be set from common config when persist_tokens is true and path_prefix is defined", func(t *testing.T) { @@ -962,6 +964,7 @@ common: assert.Equal(t, "/loki/ingester.tokens", config.Ingester.LifecyclerConfig.TokensFilePath) assert.Equal(t, "/loki/compactor.tokens", config.CompactorConfig.CompactorRing.TokensFilePath) assert.Equal(t, "/loki/scheduler.tokens", config.QueryScheduler.SchedulerRing.TokensFilePath) + assert.Equal(t, "/loki/indexgateway.tokens", config.IndexGateway.Ring.TokensFilePath) }) t.Run("ingester config not applied to other rings if actual values set", func(t *testing.T) { @@ -975,6 +978,9 @@ compactor: query_scheduler: scheduler_ring: tokens_file_path: /sched/tokes +index_gateway: + ring: + tokens_file_path: /looki/tookens common: persist_tokens: true path_prefix: /loki @@ -985,6 +991,7 @@ common: assert.Equal(t, "/loki/toookens", config.Ingester.LifecyclerConfig.TokensFilePath) assert.Equal(t, "/foo/tokens", config.CompactorConfig.CompactorRing.TokensFilePath) assert.Equal(t, "/sched/tokes", config.QueryScheduler.SchedulerRing.TokensFilePath) + assert.Equal(t, "/looki/tookens", config.IndexGateway.Ring.TokensFilePath) }) t.Run("ingester ring configuration is used for other rings when no common ring or memberlist config is provided", func(t *testing.T) { @@ -1003,6 +1010,7 @@ ingester: assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, "etcd", config.IndexGateway.Ring.KVStore.Store) }) t.Run("memberlist configuration takes precedence over copying ingester config", func(t *testing.T) { @@ -1025,6 +1033,7 @@ ingester: assert.Equal(t, "memberlist", config.Ruler.Ring.KVStore.Store) assert.Equal(t, "memberlist", config.QueryScheduler.SchedulerRing.KVStore.Store) assert.Equal(t, "memberlist", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, "memberlist", config.IndexGateway.Ring.KVStore.Store) }) } @@ -1175,6 +1184,7 @@ func TestCommonRingConfigSection(t *testing.T) { assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, "etcd", config.IndexGateway.Ring.KVStore.Store) }) t.Run("if common ring is provided, reuse it for all rings that aren't explicitly set", func(t *testing.T) { @@ -1196,6 +1206,7 @@ ingester: assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, "etcd", config.IndexGateway.Ring.KVStore.Store) }) t.Run("if only ingester ring is provided, reuse it for all rings", func(t *testing.T) { @@ -1211,6 +1222,7 @@ ingester: assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, "etcd", config.IndexGateway.Ring.KVStore.Store) }) t.Run("if a ring is explicitly configured, don't override any part of it with ingester config", func(t *testing.T) { @@ -1242,6 +1254,9 @@ ingester: assert.Equal(t, "inmemory", config.CompactorConfig.CompactorRing.KVStore.Store) assert.Equal(t, 5*time.Minute, config.CompactorConfig.CompactorRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.IndexGateway.Ring.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.IndexGateway.Ring.HeartbeatPeriod) }) t.Run("if a ring is explicitly configured, merge common config with unconfigured parts of explicitly configured ring", func(t *testing.T) { @@ -1273,6 +1288,9 @@ distributor: assert.Equal(t, "inmemory", config.CompactorConfig.CompactorRing.KVStore.Store) assert.Equal(t, 5*time.Minute, config.CompactorConfig.CompactorRing.HeartbeatPeriod) + + assert.Equal(t, "inmemory", config.IndexGateway.Ring.KVStore.Store) + assert.Equal(t, 5*time.Minute, config.IndexGateway.Ring.HeartbeatPeriod) }) t.Run("ring configs provided via command line take precedence", func(t *testing.T) { @@ -1289,6 +1307,7 @@ distributor: assert.Equal(t, "consul", config.Ruler.Ring.KVStore.Store) assert.Equal(t, "consul", config.QueryScheduler.SchedulerRing.KVStore.Store) assert.Equal(t, "consul", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, "consul", config.IndexGateway.Ring.KVStore.Store) }) t.Run("common ring config take precedence over common memberlist config", func(t *testing.T) { @@ -1306,6 +1325,7 @@ common: assert.Equal(t, "etcd", config.Ruler.Ring.KVStore.Store) assert.Equal(t, "etcd", config.QueryScheduler.SchedulerRing.KVStore.Store) assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store) + assert.Equal(t, "etcd", config.IndexGateway.Ring.KVStore.Store) }) } @@ -1342,10 +1362,38 @@ func Test_replicationFactor(t *testing.T) { join_members: - foo.bar.example.com common: - replication_factor: 1` + replication_factor: 2` + config, _, err := configWrapperFromYAML(t, yamlContent, nil) + assert.NoError(t, err) + assert.Equal(t, 2, config.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor) + assert.Equal(t, 2, config.IndexGateway.Ring.ReplicationFactor) + }) +} + +func Test_IndexGatewayRingReplicationFactor(t *testing.T) { + t.Run("default replication factor is 3", func(t *testing.T) { + const emptyConfigString = `--- +server: + http_listen_port: 80` + config, _, err := configWrapperFromYAML(t, emptyConfigString, nil) + assert.NoError(t, err) + assert.Equal(t, 3, config.IndexGateway.Ring.ReplicationFactor) + }) + + t.Run("explicit replication factor for the index gateway should override all other definitions", func(t *testing.T) { + yamlContent := `ingester: + lifecycler: + ring: + replication_factor: 15 +common: + replication_factor: 30 +index_gateway: + ring: + replication_factor: 7` + config, _, err := configWrapperFromYAML(t, yamlContent, nil) assert.NoError(t, err) - assert.Equal(t, 1, config.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor) + assert.Equal(t, 7, config.IndexGateway.Ring.ReplicationFactor) }) } @@ -1368,6 +1416,9 @@ frontend: compactor: compactor_ring: instance_addr: mycompactor +index_gateway: + ring: + instance_addr: myindexgateway common: instance_addr: 99.99.99.99 ring: @@ -1380,6 +1431,7 @@ common: assert.Equal(t, "myscheduler", config.QueryScheduler.SchedulerRing.InstanceAddr) assert.Equal(t, "myqueryfrontend", config.Frontend.FrontendV2.Addr) assert.Equal(t, "mycompactor", config.CompactorConfig.CompactorRing.InstanceAddr) + assert.Equal(t, "myindexgateway", config.IndexGateway.Ring.InstanceAddr) }) t.Run("common instance addr is applied when addresses are not explicitly set", func(t *testing.T) { @@ -1393,6 +1445,7 @@ common: assert.Equal(t, "99.99.99.99", config.QueryScheduler.SchedulerRing.InstanceAddr) assert.Equal(t, "99.99.99.99", config.Frontend.FrontendV2.Addr) assert.Equal(t, "99.99.99.99", config.CompactorConfig.CompactorRing.InstanceAddr) + assert.Equal(t, "99.99.99.99", config.IndexGateway.Ring.InstanceAddr) }) t.Run("common instance addr doesn't supersede instance addr from common ring", func(t *testing.T) { @@ -1409,6 +1462,7 @@ common: assert.Equal(t, "22.22.22.22", config.QueryScheduler.SchedulerRing.InstanceAddr) assert.Equal(t, "99.99.99.99", config.Frontend.FrontendV2.Addr) // not a ring. assert.Equal(t, "22.22.22.22", config.CompactorConfig.CompactorRing.InstanceAddr) + assert.Equal(t, "22.22.22.22", config.IndexGateway.Ring.InstanceAddr) }) } @@ -1430,6 +1484,10 @@ query_scheduler: scheduler_ring: instance_interface_names: - myscheduler +index_gateway: + ring: + instance_interface_names: + - myindexgateway frontend: instance_interface_names: - myfrontend @@ -1451,6 +1509,7 @@ common: assert.Equal(t, []string{"myscheduler"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames) assert.Equal(t, []string{"myfrontend"}, config.Frontend.FrontendV2.InfNames) assert.Equal(t, []string{"mycompactor"}, config.CompactorConfig.CompactorRing.InstanceInterfaceNames) + assert.Equal(t, []string{"myindexgateway"}, config.IndexGateway.Ring.InstanceInterfaceNames) }) t.Run("common instance net interfaces is applied when others net interfaces are not explicitly set", func(t *testing.T) { @@ -1465,6 +1524,7 @@ common: assert.Equal(t, []string{"commoninterface"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames) assert.Equal(t, []string{"commoninterface"}, config.Frontend.FrontendV2.InfNames) assert.Equal(t, []string{"commoninterface"}, config.CompactorConfig.CompactorRing.InstanceInterfaceNames) + assert.Equal(t, []string{"commoninterface"}, config.IndexGateway.Ring.InstanceInterfaceNames) }) t.Run("common instance net interface doesn't supersede net interface from common ring", func(t *testing.T) { @@ -1483,5 +1543,6 @@ common: assert.Equal(t, []string{"ringsshouldusethis"}, config.QueryScheduler.SchedulerRing.InstanceInterfaceNames) assert.Equal(t, []string{"ringsshouldntusethis"}, config.Frontend.FrontendV2.InfNames) // not a ring. assert.Equal(t, []string{"ringsshouldusethis"}, config.CompactorConfig.CompactorRing.InstanceInterfaceNames) + assert.Equal(t, []string{"ringsshouldusethis"}, config.IndexGateway.Ring.InstanceInterfaceNames) }) } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 6ef8b8c782291..af3e289687f7d 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -43,6 +43,7 @@ import ( "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/tracing" "github.com/grafana/loki/pkg/usagestats" "github.com/grafana/loki/pkg/util" @@ -66,6 +67,7 @@ type Config struct { IngesterClient client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` StorageConfig storage.Config `yaml:"storage_config,omitempty"` + IndexGateway indexgateway.Config `yaml:"index_gateway"` ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"` SchemaConfig config.SchemaConfig `yaml:"schema_config,omitempty"` LimitsConfig validation.Limits `yaml:"limits_config,omitempty"` @@ -103,6 +105,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) c.StorageConfig.RegisterFlags(f) + c.IndexGateway.RegisterFlags(f) c.ChunkStoreConfig.RegisterFlags(f) c.SchemaConfig.RegisterFlags(f) c.LimitsConfig.RegisterFlags(f) @@ -249,6 +252,7 @@ type Loki struct { QueryFrontEndTripperware basetripper.Tripperware queryScheduler *scheduler.Scheduler usageReport *usagestats.Reporter + indexGatewayRing *ring.Ring clientMetrics storage.ClientMetrics @@ -483,6 +487,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(IndexGateway, t.initIndexGateway) mm.RegisterModule(QueryScheduler, t.initQueryScheduler) + mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule) mm.RegisterModule(UsageReport, t.initUsageReport) mm.RegisterModule(All, nil) @@ -497,7 +502,7 @@ func (t *Loki) setupModuleManager() error { OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport}, - Store: {Overrides}, + Store: {Overrides, IndexGatewayRing}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport}, Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport}, QueryFrontendTripperware: {Server, Overrides, TenantConfigs}, @@ -506,8 +511,9 @@ func (t *Loki) setupModuleManager() error { Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, TableManager: {Server, UsageReport}, Compactor: {Server, Overrides, MemberlistKV, UsageReport}, - IndexGateway: {Server, Overrides, UsageReport}, + IndexGateway: {Server, Overrides, UsageReport, MemberlistKV}, IngesterQuerier: {Ring}, + IndexGatewayRing: {RuntimeConfig, Server, MemberlistKV}, All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, Read: {QueryScheduler, QueryFrontend, Querier, Ruler, Compactor}, Write: {Ingester, Distributor}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 98b8499c7779f..86bb0bea49201 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -10,7 +10,9 @@ import ( "os" "time" + "github.com/grafana/dskit/kv" "github.com/grafana/dskit/tenant" + gerrors "github.com/pkg/errors" "github.com/NYTimes/gziphandler" "github.com/go-kit/log" @@ -84,6 +86,7 @@ const ( MemberlistKV string = "memberlist-kv" Compactor string = "compactor" IndexGateway string = "index-gateway" + IndexGatewayRing string = "index-gateway-ring" QueryScheduler string = "query-scheduler" All string = "all" Read string = "read" @@ -413,6 +416,9 @@ func (t *Loki) initStore() (_ services.Service, err error) { } } + t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Mode = t.Cfg.IndexGateway.Mode + t.Cfg.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Ring = t.indexGatewayRing + var asyncStore bool if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) { boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg) @@ -761,6 +767,9 @@ func (t *Loki) initCompactor() (services.Service, error) { func (t *Loki) initIndexGateway() (services.Service, error) { t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly + t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort + objectClient, err := storage.NewObjectClient(t.Cfg.StorageConfig.BoltDBShipperConfig.SharedStoreType, t.Cfg.StorageConfig, t.clientMetrics) if err != nil { return nil, err @@ -771,11 +780,49 @@ func (t *Loki) initIndexGateway() (services.Service, error) { return nil, err } - gateway := indexgateway.NewIndexGateway(shipperIndexClient) + gateway, err := indexgateway.NewIndexGateway(t.Cfg.IndexGateway, util_log.Logger, prometheus.DefaultRegisterer, shipperIndexClient) + if err != nil { + return nil, err + } + + t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(gateway) indexgatewaypb.RegisterIndexGatewayServer(t.Server.GRPC, gateway) return gateway, nil } +func (t *Loki) initIndexGatewayRing() (_ services.Service, err error) { + if t.Cfg.IndexGateway.Mode != indexgateway.RingMode { + return + } + + t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly + t.Cfg.IndexGateway.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.IndexGateway.Ring.ListenPort = t.Cfg.Server.GRPCListenPort + ringCfg := t.Cfg.IndexGateway.Ring.ToRingConfig(t.Cfg.IndexGateway.Ring.ReplicationFactor) + reg := prometheus.WrapRegistererWithPrefix("loki_", prometheus.DefaultRegisterer) + + logger := util_log.Logger + ringStore, err := kv.NewClient( + ringCfg.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", reg), "index-gateway"), + logger, + ) + if err != nil { + return nil, gerrors.Wrap(err, "kv new client") + } + + t.indexGatewayRing, err = ring.NewWithStoreClientAndStrategy( + ringCfg, indexgateway.RingIdentifier, indexgateway.RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", reg), logger, + ) + if err != nil { + return nil, gerrors.Wrap(err, "new with store client and strategy") + } + + t.Server.HTTP.Path("/indexgateway/ring").Methods("GET", "POST").Handler(t.indexGatewayRing) + return t.indexGatewayRing, nil +} + func (t *Loki) initQueryScheduler() (services.Service, error) { // Set some config sections from other config sections in the config struct t.Cfg.QueryScheduler.SchedulerRing.ListenPort = t.Cfg.Server.GRPCListenPort diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index a2bc00846b483..aa204e3f8b071 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -145,7 +145,7 @@ func NewIndexClient(name string, cfg Config, schemaCfg config.SchemaConfig, limi return boltDBIndexClientWithShipper, nil } if cfg.BoltDBShipperConfig.Mode == shipper.ModeReadOnly && cfg.BoltDBShipperConfig.IndexGatewayClientConfig.Address != "" { - gateway, err := shipper.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer) + gateway, err := shipper.NewGatewayClient(cfg.BoltDBShipperConfig.IndexGatewayClientConfig, registerer, util_log.Logger) if err != nil { return nil, err } diff --git a/pkg/storage/stores/shipper/gateway_client.go b/pkg/storage/stores/shipper/gateway_client.go index 8c920dd6ce7d8..8efc15266430a 100644 --- a/pkg/storage/stores/shipper/gateway_client.go +++ b/pkg/storage/stores/shipper/gateway_client.go @@ -5,19 +5,27 @@ import ( "flag" "fmt" "io" + "math/rand" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/ring" + ring_client "github.com/grafana/dskit/ring/client" + "github.com/grafana/dskit/tenant" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/instrument" "google.golang.org/grpc" + "github.com/grafana/loki/pkg/distributor/clientpool" "github.com/grafana/loki/pkg/storage/stores/series/index" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" + "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" util_math "github.com/grafana/loki/pkg/util/math" ) @@ -27,32 +35,67 @@ const ( maxConcurrentGrpcCalls = 10 ) +// IndexGatewayClientConfig configures the Index Gateway client used to +// communicate with the Index Gateway server. type IndexGatewayClientConfig struct { - Address string `yaml:"server_address,omitempty"` + // Mode sets in which mode the client will operate. It is actually defined at the + // index_gateway YAML section and reused here. + Mode indexgateway.Mode `yaml:"-"` + + // PoolConfig defines the behavior of the gRPC connection pool used to communicate + // with the Index Gateway. + // + // Only relevant for the ring mode. + // It is defined at the distributors YAML section and reused here. + PoolConfig clientpool.PoolConfig `yaml:"-"` + + // Ring is the Index Gateway ring used to find the appropriate Index Gateway instance + // this client should talk to. + // + // Only relevant for the ring mode. + Ring ring.ReadRing `yaml:"-"` + + // GRPCClientConfig configures the gRPC connection between the Index Gateway client and the server. + // + // Used by both, ring and simple mode. GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"` -} -// RegisterFlags registers flags. -func (cfg *IndexGatewayClientConfig) RegisterFlags(f *flag.FlagSet) { - cfg.GRPCClientConfig.RegisterFlagsWithPrefix("", f) + // Address of the Index Gateway instance responsible for retaining the index for all tenants. + // + // Only relevant for the simple mode. + Address string `yaml:"server_address,omitempty"` } -// RegisterFlagsWithPrefix registers flags with prefix. -func (cfg *IndexGatewayClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f) +// RegisterFlagsWithPrefix register client-specific flags with the given prefix. +// +// Flags that are used by both, client and server, are defined in the indexgateway package. +func (i *IndexGatewayClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + i.GRPCClientConfig.RegisterFlagsWithPrefix(prefix+".grpc", f) + f.StringVar(&i.Address, prefix+".server-address", "", "Hostname or IP of the Index Gateway gRPC server running in simple mode.") +} - f.StringVar(&cfg.Address, prefix+".server-address", "", "Hostname or IP of the Index Gateway gRPC server.") +func (i *IndexGatewayClientConfig) RegisterFlags(f *flag.FlagSet) { + i.RegisterFlagsWithPrefix("index-gateway-client", f) } type GatewayClient struct { cfg IndexGatewayClientConfig storeGatewayClientRequestDuration *prometheus.HistogramVec - conn *grpc.ClientConn - grpcClient indexgatewaypb.IndexGatewayClient + + conn *grpc.ClientConn + grpcClient indexgatewaypb.IndexGatewayClient + + pool *ring_client.Pool + + ring ring.ReadRing } -func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer) (*GatewayClient, error) { +// NewGatewayClient instantiates a new client used to communicate with an Index Gateway instance. +// +// If it is configured to be in ring mode, a pool of GRPC connections to all Index Gateway instances is created. +// Otherwise, it creates a single GRPC connection to an Index Gateway instance running in simple mode. +func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, logger log.Logger) (*GatewayClient, error) { sgClient := &GatewayClient{ cfg: cfg, storeGatewayClientRequestDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ @@ -61,24 +104,44 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer) (*G Help: "Time (in seconds) spent serving requests when using boltdb shipper store gateway", Buckets: instrument.DefBuckets, }, []string{"operation", "status_code"}), + ring: cfg.Ring, } dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration)) if err != nil { - return nil, err + return nil, errors.Wrap(err, "index gateway grpc dial option") } - sgClient.conn, err = grpc.Dial(cfg.Address, dialOpts...) - if err != nil { - return nil, err + if sgClient.cfg.Mode == indexgateway.RingMode { + factory := func(addr string) (ring_client.PoolClient, error) { + igPool, err := NewIndexGatewayGRPCPool(addr, dialOpts) + if err != nil { + return nil, errors.Wrap(err, "new index gateway grpc pool") + } + + return igPool, nil + } + + sgClient.pool = clientpool.NewPool(cfg.PoolConfig, sgClient.ring, factory, logger) + } else { + sgClient.conn, err = grpc.Dial(cfg.Address, dialOpts...) + if err != nil { + return nil, errors.Wrap(err, "index gateway grpc dial") + } + + sgClient.grpcClient = indexgatewaypb.NewIndexGatewayClient(sgClient.conn) } - sgClient.grpcClient = indexgatewaypb.NewIndexGatewayClient(sgClient.conn) return sgClient, nil } +// Stop stops the execution of this gateway client. +// +// If it is in simple mode, the single GRPC connection is closed. Otherwise, nothing happens. func (s *GatewayClient) Stop() { - s.conn.Close() + if s.cfg.Mode == indexgateway.SimpleMode { + s.conn.Close() + } } func (s *GatewayClient) QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error { @@ -110,9 +173,21 @@ func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, ca }) } - streamer, err := s.grpcClient.QueryIndex(ctx, &indexgatewaypb.QueryIndexRequest{Queries: gatewayQueries}) + if s.cfg.Mode == indexgateway.RingMode { + return s.ringModeDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback) + } + + return s.clientDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback, s.grpcClient) +} + +// clientDoQueries send a query request to an Index Gateway instance using the given gRPC client. +// +// It is used by both, simple and ring mode. +func (s *GatewayClient) clientDoQueries(ctx context.Context, gatewayQueries []*indexgatewaypb.IndexQuery, + queryKeyQueryMap map[string]index.Query, callback index.QueryPagesCallback, client indexgatewaypb.IndexGatewayClient) error { + streamer, err := client.QueryIndex(ctx, &indexgatewaypb.QueryIndexRequest{Queries: gatewayQueries}) if err != nil { - return err + return errors.Wrap(err, "query index") } for { @@ -136,6 +211,53 @@ func (s *GatewayClient) doQueries(ctx context.Context, queries []index.Query, ca return nil } +// ringModeDoQueries prepares an index query to be sent to the Index Gateway, and then sends it +// using the clientDoQueries implementation. +// +// The preparation and sending phase includes: +// 1. Extracting the tenant name from the query. +// 2. Fetching different Index Gateway instances assigned to the extracted tenant. +// 3. Iterating in parallel over all fetched Index Gateway instances, getting their gRPC connections +// from the pool and invoking clientDoQueries using their client. +func (s *GatewayClient) ringModeDoQueries(ctx context.Context, gatewayQueries []*indexgatewaypb.IndexQuery, queryKeyQueryMap map[string]index.Query, callback index.QueryPagesCallback) error { + userID, err := tenant.TenantID(ctx) + if err != nil { + return errors.Wrap(err, "index gateway client get tenant ID") + } + + bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() + + key := util.TokenFor(userID, "" /* labels */) + rs, err := s.ring.Get(key, ring.WriteNoExtend, bufDescs, bufHosts, bufZones) + if err != nil { + return errors.Wrap(err, "index gateway get ring") + } + + addrs := rs.GetAddresses() + // shuffle addresses to make sure we don't always access the same Index Gateway instances in sequence for same tenant. + rand.Shuffle(len(addrs), func(i, j int) { + addrs[i], addrs[j] = addrs[j], addrs[i] + }) + + for _, addr := range addrs { + genericClient, err := s.pool.GetClientFor(addr) + if err != nil { + level.Error(util_log.Logger).Log("msg", fmt.Sprintf("failed to get client for instance %s", addr), "err", err) + continue + } + + client := (genericClient.(indexgatewaypb.IndexGatewayClient)) + if err := s.clientDoQueries(ctx, gatewayQueries, queryKeyQueryMap, callback, client); err != nil { + level.Error(util_log.Logger).Log("msg", fmt.Sprintf("client do queries failed for instance %s", addr), "err", err) + continue + } + + return nil + } + + return fmt.Errorf("index gateway replicationSet clientDoQueries") +} + func (s *GatewayClient) NewWriteBatch() index.WriteBatch { panic("unsupported") } diff --git a/pkg/storage/stores/shipper/gateway_client_test.go b/pkg/storage/stores/shipper/gateway_client_test.go index e0c0c9c41f64f..cb7ae286dd88a 100644 --- a/pkg/storage/stores/shipper/gateway_client_test.go +++ b/pkg/storage/stores/shipper/gateway_client_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/grafana/dskit/flagext" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -29,6 +30,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/storage" "github.com/grafana/loki/pkg/storage/stores/shipper/testutil" "github.com/grafana/loki/pkg/storage/stores/shipper/util" + util_log "github.com/grafana/loki/pkg/util/log" util_math "github.com/grafana/loki/pkg/util/math" "github.com/grafana/loki/pkg/validation" ) @@ -122,10 +124,11 @@ func TestGatewayClient(t *testing.T) { defer cleanup() var cfg IndexGatewayClientConfig + cfg.Mode = indexgateway.SimpleMode flagext.DefaultValues(&cfg) cfg.Address = storeAddress - gatewayClient, err := NewGatewayClient(cfg, nil) + gatewayClient, err := NewGatewayClient(cfg, prometheus.DefaultRegisterer, util_log.Logger) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "fake") @@ -228,7 +231,11 @@ func benchmarkIndexQueries(b *testing.B, queries []index.Query) { require.NoError(b, err) // initialize the index gateway server - gw := indexgateway.NewIndexGateway(tm) + var cfg indexgateway.Config + flagext.DefaultValues(&cfg) + + gw, err := indexgateway.NewIndexGateway(cfg, util_log.Logger, prometheus.DefaultRegisterer, tm) + require.NoError(b, err) indexgatewaypb.RegisterIndexGatewayServer(s, gw) go func() { if err := s.Serve(listener); err != nil { diff --git a/pkg/storage/stores/shipper/index_gateway_grpc_pool.go b/pkg/storage/stores/shipper/index_gateway_grpc_pool.go new file mode 100644 index 0000000000000..1b4ce22787e81 --- /dev/null +++ b/pkg/storage/stores/shipper/index_gateway_grpc_pool.go @@ -0,0 +1,36 @@ +package shipper + +import ( + "io" + + "github.com/pkg/errors" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" +) + +// IndexGatewayGRPCPool represents a pool of gRPC connections to different index gateway instances. +// +// Only used when Index Gateway is configured to run in ring mode. +type IndexGatewayGRPCPool struct { + grpc_health_v1.HealthClient + indexgatewaypb.IndexGatewayClient + io.Closer +} + +// NewIndexGatewayGRPCPool instantiates a new pool of IndexGateway GRPC connections. +// +// Internally, it also instantiates a protobuf index gateway client and a health client. +func NewIndexGatewayGRPCPool(address string, opts []grpc.DialOption) (*IndexGatewayGRPCPool, error) { + conn, err := grpc.Dial(address, opts...) + if err != nil { + return nil, errors.Wrap(err, "shipper new grpc pool dial") + } + + return &IndexGatewayGRPCPool{ + Closer: conn, + HealthClient: grpc_health_v1.NewHealthClient(conn), + IndexGatewayClient: indexgatewaypb.NewIndexGatewayClient(conn), + }, nil +} diff --git a/pkg/storage/stores/shipper/indexgateway/config.go b/pkg/storage/stores/shipper/indexgateway/config.go new file mode 100644 index 0000000000000..80beaceb33dbf --- /dev/null +++ b/pkg/storage/stores/shipper/indexgateway/config.go @@ -0,0 +1,85 @@ +package indexgateway + +import ( + "flag" + "fmt" + + loki_util "github.com/grafana/loki/pkg/util" +) + +// Mode represents in which mode an Index Gateway instance is running. +// +// Right now, two modes are supported: simple mode (default) and ring mode. +type Mode string + +// Set implements a flag interface, and is necessary to use the IndexGatewayClientMode as a flag. +func (i Mode) Set(v string) error { + switch v { + case string(SimpleMode): + // nolint:ineffassign + i = SimpleMode + case string(RingMode): + // nolint:ineffassign + i = RingMode + default: + return fmt.Errorf("mode %s not supported. list of supported modes: simple (default), ring", v) + } + return nil +} + +// String implements a flag interface, and is necessary to use the IndexGatewayClientMode as a flag. +func (i Mode) String() string { + switch i { + case RingMode: + return string(RingMode) + default: + return string(SimpleMode) + } +} + +const ( + // SimpleMode is a mode where an Index Gateway instance solely handle all the work. + SimpleMode Mode = "simple" + + // RingMode is a mode where different Index Gateway instances are assigned to handle different tenants. + // + // It is more horizontally scalable than the simple mode, but requires running a key-value store ring. + RingMode Mode = "ring" +) + +// RingCfg is a wrapper for our Index Gateway ring configuration plus the replication factor. +type RingCfg struct { + // InternalRingCfg configures the Index Gateway ring. + loki_util.RingConfig `yaml:",inline"` + + // ReplicationFactor defines how many Index Gateway instances are assigned to each tenant. + // + // Whenever the store queries the ring key-value store for the Index Gateway instance responsible for tenant X, + // multiple Index Gateway instances are expected to be returned as Index Gateway might be busy/locked for specific + // reasons (this is assured by the spikey behavior of Index Gateway latencies). + ReplicationFactor int `yaml:"replication_factor"` +} + +// RegisterFlagsWithPrefix register all Index Gateway flags related to its ring but with a proper store prefix to avoid conflicts. +func (cfg *RingCfg) RegisterFlags(prefix, storePrefix string, f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(prefix, storePrefix, f) + f.IntVar(&cfg.ReplicationFactor, "replication-factor", 3, "how many index gateway instances are assigned to each tenant") +} + +// Config configures an Index Gateway server. +type Config struct { + // Mode configures in which mode the client will be running when querying and communicating with an Index Gateway instance. + Mode Mode `yaml:"mode"` + + // Ring configures the ring key-value store used to save and retrieve the different Index Gateway instances. + // + // In case it isn't explicitly set, it follows the same behavior of the other rings (ex: using the common configuration + // section and the ingester configuration by default). + Ring RingCfg `yaml:"ring,omitempty"` +} + +// RegisterFlags register all IndexGatewayClientConfig flags and all the flags of its subconfigs but with a prefix (ex: shipper). +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.Ring.RegisterFlags("index-gateway.", "collectors/", f) + f.StringVar((*string)(&cfg.Mode), "index-gateway.mode", SimpleMode.String(), "mode in which the index gateway client will be running") +} diff --git a/pkg/storage/stores/shipper/indexgateway/gateway.go b/pkg/storage/stores/shipper/indexgateway/gateway.go index 0a805553b1e25..a99b9104580f5 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway.go @@ -2,40 +2,200 @@ package indexgateway import ( "context" + "fmt" + "net/http" "sync" + "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/kv" + "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/storage/stores/series/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexgateway/indexgatewaypb" "github.com/grafana/loki/pkg/storage/stores/shipper/util" + util_log "github.com/grafana/loki/pkg/util/log" ) -const maxIndexEntriesPerResponse = 1000 +const ( + maxIndexEntriesPerResponse = 1000 + ringAutoForgetUnhealthyPeriods = 10 + ringNameForServer = "index-gateway" + ringNumTokens = 128 + ringCheckPeriod = 3 * time.Second + + // RingIdentifier is used as a unique name to register the Index Gateway ring. + RingIdentifier = "index-gateway" + + // RingKey is the name of the key used to register the different Index Gateway instances in the key-value store. + RingKey = "index-gateway" +) type IndexQuerier interface { QueryPages(ctx context.Context, queries []index.Query, callback index.QueryPagesCallback) error Stop() } -type gateway struct { +type Gateway struct { services.Service indexQuerier IndexQuerier + cfg Config + log log.Logger + + shipper IndexQuerier + + subservices *services.Manager + subservicesWatcher *services.FailureWatcher + + ringLifecycler *ring.BasicLifecycler + ring *ring.Ring } -func NewIndexGateway(indexQuerier IndexQuerier) *gateway { - g := &gateway{ +// NewIndexGateway instantiates a new Index Gateway and start its services. +// +// In case it is configured to be in ring mode, a Basic Service wrapping the ring client is started. +// Otherwise, it starts an Idle Service that doesn't have lifecycle hooks. +func NewIndexGateway(cfg Config, log log.Logger, registerer prometheus.Registerer, indexQuerier IndexQuerier) (*Gateway, error) { + g := &Gateway{ indexQuerier: indexQuerier, + cfg: cfg, + log: log, } - g.Service = services.NewIdleService(nil, func(failureCase error) error { - g.indexQuerier.Stop() - return nil - }) - return g + + if cfg.Mode == RingMode { + ringStore, err := kv.NewClient( + cfg.Ring.KVStore, + ring.GetCodec(), + kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("loki_", registerer), "index-gateway"), + log, + ) + if err != nil { + return nil, errors.Wrap(err, "create KV store client") + } + + lifecyclerCfg, err := cfg.Ring.ToLifecyclerConfig(ringNumTokens, log) + if err != nil { + return nil, errors.Wrap(err, "invalid ring lifecycler config") + } + + delegate := ring.BasicLifecyclerDelegate(g) + delegate = ring.NewLeaveOnStoppingDelegate(delegate, log) + delegate = ring.NewTokensPersistencyDelegate(cfg.Ring.TokensFilePath, ring.JOINING, delegate, log) + delegate = ring.NewAutoForgetDelegate(ringAutoForgetUnhealthyPeriods*cfg.Ring.HeartbeatTimeout, delegate, log) + + g.ringLifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, ringNameForServer, RingKey, ringStore, delegate, log, registerer) + if err != nil { + return nil, errors.Wrap(err, "index gateway create ring lifecycler") + } + + ringCfg := cfg.Ring.ToRingConfig(cfg.Ring.ReplicationFactor) + g.ring, err = ring.NewWithStoreClientAndStrategy(ringCfg, ringNameForServer, RingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("loki_", registerer), log) + if err != nil { + return nil, errors.Wrap(err, "index gateway create ring client") + } + + svcs := []services.Service{g.ringLifecycler, g.ring} + g.subservices, err = services.NewManager(svcs...) + if err != nil { + return nil, fmt.Errorf("new index gateway services manager: %w", err) + } + + g.subservicesWatcher = services.NewFailureWatcher() + g.subservicesWatcher.WatchManager(g.subservices) + g.Service = services.NewBasicService(g.starting, g.running, g.stopping) + } else { + g.Service = services.NewIdleService(nil, func(failureCase error) error { + g.indexQuerier.Stop() + return nil + }) + } + + return g, nil } -func (g *gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server indexgatewaypb.IndexGateway_QueryIndexServer) error { +// starting implements the Lifecycler interface and is one of the lifecycle hooks. +// +// Only invoked if the Index Gateway is in ring mode. +func (g *Gateway) starting(ctx context.Context) (err error) { + // In case this function will return error we want to unregister the instance + // from the ring. We do it ensuring dependencies are gracefully stopped if they + // were already started. + defer func() { + if err == nil || g.subservices == nil { + return + } + + if stopErr := services.StopManagerAndAwaitStopped(context.Background(), g.subservices); stopErr != nil { + level.Error(util_log.Logger).Log("msg", "failed to gracefully stop index gateway dependencies", "err", stopErr) + } + }() + + if err := services.StartManagerAndAwaitHealthy(ctx, g.subservices); err != nil { + return errors.Wrap(err, "unable to start index gateway subservices") + } + + // The BasicLifecycler does not automatically move state to ACTIVE such that any additional work that + // someone wants to do can be done before becoming ACTIVE. For the index gateway we don't currently + // have any additional work so we can become ACTIVE right away. + // Wait until the ring client detected this instance in the JOINING + // state to make sure that when we'll run the initial sync we already + // know the tokens assigned to this instance. + level.Info(util_log.Logger).Log("msg", "waiting until index gateway is JOINING in the ring") + if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.JOINING); err != nil { + return err + } + level.Info(util_log.Logger).Log("msg", "index gateway is JOINING in the ring") + + if err = g.ringLifecycler.ChangeState(ctx, ring.ACTIVE); err != nil { + return errors.Wrapf(err, "switch instance to %s in the ring", ring.ACTIVE) + } + + // Wait until the ring client detected this instance in the ACTIVE state to + // make sure that when we'll run the loop it won't be detected as a ring + // topology change. + level.Info(util_log.Logger).Log("msg", "waiting until index gateway is ACTIVE in the ring") + if err := ring.WaitInstanceState(ctx, g.ring, g.ringLifecycler.GetInstanceID(), ring.ACTIVE); err != nil { + return err + } + level.Info(util_log.Logger).Log("msg", "index gateway is ACTIVE in the ring") + + return nil +} + +// running implements the Lifecycler interface and is one of the lifecycle hooks. +// +// Only invoked if the Index Gateway is in ring mode. +func (g *Gateway) running(ctx context.Context) error { + t := time.NewTicker(ringCheckPeriod) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return nil + case err := <-g.subservicesWatcher.Chan(): + return errors.Wrap(err, "running index gateway subservice failed") + case <-t.C: + continue + // TODO: should we implement CAS check? + } + } +} + +// stopping implements the Lifecycler interface and is one of the lifecycle hooks. +// +// Only invoked if the Index Gateway is in ring mode. +func (g *Gateway) stopping(_ error) error { + level.Debug(util_log.Logger).Log("msg", "stopping index gateway") + defer g.indexQuerier.Stop() + return services.StopManagerAndAwaitStopped(context.Background(), g.subservices) +} + +func (g *Gateway) QueryIndex(request *indexgatewaypb.QueryIndexRequest, server indexgatewaypb.IndexGateway_QueryIndexServer) error { var outerErr error var innerErr error @@ -108,3 +268,12 @@ func buildResponses(query index.Query, batch index.ReadBatchResult, callback fun return nil } + +// ServeHTTP serves the HTTP route /indexgateway/ring. +func (g *Gateway) ServeHTTP(w http.ResponseWriter, req *http.Request) { + if g.cfg.Mode == RingMode { + g.ring.ServeHTTP(w, req) + } else { + w.Write([]byte("IndexGateway running with 'useIndexGatewayRing' disabled.")) + } +} diff --git a/pkg/storage/stores/shipper/indexgateway/gateway_test.go b/pkg/storage/stores/shipper/indexgateway/gateway_test.go index 475db802d4595..c883e2f3b9d64 100644 --- a/pkg/storage/stores/shipper/indexgateway/gateway_test.go +++ b/pkg/storage/stores/shipper/indexgateway/gateway_test.go @@ -108,7 +108,7 @@ func TestGateway_QueryIndex(t *testing.T) { }, } - gateway := gateway{} + gateway := Gateway{} responseSizes := []int{0, 99, maxIndexEntriesPerResponse, 2 * maxIndexEntriesPerResponse, 5*maxIndexEntriesPerResponse - 1} for i, responseSize := range responseSizes { query := index.Query{ diff --git a/pkg/storage/stores/shipper/indexgateway/lifecycle.go b/pkg/storage/stores/shipper/indexgateway/lifecycle.go new file mode 100644 index 0000000000000..05639b1f94ede --- /dev/null +++ b/pkg/storage/stores/shipper/indexgateway/lifecycle.go @@ -0,0 +1,28 @@ +package indexgateway + +import ( + "github.com/grafana/dskit/ring" +) + +func (g *Gateway) OnRingInstanceRegister(_ *ring.BasicLifecycler, ringDesc ring.Desc, instanceExists bool, instanceID string, instanceDesc ring.InstanceDesc) (ring.InstanceState, ring.Tokens) { + // When we initialize the index gateway instance in the ring we want to start from + // a clean situation, so whatever is the state we set it JOINING, while we keep existing + // tokens (if any) or the ones loaded from file. + var tokens []uint32 + if instanceExists { + tokens = instanceDesc.GetTokens() + } + + takenTokens := ringDesc.GetTokens() + newTokens := ring.GenerateTokens(ringNumTokens-len(tokens), takenTokens) + + // Tokens sorting will be enforced by the parent caller. + tokens = append(tokens, newTokens...) + + return ring.JOINING, tokens +} + +func (g *Gateway) OnRingInstanceTokens(_ *ring.BasicLifecycler, _ ring.Tokens) {} +func (g *Gateway) OnRingInstanceStopping(_ *ring.BasicLifecycler) {} +func (g *Gateway) OnRingInstanceHeartbeat(_ *ring.BasicLifecycler, _ *ring.Desc, _ *ring.InstanceDesc) { +} diff --git a/pkg/storage/stores/shipper/shipper_index_client.go b/pkg/storage/stores/shipper/shipper_index_client.go index 473bbad0161d0..ad4c0e63fde8c 100644 --- a/pkg/storage/stores/shipper/shipper_index_client.go +++ b/pkg/storage/stores/shipper/shipper_index_client.go @@ -15,8 +15,6 @@ import ( "github.com/weaveworks/common/instrument" "go.etcd.io/bbolt" - "github.com/grafana/loki/pkg/util/spanlogger" - "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/chunk/client/local" chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util" @@ -26,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/uploads" shipper_util "github.com/grafana/loki/pkg/storage/stores/shipper/util" util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/util/spanlogger" ) const (