diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b1e8998ea350..1808fed392c3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Loki ##### Enhancements +* [6821](https://github.com/grafana/loki/pull/6821) **kavirajk**: Introduce new cache type `embedded-cache` which is an in-process cache system that runs loki without the need for an external cache (like memcached, redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. * [6691](https://github.com/grafana/loki/pull/6691) **dannykopping**: Update production-ready Loki cluster in docker-compose * [6317](https://github.com/grafana/loki/pull/6317) **dannykoping**: General: add cache usage statistics * [6444](https://github.com/grafana/loki/pull/6444) **aminesnow** Add TLS config to query frontend. diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 75b3d3968685f..d039f37e88ccd 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -16,6 +16,14 @@ common: kvstore: store: inmemory +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + distributed: true + max_size_mb: 100 + schema_config: configs: - from: 2020-10-24 diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 2f97cec1139fb..935e1281fdcc0 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -33,6 +33,12 @@ The output is incredibly verbose as it shows the entire internal config struct u ### Loki +#### Fifocache is deprecated + +We introduced a new cache called `embedded-cache` which is an in-process cache system that make it possible to run Loki without the need for an external cache (like Memcached, Redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. + +Currently `embedded-cache` with `distributed: true` can be enabled only for results cache. + #### Evenly spread queriers across kubernetes nodes We now evenly spread queriers across the available kubernetes nodes, but allowing more than one querier to be scheduled into the same node. @@ -87,7 +93,7 @@ limits_config: split_queries_by_interval: 10m ``` -In 2.5.0 it can only be defined in the `limits_config` section, **Loki will fail to start if you do not remove the `split_queries_by_interval` config from the `query_range` section.** +In 2.5.0 it can only be defined in the `limits_config` section, **Loki will fail to start if you do not remove the `split_queries_by_interval` config from the `query_range` section.** Additionally, it has a new default value of `30m` rather than `0`. @@ -130,7 +136,7 @@ Meanwhile, the legacy format is a string in the following format: * `parallelise_shardable_queries` under the `query_range` config now defaults to `true`. * `split_queries_by_interval` under the `limits_config` config now defaults to `30m`, it was `0s`. * `max_chunk_age` in the `ingester` config now defaults to `2h` previously it was `1h`. -* `query_ingesters_within` under the `querier` config now defaults to `3h`, previously it was `0s`. Any query (or subquery) that has an end time more than `3h` ago will not be sent to the ingesters, this saves work on the ingesters for data they normally don't contain. If you regularly write old data to Loki you may need to return this value to `0s` to always query ingesters. +* `query_ingesters_within` under the `querier` config now defaults to `3h`, previously it was `0s`. Any query (or subquery) that has an end time more than `3h` ago will not be sent to the ingesters, this saves work on the ingesters for data they normally don't contain. If you regularly write old data to Loki you may need to return this value to `0s` to always query ingesters. * `max_concurrent` under the `querier` config now defaults to `10` instead of `20`. * `match_max_concurrent` under the `frontend_worker` config now defaults to true, this supersedes the `parallelism` setting which can now be removed from your config. Controlling query parallelism of a single process can now be done with the `querier` `max_concurrent` setting. * `flush_op_timeout` under the `ingester` configuration block now defaults to `10m`, increased from `10s`. This can help when replaying a large WAL on Loki startup, and avoid `msg="failed to flush" ... context deadline exceeded` errors. @@ -378,7 +384,7 @@ server: grpc_server_ping_without_stream_allowed: true ``` -[This issue](https://github.com/grafana/loki/issues/4375) has some more information on the change. +[This issue](https://github.com/grafana/loki/issues/4375) has some more information on the change. #### Some metric prefixes have changed from `cortex_` to `loki_` diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index fb1b1840aba29..0a5459911f7ef 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -3,11 +3,10 @@ package common import ( "flag" - "github.com/grafana/loki/pkg/storage/chunk/cache" - "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/netutil" + "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/client/aws" "github.com/grafana/loki/pkg/storage/chunk/client/azure" "github.com/grafana/loki/pkg/storage/chunk/client/baidubce" @@ -46,10 +45,8 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` - // GroupCacheConfig is the configuration to use when groupcache is enabled. - // - // This is a common config because, when enabled, it is used across all caches - GroupCacheConfig cache.GroupCacheConfig `yaml:"groupcache"` + // Global embedded-cache config. Independent of what type of cache, we need some singleton configs like Ring configuration when running in distributed fashion. + EmbeddedCacheConfig cache.EmbeddedCacheSingletonConfig `yaml:"embedded_cache"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -63,10 +60,9 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.") throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") - // flags that only live in common - c.GroupCacheConfig.RegisterFlagsWithPrefix("common.groupcache", "", f) - f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") + + c.EmbeddedCacheConfig.RegisterFlagsWithPrefix("common.embedded-cache", "", f) } type Storage struct { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index d34431da3a934..96a8c7db85920 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -96,8 +96,6 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { applyInstanceConfigs(r, &defaults) - applyCommonCacheConfigs(r, &defaults) - applyCommonReplicationFactor(r, &defaults) applyDynamicRingConfigs(r, &defaults) @@ -166,7 +164,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr - r.Common.GroupCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr + if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { + r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr + } } if !reflect.DeepEqual(r.Common.InstanceInterfaceNames, defaults.Common.InstanceInterfaceNames) { @@ -175,18 +175,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames - r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames - } -} - -// applyCommonCacheConfigs applies to Loki components the cache-related configurations under the common config section -// NOTE: only used for GroupCache at the moment -// TODO: apply to other caches as well -func applyCommonCacheConfigs(r, _ *ConfigWrapper) { - if r.Config.Common.GroupCacheConfig.Enabled { - r.Config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache = true - r.Config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache = true - r.Config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache = true + if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { + r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames + } } } @@ -315,17 +306,18 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit r.IndexGateway.Ring.KVStore = rc.KVStore } - // GroupCacheRing - if mergeWithExisting || reflect.DeepEqual(r.Common.GroupCacheConfig.Ring, defaults.Common.GroupCacheConfig.Ring) { - r.Common.GroupCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.Common.GroupCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod - r.Common.GroupCacheConfig.Ring.InstancePort = rc.InstancePort - r.Common.GroupCacheConfig.Ring.InstanceAddr = rc.InstanceAddr - r.Common.GroupCacheConfig.Ring.InstanceID = rc.InstanceID - r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.Common.GroupCacheConfig.Ring.InstanceZone = rc.InstanceZone - r.Common.GroupCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.Common.GroupCacheConfig.Ring.KVStore = rc.KVStore + // EmbeddedCache distributed ring. + if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() && + (mergeWithExisting || reflect.DeepEqual(r.Common.EmbeddedCacheConfig.Ring, defaults.Common.EmbeddedCacheConfig.Ring)) { + r.Common.EmbeddedCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.Common.EmbeddedCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.Common.EmbeddedCacheConfig.Ring.InstancePort = rc.InstancePort + r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = rc.InstanceAddr + r.Common.EmbeddedCacheConfig.Ring.InstanceID = rc.InstanceID + r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Common.EmbeddedCacheConfig.Ring.InstanceZone = rc.InstanceZone + r.Common.EmbeddedCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.Common.EmbeddedCacheConfig.Ring.KVStore = rc.KVStore } } @@ -361,7 +353,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error { if err != nil { return err } - cfg.Common.GroupCacheConfig.Ring.TokensFilePath = f + cfg.Common.EmbeddedCacheConfig.Ring.TokensFilePath = f return nil } @@ -440,8 +432,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface) } - if reflect.DeepEqual(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.GroupCacheConfig.Ring.InstanceInterfaceNames) { - cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, loopbackIface) + if reflect.DeepEqual(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames) { + cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, loopbackIface) } } @@ -456,7 +448,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr r.IndexGateway.Ring.KVStore.Store = memberlistStr - r.Common.GroupCacheConfig.Ring.KVStore.Store = memberlistStr + r.Common.EmbeddedCacheConfig.Ring.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 0568c116f6d2f..d88624edd5ec9 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -821,23 +821,24 @@ ingester: }) }) - t.Run("common groupcache setting is applied to chunk, index, and result caches", func(t *testing.T) { + t.Run("embedded-cache setting is applied to result caches", func(t *testing.T) { // ensure they are all false by default config, _, _ := configWrapperFromYAML(t, minimalConfig, nil) - assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache) - assert.False(t, config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache) - assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache) + assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled) + assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed) configFileString := `--- -common: - groupcache: - enabled: true` +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + distributed: true` config, _ = testContext(configFileString, nil) - assert.True(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache) - assert.True(t, config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache) - assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache) + assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled) + assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed) }) } @@ -867,16 +868,18 @@ chunk_store_config: assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache) }) - t.Run("no FIFO cache enabled by default if GroupCache is set", func(t *testing.T) { + t.Run("if distributed cache is set for results cache, FIFO cache should be disabled.", func(t *testing.T) { configFileString := `--- -common: - groupcache: - enabled: true` +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + distributed: true` config, _, _ := configWrapperFromYAML(t, configFileString, nil) - assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache) - assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableFifoCache) - assert.True(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache) + assert.True(t, config.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed()) + assert.False(t, config.QueryRange.CacheConfig.EnableFifoCache) }) t.Run("FIFO cache is enabled by default if no other cache is set", func(t *testing.T) { diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 2aca337304fec..9e6beb8fa2919 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -255,7 +255,7 @@ type Loki struct { queryScheduler *scheduler.Scheduler usageReport *usagestats.Reporter indexGatewayRingManager *indexgateway.RingManager - groupcacheRingManager *cache.GroupcacheRingManager + embeddedcacheRingManager *cache.GroupcacheRingManager clientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -476,7 +476,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule) mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule) mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule) - mm.RegisterModule(GroupCache, t.initGroupcache, modules.UserInvisibleModule) + mm.RegisterModule(Embededcache, t.initEmbeddedCache, modules.UserInvisibleModule) mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule) @@ -503,16 +503,16 @@ func (t *Loki) setupModuleManager() error { // Add dependencies deps := map[string][]string{ Ring: {RuntimeConfig, Server, MemberlistKV}, - GroupCache: {RuntimeConfig, Server, MemberlistKV}, + Embededcache: {RuntimeConfig, Server, MemberlistKV}, UsageReport: {}, Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport}, - Store: {Overrides, GroupCache, IndexGatewayRing}, + Store: {Overrides, Embededcache, IndexGatewayRing}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport}, Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport}, - QueryFrontendTripperware: {Server, GroupCache, Overrides, TenantConfigs}, + QueryFrontendTripperware: {Server, Embededcache, Overrides, TenantConfigs}, QueryFrontend: {QueryFrontendTripperware, UsageReport}, QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport}, Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 186a98a186dfe..b87e0c9734a17 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -71,7 +71,7 @@ const maxChunkAgeForTableManager = 12 * time.Hour // The various modules that make up Loki. const ( Ring string = "ring" - GroupCache string = "groupcache" + Embededcache string = "embedded-cache" RuntimeConfig string = "runtime-config" Overrides string = "overrides" OverridesExporter string = "overrides-exporter" @@ -147,41 +147,47 @@ func (t *Loki) initRing() (_ services.Service, err error) { return t.ring, nil } -func (t *Loki) initGroupcache() (_ services.Service, err error) { - if !t.Cfg.Common.GroupCacheConfig.Enabled { +func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { + if !t.Cfg.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { return nil, nil } - t.Cfg.Common.GroupCacheConfig.Ring.ListenPort = t.Cfg.Common.GroupCacheConfig.ListenPort - rm, err := cache.NewGroupcacheRingManager(t.Cfg.Common.GroupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) + groupCacheConfig := cache.GroupCacheConfig{ + Enabled: true, + Ring: t.Cfg.Common.EmbeddedCacheConfig.Ring, + MaxSizeMB: t.Cfg.Common.EmbeddedCacheConfig.MaxSizeMB, + ListenPort: t.Cfg.Common.EmbeddedCacheConfig.ListenPort, + HeartbeatInterval: t.Cfg.Common.EmbeddedCacheConfig.HeartbeatInterval, + HeartbeatTimeout: t.Cfg.Common.EmbeddedCacheConfig.HeartbeatTimeout, + WriteByteTimeout: t.Cfg.Common.EmbeddedCacheConfig.WriteByteTimeout, + } + + groupCacheConfig.Ring.ListenPort = groupCacheConfig.ListenPort + + rm, err := cache.NewGroupcacheRingManager(groupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { - return nil, gerrors.Wrap(err, "new groupcache ring manager") + return nil, gerrors.Wrap(err, "new embedded-cache ring manager") } - t.groupcacheRingManager = rm - t.Server.HTTP.Path("/groupcache/ring").Methods("GET", "POST").Handler(t.groupcacheRingManager) + t.embeddedcacheRingManager = rm + t.Server.HTTP.Path("/embedded-cache/ring").Methods("GET", "POST").Handler(t.embeddedcacheRingManager) - gc, err := cache.NewGroupCache(rm, t.Cfg.Common.GroupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) + gc, err := cache.NewGroupCache(rm, groupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } - t.Cfg.ChunkStoreConfig.ChunkCacheConfig.GroupCache = gc.NewGroup( - t.Cfg.ChunkStoreConfig.ChunkCacheConfig.Prefix+"groupcache", - &t.Cfg.ChunkStoreConfig.ChunkCacheConfig.GroupCacheConfig, - stats.ChunkCache, - ) - t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.GroupCache = gc.NewGroup( + groupConfig := cache.GroupConfig{ + MaxSizeMB: t.Cfg.QueryRange.CacheConfig.EmbeddedCache.MaxSizeMB, + } + + t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Cache = gc.NewGroup( t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Prefix+"groupcache", - &t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.GroupCacheConfig, + &groupConfig, stats.ResultCache, ) - // The index cache generates too much traffic to be used. Make it a fifo cache - t.Cfg.StorageConfig.IndexQueriesCacheConfig.EnableFifoCache = true - t.Cfg.StorageConfig.IndexQueriesCacheConfig.Fifocache.MaxSizeBytes = fmt.Sprint(t.Cfg.Common.GroupCacheConfig.CapacityMB * 1e6) - - return t.groupcacheRingManager, nil + return t.embeddedcacheRingManager, nil } func (t *Loki) initRuntimeConfig() (services.Service, error) { @@ -901,7 +907,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.Cfg.Common.GroupCacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + if t.Cfg.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { + t.Cfg.Common.EmbeddedCacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + } t.Server.HTTP.Handle("/memberlist", t.MemberlistKV) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 0f40e79770b30..f254c6b522506 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -52,6 +52,7 @@ func NewTripperware( c cache.Cache err error ) + if cfg.CacheResults { c, err = cache.New(cfg.CacheConfig, registerer, log, stats.ResultCache) if err != nil { diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index a4babc2e6c203..e884266a64819 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -25,8 +26,7 @@ type Cache interface { // Config for building Caches. type Config struct { - EnableFifoCache bool `yaml:"enable_fifocache"` - EnableGroupCache bool `yaml:"enable_groupcache"` + EnableFifoCache bool `yaml:"enable_fifocache"` DefaultValidity time.Duration `yaml:"default_validity"` @@ -34,17 +34,12 @@ type Config struct { Memcache MemcachedConfig `yaml:"memcached"` MemcacheClient MemcachedClientConfig `yaml:"memcached_client"` Redis RedisConfig `yaml:"redis"` - Fifocache FifoCacheConfig `yaml:"fifocache"` - - // GroupcacheConfig is a local GroupCache config per cache - GroupCacheConfig GroupConfig `yaml:"groupcache"` + EmbeddedCache EmbeddedCacheConfig `yaml:"embedded_cache"` + Fifocache FifoCacheConfig `yaml:"fifocache"` // deprecated // This is to name the cache metrics properly. Prefix string `yaml:"prefix" doc:"hidden"` - // GroupCache is configured/initialized as part of modules and injected here - GroupCache Cache `yaml:"-"` - // For tests to inject specific implementations. Cache Cache `yaml:"-"` @@ -61,11 +56,11 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix, description, f) f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") - f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") - f.BoolVar(&cfg.EnableGroupCache, prefix+"cache.enable-groupcache", false, description+"Enable distributed in-memory cache.") + f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"(deprecated: use embedded-cache instead) Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") cfg.Prefix = prefix } @@ -89,29 +84,52 @@ func IsRedisSet(cfg Config) bool { return cfg.Redis.Endpoint != "" } -func IsGroupCacheSet(cfg Config) bool { - return cfg.GroupCache != nil +func IsEmbeddedCacheSet(cfg Config) bool { + return cfg.EmbeddedCache.Enabled } -// IsCacheConfigured determines if memcached, redis, or groupcache have been configured +// IsCacheConfigured determines if memcached, redis, or embedded-cache have been configured func IsCacheConfigured(cfg Config) bool { - return IsMemcacheSet(cfg) || IsRedisSet(cfg) || cfg.EnableGroupCache + return IsMemcacheSet(cfg) || IsRedisSet(cfg) || IsEmbeddedCacheSet(cfg) } // New creates a new Cache using Config. func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) (Cache, error) { - if cfg.Cache != nil { + + // Have additional check for embeddedcache with distributed mode, because those cache will already be initialized in modules + // but still need stats collector wrapper for it. + if cfg.Cache != nil && !cfg.EmbeddedCache.IsEnabledWithDistributed() { return cfg.Cache, nil } var caches []Cache - if cfg.EnableFifoCache { - if cfg.Fifocache.TTL == 0 && cfg.DefaultValidity != 0 { - cfg.Fifocache.TTL = cfg.DefaultValidity + + // Currently fifocache can be enabled in two ways. + // 1. cfg.EnableFifocache (old deprecated way) + // 2. cfg.EmbeddedCache.Enabled=true and cfg.EmbeddedCache.Distributed=false (new way) + + if cfg.EnableFifoCache || (IsEmbeddedCacheSet(cfg) && !cfg.EmbeddedCache.Distributed) { + var fifocfg FifoCacheConfig + + if cfg.EnableFifoCache { + level.Warn(logger).Log("msg", "fifocache config is deprecated. use embedded-cache instead") + fifocfg = cfg.Fifocache + } + + if cfg.EmbeddedCache.IsEnabledWithoutDistributed() { + fifocfg = FifoCacheConfig{ + MaxSizeBytes: fmt.Sprint(cfg.EmbeddedCache.MaxSizeMB * 1e6), + TTL: cfg.EmbeddedCache.TTL, + PurgeInterval: cfg.EmbeddedCache.PurgeInterval, + } + } + + if fifocfg.TTL == 0 && cfg.DefaultValidity != 0 { + fifocfg.TTL = cfg.DefaultValidity } - if cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache, reg, logger, cacheType); cache != nil { - caches = append(caches, CollectStats(Instrument(cfg.Prefix+"fifocache", cache, reg))) + if cache := NewFifoCache(cfg.Prefix+"embedded-cache", fifocfg, reg, logger, cacheType); cache != nil { + caches = append(caches, CollectStats(Instrument(cfg.Prefix+"embedded-cache", cache, reg))) } } @@ -144,9 +162,9 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta caches = append(caches, CollectStats(NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg))) } - if IsGroupCacheSet(cfg) { - cacheName := cfg.Prefix + "groupcache" - caches = append(caches, CollectStats(Instrument(cacheName, cfg.GroupCache, reg))) + if IsEmbeddedCacheSet(cfg) && cfg.EmbeddedCache.Distributed { + cacheName := cfg.Prefix + "embedded-cache" + caches = append(caches, CollectStats(Instrument(cacheName, cfg.Cache, reg))) } cache := NewTiered(caches) diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go new file mode 100644 index 0000000000000..f383638345586 --- /dev/null +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -0,0 +1,64 @@ +package cache + +import ( + "flag" + "time" +) + +const ( + DefaultPurgeInterval = 1 * time.Minute +) + +// EmbeddedCacheConfig represents in-process embedded cache config. +// It can also be distributed, sharding keys across peers when run with microservices +// or SSD mode. +type EmbeddedCacheConfig struct { + Distributed bool `yaml:"distributed,omitempty"` + Enabled bool `yaml:"enabled,omitempty"` + MaxSizeMB int64 `yaml:"max_size_mb"` + TTL time.Duration `yaml:"ttl"` + + // PurgeInterval tell how often should we remove keys that are expired. + // by default it takes `DefaultPurgeInterval` + PurgeInterval time.Duration `yaml:"-"` +} + +func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.") + f.BoolVar(&cfg.Distributed, prefix+"embedded-cache.distributed", false, description+"Whether embedded cache is enabled with distributed mode.") + f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") + f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") + +} + +func (cfg *EmbeddedCacheConfig) IsEnabledWithDistributed() bool { + return cfg.Enabled && cfg.Distributed +} + +func (cfg *EmbeddedCacheConfig) IsEnabledWithoutDistributed() bool { + return cfg.Enabled && !cfg.Distributed +} + +// EmbeddedCacheSingletonConfig defines global singleton needed by Embedded cache(particularly used in distributed fashion) +type EmbeddedCacheSingletonConfig struct { + // distributed cache configs. Have no meaning if `Distributed=false`. + ListenPort int `yaml:"listen_port,omitempty"` + Ring RingCfg `yaml:"ring,omitempty"` + + // Default capacity if none provided while creating each "Group". + MaxSizeMB int64 `yaml:"max_size__mb,omitempty"` + + // Different timeouts + HeartbeatInterval time.Duration `yaml:"heartbeat_interval,omitempty"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"` + WriteByteTimeout time.Duration `yaml:"write_timeout,omitempty"` +} + +func (cfg *EmbeddedCacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for cache communications across the peers when run in distributed fashion") + cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) + f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, "Maximum memory size of the cache in MB.") + f.DurationVar(&cfg.HeartbeatInterval, prefix+"embedded-cache.heartbeat-interval", time.Second, "If the connection is idle, the interval the cache will send heartbeats") + f.DurationVar(&cfg.HeartbeatTimeout, prefix+"embedded-cache.heartbeat-timeout", time.Second, "Timeout for heartbeat responses") + f.DurationVar(&cfg.WriteByteTimeout, prefix+"embeddec-cache.write-timeout", time.Second, "Maximum time for the cache to try writing") +} diff --git a/pkg/storage/chunk/cache/fifo_cache.go b/pkg/storage/chunk/cache/fifo_cache.go index f2722466f5365..6871fb328884c 100644 --- a/pkg/storage/chunk/cache/fifo_cache.go +++ b/pkg/storage/chunk/cache/fifo_cache.go @@ -34,7 +34,7 @@ const ( // FifoCacheConfig holds config for the FifoCache. type FifoCacheConfig struct { MaxSizeBytes string `yaml:"max_size_bytes"` - MaxSizeItems int `yaml:"max_size_items"` + MaxSizeItems int `yaml:"max_size_items"` // deprecated TTL time.Duration `yaml:"ttl"` DeprecatedValidity time.Duration `yaml:"validity"` @@ -46,7 +46,7 @@ type FifoCacheConfig struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.StringVar(&cfg.MaxSizeBytes, prefix+"fifocache.max-size-bytes", "1GB", description+"Maximum memory size of the cache in bytes. A unit suffix (KB, MB, GB) may be applied.") - f.IntVar(&cfg.MaxSizeItems, prefix+"fifocache.max-size-items", 0, description+"Maximum number of entries in the cache.") + f.IntVar(&cfg.MaxSizeItems, prefix+"fifocache.max-size-items", 0, description+"deprecated: Maximum number of entries in the cache.") f.DurationVar(&cfg.TTL, prefix+"fifocache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") f.DurationVar(&cfg.DeprecatedValidity, prefix+"fifocache.duration", 0, "Deprecated (use ttl instead): "+description+"The expiry duration for the cache.") diff --git a/pkg/storage/chunk/cache/groupcache.go b/pkg/storage/chunk/cache/groupcache.go index 72090c483ac73..bebd2417076fc 100644 --- a/pkg/storage/chunk/cache/groupcache.go +++ b/pkg/storage/chunk/cache/groupcache.go @@ -3,7 +3,6 @@ package cache import ( "context" "crypto/tls" - "flag" "fmt" "net" "net/http" @@ -62,7 +61,7 @@ type RingCfg struct { type GroupCacheConfig struct { Enabled bool `yaml:"enabled,omitempty"` Ring RingCfg `yaml:"ring,omitempty"` - CapacityMB int64 `yaml:"capacity_per_cache_mb,omitempty"` + MaxSizeMB int64 `yaml:"max_size_mb,omitempty"` ListenPort int `yaml:"listen_port,omitempty"` HeartbeatInterval time.Duration `yaml:"heartbeat_interval,omitempty"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"` @@ -71,19 +70,6 @@ type GroupCacheConfig struct { Cache Cache `yaml:"-"` } -// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet -func (cfg *GroupCacheConfig) RegisterFlagsWithPrefix(prefix, _ string, f *flag.FlagSet) { - cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) - - f.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Whether or not groupcache is enabled") - f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication") - f.Int64Var(&cfg.CapacityMB, prefix+".capacity-per-cache-mb", 100, "Capacity of each groupcache group in MB (default: 100). "+ - "NOTE: there are 3 caches (result, chunk, and index query), so the maximum used memory will be *triple* the value specified here.") - f.DurationVar(&cfg.HeartbeatInterval, "prefix.heartbeat-interval", time.Second, "If the connection is idle, the interval the cache will send heartbeats") - f.DurationVar(&cfg.HeartbeatTimeout, "prefix.heartbeat-timeout", time.Second, "Timeout for heartbeat responses") - f.DurationVar(&cfg.WriteByteTimeout, "prefix.write-timeout", time.Second, "Maximum time for the cache to try writing") -} - type ringManager interface { Addr() string Ring() ring.ReadRing @@ -116,7 +102,7 @@ func NewGroupCache(rm ringManager, config GroupCacheConfig, logger log.Logger, r wg: sync.WaitGroup{}, startWaitingForClose: cancel, reg: reg, - cacheBytes: config.CapacityMB * 1e6, // MB => B + cacheBytes: config.MaxSizeMB * 1e6, // MB => B } go cache.serveGroupcache(config.ListenPort) @@ -208,7 +194,7 @@ func (c *GroupCache) Stats() *groupcache.Stats { // Groupconfig represents config per Group. type GroupConfig struct { - CapacityMB int64 `yaml:"capacity_mb,omitempty"` + MaxSizeMB int64 `yaml:"max_size_mb,omitempty"` } type group struct { @@ -234,8 +220,8 @@ func (c *GroupCache) NewGroup(name string, cfg *GroupConfig, ct stats.CacheType) c.startWaitingForClose() cap := c.cacheBytes - if cfg.CapacityMB != 0 { - cap = cfg.CapacityMB * 1e6 // MB into bytes + if cfg.MaxSizeMB != 0 { + cap = cfg.MaxSizeMB * 1e6 // MB into bytes } requestDuration := promauto.With(c.reg).NewHistogramVec(prometheus.HistogramOpts{ diff --git a/pkg/storage/chunk/cache/groupcache_test.go b/pkg/storage/chunk/cache/groupcache_test.go index c75d2a727d0b5..d0501bae7c94f 100644 --- a/pkg/storage/chunk/cache/groupcache_test.go +++ b/pkg/storage/chunk/cache/groupcache_test.go @@ -51,7 +51,7 @@ func TestGroupCache(t *testing.T) { assert.Equal(t, c1.(*group).cacheBytes, int64(1*1e6)) // pass explicitly capacity per group should take preference. - c2 := gc.NewGroup("test-group2", &GroupConfig{CapacityMB: 6}, "test2") + c2 := gc.NewGroup("test-group2", &GroupConfig{MaxSizeMB: 6}, "test2") defer c.Stop() assert.Equal(t, c2.(*group).cacheBytes, int64(6*1e6)) @@ -60,8 +60,8 @@ func TestGroupCache(t *testing.T) { func setupGroupCache() (*GroupCache, error) { return NewGroupCache(&mockRingManager{}, GroupCacheConfig{ - Enabled: true, - CapacityMB: 1, + Enabled: true, + MaxSizeMB: 1, }, log.NewNopLogger(), nil) }