Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc for querying ingesters to get chunk ids from its store #2601

Merged
merged 22 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d8313b8
rpc for querying ingesters to get chunk ids from its store
sandeepsukhani Sep 7, 2020
5351c97
fix broken tests
sandeepsukhani Sep 7, 2020
c4299c1
separate out code from querier to query ingester to a new type called…
sandeepsukhani Sep 10, 2020
22dce5c
add AsyncStore which helps with querying and merging results from bot…
sandeepsukhani Sep 10, 2020
8376f27
query ingesters local store for labels when active index type is bolt…
sandeepsukhani Sep 11, 2020
488b202
better naming
sandeepsukhani Sep 11, 2020
305cf4b
add some tests
sandeepsukhani Sep 11, 2020
88de6c3
add test case for duplicate chunk ids from ingesters
sandeepsukhani Sep 11, 2020
5b23b55
add some logs and spans
sandeepsukhani Sep 11, 2020
f686414
check for nil fetcher
sandeepsukhani Sep 11, 2020
3dc2c32
use require.Same to assert that two pointers are pointing to same object
sandeepsukhani Sep 11, 2020
ad5e044
some more improvements
sandeepsukhani Sep 14, 2020
4e9ee5e
fixes suggested from PR review
sandeepsukhani Sep 15, 2020
242271d
fix mode not being set before cortex_storage.NewStore is being called
sandeepsukhani Sep 15, 2020
678470d
add IngesterQuerier as a dependency for store when target is either i…
sandeepsukhani Sep 15, 2020
9f3bef5
do not cache index from ingester
sandeepsukhani Sep 15, 2020
bc6ee17
rename tests
sandeepsukhani Sep 15, 2020
4f974ac
fix broken tests
sandeepsukhani Sep 16, 2020
4c0732a
update upgrade guide to rollout ingesters before queriers
sandeepsukhani Sep 16, 2020
c05bf1f
upgrade guide changes suggested in PR review
sandeepsukhani Sep 17, 2020
e4450ae
runtime check for whether active index store is boltdb-shipper
sandeepsukhani Sep 17, 2020
2946f52
consider previous schema as well for boltdb-shipper for calculating m…
sandeepsukhani Sep 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/sources/operations/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ On this page we will document any upgrade issues/gotchas/considerations we are a
The `max_freshness` config from `results_cache` has been removed in favour of another flag called `max_cache_freshness_per_query` in `limits_config` which has the same effect.
If you happen to have `results_cache.max_freshness` set please use `limits_config.max_cache_freshness_per_query` YAML config instead.

### Important: Roll out ingesters before queriers when using BoltDB-Shipper

Ingesters now expose a new RPC method that queriers use when the index type is `boltdb-shipper`.
Queriers generally roll out faster than ingesters, so if new queriers query older ingesters using the new RPC, the queries would fail.
To avoid any query downtime during the upgrade, rollout ingesters before queriers.

## 1.6.0

### IMPORTANT: Ksonnet Port Change and Removal of NET_BIND_SERVICE Capability from docker image
### Important: Ksonnet port changed and removed NET_BIND_SERVICE capability from Docker image

In 1.5.0 we changed the Loki user to not run as root which created problems binding to port 80.
To address this we updated the docker image to add the NET_BIND_SERVICE capability to the loki process
Expand Down
8 changes: 8 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ func (s *testStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, nil
}

func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
return nil, nil, nil
}

func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig {
return nil
}

func (s *testStore) Stop() {}

func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream {
Expand Down
102 changes: 87 additions & 15 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"sync"
"time"

"github.com/grafana/loki/pkg/storage"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"

Expand All @@ -26,6 +29,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage/stores/shipper"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
Expand Down Expand Up @@ -104,7 +108,8 @@ type Ingester struct {
lifecycler *ring.Lifecycler
lifecyclerWatcher *services.FailureWatcher

store ChunkStore
store ChunkStore
periodicConfigs []chunk.PeriodConfig

loopDone sync.WaitGroup
loopQuit chan struct{}
Expand All @@ -124,6 +129,8 @@ type ChunkStore interface {
Put(ctx context.Context, chunks []chunk.Chunk) error
SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error)
SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error)
GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error)
GetSchemaConfigs() []chunk.PeriodConfig
}

// New makes a new Ingester.
Expand All @@ -137,13 +144,14 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
}

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
Expand Down Expand Up @@ -354,6 +362,64 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
return sendSampleBatches(queryServer.Context(), heapItr, queryServer)
}

// boltdbShipperMaxLookBack returns a max look back period only if active index type is boltdb-shipper.
// max look back is limited to from time of boltdb-shipper config.
// It considers previous periodic config's from time if that also has index type set to boltdb-shipper.
func (i *Ingester) boltdbShipperMaxLookBack() time.Duration {
activePeriodicConfigIndex := storage.ActivePeriodConfig(i.periodicConfigs)
activePeriodicConfig := i.periodicConfigs[activePeriodicConfigIndex]
if activePeriodicConfig.IndexType != shipper.BoltDBShipperType {
return 0
}

startTime := activePeriodicConfig.From
if activePeriodicConfigIndex != 0 && i.periodicConfigs[activePeriodicConfigIndex-1].IndexType == shipper.BoltDBShipperType {
startTime = i.periodicConfigs[activePeriodicConfigIndex-1].From
}

maxLookBack := time.Since(startTime.Time.Time())
return maxLookBack
}

// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
orgID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}

boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
if boltdbShipperMaxLookBack == 0 {
return nil, nil
}

reqStart := req.Start
reqStart = adjustQueryStartTime(boltdbShipperMaxLookBack, reqStart, time.Now())

// parse the request
start, end := listutil.RoundToMilliseconds(reqStart, req.End)
matchers, err := logql.ParseMatchers(req.Matchers)
if err != nil {
return nil, err
}

// get chunk references
chunksGroups, _, err := i.store.GetChunkRefs(ctx, orgID, start, end, matchers...)
if err != nil {
return nil, err
}

// build the response
resp := logproto.GetChunkIDsResponse{ChunkIDs: []string{}}
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
for _, chunks := range chunksGroups {
for _, chk := range chunks {
resp.ChunkIDs = append(resp.ChunkIDs, chk.ExternalKey())
}
}

return &resp, nil
}

// Label returns the set of labels for the stream this ingester knows about.
func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
Expand All @@ -367,8 +433,9 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return nil, err
}

// Only continue if we should query the store for labels
if !i.cfg.QueryStore {
// Only continue if the active index type is boltdb-shipper or QueryStore flag is true.
boltdbShipperMaxLookBack := i.boltdbShipperMaxLookBack()
if boltdbShipperMaxLookBack == 0 && !i.cfg.QueryStore {
return resp, nil
}

Expand All @@ -383,8 +450,13 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
if err != nil {
return nil, err
}

maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod
if boltdbShipperMaxLookBack != 0 {
maxLookBackPeriod = boltdbShipperMaxLookBack
}
// Adjust the start time based on QueryStoreMaxLookBackPeriod.
start := adjustQueryStartTime(i.cfg, *req.Start, time.Now())
start := adjustQueryStartTime(maxLookBackPeriod, *req.Start, time.Now())
if start.After(*req.End) {
// The request is older than we are allowed to query the store, just return what we have.
return resp, nil
Expand Down Expand Up @@ -506,17 +578,17 @@ func buildStoreRequest(cfg Config, start, end, now time.Time) (time.Time, time.T
if !cfg.QueryStore {
return time.Time{}, time.Time{}, false
}
start = adjustQueryStartTime(cfg, start, now)
start = adjustQueryStartTime(cfg.QueryStoreMaxLookBackPeriod, start, now)

if start.After(end) {
return time.Time{}, time.Time{}, false
}
return start, end, true
}

func adjustQueryStartTime(cfg Config, start, now time.Time) time.Time {
if cfg.QueryStoreMaxLookBackPeriod > 0 {
oldestStartTime := now.Add(-cfg.QueryStoreMaxLookBackPeriod)
func adjustQueryStartTime(maxLookBackPeriod time.Duration, start, now time.Time) time.Time {
if maxLookBackPeriod > 0 {
oldestStartTime := now.Add(-maxLookBackPeriod)
if oldestStartTime.After(start) {
return oldestStartTime
}
Expand Down
88 changes: 88 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -271,6 +274,14 @@ func (s *mockStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar
return nil, nil
}

func (s *mockStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
return nil, nil, nil
}

func (s *mockStore) GetSchemaConfigs() []chunk.PeriodConfig {
return nil
}

type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
Expand Down Expand Up @@ -356,3 +367,80 @@ func TestIngester_buildStoreRequest(t *testing.T) {
})
}
}

func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
now := model.Now()

for _, tc := range []struct {
name string
periodicConfigs []chunk.PeriodConfig
expectedMaxLookBack time.Duration
}{
{
name: "not using boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "bigtable",
},
},
},
{
name: "just one periodic config with boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
},
},
expectedMaxLookBack: time.Since(now.Add(-24 * time.Hour).Time()),
},
{
name: "active config boltdb-shipper, previous config non boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "bigtable",
},
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
},
},
expectedMaxLookBack: time.Since(now.Add(-24 * time.Hour).Time()),
},
{
name: "current and previous config both using boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "boltdb-shipper",
},
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "boltdb-shipper",
},
},
expectedMaxLookBack: time.Since(now.Add(-48 * time.Hour).Time()),
},
{
name: "active config non boltdb-shipper, previous config boltdb-shipper",
periodicConfigs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: now.Add(-48 * time.Hour)},
IndexType: "boltdb-shipper",
},
{
From: chunk.DayTime{Time: now.Add(-24 * time.Hour)},
IndexType: "bigtable",
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
ingester := Ingester{periodicConfigs: tc.periodicConfigs}
mlb := ingester.boltdbShipperMaxLookBack()
require.InDelta(t, tc.expectedMaxLookBack, mlb, float64(time.Second))
})
}
}
8 changes: 7 additions & 1 deletion pkg/logcli/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"text/tabwriter"
"time"

cortex_storage "github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/util"
"github.com/fatih/color"
json "github.com/json-iterator/go"
Expand Down Expand Up @@ -190,7 +191,12 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}

querier, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits, prometheus.DefaultRegisterer)
chunkStore, err := cortex_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util.Logger)
if err != nil {
return err
}

querier, err := storage.NewStore(conf.StorageConfig, conf.SchemaConfig, chunkStore, prometheus.DefaultRegisterer)
if err != nil {
return err
}
Expand Down
Loading