Skip to content

Commit

Permalink
Loki: check new Read target when initializing boltdb-shipper store (#…
Browse files Browse the repository at this point in the history
…4681)

* make sure to check for new `Read` target when setting up boltdb-shipper

* also need to make sure ingester querier is added as a dep for Read target

* improve the comment on why we shorten the cache validity.

* fix shellcheck errors

* apply smart defaults to chunk retain period to avoid query gaps.

* typo
  • Loading branch information
slim-bean authored Nov 7, 2021
1 parent c538171 commit fbebf67
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 14 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,9 @@ fmt-jsonnet:
xargs -n 1 -- jsonnetfmt -i

lint-scripts:
# Ignore https://github.com/koalaman/shellcheck/wiki/SC2312
@find . -name '*.sh' -not -path '*/vendor/*' -print0 | \
xargs -0 -n1 shellcheck -x -o all
xargs -0 -n1 shellcheck -e SC2312 -x -o all


# search for dead link in our documentation.
Expand Down
14 changes: 14 additions & 0 deletions pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (c *ConfigWrapper) ApplyDynamicConfig() cfg.Source {

applyFIFOCacheConfig(r)
applyIngesterFinalSleep(r)
applyChunkRetain(r, &defaults)

return nil
}
Expand Down Expand Up @@ -468,3 +469,16 @@ func isMemcacheSet(cfg cortexcache.Config) bool {
func applyIngesterFinalSleep(cfg *ConfigWrapper) {
cfg.Ingester.LifecyclerConfig.FinalSleep = 0 * time.Second
}

// applyChunkRetain is used to set chunk retain based on having an index query cache configured
// We retain chunks for at least as long as the index queries cache TTL. When an index entry is
// cached, any chunks flushed after that won't be in the cached entry. To make sure their data is
// available the RetainPeriod keeps them available in the ingesters live data. We want to retain them
// for at least as long as the TTL on the index queries cache.
func applyChunkRetain(cfg, defaults *ConfigWrapper) {
if !reflect.DeepEqual(cfg.StorageConfig.IndexQueriesCacheConfig, defaults.StorageConfig.IndexQueriesCacheConfig) {
// Set the retain period to the cache validity plus one minute. One minute is arbitrary but leaves some
// buffer to make sure the chunks are there until the index entries expire.
cfg.Ingester.RetainPeriod = cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
}
}
28 changes: 28 additions & 0 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,3 +1303,31 @@ common:
assert.Equal(t, "etcd", config.CompactorConfig.CompactorRing.KVStore.Store)
})
}

func Test_applyChunkRetain(t *testing.T) {
t.Run("chunk retain is unchanged if no index queries cache is defined", func(t *testing.T) {
yamlContent := ``
config, defaults, err := configWrapperFromYAML(t, yamlContent, nil)
assert.NoError(t, err)
assert.Equal(t, defaults.Ingester.RetainPeriod, config.Ingester.RetainPeriod)
})

t.Run("chunk retain is set to IndexCacheValidity + 1 minute", func(t *testing.T) {
yamlContent := `
storage_config:
index_cache_validity: 10m
index_queries_cache_config:
memcached:
batch_size: 256
parallelism: 10
memcached_client:
consistent_hash: true
host: memcached-index-queries.loki-bigtable.svc.cluster.local
service: memcached-client
`
config, _, err := configWrapperFromYAML(t, yamlContent, nil)
assert.NoError(t, err)
assert.Equal(t, 11*time.Minute, config.Ingester.RetainPeriod)
})

}
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (t *Loki) setupModuleManager() error {
}

// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) {
if t.Cfg.isModuleEnabled(Querier) || t.Cfg.isModuleEnabled(Ruler) || t.Cfg.isModuleEnabled(Read) {
deps[Store] = append(deps[Store], IngesterQuerier)
}

Expand Down
21 changes: 13 additions & 8 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,23 +337,28 @@ func (t *Loki) initStore() (_ services.Service, err error) {
if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
switch true {
case t.Cfg.isModuleEnabled(Ingester):
case t.Cfg.isModuleEnabled(Ingester), t.Cfg.isModuleEnabled(Write):
// We do not want ingester to unnecessarily keep downloading files
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory.
// Use fifo cache for caching index in memory, this also significantly helps performance.
t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeBytes: "200 MB",
// We snapshot the index in ingesters every minute for reads so reduce the index cache validity by a minute.
// This is usually set in StorageConfig.IndexCacheValidity but since this is exclusively used for caching the index entries,
// I(Sandeep) am setting it here which also helps reduce some CPU cycles and allocations required for
// unmarshalling the cached data to check the expiry.
// This is a small hack to save some CPU cycles.
// We check if the object is still valid after pulling it from cache using the IndexCacheValidity value
// however it has to be deserialized to do so, setting the cache validity to some arbitrary amount less than the
// IndexCacheValidity guarantees the FIFO cache will expire the object first which can be done without
// having to deserialize the object.
Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
// Force the retain period to be longer than the IndexCacheValidity used in the store, this guarantees we don't
// have query gaps on chunks flushed after an index entry is cached by keeping them retained in the ingester
// and queried as part of live data until the cache TTL expires on the index entry.
t.Cfg.Ingester.RetainPeriod = t.Cfg.StorageConfig.IndexCacheValidity + 1*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
Expand All @@ -370,7 +375,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch true {
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler):
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read):
// Do not use the AsyncStore if the querier is configured with QueryStoreOnly set to true
if t.Cfg.Querier.QueryStoreOnly {
break
Expand Down
8 changes: 4 additions & 4 deletions tools/increment_version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,28 +51,28 @@ a=( "${version//./ }" )

# If version string is missing or has the wrong number of members, show usage message.

if [ ${#a[@]} -ne 3 ]
if [[ ${#a[@]} -ne 3 ]]
then
echo "usage: $(basename "$0") [-Mmp] major.minor.patch"
exit 1
fi

# Increment version numbers as requested.

if [ -n "${major}" ]
if [[ -n "${major}" ]]
then
((a[0]++))
a[1]=0
a[2]=0
fi

if [ -n "${minor}" ]
if [[ -n "${minor}" ]]
then
((a[1]++))
a[2]=0
fi

if [ -n "${patch}" ]
if [[ -n "${patch}" ]]
then
((a[2]++))
fi
Expand Down

0 comments on commit fbebf67

Please sign in to comment.