From 32d6e07c8ce96520b58ac247ea1009c07e2cedfc Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Mon, 1 Aug 2022 15:27:03 +0200 Subject: [PATCH 01/19] chore(groupcache): Rename groupcache into memcache{distributed: true} 1. Introduced new config called memorycache config. 2. It has flag `distributed`. By enabling it, we use groupcache Going forward. We will bring fifocache into memcache config(with distributed:false) This PR is just a POC. To show how we can simplify in-built memory cache config and bring it into single config. Signed-off-by: Kaviraj --- pkg/loki/common/common.go | 6 ++-- pkg/loki/config_wrapper.go | 38 ++++++++++++++------------ pkg/loki/modules.go | 21 ++++++++++---- pkg/storage/chunk/cache/cache.go | 5 ++-- pkg/storage/chunk/cache/groupcache.go | 20 ++++++-------- pkg/storage/chunk/cache/memorycache.go | 37 +++++++++++++++++++++++++ 6 files changed, 88 insertions(+), 39 deletions(-) create mode 100644 pkg/storage/chunk/cache/memorycache.go diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index fb1b1840aba29..a492cc98068e9 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -46,10 +46,10 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` - // GroupCacheConfig is the configuration to use when groupcache is enabled. + // MemorycacheConfig is the configuration to use when in-memory cache is enabled. // // This is a common config because, when enabled, it is used across all caches - GroupCacheConfig cache.GroupCacheConfig `yaml:"groupcache"` + Memorycache cache.MemorycacheConfig `yaml:"memorycache"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -64,7 +64,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") // flags that only live in common - c.GroupCacheConfig.RegisterFlagsWithPrefix("common.groupcache", "", f) + c.Memorycache.RegisterFlagsWithPrefix("common.memorycache", "", f) f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") } diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 47d75a61ff244..7d807442467eb 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -155,7 +155,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr - r.Common.GroupCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr + if r.Common.Memorycache.Distributed { + r.Common.Memorycache.Ring.InstanceAddr = r.Common.InstanceAddr + } } if !reflect.DeepEqual(r.Common.InstanceInterfaceNames, defaults.Common.InstanceInterfaceNames) { @@ -164,7 +166,9 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames - r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames + if r.Common.Memorycache.Distributed { + r.Common.Memorycache.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames + } } } @@ -172,7 +176,7 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { // NOTE: only used for GroupCache at the moment // TODO: apply to other caches as well func applyCommonCacheConfigs(r, _ *ConfigWrapper) { - if r.Config.Common.GroupCacheConfig.Enabled { + if r.Config.Common.Memorycache.Enabled && r.Config.Common.Memorycache.Distributed { r.Config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache = true r.Config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache = true r.Config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache = true @@ -305,16 +309,16 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit } // GroupCacheRing - if mergeWithExisting || reflect.DeepEqual(r.Common.GroupCacheConfig.Ring, defaults.Common.GroupCacheConfig.Ring) { - r.Common.GroupCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.Common.GroupCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod - r.Common.GroupCacheConfig.Ring.InstancePort = rc.InstancePort - r.Common.GroupCacheConfig.Ring.InstanceAddr = rc.InstanceAddr - r.Common.GroupCacheConfig.Ring.InstanceID = rc.InstanceID - r.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.Common.GroupCacheConfig.Ring.InstanceZone = rc.InstanceZone - r.Common.GroupCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.Common.GroupCacheConfig.Ring.KVStore = rc.KVStore + if mergeWithExisting || reflect.DeepEqual(r.Common.Memorycache.Ring, defaults.Common.Memorycache.Ring) { + r.Common.Memorycache.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.Common.Memorycache.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.Common.Memorycache.Ring.InstancePort = rc.InstancePort + r.Common.Memorycache.Ring.InstanceAddr = rc.InstanceAddr + r.Common.Memorycache.Ring.InstanceID = rc.InstanceID + r.Common.Memorycache.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Common.Memorycache.Ring.InstanceZone = rc.InstanceZone + r.Common.Memorycache.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.Common.Memorycache.Ring.KVStore = rc.KVStore } } @@ -350,7 +354,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error { if err != nil { return err } - cfg.Common.GroupCacheConfig.Ring.TokensFilePath = f + cfg.Common.Memorycache.Ring.TokensFilePath = f return nil } @@ -429,8 +433,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface) } - if reflect.DeepEqual(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.GroupCacheConfig.Ring.InstanceInterfaceNames) { - cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.GroupCacheConfig.Ring.InstanceInterfaceNames, loopbackIface) + if reflect.DeepEqual(cfg.Common.Memorycache.Ring.InstanceInterfaceNames, defaults.Common.Memorycache.Ring.InstanceInterfaceNames) { + cfg.Common.Memorycache.Ring.InstanceInterfaceNames = append(cfg.Common.Memorycache.Ring.InstanceInterfaceNames, loopbackIface) } } @@ -445,7 +449,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr r.IndexGateway.Ring.KVStore.Store = memberlistStr - r.Common.GroupCacheConfig.Ring.KVStore.Store = memberlistStr + r.Common.Memorycache.Ring.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2994494b1a2c4..d58bc8b54bbeb 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -148,12 +148,19 @@ func (t *Loki) initRing() (_ services.Service, err error) { } func (t *Loki) initGroupcache() (_ services.Service, err error) { - if !t.Cfg.Common.GroupCacheConfig.Enabled { + if !t.Cfg.Common.Memorycache.Enabled || + !t.Cfg.Common.Memorycache.Distributed { return nil, nil } - t.Cfg.Common.GroupCacheConfig.Ring.ListenPort = t.Cfg.Common.GroupCacheConfig.ListenPort - rm, err := cache.NewGroupcacheRingManager(t.Cfg.Common.GroupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) + groupConfig := cache.GroupCacheConfig{ + Enabled: true, + Ring: t.Cfg.Common.Memorycache.Ring, + CapacityMB: t.Cfg.Common.Memorycache.MaxSizeMB, + ListenPort: t.Cfg.Common.Memorycache.ListenPort, + } + + rm, err := cache.NewGroupcacheRingManager(groupConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, gerrors.Wrap(err, "new groupcache ring manager") } @@ -161,7 +168,7 @@ func (t *Loki) initGroupcache() (_ services.Service, err error) { t.groupcacheRingManager = rm t.Server.HTTP.Path("/groupcache/ring").Methods("GET", "POST").Handler(t.groupcacheRingManager) - gc, err := cache.NewGroupCache(rm, t.Cfg.Common.GroupCacheConfig, t.Server, util_log.Logger, prometheus.DefaultRegisterer) + gc, err := cache.NewGroupCache(rm, groupConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -179,7 +186,7 @@ func (t *Loki) initGroupcache() (_ services.Service, err error) { // The index cache generates too much traffic to be used. Make it a fifo cache t.Cfg.StorageConfig.IndexQueriesCacheConfig.EnableFifoCache = true - t.Cfg.StorageConfig.IndexQueriesCacheConfig.Fifocache.MaxSizeBytes = fmt.Sprint(t.Cfg.Common.GroupCacheConfig.CapacityMB * 1e6) + t.Cfg.StorageConfig.IndexQueriesCacheConfig.Fifocache.MaxSizeBytes = fmt.Sprint(t.Cfg.Common.Memorycache.MaxSizeMB * 1e6) return t.groupcacheRingManager, nil } @@ -901,7 +908,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.Cfg.Common.GroupCacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + if t.Cfg.Common.Memorycache.Distributed { + t.Cfg.Common.Memorycache.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + } t.Server.HTTP.Handle("/memberlist", t.MemberlistKV) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index a4babc2e6c203..a2d5e1fa7e685 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -34,10 +34,11 @@ type Config struct { Memcache MemcachedConfig `yaml:"memcached"` MemcacheClient MemcachedClientConfig `yaml:"memcached_client"` Redis RedisConfig `yaml:"redis"` - Fifocache FifoCacheConfig `yaml:"fifocache"` + Memorycache MemorycacheConfig `yaml:"memorycache"` + Fifocache FifoCacheConfig `yaml:"fifocache"` // depreciated // GroupcacheConfig is a local GroupCache config per cache - GroupCacheConfig GroupConfig `yaml:"groupcache"` + GroupCacheConfig GroupConfig `yaml:"groupcache"` // depreicated // This is to name the cache metrics properly. Prefix string `yaml:"prefix" doc:"hidden"` diff --git a/pkg/storage/chunk/cache/groupcache.go b/pkg/storage/chunk/cache/groupcache.go index 79954ece36a06..0c09c77fba054 100644 --- a/pkg/storage/chunk/cache/groupcache.go +++ b/pkg/storage/chunk/cache/groupcache.go @@ -3,7 +3,6 @@ package cache import ( "context" "crypto/tls" - "flag" "fmt" "net" "net/http" @@ -21,7 +20,6 @@ import ( "github.com/mailgun/groupcache/v2" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/server" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -74,22 +72,22 @@ type GroupConfig struct { CapacityMB int64 `yaml:"capacity_mb,omitempty"` } -// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet -func (cfg *GroupCacheConfig) RegisterFlagsWithPrefix(prefix, _ string, f *flag.FlagSet) { - cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) +// // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +// func (cfg *GroupCacheConfig) RegisterFlagsWithPrefix(prefix, _ string, f *flag.FlagSet) { +// cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) - f.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Whether or not groupcache is enabled") - f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication") - f.Int64Var(&cfg.CapacityMB, prefix+".capacity-per-cache-mb", 100, "Capacity of each groupcache group in MB (default: 100). "+ - "NOTE: there are 3 caches (result, chunk, and index query), so the maximum used memory will be *triple* the value specified here.") -} +// f.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Whether or not groupcache is enabled") +// f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication") +// f.Int64Var(&cfg.CapacityMB, prefix+".capacity-per-cache-mb", 100, "Capacity of each groupcache group in MB (default: 100). "+ +// "NOTE: there are 3 caches (result, chunk, and index query), so the maximum used memory will be *triple* the value specified here.") +// } type ringManager interface { Addr() string Ring() ring.ReadRing } -func NewGroupCache(rm ringManager, config GroupCacheConfig, server *server.Server, logger log.Logger, reg prometheus.Registerer) (*GroupCache, error) { +func NewGroupCache(rm ringManager, config GroupCacheConfig, logger log.Logger, reg prometheus.Registerer) (*GroupCache, error) { addr := fmt.Sprintf("http://%s", rm.Addr()) level.Info(logger).Log("msg", "groupcache local address set to", "addr", addr) diff --git a/pkg/storage/chunk/cache/memorycache.go b/pkg/storage/chunk/cache/memorycache.go new file mode 100644 index 0000000000000..399547e691acc --- /dev/null +++ b/pkg/storage/chunk/cache/memorycache.go @@ -0,0 +1,37 @@ +package cache + +import ( + "flag" + "time" +) + +const ( + DefaultPurgeInterval = 1 * time.Minute +) + +// MemorycacheConfig represents in-process memory cache config. +// It can also be distributed sharding keys across peers when microservice +// or SSD mode. +type MemorycacheConfig struct { + Distributed bool `yaml:"distributed,omitempty"` + Enabled bool `yaml:"enabled,omitempty"` + MaxSizeMB int64 `yaml:"max_size_mb"` + MaxItems int `yaml:"max_items"` + TTL time.Duration `yaml:"ttl"` + + // PurgeInterval tell how often should we remove keys that are expired. + // by default it takes `DefaultPurgeInterval` + PurgeInterval time.Duration + + // distributed cache configs. Have no meaning if `Distributed=false`. + Ring RingCfg `yaml:"ring,omitempty"` + ListenPort int `yaml:"listen_port,omitempty"` +} + +func (cfg *MemorycacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + f.Int64Var(&cfg.MaxSizeMB, prefix+"memorycache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") + f.IntVar(&cfg.MaxItems, prefix+"memorycache.max-items", 0, description+"Maximum number of entries in the cache.") + f.DurationVar(&cfg.TTL, prefix+"memorycache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") + cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) + f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication") +} From 2e2f3d42659d0f5f51113fa76ce9d60a6e225baa Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 11:12:22 +0200 Subject: [PATCH 02/19] Memorycache -> EmbeddedCache --- pkg/loki/common/common.go | 10 ----- pkg/loki/config_wrapper.go | 52 +++++++++------------- pkg/loki/config_wrapper_test.go | 37 ++++++++------- pkg/loki/modules.go | 25 ++++------- pkg/storage/chunk/cache/cache.go | 24 +++++----- pkg/storage/chunk/cache/groupcache.go | 10 ----- pkg/storage/chunk/cache/groupcache_test.go | 4 +- pkg/storage/chunk/cache/memorycache.go | 22 ++++++--- 8 files changed, 75 insertions(+), 109 deletions(-) diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index a492cc98068e9..721ffed9d12e3 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -3,8 +3,6 @@ package common import ( "flag" - "github.com/grafana/loki/pkg/storage/chunk/cache" - "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/netutil" @@ -45,11 +43,6 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` - - // MemorycacheConfig is the configuration to use when in-memory cache is enabled. - // - // This is a common config because, when enabled, it is used across all caches - Memorycache cache.MemorycacheConfig `yaml:"memorycache"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -63,9 +56,6 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.") throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") - // flags that only live in common - c.Memorycache.RegisterFlagsWithPrefix("common.memorycache", "", f) - f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") } diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 7d807442467eb..e7f1ec59466d5 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -85,8 +85,6 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source { applyInstanceConfigs(r, &defaults) - applyCommonCacheConfigs(r, &defaults) - applyCommonReplicationFactor(r, &defaults) applyDynamicRingConfigs(r, &defaults) @@ -155,8 +153,8 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr - if r.Common.Memorycache.Distributed { - r.Common.Memorycache.Ring.InstanceAddr = r.Common.InstanceAddr + if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { + r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceAddr = r.Common.InstanceAddr } } @@ -166,23 +164,12 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames - if r.Common.Memorycache.Distributed { - r.Common.Memorycache.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames + if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { + r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames } } } -// applyCommonCacheConfigs applies to Loki components the cache-related configurations under the common config section -// NOTE: only used for GroupCache at the moment -// TODO: apply to other caches as well -func applyCommonCacheConfigs(r, _ *ConfigWrapper) { - if r.Config.Common.Memorycache.Enabled && r.Config.Common.Memorycache.Distributed { - r.Config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache = true - r.Config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache = true - r.Config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache = true - } -} - // applyCommonReplicationFactor apply the common replication factor to the Index Gateway ring. func applyCommonReplicationFactor(r, defaults *ConfigWrapper) { if !reflect.DeepEqual(r.Common.ReplicationFactor, defaults.Common.ReplicationFactor) { @@ -308,17 +295,18 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit r.IndexGateway.Ring.KVStore = rc.KVStore } - // GroupCacheRing - if mergeWithExisting || reflect.DeepEqual(r.Common.Memorycache.Ring, defaults.Common.Memorycache.Ring) { - r.Common.Memorycache.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.Common.Memorycache.Ring.HeartbeatPeriod = rc.HeartbeatPeriod - r.Common.Memorycache.Ring.InstancePort = rc.InstancePort - r.Common.Memorycache.Ring.InstanceAddr = rc.InstanceAddr - r.Common.Memorycache.Ring.InstanceID = rc.InstanceID - r.Common.Memorycache.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.Common.Memorycache.Ring.InstanceZone = rc.InstanceZone - r.Common.Memorycache.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.Common.Memorycache.Ring.KVStore = rc.KVStore + // EmbeddedCache distributed ring. + if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() && + (mergeWithExisting || reflect.DeepEqual(r.QueryRange.CacheConfig.Embeddedcache.Ring, defaults.QueryRange.CacheConfig.Embeddedcache.Ring)) { + r.QueryRange.CacheConfig.Embeddedcache.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.QueryRange.CacheConfig.Embeddedcache.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.QueryRange.CacheConfig.Embeddedcache.Ring.InstancePort = rc.InstancePort + r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceAddr = rc.InstanceAddr + r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceID = rc.InstanceID + r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceZone = rc.InstanceZone + r.QueryRange.CacheConfig.Embeddedcache.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.QueryRange.CacheConfig.Embeddedcache.Ring.KVStore = rc.KVStore } } @@ -354,7 +342,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error { if err != nil { return err } - cfg.Common.Memorycache.Ring.TokensFilePath = f + cfg.QueryRange.CacheConfig.Embeddedcache.Ring.TokensFilePath = f return nil } @@ -433,8 +421,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface) } - if reflect.DeepEqual(cfg.Common.Memorycache.Ring.InstanceInterfaceNames, defaults.Common.Memorycache.Ring.InstanceInterfaceNames) { - cfg.Common.Memorycache.Ring.InstanceInterfaceNames = append(cfg.Common.Memorycache.Ring.InstanceInterfaceNames, loopbackIface) + if reflect.DeepEqual(cfg.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames, defaults.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames) { + cfg.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames = append(cfg.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames, loopbackIface) } } @@ -449,7 +437,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr r.IndexGateway.Ring.KVStore.Store = memberlistStr - r.Common.Memorycache.Ring.KVStore.Store = memberlistStr + r.QueryRange.CacheConfig.Embeddedcache.Ring.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index 0568c116f6d2f..bcd29dd182fad 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -821,23 +821,24 @@ ingester: }) }) - t.Run("common groupcache setting is applied to chunk, index, and result caches", func(t *testing.T) { + t.Run("embedded-cache setting is applied to result caches", func(t *testing.T) { // ensure they are all false by default config, _, _ := configWrapperFromYAML(t, minimalConfig, nil) - assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache) - assert.False(t, config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache) - assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache) + assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Enabled) + assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Distributed) configFileString := `--- -common: - groupcache: - enabled: true` +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + distributed: true` config, _ = testContext(configFileString, nil) - assert.True(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache) - assert.True(t, config.StorageConfig.IndexQueriesCacheConfig.EnableGroupCache) - assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableGroupCache) + assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Enabled) + assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Distributed) }) } @@ -867,16 +868,18 @@ chunk_store_config: assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache) }) - t.Run("no FIFO cache enabled by default if GroupCache is set", func(t *testing.T) { + t.Run("if distributed cache is set for results cache, FIFO cache should be disabled.", func(t *testing.T) { configFileString := `--- -common: - groupcache: - enabled: true` +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + distributed: true` config, _, _ := configWrapperFromYAML(t, configFileString, nil) - assert.False(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableFifoCache) - assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EnableFifoCache) - assert.True(t, config.ChunkStoreConfig.ChunkCacheConfig.EnableGroupCache) + assert.True(t, config.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed()) + assert.False(t, config.QueryRange.CacheConfig.EnableFifoCache) }) t.Run("FIFO cache is enabled by default if no other cache is set", func(t *testing.T) { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index d58bc8b54bbeb..8bd36b0e3b9c8 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -148,16 +148,16 @@ func (t *Loki) initRing() (_ services.Service, err error) { } func (t *Loki) initGroupcache() (_ services.Service, err error) { - if !t.Cfg.Common.Memorycache.Enabled || - !t.Cfg.Common.Memorycache.Distributed { + // Currently groupcache can only be enabled for results cache. + if !t.Cfg.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { return nil, nil } groupConfig := cache.GroupCacheConfig{ Enabled: true, - Ring: t.Cfg.Common.Memorycache.Ring, - CapacityMB: t.Cfg.Common.Memorycache.MaxSizeMB, - ListenPort: t.Cfg.Common.Memorycache.ListenPort, + Ring: t.Cfg.QueryRange.CacheConfig.Embeddedcache.Ring, + CapacityMB: t.Cfg.QueryRange.CacheConfig.Embeddedcache.MaxSizeMB, + ListenPort: t.Cfg.QueryRange.CacheConfig.Embeddedcache.ListenPort, } rm, err := cache.NewGroupcacheRingManager(groupConfig, util_log.Logger, prometheus.DefaultRegisterer) @@ -173,21 +173,12 @@ func (t *Loki) initGroupcache() (_ services.Service, err error) { return nil, err } - t.Cfg.ChunkStoreConfig.ChunkCacheConfig.GroupCache = gc.NewGroup( - t.Cfg.ChunkStoreConfig.ChunkCacheConfig.Prefix+"groupcache", - &t.Cfg.ChunkStoreConfig.ChunkCacheConfig.GroupCacheConfig, - stats.ChunkCache, - ) - t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.GroupCache = gc.NewGroup( + t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Cache = gc.NewGroup( t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Prefix+"groupcache", &t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.GroupCacheConfig, stats.ResultCache, ) - // The index cache generates too much traffic to be used. Make it a fifo cache - t.Cfg.StorageConfig.IndexQueriesCacheConfig.EnableFifoCache = true - t.Cfg.StorageConfig.IndexQueriesCacheConfig.Fifocache.MaxSizeBytes = fmt.Sprint(t.Cfg.Common.Memorycache.MaxSizeMB * 1e6) - return t.groupcacheRingManager, nil } @@ -908,8 +899,8 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - if t.Cfg.Common.Memorycache.Distributed { - t.Cfg.Common.Memorycache.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + if t.Cfg.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { + t.Cfg.QueryRange.CacheConfig.Embeddedcache.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV } t.Server.HTTP.Handle("/memberlist", t.MemberlistKV) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index a2d5e1fa7e685..a065dc1b00104 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -25,8 +25,7 @@ type Cache interface { // Config for building Caches. type Config struct { - EnableFifoCache bool `yaml:"enable_fifocache"` - EnableGroupCache bool `yaml:"enable_groupcache"` + EnableFifoCache bool `yaml:"enable_fifocache"` DefaultValidity time.Duration `yaml:"default_validity"` @@ -34,7 +33,7 @@ type Config struct { Memcache MemcachedConfig `yaml:"memcached"` MemcacheClient MemcachedClientConfig `yaml:"memcached_client"` Redis RedisConfig `yaml:"redis"` - Memorycache MemorycacheConfig `yaml:"memorycache"` + Embeddedcache EmbeddedcacheConfig `yaml:"embedded_cache"` Fifocache FifoCacheConfig `yaml:"fifocache"` // depreciated // GroupcacheConfig is a local GroupCache config per cache @@ -43,8 +42,8 @@ type Config struct { // This is to name the cache metrics properly. Prefix string `yaml:"prefix" doc:"hidden"` - // GroupCache is configured/initialized as part of modules and injected here - GroupCache Cache `yaml:"-"` + // // GroupCache is configured/initialized as part of modules and injected here + // GroupCache Cache `yaml:"-"` // For tests to inject specific implementations. Cache Cache `yaml:"-"` @@ -66,7 +65,6 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") - f.BoolVar(&cfg.EnableGroupCache, prefix+"cache.enable-groupcache", false, description+"Enable distributed in-memory cache.") cfg.Prefix = prefix } @@ -90,13 +88,13 @@ func IsRedisSet(cfg Config) bool { return cfg.Redis.Endpoint != "" } -func IsGroupCacheSet(cfg Config) bool { - return cfg.GroupCache != nil +func IsEmbeddedCacheSet(cfg Config) bool { + return cfg.Embeddedcache.Enabled } -// IsCacheConfigured determines if memcached, redis, or groupcache have been configured +// IsCacheConfigured determines if memcached, redis, or embedded-cache have been configured func IsCacheConfigured(cfg Config) bool { - return IsMemcacheSet(cfg) || IsRedisSet(cfg) || cfg.EnableGroupCache + return IsMemcacheSet(cfg) || IsRedisSet(cfg) || IsEmbeddedCacheSet(cfg) } // New creates a new Cache using Config. @@ -145,9 +143,9 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta caches = append(caches, CollectStats(NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg))) } - if IsGroupCacheSet(cfg) { - cacheName := cfg.Prefix + "groupcache" - caches = append(caches, CollectStats(Instrument(cacheName, cfg.GroupCache, reg))) + if IsEmbeddedCacheSet(cfg) { + cacheName := cfg.Prefix + "embedded-cache" + caches = append(caches, CollectStats(Instrument(cacheName, cfg.Cache, reg))) } cache := NewTiered(caches) diff --git a/pkg/storage/chunk/cache/groupcache.go b/pkg/storage/chunk/cache/groupcache.go index 0c09c77fba054..11315b85f40ba 100644 --- a/pkg/storage/chunk/cache/groupcache.go +++ b/pkg/storage/chunk/cache/groupcache.go @@ -72,16 +72,6 @@ type GroupConfig struct { CapacityMB int64 `yaml:"capacity_mb,omitempty"` } -// // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet -// func (cfg *GroupCacheConfig) RegisterFlagsWithPrefix(prefix, _ string, f *flag.FlagSet) { -// cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) - -// f.BoolVar(&cfg.Enabled, prefix+".enabled", false, "Whether or not groupcache is enabled") -// f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication") -// f.Int64Var(&cfg.CapacityMB, prefix+".capacity-per-cache-mb", 100, "Capacity of each groupcache group in MB (default: 100). "+ -// "NOTE: there are 3 caches (result, chunk, and index query), so the maximum used memory will be *triple* the value specified here.") -// } - type ringManager interface { Addr() string Ring() ring.ReadRing diff --git a/pkg/storage/chunk/cache/groupcache_test.go b/pkg/storage/chunk/cache/groupcache_test.go index 199a0dcd3274c..c75d2a727d0b5 100644 --- a/pkg/storage/chunk/cache/groupcache_test.go +++ b/pkg/storage/chunk/cache/groupcache_test.go @@ -5,8 +5,6 @@ import ( "testing" "github.com/go-kit/log" - "github.com/gorilla/mux" - "github.com/weaveworks/common/server" "github.com/grafana/dskit/ring" "github.com/stretchr/testify/assert" @@ -64,7 +62,7 @@ func setupGroupCache() (*GroupCache, error) { return NewGroupCache(&mockRingManager{}, GroupCacheConfig{ Enabled: true, CapacityMB: 1, - }, &server.Server{HTTP: mux.NewRouter()}, log.NewNopLogger(), nil) + }, log.NewNopLogger(), nil) } type mockRingManager struct{} diff --git a/pkg/storage/chunk/cache/memorycache.go b/pkg/storage/chunk/cache/memorycache.go index 399547e691acc..84f273bbcf42e 100644 --- a/pkg/storage/chunk/cache/memorycache.go +++ b/pkg/storage/chunk/cache/memorycache.go @@ -9,10 +9,10 @@ const ( DefaultPurgeInterval = 1 * time.Minute ) -// MemorycacheConfig represents in-process memory cache config. -// It can also be distributed sharding keys across peers when microservice +// EmbeddedcacheConfig represents in-process embedded cache config. +// It can also be distributed, sharding keys across peers when run with microservices // or SSD mode. -type MemorycacheConfig struct { +type EmbeddedcacheConfig struct { Distributed bool `yaml:"distributed,omitempty"` Enabled bool `yaml:"enabled,omitempty"` MaxSizeMB int64 `yaml:"max_size_mb"` @@ -28,10 +28,18 @@ type MemorycacheConfig struct { ListenPort int `yaml:"listen_port,omitempty"` } -func (cfg *MemorycacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - f.Int64Var(&cfg.MaxSizeMB, prefix+"memorycache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") - f.IntVar(&cfg.MaxItems, prefix+"memorycache.max-items", 0, description+"Maximum number of entries in the cache.") - f.DurationVar(&cfg.TTL, prefix+"memorycache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") +func (cfg *EmbeddedcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + f.Int64Var(&cfg.MaxSizeMB, prefix+".max-size-mb", 100, description+"Maximum memory size of the cache in MB.") + f.IntVar(&cfg.MaxItems, prefix+".max-items", 0, description+"Maximum number of entries in the cache.") + f.DurationVar(&cfg.TTL, prefix+".ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication") } + +func (em EmbeddedcacheConfig) IsEnabledWithDistributed() bool { + return em.Enabled && em.Distributed +} + +func (em EmbeddedcacheConfig) IsEnabledWithoutDistributed() bool { + return em.Enabled && !em.Distributed +} From 0c0ae42f2cc68a30a671e51fae49a3427d5db8f1 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 12:02:32 +0200 Subject: [PATCH 03/19] Bring fifocache into embeddedcache tent Signed-off-by: Kaviraj --- pkg/loki/loki.go | 10 +++--- pkg/loki/modules.go | 15 +++++---- pkg/storage/chunk/cache/cache.go | 36 ++++++++++++++++------ pkg/storage/chunk/cache/groupcache.go | 16 ++-------- pkg/storage/chunk/cache/groupcache_test.go | 16 +--------- 5 files changed, 43 insertions(+), 50 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 2aca337304fec..9e6beb8fa2919 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -255,7 +255,7 @@ type Loki struct { queryScheduler *scheduler.Scheduler usageReport *usagestats.Reporter indexGatewayRingManager *indexgateway.RingManager - groupcacheRingManager *cache.GroupcacheRingManager + embeddedcacheRingManager *cache.GroupcacheRingManager clientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics @@ -476,7 +476,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig, modules.UserInvisibleModule) mm.RegisterModule(MemberlistKV, t.initMemberlistKV, modules.UserInvisibleModule) mm.RegisterModule(Ring, t.initRing, modules.UserInvisibleModule) - mm.RegisterModule(GroupCache, t.initGroupcache, modules.UserInvisibleModule) + mm.RegisterModule(Embededcache, t.initEmbeddedCache, modules.UserInvisibleModule) mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(TenantConfigs, t.initTenantConfigs, modules.UserInvisibleModule) @@ -503,16 +503,16 @@ func (t *Loki) setupModuleManager() error { // Add dependencies deps := map[string][]string{ Ring: {RuntimeConfig, Server, MemberlistKV}, - GroupCache: {RuntimeConfig, Server, MemberlistKV}, + Embededcache: {RuntimeConfig, Server, MemberlistKV}, UsageReport: {}, Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, Distributor: {Ring, Server, Overrides, TenantConfigs, UsageReport}, - Store: {Overrides, GroupCache, IndexGatewayRing}, + Store: {Overrides, Embededcache, IndexGatewayRing}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, UsageReport}, Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs, UsageReport}, - QueryFrontendTripperware: {Server, GroupCache, Overrides, TenantConfigs}, + QueryFrontendTripperware: {Server, Embededcache, Overrides, TenantConfigs}, QueryFrontend: {QueryFrontendTripperware, UsageReport}, QueryScheduler: {Server, Overrides, MemberlistKV, UsageReport}, Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs, UsageReport}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 8bd36b0e3b9c8..4e1f14f2d7faf 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -71,7 +71,7 @@ const maxChunkAgeForTableManager = 12 * time.Hour // The various modules that make up Loki. const ( Ring string = "ring" - GroupCache string = "groupcache" + Embededcache string = "embedded-cache" RuntimeConfig string = "runtime-config" Overrides string = "overrides" OverridesExporter string = "overrides-exporter" @@ -147,8 +147,7 @@ func (t *Loki) initRing() (_ services.Service, err error) { return t.ring, nil } -func (t *Loki) initGroupcache() (_ services.Service, err error) { - // Currently groupcache can only be enabled for results cache. +func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { if !t.Cfg.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { return nil, nil } @@ -162,24 +161,24 @@ func (t *Loki) initGroupcache() (_ services.Service, err error) { rm, err := cache.NewGroupcacheRingManager(groupConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { - return nil, gerrors.Wrap(err, "new groupcache ring manager") + return nil, gerrors.Wrap(err, "new embedded-cache ring manager") } - t.groupcacheRingManager = rm - t.Server.HTTP.Path("/groupcache/ring").Methods("GET", "POST").Handler(t.groupcacheRingManager) + t.embeddedcacheRingManager = rm + t.Server.HTTP.Path("/groupcache/ring").Methods("GET", "POST").Handler(t.embeddedcacheRingManager) gc, err := cache.NewGroupCache(rm, groupConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } + // We support distributed embedded cache only for results currently. t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Cache = gc.NewGroup( t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Prefix+"groupcache", - &t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.GroupCacheConfig, stats.ResultCache, ) - return t.groupcacheRingManager, nil + return t.embeddedcacheRingManager, nil } func (t *Loki) initRuntimeConfig() (services.Service, error) { diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index a065dc1b00104..2c6f740f1ecfb 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -36,9 +36,6 @@ type Config struct { Embeddedcache EmbeddedcacheConfig `yaml:"embedded_cache"` Fifocache FifoCacheConfig `yaml:"fifocache"` // depreciated - // GroupcacheConfig is a local GroupCache config per cache - GroupCacheConfig GroupConfig `yaml:"groupcache"` // depreicated - // This is to name the cache metrics properly. Prefix string `yaml:"prefix" doc:"hidden"` @@ -104,12 +101,32 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta } var caches []Cache - if cfg.EnableFifoCache { - if cfg.Fifocache.TTL == 0 && cfg.DefaultValidity != 0 { - cfg.Fifocache.TTL = cfg.DefaultValidity + + // Currently fifocache can be enabled in two ways. + // 1. cfg.EnableFifocache (old deprecated way) + // 2. cfg.Embeddedcache.Enabled=true and cfg.Embeddedcache.Distributed=false (new way) + // if cfg.EnableFifoCache || (cfg.Embeddedcache.IsEnabledWithoutDistributed()) { + if IsEmbeddedCacheSet(cfg) && !cfg.Embeddedcache.Distributed { + var fifocfg FifoCacheConfig + + if cfg.EnableFifoCache { + fifocfg = cfg.Fifocache + } + + if cfg.Embeddedcache.IsEnabledWithoutDistributed() { + fifocfg = FifoCacheConfig{ + MaxSizeBytes: fmt.Sprint(cfg.Embeddedcache.MaxSizeMB * 1e6), + MaxSizeItems: cfg.Embeddedcache.MaxItems, + TTL: cfg.Embeddedcache.TTL, + PurgeInterval: cfg.Embeddedcache.PurgeInterval, + } + } + + if fifocfg.TTL == 0 && cfg.DefaultValidity != 0 { + fifocfg.TTL = cfg.DefaultValidity } - if cache := NewFifoCache(cfg.Prefix+"fifocache", cfg.Fifocache, reg, logger, cacheType); cache != nil { + if cache := NewFifoCache(cfg.Prefix+"fifocache", fifocfg, reg, logger, cacheType); cache != nil { caches = append(caches, CollectStats(Instrument(cfg.Prefix+"fifocache", cache, reg))) } } @@ -143,8 +160,9 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta caches = append(caches, CollectStats(NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg))) } - if IsEmbeddedCacheSet(cfg) { - cacheName := cfg.Prefix + "embedded-cache" + if IsEmbeddedCacheSet(cfg) && cfg.Embeddedcache.Distributed { + cacheName := cfg.Prefix + "groupcache" + caches = append(caches, CollectStats(Instrument(cacheName, cfg.Cache, reg))) } diff --git a/pkg/storage/chunk/cache/groupcache.go b/pkg/storage/chunk/cache/groupcache.go index 11315b85f40ba..0e1ee93ed6f07 100644 --- a/pkg/storage/chunk/cache/groupcache.go +++ b/pkg/storage/chunk/cache/groupcache.go @@ -67,11 +67,6 @@ type GroupCacheConfig struct { Cache Cache `yaml:"-"` } -// Groupconfig represents config per Group. -type GroupConfig struct { - CapacityMB int64 `yaml:"capacity_mb,omitempty"` -} - type ringManager interface { Addr() string Ring() ring.ReadRing @@ -203,7 +198,7 @@ type group struct { storeDuration prometheus.Observer } -func (c *GroupCache) NewGroup(name string, cfg *GroupConfig, ct stats.CacheType) Cache { +func (c *GroupCache) NewGroup(name string, ct stats.CacheType) Cache { // Return a known error on miss to track which keys need to be inserted missGetter := groupcache.GetterFunc(func(_ context.Context, _ string, _ groupcache.Sink) error { return ErrGroupcacheMiss @@ -212,11 +207,6 @@ func (c *GroupCache) NewGroup(name string, cfg *GroupConfig, ct stats.CacheType) c.wg.Add(1) c.startWaitingForClose() - cap := c.cacheBytes - if cfg.CapacityMB != 0 { - cap = cfg.CapacityMB * 1e6 // MB into bytes - } - requestDuration := promauto.With(c.reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "loki", Name: "groupcache_request_duration_seconds", @@ -226,8 +216,8 @@ func (c *GroupCache) NewGroup(name string, cfg *GroupConfig, ct stats.CacheType) }, []string{"operation"}) g := &group{ - cache: groupcache.NewGroup(name, cap, missGetter), - cacheBytes: cap, + cache: groupcache.NewGroup(name, c.cacheBytes, missGetter), + cacheBytes: c.cacheBytes, logger: c.logger, wg: &c.wg, cacheType: ct, diff --git a/pkg/storage/chunk/cache/groupcache_test.go b/pkg/storage/chunk/cache/groupcache_test.go index c75d2a727d0b5..394da4007a4bc 100644 --- a/pkg/storage/chunk/cache/groupcache_test.go +++ b/pkg/storage/chunk/cache/groupcache_test.go @@ -7,7 +7,6 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/ring" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -15,7 +14,7 @@ func TestGroupCache(t *testing.T) { gc, err := setupGroupCache() require.Nil(t, err) - c := gc.NewGroup("test-group", &GroupConfig{}, "test") + c := gc.NewGroup("test-group", "test") defer c.Stop() keys := []string{"key1", "key2", "key3"} @@ -43,19 +42,6 @@ func TestGroupCache(t *testing.T) { for i := 0; i < len(miss); i++ { require.Equal(t, miss[i], missed[i]) } - - // passing empty GroupConfig should use global `CapacityMB`.(which is 1MB). - c1 := gc.NewGroup("test-group1", &GroupConfig{}, "test1") - defer c.Stop() - - assert.Equal(t, c1.(*group).cacheBytes, int64(1*1e6)) - - // pass explicitly capacity per group should take preference. - c2 := gc.NewGroup("test-group2", &GroupConfig{CapacityMB: 6}, "test2") - defer c.Stop() - - assert.Equal(t, c2.(*group).cacheBytes, int64(6*1e6)) - } func setupGroupCache() (*GroupCache, error) { From 570b3708e4b17ce7452a7413451518a3e0dd08a9 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 12:20:33 +0200 Subject: [PATCH 04/19] Fix edge case with enabling fifocache Signed-off-by: Kaviraj --- pkg/storage/chunk/cache/cache.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 2c6f740f1ecfb..7096cb7b840ac 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -105,8 +105,9 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta // Currently fifocache can be enabled in two ways. // 1. cfg.EnableFifocache (old deprecated way) // 2. cfg.Embeddedcache.Enabled=true and cfg.Embeddedcache.Distributed=false (new way) + // if cfg.EnableFifoCache || (cfg.Embeddedcache.IsEnabledWithoutDistributed()) { - if IsEmbeddedCacheSet(cfg) && !cfg.Embeddedcache.Distributed { + if cfg.EnableFifoCache || (IsEmbeddedCacheSet(cfg) && !cfg.Embeddedcache.Distributed) { var fifocfg FifoCacheConfig if cfg.EnableFifoCache { From d1b93ccab4b194221c3801dd8e7b68edf3960386 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 12:38:32 +0200 Subject: [PATCH 05/19] Fix missing flags for embedded cache Signed-off-by: Kaviraj --- pkg/storage/chunk/cache/cache.go | 1 + pkg/storage/chunk/cache/memorycache.go | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 7096cb7b840ac..20dac27cfc6f8 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -58,6 +58,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.Embeddedcache.RegisterFlagsWithPrefix(prefix, description, f) f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") diff --git a/pkg/storage/chunk/cache/memorycache.go b/pkg/storage/chunk/cache/memorycache.go index 84f273bbcf42e..579aa28a00eda 100644 --- a/pkg/storage/chunk/cache/memorycache.go +++ b/pkg/storage/chunk/cache/memorycache.go @@ -21,7 +21,7 @@ type EmbeddedcacheConfig struct { // PurgeInterval tell how often should we remove keys that are expired. // by default it takes `DefaultPurgeInterval` - PurgeInterval time.Duration + PurgeInterval time.Duration `yaml:"-"` // distributed cache configs. Have no meaning if `Distributed=false`. Ring RingCfg `yaml:"ring,omitempty"` @@ -29,11 +29,13 @@ type EmbeddedcacheConfig struct { } func (cfg *EmbeddedcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - f.Int64Var(&cfg.MaxSizeMB, prefix+".max-size-mb", 100, description+"Maximum memory size of the cache in MB.") - f.IntVar(&cfg.MaxItems, prefix+".max-items", 0, description+"Maximum number of entries in the cache.") - f.DurationVar(&cfg.TTL, prefix+".ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") + f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.") + f.BoolVar(&cfg.Distributed, prefix+"embedded-cache.distributed", false, description+"Whether embedded cache is enabled with distributed mode.") + f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") + f.IntVar(&cfg.MaxItems, prefix+"embedded-cache.max-items", 0, description+"Maximum number of entries in the cache.") + f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) - f.IntVar(&cfg.ListenPort, prefix+".listen_port", 4100, "The port to use for groupcache communication") + f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for groupcache communication") } func (em EmbeddedcacheConfig) IsEnabledWithDistributed() bool { From 4035664818a526038e515356061bddbfc4064257 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 12:45:43 +0200 Subject: [PATCH 06/19] Add changelog Signed-off-by: Kaviraj --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b1e8998ea350..626305c337da4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Loki ##### Enhancements +* [6821](https://github.com/grafana/loki/pull/6821) **kavirajk**: Introduce new cache type `embedded-cache` which is a in-process cache system that runs loki without any dependencies of external cache (like memcached, redis, etc). It can be run in two mode `distributed: false` (default and same as old `fifocache`) and `distributed: true` which runs cache is distributed fashing sharing keys between peers if Loki is run in microservices or SSD mode. * [6691](https://github.com/grafana/loki/pull/6691) **dannykopping**: Update production-ready Loki cluster in docker-compose * [6317](https://github.com/grafana/loki/pull/6317) **dannykoping**: General: add cache usage statistics * [6444](https://github.com/grafana/loki/pull/6444) **aminesnow** Add TLS config to query frontend. From 7e6bed9f0d073a73fce8c39564d4e61461daa6fa Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 12:48:53 +0200 Subject: [PATCH 07/19] Warning message for fifocache config Signed-off-by: Kaviraj --- pkg/storage/chunk/cache/cache.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 20dac27cfc6f8..99a78ab391ab7 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -8,6 +8,7 @@ import ( "github.com/pkg/errors" + "github.com/go-kit/kit/log/level" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -62,7 +63,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") - f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") + f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"(deprecated use embedded-cache instead) Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") cfg.Prefix = prefix } @@ -112,6 +113,7 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta var fifocfg FifoCacheConfig if cfg.EnableFifoCache { + level.Warn(logger).Log("msg", "fifocache config is deprecated. use embedded-cache instead") fifocfg = cfg.Fifocache } From d4976b3ba9a89a2d7257c7715e177cb210d09992 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 13:17:28 +0200 Subject: [PATCH 08/19] Make linter happy Signed-off-by: Kaviraj --- pkg/storage/chunk/cache/cache.go | 2 +- .../chunk/cache/{memorycache.go => embeddedcache.go} | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) rename pkg/storage/chunk/cache/{memorycache.go => embeddedcache.go} (89%) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 99a78ab391ab7..5bea1a7191469 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -8,8 +8,8 @@ import ( "github.com/pkg/errors" - "github.com/go-kit/kit/log/level" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/logqlmodel/stats" diff --git a/pkg/storage/chunk/cache/memorycache.go b/pkg/storage/chunk/cache/embeddedcache.go similarity index 89% rename from pkg/storage/chunk/cache/memorycache.go rename to pkg/storage/chunk/cache/embeddedcache.go index 579aa28a00eda..0319f5b8a21a6 100644 --- a/pkg/storage/chunk/cache/memorycache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -38,10 +38,10 @@ func (cfg *EmbeddedcacheConfig) RegisterFlagsWithPrefix(prefix, description stri f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for groupcache communication") } -func (em EmbeddedcacheConfig) IsEnabledWithDistributed() bool { - return em.Enabled && em.Distributed +func (cfg *EmbeddedcacheConfig) IsEnabledWithDistributed() bool { + return cfg.Enabled && cfg.Distributed } -func (em EmbeddedcacheConfig) IsEnabledWithoutDistributed() bool { - return em.Enabled && !em.Distributed +func (cfg *EmbeddedcacheConfig) IsEnabledWithoutDistributed() bool { + return cfg.Enabled && !cfg.Distributed } From 7c476ba23d75d677fbb62249ab5b4842834536dc Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Wed, 3 Aug 2022 18:20:48 +0200 Subject: [PATCH 09/19] Fix listenport issue Signed-off-by: Kaviraj --- pkg/loki/modules.go | 2 ++ pkg/storage/chunk/cache/cache.go | 3 --- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 4e1f14f2d7faf..0720aa9a8e1f2 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -159,6 +159,8 @@ func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { ListenPort: t.Cfg.QueryRange.CacheConfig.Embeddedcache.ListenPort, } + groupConfig.Ring.ListenPort = groupConfig.ListenPort + rm, err := cache.NewGroupcacheRingManager(groupConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, gerrors.Wrap(err, "new embedded-cache ring manager") diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 5bea1a7191469..19f7a1f20092e 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -40,9 +40,6 @@ type Config struct { // This is to name the cache metrics properly. Prefix string `yaml:"prefix" doc:"hidden"` - // // GroupCache is configured/initialized as part of modules and injected here - // GroupCache Cache `yaml:"-"` - // For tests to inject specific implementations. Cache Cache `yaml:"-"` From 993b870c4fd6e20fddd5eeb4627d04010f92f57d Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 11:00:10 +0200 Subject: [PATCH 10/19] idk. review remarks Signed-off-by: Kaviraj --- cmd/loki/loki-local-config.yaml | 8 ++++++ pkg/loki/common/common.go | 6 +++++ pkg/loki/config_wrapper.go | 32 ++++++++++++------------ pkg/loki/modules.go | 21 ++++++++++------ pkg/storage/chunk/cache/embeddedcache.go | 25 +++++++++++++----- pkg/storage/chunk/cache/groupcache.go | 20 +++++++++++---- 6 files changed, 77 insertions(+), 35 deletions(-) diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 75b3d3968685f..d039f37e88ccd 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -16,6 +16,14 @@ common: kvstore: store: inmemory +query_range: + results_cache: + cache: + embedded_cache: + enabled: true + distributed: true + max_size_mb: 100 + schema_config: configs: - from: 2020-10-24 diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 721ffed9d12e3..0c89969d27995 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/netutil" + "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/client/aws" "github.com/grafana/loki/pkg/storage/chunk/client/azure" "github.com/grafana/loki/pkg/storage/chunk/client/baidubce" @@ -43,6 +44,9 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` + + // Global groupcache config. Independent of what type of cache uses the groupcache, we need some singleton configs like Ring configuration. Shouldn't be exposed to user config + EmbeddedcacheConfig cache.EmbeddedcacheSingletonConfig `yaml:"embedded_cache"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -57,6 +61,8 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.") f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") + + c.EmbeddedcacheConfig.RegisterFlagsWithPrefix("common.embedded-cache", "", f) } type Storage struct { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 8b9e154f40169..0e9c2a7b1e82f 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -165,7 +165,7 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { - r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceAddr = r.Common.InstanceAddr + r.Common.EmbeddedcacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr } } @@ -176,7 +176,7 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { - r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames + r.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames } } } @@ -308,16 +308,16 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit // EmbeddedCache distributed ring. if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() && - (mergeWithExisting || reflect.DeepEqual(r.QueryRange.CacheConfig.Embeddedcache.Ring, defaults.QueryRange.CacheConfig.Embeddedcache.Ring)) { - r.QueryRange.CacheConfig.Embeddedcache.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.QueryRange.CacheConfig.Embeddedcache.Ring.HeartbeatPeriod = rc.HeartbeatPeriod - r.QueryRange.CacheConfig.Embeddedcache.Ring.InstancePort = rc.InstancePort - r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceAddr = rc.InstanceAddr - r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceID = rc.InstanceID - r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceZone = rc.InstanceZone - r.QueryRange.CacheConfig.Embeddedcache.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.QueryRange.CacheConfig.Embeddedcache.Ring.KVStore = rc.KVStore + (mergeWithExisting || reflect.DeepEqual(r.Common.EmbeddedcacheConfig.Ring, defaults.Common.EmbeddedcacheConfig.Ring)) { + r.Common.EmbeddedcacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.Common.EmbeddedcacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.Common.EmbeddedcacheConfig.Ring.InstancePort = rc.InstancePort + r.Common.EmbeddedcacheConfig.Ring.InstanceAddr = rc.InstanceAddr + r.Common.EmbeddedcacheConfig.Ring.InstanceID = rc.InstanceID + r.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Common.EmbeddedcacheConfig.Ring.InstanceZone = rc.InstanceZone + r.Common.EmbeddedcacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.Common.EmbeddedcacheConfig.Ring.KVStore = rc.KVStore } } @@ -353,7 +353,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error { if err != nil { return err } - cfg.QueryRange.CacheConfig.Embeddedcache.Ring.TokensFilePath = f + cfg.Common.EmbeddedcacheConfig.Ring.TokensFilePath = f return nil } @@ -432,8 +432,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface) } - if reflect.DeepEqual(cfg.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames, defaults.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames) { - cfg.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames = append(cfg.QueryRange.CacheConfig.Embeddedcache.Ring.InstanceInterfaceNames, loopbackIface) + if reflect.DeepEqual(cfg.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames, defaults.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames) { + cfg.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames, loopbackIface) } } @@ -448,7 +448,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr r.IndexGateway.Ring.KVStore.Store = memberlistStr - r.QueryRange.CacheConfig.Embeddedcache.Ring.KVStore.Store = memberlistStr + r.Common.EmbeddedcacheConfig.Ring.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0720aa9a8e1f2..ce91a3e79078a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -152,16 +152,16 @@ func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { return nil, nil } - groupConfig := cache.GroupCacheConfig{ + groupCacheConfig := cache.GroupCacheConfig{ Enabled: true, - Ring: t.Cfg.QueryRange.CacheConfig.Embeddedcache.Ring, - CapacityMB: t.Cfg.QueryRange.CacheConfig.Embeddedcache.MaxSizeMB, - ListenPort: t.Cfg.QueryRange.CacheConfig.Embeddedcache.ListenPort, + Ring: t.Cfg.Common.EmbeddedcacheConfig.Ring, + MaxSizeMB: t.Cfg.Common.EmbeddedcacheConfig.MaxSizeMB, + ListenPort: t.Cfg.Common.EmbeddedcacheConfig.ListenPort, } - groupConfig.Ring.ListenPort = groupConfig.ListenPort + groupCacheConfig.Ring.ListenPort = groupCacheConfig.ListenPort - rm, err := cache.NewGroupcacheRingManager(groupConfig, util_log.Logger, prometheus.DefaultRegisterer) + rm, err := cache.NewGroupcacheRingManager(groupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, gerrors.Wrap(err, "new embedded-cache ring manager") } @@ -169,14 +169,19 @@ func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { t.embeddedcacheRingManager = rm t.Server.HTTP.Path("/groupcache/ring").Methods("GET", "POST").Handler(t.embeddedcacheRingManager) - gc, err := cache.NewGroupCache(rm, groupConfig, util_log.Logger, prometheus.DefaultRegisterer) + gc, err := cache.NewGroupCache(rm, groupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } + groupConfig := cache.GroupConfig{ + MaxSizeMB: t.Cfg.QueryRange.CacheConfig.Embeddedcache.MaxSizeMB, + } + // We support distributed embedded cache only for results currently. t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Cache = gc.NewGroup( t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Prefix+"groupcache", + &groupConfig, stats.ResultCache, ) @@ -901,7 +906,7 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV if t.Cfg.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { - t.Cfg.QueryRange.CacheConfig.Embeddedcache.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Common.EmbeddedcacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV } t.Server.HTTP.Handle("/memberlist", t.MemberlistKV) diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index 0319f5b8a21a6..23b508f0213e0 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -16,16 +16,15 @@ type EmbeddedcacheConfig struct { Distributed bool `yaml:"distributed,omitempty"` Enabled bool `yaml:"enabled,omitempty"` MaxSizeMB int64 `yaml:"max_size_mb"` - MaxItems int `yaml:"max_items"` + MaxItems int `yaml:"max_items"` // TODO(kavi): should we stop supporting it? TTL time.Duration `yaml:"ttl"` // PurgeInterval tell how often should we remove keys that are expired. // by default it takes `DefaultPurgeInterval` PurgeInterval time.Duration `yaml:"-"` - // distributed cache configs. Have no meaning if `Distributed=false`. - Ring RingCfg `yaml:"ring,omitempty"` - ListenPort int `yaml:"listen_port,omitempty"` + // singleton configs(irrespective of what caches uses it). Mainly useful for distributed caches. + globalConfig EmbeddedcacheSingletonConfig `yaml:"-"` } func (cfg *EmbeddedcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { @@ -34,8 +33,7 @@ func (cfg *EmbeddedcacheConfig) RegisterFlagsWithPrefix(prefix, description stri f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") f.IntVar(&cfg.MaxItems, prefix+"embedded-cache.max-items", 0, description+"Maximum number of entries in the cache.") f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") - cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) - f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for groupcache communication") + } func (cfg *EmbeddedcacheConfig) IsEnabledWithDistributed() bool { @@ -45,3 +43,18 @@ func (cfg *EmbeddedcacheConfig) IsEnabledWithDistributed() bool { func (cfg *EmbeddedcacheConfig) IsEnabledWithoutDistributed() bool { return cfg.Enabled && !cfg.Distributed } + +type EmbeddedcacheSingletonConfig struct { + // distributed cache configs. Have no meaning if `Distributed=false`. + ListenPort int `yaml:"listen_port,omitempty"` + Ring RingCfg `yaml:"ring,omitempty"` + + // Default capacity if none provided while creating each "Group". + MaxSizeMB int64 `yaml:"max_size__mb,omitempty"` +} + +func (cfg *EmbeddedcacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for groupcache communication") + cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) + f.Int64Var(&cfg.MaxSizeMB, prefix+".max-size-mb", 100, "Maximum memory size of the cache in MB.") +} diff --git a/pkg/storage/chunk/cache/groupcache.go b/pkg/storage/chunk/cache/groupcache.go index 0e1ee93ed6f07..d931a1772d179 100644 --- a/pkg/storage/chunk/cache/groupcache.go +++ b/pkg/storage/chunk/cache/groupcache.go @@ -61,7 +61,7 @@ type RingCfg struct { type GroupCacheConfig struct { Enabled bool `yaml:"enabled,omitempty"` Ring RingCfg `yaml:"ring,omitempty"` - CapacityMB int64 `yaml:"capacity_per_cache_mb,omitempty"` + MaxSizeMB int64 `yaml:"max_size_mb,omitempty"` ListenPort int `yaml:"listen_port,omitempty"` Cache Cache `yaml:"-"` @@ -95,7 +95,7 @@ func NewGroupCache(rm ringManager, config GroupCacheConfig, logger log.Logger, r wg: sync.WaitGroup{}, startWaitingForClose: cancel, reg: reg, - cacheBytes: config.CapacityMB * 1e6, // MB => B + cacheBytes: config.MaxSizeMB * 1e6, // MB => B } go cache.serveGroupcache(config.ListenPort) @@ -185,6 +185,11 @@ func (c *GroupCache) Stats() *groupcache.Stats { return &c.cache.Stats } +// Groupconfig represents config per Group. +type GroupConfig struct { + MaxSizeMB int64 `yaml:"max_size_mb,omitempty"` +} + type group struct { cache *groupcache.Group logger log.Logger @@ -198,7 +203,7 @@ type group struct { storeDuration prometheus.Observer } -func (c *GroupCache) NewGroup(name string, ct stats.CacheType) Cache { +func (c *GroupCache) NewGroup(name string, cfg *GroupConfig, ct stats.CacheType) Cache { // Return a known error on miss to track which keys need to be inserted missGetter := groupcache.GetterFunc(func(_ context.Context, _ string, _ groupcache.Sink) error { return ErrGroupcacheMiss @@ -207,6 +212,11 @@ func (c *GroupCache) NewGroup(name string, ct stats.CacheType) Cache { c.wg.Add(1) c.startWaitingForClose() + cap := c.cacheBytes + if cfg.MaxSizeMB != 0 { + cap = cfg.MaxSizeMB * 1e6 // MB into bytes + } + requestDuration := promauto.With(c.reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "loki", Name: "groupcache_request_duration_seconds", @@ -216,8 +226,8 @@ func (c *GroupCache) NewGroup(name string, ct stats.CacheType) Cache { }, []string{"operation"}) g := &group{ - cache: groupcache.NewGroup(name, c.cacheBytes, missGetter), - cacheBytes: c.cacheBytes, + cache: groupcache.NewGroup(name, cap, missGetter), + cacheBytes: cap, logger: c.logger, wg: &c.wg, cacheType: ct, From 0c16ab822fa695450e0f95cea1d6e522a7f1bd2f Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 11:24:01 +0200 Subject: [PATCH 11/19] PR remarks 1. update changelog and upgrade guide 2. s/Embeddedcache/EmbeddedCache/g 3. /groupcache/ handler -> /embedded-cache/ 4. some typos 5. Fix some cache prefixes Signed-off-by: Kaviraj Signed-off-by: Kaviraj --- CHANGELOG.md | 2 +- docs/sources/upgrading/_index.md | 12 ++++++-- pkg/loki/common/common.go | 4 +-- pkg/loki/config_wrapper.go | 38 ++++++++++++------------ pkg/loki/config_wrapper_test.go | 10 +++---- pkg/loki/modules.go | 17 +++++------ pkg/storage/chunk/cache/cache.go | 33 ++++++++++---------- pkg/storage/chunk/cache/embeddedcache.go | 16 +++++----- 8 files changed, 68 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 626305c337da4..1808fed392c3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ #### Loki ##### Enhancements -* [6821](https://github.com/grafana/loki/pull/6821) **kavirajk**: Introduce new cache type `embedded-cache` which is a in-process cache system that runs loki without any dependencies of external cache (like memcached, redis, etc). It can be run in two mode `distributed: false` (default and same as old `fifocache`) and `distributed: true` which runs cache is distributed fashing sharing keys between peers if Loki is run in microservices or SSD mode. +* [6821](https://github.com/grafana/loki/pull/6821) **kavirajk**: Introduce new cache type `embedded-cache` which is an in-process cache system that runs loki without the need for an external cache (like memcached, redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. * [6691](https://github.com/grafana/loki/pull/6691) **dannykopping**: Update production-ready Loki cluster in docker-compose * [6317](https://github.com/grafana/loki/pull/6317) **dannykoping**: General: add cache usage statistics * [6444](https://github.com/grafana/loki/pull/6444) **aminesnow** Add TLS config to query frontend. diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 2f97cec1139fb..28d27bd7a8af2 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -33,6 +33,12 @@ The output is incredibly verbose as it shows the entire internal config struct u ### Loki +#### Fifocache is deprecated + +We introduce new cache called `embedded-cache` which is an in-process cache system that make it possible to run loki without the need for an external cache (like memcached, redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. + +Currently `embedde-cache` with `distributed: true` can be enabled only for results cache. + #### Evenly spread queriers across kubernetes nodes We now evenly spread queriers across the available kubernetes nodes, but allowing more than one querier to be scheduled into the same node. @@ -87,7 +93,7 @@ limits_config: split_queries_by_interval: 10m ``` -In 2.5.0 it can only be defined in the `limits_config` section, **Loki will fail to start if you do not remove the `split_queries_by_interval` config from the `query_range` section.** +In 2.5.0 it can only be defined in the `limits_config` section, **Loki will fail to start if you do not remove the `split_queries_by_interval` config from the `query_range` section.** Additionally, it has a new default value of `30m` rather than `0`. @@ -130,7 +136,7 @@ Meanwhile, the legacy format is a string in the following format: * `parallelise_shardable_queries` under the `query_range` config now defaults to `true`. * `split_queries_by_interval` under the `limits_config` config now defaults to `30m`, it was `0s`. * `max_chunk_age` in the `ingester` config now defaults to `2h` previously it was `1h`. -* `query_ingesters_within` under the `querier` config now defaults to `3h`, previously it was `0s`. Any query (or subquery) that has an end time more than `3h` ago will not be sent to the ingesters, this saves work on the ingesters for data they normally don't contain. If you regularly write old data to Loki you may need to return this value to `0s` to always query ingesters. +* `query_ingesters_within` under the `querier` config now defaults to `3h`, previously it was `0s`. Any query (or subquery) that has an end time more than `3h` ago will not be sent to the ingesters, this saves work on the ingesters for data they normally don't contain. If you regularly write old data to Loki you may need to return this value to `0s` to always query ingesters. * `max_concurrent` under the `querier` config now defaults to `10` instead of `20`. * `match_max_concurrent` under the `frontend_worker` config now defaults to true, this supersedes the `parallelism` setting which can now be removed from your config. Controlling query parallelism of a single process can now be done with the `querier` `max_concurrent` setting. * `flush_op_timeout` under the `ingester` configuration block now defaults to `10m`, increased from `10s`. This can help when replaying a large WAL on Loki startup, and avoid `msg="failed to flush" ... context deadline exceeded` errors. @@ -378,7 +384,7 @@ server: grpc_server_ping_without_stream_allowed: true ``` -[This issue](https://github.com/grafana/loki/issues/4375) has some more information on the change. +[This issue](https://github.com/grafana/loki/issues/4375) has some more information on the change. #### Some metric prefixes have changed from `cortex_` to `loki_` diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index 0c89969d27995..b9ad17318d305 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -46,7 +46,7 @@ type Config struct { CompactorAddress string `yaml:"compactor_address"` // Global groupcache config. Independent of what type of cache uses the groupcache, we need some singleton configs like Ring configuration. Shouldn't be exposed to user config - EmbeddedcacheConfig cache.EmbeddedcacheSingletonConfig `yaml:"embedded_cache"` + EmbeddedCacheConfig cache.EmbeddedCacheSingletonConfig `yaml:"embedded_cache"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { @@ -62,7 +62,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port") - c.EmbeddedcacheConfig.RegisterFlagsWithPrefix("common.embedded-cache", "", f) + c.EmbeddedCacheConfig.RegisterFlagsWithPrefix("common.embedded-cache", "", f) } type Storage struct { diff --git a/pkg/loki/config_wrapper.go b/pkg/loki/config_wrapper.go index 0e9c2a7b1e82f..96a8c7db85920 100644 --- a/pkg/loki/config_wrapper.go +++ b/pkg/loki/config_wrapper.go @@ -164,8 +164,8 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.Addr = r.Common.InstanceAddr r.IndexGateway.Ring.InstanceAddr = r.Common.InstanceAddr - if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { - r.Common.EmbeddedcacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr + if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { + r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = r.Common.InstanceAddr } } @@ -175,8 +175,8 @@ func applyInstanceConfigs(r, defaults *ConfigWrapper) { } r.Frontend.FrontendV2.InfNames = r.Common.InstanceInterfaceNames r.IndexGateway.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames - if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { - r.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames + if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { + r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = r.Common.InstanceInterfaceNames } } } @@ -307,17 +307,17 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc util.RingConfig, mergeWit } // EmbeddedCache distributed ring. - if r.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() && - (mergeWithExisting || reflect.DeepEqual(r.Common.EmbeddedcacheConfig.Ring, defaults.Common.EmbeddedcacheConfig.Ring)) { - r.Common.EmbeddedcacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout - r.Common.EmbeddedcacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod - r.Common.EmbeddedcacheConfig.Ring.InstancePort = rc.InstancePort - r.Common.EmbeddedcacheConfig.Ring.InstanceAddr = rc.InstanceAddr - r.Common.EmbeddedcacheConfig.Ring.InstanceID = rc.InstanceID - r.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames - r.Common.EmbeddedcacheConfig.Ring.InstanceZone = rc.InstanceZone - r.Common.EmbeddedcacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled - r.Common.EmbeddedcacheConfig.Ring.KVStore = rc.KVStore + if r.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() && + (mergeWithExisting || reflect.DeepEqual(r.Common.EmbeddedCacheConfig.Ring, defaults.Common.EmbeddedCacheConfig.Ring)) { + r.Common.EmbeddedCacheConfig.Ring.HeartbeatTimeout = rc.HeartbeatTimeout + r.Common.EmbeddedCacheConfig.Ring.HeartbeatPeriod = rc.HeartbeatPeriod + r.Common.EmbeddedCacheConfig.Ring.InstancePort = rc.InstancePort + r.Common.EmbeddedCacheConfig.Ring.InstanceAddr = rc.InstanceAddr + r.Common.EmbeddedCacheConfig.Ring.InstanceID = rc.InstanceID + r.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = rc.InstanceInterfaceNames + r.Common.EmbeddedCacheConfig.Ring.InstanceZone = rc.InstanceZone + r.Common.EmbeddedCacheConfig.Ring.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled + r.Common.EmbeddedCacheConfig.Ring.KVStore = rc.KVStore } } @@ -353,7 +353,7 @@ func applyTokensFilePath(cfg *ConfigWrapper) error { if err != nil { return err } - cfg.Common.EmbeddedcacheConfig.Ring.TokensFilePath = f + cfg.Common.EmbeddedCacheConfig.Ring.TokensFilePath = f return nil } @@ -432,8 +432,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) { cfg.IndexGateway.Ring.InstanceInterfaceNames = append(cfg.IndexGateway.Ring.InstanceInterfaceNames, loopbackIface) } - if reflect.DeepEqual(cfg.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames, defaults.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames) { - cfg.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.EmbeddedcacheConfig.Ring.InstanceInterfaceNames, loopbackIface) + if reflect.DeepEqual(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, defaults.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames) { + cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames = append(cfg.Common.EmbeddedCacheConfig.Ring.InstanceInterfaceNames, loopbackIface) } } @@ -448,7 +448,7 @@ func applyMemberlistConfig(r *ConfigWrapper) { r.QueryScheduler.SchedulerRing.KVStore.Store = memberlistStr r.CompactorConfig.CompactorRing.KVStore.Store = memberlistStr r.IndexGateway.Ring.KVStore.Store = memberlistStr - r.Common.EmbeddedcacheConfig.Ring.KVStore.Store = memberlistStr + r.Common.EmbeddedCacheConfig.Ring.KVStore.Store = memberlistStr } var ErrTooManyStorageConfigs = errors.New("too many storage configs provided in the common config, please only define one storage backend") diff --git a/pkg/loki/config_wrapper_test.go b/pkg/loki/config_wrapper_test.go index bcd29dd182fad..d88624edd5ec9 100644 --- a/pkg/loki/config_wrapper_test.go +++ b/pkg/loki/config_wrapper_test.go @@ -824,8 +824,8 @@ ingester: t.Run("embedded-cache setting is applied to result caches", func(t *testing.T) { // ensure they are all false by default config, _, _ := configWrapperFromYAML(t, minimalConfig, nil) - assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Enabled) - assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Distributed) + assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled) + assert.False(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed) configFileString := `--- query_range: @@ -837,8 +837,8 @@ query_range: config, _ = testContext(configFileString, nil) - assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Enabled) - assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.Embeddedcache.Distributed) + assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Enabled) + assert.True(t, config.QueryRange.ResultsCacheConfig.CacheConfig.EmbeddedCache.Distributed) }) } @@ -878,7 +878,7 @@ query_range: distributed: true` config, _, _ := configWrapperFromYAML(t, configFileString, nil) - assert.True(t, config.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed()) + assert.True(t, config.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed()) assert.False(t, config.QueryRange.CacheConfig.EnableFifoCache) }) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index ce91a3e79078a..fa7dbfe305381 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -148,15 +148,15 @@ func (t *Loki) initRing() (_ services.Service, err error) { } func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { - if !t.Cfg.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { + if !t.Cfg.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { return nil, nil } groupCacheConfig := cache.GroupCacheConfig{ Enabled: true, - Ring: t.Cfg.Common.EmbeddedcacheConfig.Ring, - MaxSizeMB: t.Cfg.Common.EmbeddedcacheConfig.MaxSizeMB, - ListenPort: t.Cfg.Common.EmbeddedcacheConfig.ListenPort, + Ring: t.Cfg.Common.EmbeddedCacheConfig.Ring, + MaxSizeMB: t.Cfg.Common.EmbeddedCacheConfig.MaxSizeMB, + ListenPort: t.Cfg.Common.EmbeddedCacheConfig.ListenPort, } groupCacheConfig.Ring.ListenPort = groupCacheConfig.ListenPort @@ -167,7 +167,7 @@ func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { } t.embeddedcacheRingManager = rm - t.Server.HTTP.Path("/groupcache/ring").Methods("GET", "POST").Handler(t.embeddedcacheRingManager) + t.Server.HTTP.Path("/embedded-cache/ring").Methods("GET", "POST").Handler(t.embeddedcacheRingManager) gc, err := cache.NewGroupCache(rm, groupCacheConfig, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { @@ -175,10 +175,9 @@ func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { } groupConfig := cache.GroupConfig{ - MaxSizeMB: t.Cfg.QueryRange.CacheConfig.Embeddedcache.MaxSizeMB, + MaxSizeMB: t.Cfg.QueryRange.CacheConfig.EmbeddedCache.MaxSizeMB, } - // We support distributed embedded cache only for results currently. t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Cache = gc.NewGroup( t.Cfg.QueryRange.ResultsCacheConfig.CacheConfig.Prefix+"groupcache", &groupConfig, @@ -905,8 +904,8 @@ func (t *Loki) initMemberlistKV() (services.Service, error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.QueryScheduler.SchedulerRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - if t.Cfg.QueryRange.CacheConfig.Embeddedcache.IsEnabledWithDistributed() { - t.Cfg.Common.EmbeddedcacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + if t.Cfg.QueryRange.CacheConfig.EmbeddedCache.IsEnabledWithDistributed() { + t.Cfg.Common.EmbeddedCacheConfig.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV } t.Server.HTTP.Handle("/memberlist", t.MemberlistKV) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 19f7a1f20092e..601c133b8eb46 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -34,8 +34,8 @@ type Config struct { Memcache MemcachedConfig `yaml:"memcached"` MemcacheClient MemcachedClientConfig `yaml:"memcached_client"` Redis RedisConfig `yaml:"redis"` - Embeddedcache EmbeddedcacheConfig `yaml:"embedded_cache"` - Fifocache FifoCacheConfig `yaml:"fifocache"` // depreciated + EmbeddedCache EmbeddedCacheConfig `yaml:"embedded_cache"` + Fifocache FifoCacheConfig `yaml:"fifocache"` // deprecated // This is to name the cache metrics properly. Prefix string `yaml:"prefix" doc:"hidden"` @@ -56,11 +56,11 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.MemcacheClient.RegisterFlagsWithPrefix(prefix, description, f) cfg.Redis.RegisterFlagsWithPrefix(prefix, description, f) cfg.Fifocache.RegisterFlagsWithPrefix(prefix, description, f) - cfg.Embeddedcache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.EmbeddedCache.RegisterFlagsWithPrefix(prefix, description, f) f.IntVar(&cfg.AsyncCacheWriteBackConcurrency, prefix+"max-async-cache-write-back-concurrency", 16, "The maximum number of concurrent asynchronous writeback cache can occur.") f.IntVar(&cfg.AsyncCacheWriteBackBufferSize, prefix+"max-async-cache-write-back-buffer-size", 500, "The maximum number of enqueued asynchronous writeback cache allowed.") f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", time.Hour, description+"The default validity of entries for caches unless overridden.") - f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"(deprecated use embedded-cache instead) Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") + f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"(deprecated: use embedded-cache instead) Enable in-memory cache (auto-enabled for the chunks & query results cache if no other cache is configured).") cfg.Prefix = prefix } @@ -85,7 +85,7 @@ func IsRedisSet(cfg Config) bool { } func IsEmbeddedCacheSet(cfg Config) bool { - return cfg.Embeddedcache.Enabled + return cfg.EmbeddedCache.Enabled } // IsCacheConfigured determines if memcached, redis, or embedded-cache have been configured @@ -103,10 +103,9 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta // Currently fifocache can be enabled in two ways. // 1. cfg.EnableFifocache (old deprecated way) - // 2. cfg.Embeddedcache.Enabled=true and cfg.Embeddedcache.Distributed=false (new way) + // 2. cfg.EmbeddedCache.Enabled=true and cfg.EmbeddedCache.Distributed=false (new way) - // if cfg.EnableFifoCache || (cfg.Embeddedcache.IsEnabledWithoutDistributed()) { - if cfg.EnableFifoCache || (IsEmbeddedCacheSet(cfg) && !cfg.Embeddedcache.Distributed) { + if cfg.EnableFifoCache || (IsEmbeddedCacheSet(cfg) && !cfg.EmbeddedCache.Distributed) { var fifocfg FifoCacheConfig if cfg.EnableFifoCache { @@ -114,12 +113,12 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta fifocfg = cfg.Fifocache } - if cfg.Embeddedcache.IsEnabledWithoutDistributed() { + if cfg.EmbeddedCache.IsEnabledWithoutDistributed() { fifocfg = FifoCacheConfig{ - MaxSizeBytes: fmt.Sprint(cfg.Embeddedcache.MaxSizeMB * 1e6), - MaxSizeItems: cfg.Embeddedcache.MaxItems, - TTL: cfg.Embeddedcache.TTL, - PurgeInterval: cfg.Embeddedcache.PurgeInterval, + MaxSizeBytes: fmt.Sprint(cfg.EmbeddedCache.MaxSizeMB * 1e6), + MaxSizeItems: cfg.EmbeddedCache.MaxItems, + TTL: cfg.EmbeddedCache.TTL, + PurgeInterval: cfg.EmbeddedCache.PurgeInterval, } } @@ -127,8 +126,8 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta fifocfg.TTL = cfg.DefaultValidity } - if cache := NewFifoCache(cfg.Prefix+"fifocache", fifocfg, reg, logger, cacheType); cache != nil { - caches = append(caches, CollectStats(Instrument(cfg.Prefix+"fifocache", cache, reg))) + if cache := NewFifoCache(cfg.Prefix+"embedded-cache", fifocfg, reg, logger, cacheType); cache != nil { + caches = append(caches, CollectStats(Instrument(cfg.Prefix+"embedded-cache", cache, reg))) } } @@ -161,8 +160,8 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta caches = append(caches, CollectStats(NewBackground(cacheName, cfg.Background, Instrument(cacheName, cache, reg), reg))) } - if IsEmbeddedCacheSet(cfg) && cfg.Embeddedcache.Distributed { - cacheName := cfg.Prefix + "groupcache" + if IsEmbeddedCacheSet(cfg) && cfg.EmbeddedCache.Distributed { + cacheName := cfg.Prefix + "embedded-cache" caches = append(caches, CollectStats(Instrument(cacheName, cfg.Cache, reg))) } diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index 23b508f0213e0..a153b807c648b 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -9,10 +9,10 @@ const ( DefaultPurgeInterval = 1 * time.Minute ) -// EmbeddedcacheConfig represents in-process embedded cache config. +// EmbeddedCacheConfig represents in-process embedded cache config. // It can also be distributed, sharding keys across peers when run with microservices // or SSD mode. -type EmbeddedcacheConfig struct { +type EmbeddedCacheConfig struct { Distributed bool `yaml:"distributed,omitempty"` Enabled bool `yaml:"enabled,omitempty"` MaxSizeMB int64 `yaml:"max_size_mb"` @@ -24,10 +24,10 @@ type EmbeddedcacheConfig struct { PurgeInterval time.Duration `yaml:"-"` // singleton configs(irrespective of what caches uses it). Mainly useful for distributed caches. - globalConfig EmbeddedcacheSingletonConfig `yaml:"-"` + globalConfig EmbeddedCacheSingletonConfig `yaml:"-"` } -func (cfg *EmbeddedcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { +func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.") f.BoolVar(&cfg.Distributed, prefix+"embedded-cache.distributed", false, description+"Whether embedded cache is enabled with distributed mode.") f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") @@ -36,15 +36,15 @@ func (cfg *EmbeddedcacheConfig) RegisterFlagsWithPrefix(prefix, description stri } -func (cfg *EmbeddedcacheConfig) IsEnabledWithDistributed() bool { +func (cfg *EmbeddedCacheConfig) IsEnabledWithDistributed() bool { return cfg.Enabled && cfg.Distributed } -func (cfg *EmbeddedcacheConfig) IsEnabledWithoutDistributed() bool { +func (cfg *EmbeddedCacheConfig) IsEnabledWithoutDistributed() bool { return cfg.Enabled && !cfg.Distributed } -type EmbeddedcacheSingletonConfig struct { +type EmbeddedCacheSingletonConfig struct { // distributed cache configs. Have no meaning if `Distributed=false`. ListenPort int `yaml:"listen_port,omitempty"` Ring RingCfg `yaml:"ring,omitempty"` @@ -53,7 +53,7 @@ type EmbeddedcacheSingletonConfig struct { MaxSizeMB int64 `yaml:"max_size__mb,omitempty"` } -func (cfg *EmbeddedcacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { +func (cfg *EmbeddedCacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for groupcache communication") cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) f.Int64Var(&cfg.MaxSizeMB, prefix+".max-size-mb", 100, "Maximum memory size of the cache in MB.") From 314e938ac1be9cefe3857c9b39834f4ef7cfe22b Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 11:33:59 +0200 Subject: [PATCH 12/19] Fixing corresponding tests Signed-off-by: Kaviraj --- pkg/loki/common/common.go | 2 +- pkg/storage/chunk/cache/groupcache_test.go | 20 +++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/pkg/loki/common/common.go b/pkg/loki/common/common.go index b9ad17318d305..0a5459911f7ef 100644 --- a/pkg/loki/common/common.go +++ b/pkg/loki/common/common.go @@ -45,7 +45,7 @@ type Config struct { // CompactorAddress is the http address of the compactor in the form http://host:port CompactorAddress string `yaml:"compactor_address"` - // Global groupcache config. Independent of what type of cache uses the groupcache, we need some singleton configs like Ring configuration. Shouldn't be exposed to user config + // Global embedded-cache config. Independent of what type of cache, we need some singleton configs like Ring configuration when running in distributed fashion. EmbeddedCacheConfig cache.EmbeddedCacheSingletonConfig `yaml:"embedded_cache"` } diff --git a/pkg/storage/chunk/cache/groupcache_test.go b/pkg/storage/chunk/cache/groupcache_test.go index 394da4007a4bc..d0501bae7c94f 100644 --- a/pkg/storage/chunk/cache/groupcache_test.go +++ b/pkg/storage/chunk/cache/groupcache_test.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/ring" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -14,7 +15,7 @@ func TestGroupCache(t *testing.T) { gc, err := setupGroupCache() require.Nil(t, err) - c := gc.NewGroup("test-group", "test") + c := gc.NewGroup("test-group", &GroupConfig{}, "test") defer c.Stop() keys := []string{"key1", "key2", "key3"} @@ -42,12 +43,25 @@ func TestGroupCache(t *testing.T) { for i := 0; i < len(miss); i++ { require.Equal(t, miss[i], missed[i]) } + + // passing empty GroupConfig should use global `CapacityMB`.(which is 1MB). + c1 := gc.NewGroup("test-group1", &GroupConfig{}, "test1") + defer c.Stop() + + assert.Equal(t, c1.(*group).cacheBytes, int64(1*1e6)) + + // pass explicitly capacity per group should take preference. + c2 := gc.NewGroup("test-group2", &GroupConfig{MaxSizeMB: 6}, "test2") + defer c.Stop() + + assert.Equal(t, c2.(*group).cacheBytes, int64(6*1e6)) + } func setupGroupCache() (*GroupCache, error) { return NewGroupCache(&mockRingManager{}, GroupCacheConfig{ - Enabled: true, - CapacityMB: 1, + Enabled: true, + MaxSizeMB: 1, }, log.NewNopLogger(), nil) } From f6ba1937ea7d2184ca74c569b379686b63cd2571 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 11:48:45 +0200 Subject: [PATCH 13/19] Support extra timeouts on embedded-cache Signed-off-by: Kaviraj --- pkg/loki/modules.go | 11 +++++++---- pkg/storage/chunk/cache/embeddedcache.go | 10 +++++++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index fa7dbfe305381..b87e0c9734a17 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -153,10 +153,13 @@ func (t *Loki) initEmbeddedCache() (_ services.Service, err error) { } groupCacheConfig := cache.GroupCacheConfig{ - Enabled: true, - Ring: t.Cfg.Common.EmbeddedCacheConfig.Ring, - MaxSizeMB: t.Cfg.Common.EmbeddedCacheConfig.MaxSizeMB, - ListenPort: t.Cfg.Common.EmbeddedCacheConfig.ListenPort, + Enabled: true, + Ring: t.Cfg.Common.EmbeddedCacheConfig.Ring, + MaxSizeMB: t.Cfg.Common.EmbeddedCacheConfig.MaxSizeMB, + ListenPort: t.Cfg.Common.EmbeddedCacheConfig.ListenPort, + HeartbeatInterval: t.Cfg.Common.EmbeddedCacheConfig.HeartbeatInterval, + HeartbeatTimeout: t.Cfg.Common.EmbeddedCacheConfig.HeartbeatTimeout, + WriteByteTimeout: t.Cfg.Common.EmbeddedCacheConfig.WriteByteTimeout, } groupCacheConfig.Ring.ListenPort = groupCacheConfig.ListenPort diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index a153b807c648b..bdef8e589ce58 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -51,10 +51,18 @@ type EmbeddedCacheSingletonConfig struct { // Default capacity if none provided while creating each "Group". MaxSizeMB int64 `yaml:"max_size__mb,omitempty"` + + // Different timeouts + HeartbeatInterval time.Duration `yaml:"heartbeat_interval,omitempty"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"` + WriteByteTimeout time.Duration `yaml:"write_timeout,omitempty"` } func (cfg *EmbeddedCacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for groupcache communication") cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) - f.Int64Var(&cfg.MaxSizeMB, prefix+".max-size-mb", 100, "Maximum memory size of the cache in MB.") + f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, "Maximum memory size of the cache in MB.") + f.DurationVar(&cfg.HeartbeatInterval, prefix+"embedded-cache.heartbeat-interval", time.Second, "If the connection is idle, the interval the cache will send heartbeats") + f.DurationVar(&cfg.HeartbeatTimeout, prefix+"embedded-cache.heartbeat-timeout", time.Second, "Timeout for heartbeat responses") + f.DurationVar(&cfg.WriteByteTimeout, prefix+"embeddec-cache.write-timeout", time.Second, "Maximum time for the cache to try writing") } From fa076e4a518c8fa7d36940777221ef814201de05 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 11:51:09 +0200 Subject: [PATCH 14/19] Remove unwanted single struct field Signed-off-by: Kaviraj --- pkg/storage/chunk/cache/embeddedcache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index bdef8e589ce58..2cd11de4e2dc7 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -23,7 +23,6 @@ type EmbeddedCacheConfig struct { // by default it takes `DefaultPurgeInterval` PurgeInterval time.Duration `yaml:"-"` - // singleton configs(irrespective of what caches uses it). Mainly useful for distributed caches. globalConfig EmbeddedCacheSingletonConfig `yaml:"-"` } @@ -44,6 +43,7 @@ func (cfg *EmbeddedCacheConfig) IsEnabledWithoutDistributed() bool { return cfg.Enabled && !cfg.Distributed } +// EmbeddedCacheSingletonConfig defines global singleton needed by Embedded cache(particularly used in distributed fashion) type EmbeddedCacheSingletonConfig struct { // distributed cache configs. Have no meaning if `Distributed=false`. ListenPort int `yaml:"listen_port,omitempty"` From 5c087cc500e0733b4e425ac941c0f1a674a16faf Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 12:49:16 +0200 Subject: [PATCH 15/19] fix(stats-collector): Let cache.New() wrap stats collector for distributed cache Signed-off-by: Kaviraj --- pkg/querier/queryrange/roundtrip.go | 1 + pkg/storage/chunk/cache/cache.go | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 0f40e79770b30..f254c6b522506 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -52,6 +52,7 @@ func NewTripperware( c cache.Cache err error ) + if cfg.CacheResults { c, err = cache.New(cfg.CacheConfig, registerer, log, stats.ResultCache) if err != nil { diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 601c133b8eb46..73b45887b9d6b 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -95,7 +95,10 @@ func IsCacheConfigured(cfg Config) bool { // New creates a new Cache using Config. func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType stats.CacheType) (Cache, error) { - if cfg.Cache != nil { + + // Have additional check for embeddedcache with distributed mode, because those cache will already be initialized in modules + // but still need stats collector wrapper for it. + if cfg.Cache != nil && !cfg.EmbeddedCache.IsEnabledWithDistributed() { return cfg.Cache, nil } @@ -162,7 +165,6 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta if IsEmbeddedCacheSet(cfg) && cfg.EmbeddedCache.Distributed { cacheName := cfg.Prefix + "embedded-cache" - caches = append(caches, CollectStats(Instrument(cacheName, cfg.Cache, reg))) } From b71e47ed30eabc8204b93201edc4e634693da3c2 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 12:51:05 +0200 Subject: [PATCH 16/19] PR remarks Signed-off-by: Kaviraj --- docs/sources/upgrading/_index.md | 4 ++-- pkg/storage/chunk/cache/embeddedcache.go | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index 28d27bd7a8af2..c534966be9690 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -35,9 +35,9 @@ The output is incredibly verbose as it shows the entire internal config struct u #### Fifocache is deprecated -We introduce new cache called `embedded-cache` which is an in-process cache system that make it possible to run loki without the need for an external cache (like memcached, redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. +We introduced a new cache called `embedded-cache` which is an in-process cache system that make it possible to run loki without the need for an external cache (like memcached, redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. -Currently `embedde-cache` with `distributed: true` can be enabled only for results cache. +Currently `embedded-cache` with `distributed: true` can be enabled only for results cache. #### Evenly spread queriers across kubernetes nodes diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index 2cd11de4e2dc7..c3432edcf15ca 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -22,8 +22,6 @@ type EmbeddedCacheConfig struct { // PurgeInterval tell how often should we remove keys that are expired. // by default it takes `DefaultPurgeInterval` PurgeInterval time.Duration `yaml:"-"` - - globalConfig EmbeddedCacheSingletonConfig `yaml:"-"` } func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { From 1d9c80f65f538ed1bd6def8595c8a902a214481e Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 12:55:41 +0200 Subject: [PATCH 17/19] Deprecate max_size_items from fifocache Signed-off-by: Kaviraj --- pkg/storage/chunk/cache/cache.go | 1 - pkg/storage/chunk/cache/embeddedcache.go | 2 -- pkg/storage/chunk/cache/fifo_cache.go | 4 ++-- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/storage/chunk/cache/cache.go b/pkg/storage/chunk/cache/cache.go index 73b45887b9d6b..e884266a64819 100644 --- a/pkg/storage/chunk/cache/cache.go +++ b/pkg/storage/chunk/cache/cache.go @@ -119,7 +119,6 @@ func New(cfg Config, reg prometheus.Registerer, logger log.Logger, cacheType sta if cfg.EmbeddedCache.IsEnabledWithoutDistributed() { fifocfg = FifoCacheConfig{ MaxSizeBytes: fmt.Sprint(cfg.EmbeddedCache.MaxSizeMB * 1e6), - MaxSizeItems: cfg.EmbeddedCache.MaxItems, TTL: cfg.EmbeddedCache.TTL, PurgeInterval: cfg.EmbeddedCache.PurgeInterval, } diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index c3432edcf15ca..97c2a727c78f0 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -16,7 +16,6 @@ type EmbeddedCacheConfig struct { Distributed bool `yaml:"distributed,omitempty"` Enabled bool `yaml:"enabled,omitempty"` MaxSizeMB int64 `yaml:"max_size_mb"` - MaxItems int `yaml:"max_items"` // TODO(kavi): should we stop supporting it? TTL time.Duration `yaml:"ttl"` // PurgeInterval tell how often should we remove keys that are expired. @@ -28,7 +27,6 @@ func (cfg *EmbeddedCacheConfig) RegisterFlagsWithPrefix(prefix, description stri f.BoolVar(&cfg.Enabled, prefix+"embedded-cache.enabled", false, description+"Whether embedded cache is enabled.") f.BoolVar(&cfg.Distributed, prefix+"embedded-cache.distributed", false, description+"Whether embedded cache is enabled with distributed mode.") f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, description+"Maximum memory size of the cache in MB.") - f.IntVar(&cfg.MaxItems, prefix+"embedded-cache.max-items", 0, description+"Maximum number of entries in the cache.") f.DurationVar(&cfg.TTL, prefix+"embedded-cache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") } diff --git a/pkg/storage/chunk/cache/fifo_cache.go b/pkg/storage/chunk/cache/fifo_cache.go index f2722466f5365..6871fb328884c 100644 --- a/pkg/storage/chunk/cache/fifo_cache.go +++ b/pkg/storage/chunk/cache/fifo_cache.go @@ -34,7 +34,7 @@ const ( // FifoCacheConfig holds config for the FifoCache. type FifoCacheConfig struct { MaxSizeBytes string `yaml:"max_size_bytes"` - MaxSizeItems int `yaml:"max_size_items"` + MaxSizeItems int `yaml:"max_size_items"` // deprecated TTL time.Duration `yaml:"ttl"` DeprecatedValidity time.Duration `yaml:"validity"` @@ -46,7 +46,7 @@ type FifoCacheConfig struct { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { f.StringVar(&cfg.MaxSizeBytes, prefix+"fifocache.max-size-bytes", "1GB", description+"Maximum memory size of the cache in bytes. A unit suffix (KB, MB, GB) may be applied.") - f.IntVar(&cfg.MaxSizeItems, prefix+"fifocache.max-size-items", 0, description+"Maximum number of entries in the cache.") + f.IntVar(&cfg.MaxSizeItems, prefix+"fifocache.max-size-items", 0, description+"deprecated: Maximum number of entries in the cache.") f.DurationVar(&cfg.TTL, prefix+"fifocache.ttl", time.Hour, description+"The time to live for items in the cache before they get purged.") f.DurationVar(&cfg.DeprecatedValidity, prefix+"fifocache.duration", 0, "Deprecated (use ttl instead): "+description+"The expiry duration for the cache.") From eeb8d77e6b47e889859f09dcdc7e0adcd490dc80 Mon Sep 17 00:00:00 2001 From: Kaviraj Date: Thu, 4 Aug 2022 16:44:18 +0200 Subject: [PATCH 18/19] PR remarks about the flag description Signed-off-by: Kaviraj --- pkg/storage/chunk/cache/embeddedcache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/chunk/cache/embeddedcache.go b/pkg/storage/chunk/cache/embeddedcache.go index 97c2a727c78f0..f383638345586 100644 --- a/pkg/storage/chunk/cache/embeddedcache.go +++ b/pkg/storage/chunk/cache/embeddedcache.go @@ -55,7 +55,7 @@ type EmbeddedCacheSingletonConfig struct { } func (cfg *EmbeddedCacheSingletonConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for groupcache communication") + f.IntVar(&cfg.ListenPort, prefix+"embedded-cache.listen_port", 4100, "The port to use for cache communications across the peers when run in distributed fashion") cfg.Ring.RegisterFlagsWithPrefix(prefix, "", f) f.Int64Var(&cfg.MaxSizeMB, prefix+"embedded-cache.max-size-mb", 100, "Maximum memory size of the cache in MB.") f.DurationVar(&cfg.HeartbeatInterval, prefix+"embedded-cache.heartbeat-interval", time.Second, "If the connection is idle, the interval the cache will send heartbeats") From 800fbdf37ef4417fc4b4b499a14da29d7f4f25f2 Mon Sep 17 00:00:00 2001 From: Kaviraj Kanagaraj Date: Mon, 8 Aug 2022 11:03:08 +0200 Subject: [PATCH 19/19] Update docs/sources/upgrading/_index.md Co-authored-by: Karen Miller <84039272+KMiller-Grafana@users.noreply.github.com> --- docs/sources/upgrading/_index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/upgrading/_index.md b/docs/sources/upgrading/_index.md index c534966be9690..935e1281fdcc0 100644 --- a/docs/sources/upgrading/_index.md +++ b/docs/sources/upgrading/_index.md @@ -35,7 +35,7 @@ The output is incredibly verbose as it shows the entire internal config struct u #### Fifocache is deprecated -We introduced a new cache called `embedded-cache` which is an in-process cache system that make it possible to run loki without the need for an external cache (like memcached, redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. +We introduced a new cache called `embedded-cache` which is an in-process cache system that make it possible to run Loki without the need for an external cache (like Memcached, Redis, etc). It can be run in two modes `distributed: false` (default, and same as old `fifocache`) and `distributed: true` which runs cache in distributed fashion sharding keys across peers if Loki is run in microservices or SSD mode. Currently `embedded-cache` with `distributed: true` can be enabled only for results cache.