Skip to content

Commit

Permalink
enforce requirment for periodic config for index tables to be 24h whe…
Browse files Browse the repository at this point in the history
…n using boltdb shipper (#2166)
  • Loading branch information
sandeepsukhani authored Jul 30, 2020
1 parent 2a596a7 commit d5c840b
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 107 deletions.
21 changes: 12 additions & 9 deletions docs/sources/operations/storage/boltdb-shipper.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ It locally stores the index in BoltDB files instead and keeps shipping those fil
It also keeps syncing BoltDB files from shared object store to a configured local directory for getting index entries created by other services of same Loki cluster.
This helps run Loki with one less dependency and also saves costs in storage since object stores are likely to be much cheaper compared to cost of a hosted NoSQL store or running a self hosted instance of Cassandra.

**Note:** BoltDB shipper works best with 24h periodic index files. It is a requirement to have index period set to 24h for either active or upcoming usage of boltdb-shipper.
If boltdb-shipper already has created index files with 7 days period, and you want to retain previous data then just add a new schema config using boltdb-shipper with a future date and index files period set to 24h.

## Example Configuration

Example configuration with GCS:
Expand All @@ -23,7 +26,7 @@ schema_config:
schema: v11
index:
prefix: loki_index_
period: 168h
period: 24h

storage_config:
gcs:
Expand All @@ -45,20 +48,20 @@ Loki can be configured to run as just a single vertically scaled instance or as
When it comes to reads and writes, Ingesters are the ones which writes the index and chunks to stores and Queriers are the ones which reads index and chunks from the store for serving requests.

Before we get into more details, it is important to understand how Loki manages index in stores. Loki shards index as per configured period which defaults to 7 days i.e when it comes to table based stores like Bigtable/Cassandra/DynamoDB there would be separate table per week containing index for that week.
In case of BoltDB files there is no concept of tables so it creates a BoltDB file per week. Files/Tables created per week are identified by a configured `prefix_` + `<period-number-since-epoch>`.
Here `<period-number-since-epoch>` in case of default config would be week number since epoch.
For example, if you have prefix set to `loki_index_` and a write requests comes in on 20th April 2020, it would be stored in table/file named `loki_index_2624` because it has been `2623` weeks since epoch and we are in `2624`th week.
Since sharding of index creates multiple files when using BoltDB, BoltDB Shipper would create a folder per week and add files for that week in that folder and names those files after ingesters which created them.
In case of BoltDB files there is no concept of tables, so it creates a BoltDB file per period(i.e day in case of boltdb-shipper store). Files/Tables created per day are identified by a configured `prefix_` + `<period-number-since-epoch>`.
Here `<period-number-since-epoch>` in case of boltdb-shipper would be day number since epoch.
For example, if you have prefix set to `loki_index_` and a write request comes in on 20th April 2020, it would be stored in table/file named `loki_index_18372` because it has been `18371` days since epoch, and we are in `18372`th day.
Since sharding of index creates multiple files when using BoltDB, BoltDB Shipper would create a folder per day and add files for that day in that folder and names those files after ingesters which created them.

To show how BoltDB files in shared object store would look like, let us consider 2 ingesters named `ingester-0` and `ingester-1` running in a Loki cluster and
they both having shipped files for week `2623` and `2624` with prefix `loki_index_`, here is how the files would look like:
To show how BoltDB files in shared object store would look like, let us consider 2 ingesters named `ingester-0` and `ingester-1` running in a Loki cluster, and
they both having shipped files for day `18371` and `18372` with prefix `loki_index_`, here is how the files would look like:

```
└── index
├── loki_index_2623
├── loki_index_18371
│ ├── ingester-0
│ └── ingester-1
└── loki_index_2624
└── loki_index_18372
├── ingester-0
└── ingester-1
```
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Config struct {
Ingester ingester.Config `yaml:"ingester,omitempty"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig chunk.StoreConfig `yaml:"chunk_store_config,omitempty"`
SchemaConfig chunk.SchemaConfig `yaml:"schema_config,omitempty"`
SchemaConfig storage.SchemaConfig `yaml:"schema_config,omitempty"`
LimitsConfig validation.Limits `yaml:"limits_config,omitempty"`
TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"`
Worker frontend.WorkerConfig `yaml:"frontend_worker,omitempty"`
Expand Down
36 changes: 5 additions & 31 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http/httputil"
"net/url"
"os"
"sort"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
Expand All @@ -25,7 +24,6 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
Expand Down Expand Up @@ -187,7 +185,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort

// We want ingester to also query the store when using boltdb-shipper
pc := t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)]
pc := t.cfg.SchemaConfig.Configs[loki_storage.ActivePeriodConfig(t.cfg.SchemaConfig)]
if pc.IndexType == shipper.BoltDBShipperType {
t.cfg.Ingester.QueryStore = true
mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
Expand Down Expand Up @@ -240,7 +238,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig.Config)
util.CheckFatal("initializing bucket client", err)

t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer)
t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand All @@ -249,7 +247,7 @@ func (t *Loki) initTableManager() (services.Service, error) {
}

func (t *Loki) initStore() (_ services.Service, err error) {
if t.cfg.SchemaConfig.Configs[activePeriodConfig(t.cfg.SchemaConfig)].IndexType == shipper.BoltDBShipperType {
if t.cfg.SchemaConfig.Configs[loki_storage.ActivePeriodConfig(t.cfg.SchemaConfig)].IndexType == shipper.BoltDBShipperType {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
case Ingester:
Expand All @@ -265,7 +263,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {

// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && usingBoltdbShipper(t.cfg.SchemaConfig) {
if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig) {
t.cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}
Expand Down Expand Up @@ -295,7 +293,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
t.cfg.QueryRange,
util.Logger,
t.overrides,
t.cfg.SchemaConfig,
t.cfg.SchemaConfig.SchemaConfig,
t.cfg.Querier.QueryIngestersWithin,
prometheus.DefaultRegisterer,
)
Expand Down Expand Up @@ -360,30 +358,6 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
return t.memberlistKV, nil
}

// activePeriodConfig returns index of active PeriodicConfig which would be applicable to logs that would be pushed starting now.
// Note: Another PeriodicConfig might be applicable for future logs which can change index type.
func activePeriodConfig(cfg chunk.SchemaConfig) int {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
})
if i > 0 {
i--
}
return i
}

// usingBoltdbShipper check whether current or the next index type is boltdb-shipper, returns true if yes.
func usingBoltdbShipper(cfg chunk.SchemaConfig) bool {
activePCIndex := activePeriodConfig(cfg)
if cfg.Configs[activePCIndex].IndexType == shipper.BoltDBShipperType ||
(len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == shipper.BoltDBShipperType) {
return true
}

return false
}

func calculateMaxLookBack(pc chunk.PeriodConfig, maxLookBackConfig, maxChunkAge time.Duration) (time.Duration, error) {
if pc.ObjectType != shipper.FilesystemObjectStoreType && maxLookBackConfig.Nanoseconds() != 0 {
return 0, errors.New("it is an error to specify a non zero `query_store_max_look_back_period` value when using any object store other than `filesystem`")
Expand Down
50 changes: 0 additions & 50 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,8 @@ import (
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

func TestActiveIndexType(t *testing.T) {
var cfg chunk.SchemaConfig

// just one PeriodConfig in the past
cfg.Configs = []chunk.PeriodConfig{{
From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
IndexType: "first",
}}

assert.Equal(t, 0, activePeriodConfig(cfg))

// add a newer PeriodConfig in the past which should be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(-12 * time.Hour)},
IndexType: "second",
})
assert.Equal(t, 1, activePeriodConfig(cfg))

// add a newer PeriodConfig in the future which should not be considered
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "third",
})
assert.Equal(t, 1, activePeriodConfig(cfg))
}

func TestUsingBoltdbShipper(t *testing.T) {
var cfg chunk.SchemaConfig

// just one PeriodConfig in the past using boltdb-shipper
cfg.Configs = []chunk.PeriodConfig{{
From: chunk.DayTime{Time: model.Now().Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
}}
assert.Equal(t, true, usingBoltdbShipper(cfg))

// just one PeriodConfig in the past not using boltdb-shipper
cfg.Configs[0].IndexType = "boltdb"
assert.Equal(t, false, usingBoltdbShipper(cfg))

// add a newer PeriodConfig in the future using boltdb-shipper
cfg.Configs = append(cfg.Configs, chunk.PeriodConfig{
From: chunk.DayTime{Time: model.Now().Add(time.Hour)},
IndexType: "boltdb-shipper",
})
assert.Equal(t, true, usingBoltdbShipper(cfg))
}

func Test_calculateMaxLookBack(t *testing.T) {
type args struct {
pc chunk.PeriodConfig
Expand Down
22 changes: 12 additions & 10 deletions pkg/storage/hack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,18 @@ func getStore() (lstore.Store, error) {
},
},
chunk.StoreConfig{},
chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: start},
IndexType: "boltdb",
ObjectType: "filesystem",
Schema: "v9",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
lstore.SchemaConfig{
SchemaConfig: chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: start},
IndexType: "boltdb",
ObjectType: "filesystem",
Schema: "v9",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
},
},
},
Expand Down
57 changes: 55 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package storage

import (
"context"
"errors"
"flag"
"sort"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
cortex_local "github.com/cortexproject/cortex/pkg/chunk/local"
Expand All @@ -22,6 +24,11 @@ import (
"github.com/grafana/loki/pkg/util"
)

var (
currentBoltdbShipperNon24HoursErr = errors.New("boltdb-shipper works best with 24h periodic index config. Either add a new config with future date set to 24h to retain the existing index or change the existing config to use 24h period")
upcomingBoltdbShipperNon24HoursErr = errors.New("boltdb-shipper with future date must always have periodic config for index set to 24h")
)

// Config is the loki storage configuration
type Config struct {
storage.Config `yaml:",inline"`
Expand All @@ -36,6 +43,28 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxChunkBatchSize, "store.max-chunk-batch-size", 50, "The maximum number of chunks to fetch per batch.")
}

// SchemaConfig contains the config for our chunk index schemas
type SchemaConfig struct {
chunk.SchemaConfig `yaml:",inline"`
}

// Validate the schema config and returns an error if the validation doesn't pass
func (cfg *SchemaConfig) Validate() error {
activePCIndex := ActivePeriodConfig(*cfg)

// if current index type is boltdb-shipper and there are no upcoming index types then it should be set to 24 hours.
if cfg.Configs[activePCIndex].IndexType == shipper.BoltDBShipperType && cfg.Configs[activePCIndex].IndexTables.Period != 24*time.Hour && len(cfg.Configs)-1 == activePCIndex {
return currentBoltdbShipperNon24HoursErr
}

// if upcoming index type is boltdb-shipper, it should always be set to 24 hours.
if len(cfg.Configs)-1 > activePCIndex && (cfg.Configs[activePCIndex+1].IndexType == shipper.BoltDBShipperType && cfg.Configs[activePCIndex+1].IndexTables.Period != 24*time.Hour) {
return upcomingBoltdbShipperNon24HoursErr
}

return cfg.SchemaConfig.Validate()
}

// Store is the Loki chunk store to retrieve and save chunks.
type Store interface {
chunk.Store
Expand All @@ -50,8 +79,8 @@ type store struct {
}

// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits storage.StoreLimits, registerer prometheus.Registerer) (Store, error) {
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg, limits, registerer, nil)
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg SchemaConfig, limits storage.StoreLimits, registerer prometheus.Registerer) (Store, error) {
s, err := storage.NewStore(cfg.Config, storeCfg, schemaCfg.SchemaConfig, limits, registerer, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -315,3 +344,27 @@ func RegisterCustomIndexClients(cfg *Config, registerer prometheus.Registerer) {
return shipper.NewBoltDBShipperTableClient(objectClient), nil
})
}

// ActivePeriodConfig returns index of active PeriodicConfig which would be applicable to logs that would be pushed starting now.
// Note: Another PeriodicConfig might be applicable for future logs which can change index type.
func ActivePeriodConfig(cfg SchemaConfig) int {
now := model.Now()
i := sort.Search(len(cfg.Configs), func(i int) bool {
return cfg.Configs[i].From.Time > now
})
if i > 0 {
i--
}
return i
}

// UsingBoltdbShipper checks whether current or the next index type is boltdb-shipper, returns true if yes.
func UsingBoltdbShipper(cfg SchemaConfig) bool {
activePCIndex := ActivePeriodConfig(cfg)
if cfg.Configs[activePCIndex].IndexType == shipper.BoltDBShipperType ||
(len(cfg.Configs)-1 > activePCIndex && cfg.Configs[activePCIndex+1].IndexType == shipper.BoltDBShipperType) {
return true
}

return false
}
Loading

0 comments on commit d5c840b

Please sign in to comment.