Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingester block lifetime changes #628

Merged
merged 20 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b2fbcbf
WIP: working first draft
mdisibio Apr 5, 2021
5c44a25
Moved wal iteration/completion to tempodb and honor existing block co…
mdisibio Apr 5, 2021
9e4e4ea
Get and specify length when flushing ingester blocks to backend, to a…
mdisibio Apr 5, 2021
e14b5f5
Rename local.LocalBackend to local.Backend
mdisibio Apr 5, 2021
41b4894
Rename local.LocalBackend to local.Backend
mdisibio Apr 5, 2021
2e50481
Rename local.LocalBackend to local.Backend
mdisibio Apr 5, 2021
432e7d6
Delete obsolete complete block
mdisibio Apr 5, 2021
634ceff
Rename ingester.IngesterBlock to ingester.LocalBlock
mdisibio Apr 5, 2021
e39fca8
Update changelog
mdisibio Apr 5, 2021
25f23fe
lint
mdisibio Apr 5, 2021
b173629
Rename CompactorBlock to StreamingBlock
mdisibio Apr 6, 2021
c8e9de3
Break complete flush op into own method, remove dead code
mdisibio Apr 7, 2021
3179298
Fix StreamingBlock to finish writing data file before writing meta to…
mdisibio Apr 7, 2021
603b5e5
Don't recreate obsolete /wal/completed folder after clearing it out o…
mdisibio Apr 7, 2021
573552a
Move ReadReader to the main backend.Reader interface, leave unimpleme…
mdisibio Apr 7, 2021
d21a065
Move block copy functionality from BackendBlock to shared method
mdisibio Apr 7, 2021
d50bbeb
Move LocalBlock to wal package
mdisibio Apr 7, 2021
366b561
lint
mdisibio Apr 7, 2021
ce940bd
Restore test for wal->backend block completion after moving to /tempo…
mdisibio Apr 8, 2021
c719a69
Move local backend ownership from ingester to wal, code cleanup
mdisibio Apr 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [BUGFIX] Fixes issue where wal was deleted before successful flush and adds exponential backoff for flush errors [#593](https://github.com/grafana/tempo/pull/593)
* [BUGFIX] Fixes issue where Tempo would not parse odd length trace ids [#605](https://github.com/grafana/tempo/pull/605)
* [BUGFIX] Sort traces on flush to reduce unexpected recombination work by compactors [#606](https://github.com/grafana/tempo/pull/606)
* [BUGFIX] Ingester fully persists blocks locally to reduce amount of work done after restart [#628](https://github.com/grafana/tempo/pull/628)
* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613)

## v0.6.0
Expand Down
2 changes: 1 addition & 1 deletion modules/compactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Config struct {
func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
cfg.Compactor = tempodb.CompactorConfig{
ChunkSizeBytes: 10 * 1024 * 1024, // 10 MiB
FlushSizeBytes: 30 * 1024 * 1024, // 30 MiB
FlushSizeBytes: tempodb.DefaultFlushSizeBytes,
CompactedBlockRetention: time.Hour,
RetentionConcurrency: tempodb.DefaultRetentionConcurrency,
}
Expand Down
35 changes: 21 additions & 14 deletions modules/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
Expand Down Expand Up @@ -200,20 +201,26 @@ func (i *Ingester) flushLoop(j int) {

err = instance.ClearCompletingBlock(op.blockID)
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// Failure to delete the WAL doesn't prevent flushing the bloc
// Failure to delete the WAL doesn't prevent flushing the block
handleFailedOp(op, err)
}
} else {
retry = true
}
} else {
// add a flushOp for the block we just completed
// No delay
i.enqueue(&flushOp{
kind: opKindFlush,
userID: instance.instanceID,
blockID: op.blockID,
}, false)
err = instance.ClearCompletingBlock(op.blockID)
if err != nil {
err = errors.Wrap(err, "error clearing completing block")
handleFailedOp(op, err)
} else {
// add a flushOp for the block we just completed
// No delay
i.enqueue(&flushOp{
kind: opKindFlush,
userID: instance.instanceID,
blockID: op.blockID,
}, false)
}
}

} else {
Expand Down Expand Up @@ -268,12 +275,12 @@ func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error {
}

// Delete original wal only after successful flush
err = instance.ClearCompletingBlock(blockID)
if err != nil {
// Error deleting wal doesn't fail the flush
level.Error(log.Logger).Log("msg", "Error clearing wal", "userID", userID, "blockID", blockID.String(), "err", err)
metricFailedFlushes.Inc()
}
//err = instance.ClearCompletingBlock(blockID)
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
//if err != nil {
// // Error deleting wal doesn't fail the flush
// level.Error(log.Logger).Log("msg", "Error clearing wal", "userID", userID, "blockID", blockID.String(), "err", err)
// metricFailedFlushes.Inc()
//}

metricBlocksFlushed.Inc()
} else {
Expand Down
60 changes: 56 additions & 4 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,27 @@ package ingester

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb/backend/local"
tempodb_wal "github.com/grafana/tempo/tempodb/wal"
)

Expand All @@ -48,6 +48,10 @@ type Ingester struct {

lifecycler *ring.Lifecycler
store storage.Store
local *local.Backend
//localReader backend.Reader
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
//localWriter backend.Writer
//localCompactor backend.Compactor

flushQueues *flushqueues.ExclusiveQueues
flushQueuesDone sync.WaitGroup
Expand All @@ -66,12 +70,19 @@ func New(cfg Config, store storage.Store, limits *overrides.Overrides) (*Ingeste
flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength),
}

l, err := local.NewBackend(&local.Config{
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
Path: store.WAL().BlocksFilePath(),
})
if err != nil {
return nil, err
}
i.local = l

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
for j := 0; j < cfg.ConcurrentFlushes; j++ {
go i.flushLoop(j)
}

var err error
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, prometheus.DefaultRegisterer)
if err != nil {
return nil, fmt.Errorf("NewLifecycler failed %w", err)
Expand All @@ -94,6 +105,11 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("failed to replay wal %w", err)
}

err = i.rediscoverLocalBlocks()
if err != nil {
return fmt.Errorf("failed to rediscover local blocks %w", err)
}

// Now that user states have been created, we can start the lifecycler.
// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
Expand Down Expand Up @@ -236,7 +252,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (*instance, error) {
inst, ok = i.instances[instanceID]
if !ok {
var err error
inst, err = newInstance(instanceID, i.limiter, i.store)
inst, err = newInstance(instanceID, i.limiter, i.store, i.local)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -351,3 +367,39 @@ func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock) error {

return nil
}

func (i *Ingester) rediscoverLocalBlocks() error {
ctx := context.TODO()

tenants, err := i.local.Tenants(ctx)
if err != nil {
return errors.Wrap(err, "getting local tenants")
}

level.Info(log.Logger).Log("msg", "reloading local blocks", "tenants", len(tenants))

for _, t := range tenants {
inst, err := i.getOrCreateInstance(t)
if err != nil {
return err
}

err = inst.rediscoverLocalBlocks(ctx)
if err != nil {
return errors.Wrapf(err, "getting local blocks for tenant %v", t)
}

// Requeue needed flushes
for _, b := range inst.completeBlocks {
if b.FlushedTime().IsZero() {
i.enqueue(&flushOp{
kind: opKindFlush,
userID: t,
blockID: b.BlockMeta().BlockID,
}, true)
}
}
}

return nil
}
73 changes: 62 additions & 11 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/wal"
Expand Down Expand Up @@ -57,7 +59,7 @@ type instance struct {
blocksMtx sync.RWMutex
headBlock *wal.AppendBlock
completingBlocks []*wal.AppendBlock
completeBlocks []*encoding.CompleteBlock
completeBlocks []*LocalBlock

lastBlockCut time.Time

Expand All @@ -66,11 +68,12 @@ type instance struct {
bytesWrittenTotal prometheus.Counter
limiter *Limiter
writer tempodb.Writer
local *local.Backend

hash hash.Hash32
}

func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer) (*instance, error) {
func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *local.Backend) (*instance, error) {
i := &instance{
traces: map[uint32]*trace{},

Expand All @@ -79,6 +82,7 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer) (*i
bytesWrittenTotal: metricBytesWrittenTotal.WithLabelValues(instanceID),
limiter: limiter,
writer: writer,
local: l,

hash: fnv.New32(),
}
Expand Down Expand Up @@ -180,16 +184,20 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error {
return fmt.Errorf("error finding completingBlock")
}

// potentially long running operation placed outside blocksMtx
completeBlock, err := i.writer.CompleteBlock(completingBlock, i)
ctx := context.Background()

backendBlock, err := i.writer.CompleteBlockWithBackend(ctx, completingBlock, i, i.local, i.local)
if err != nil {
metricFailedFlushes.Inc()
level.Error(log.Logger).Log("msg", "unable to complete block.", "tenantID", i.instanceID, "err", err)
return err
return errors.Wrap(err, "error completing wal block with local backend")
}

ingesterBlock, err := NewIngesterBlock(ctx, backendBlock, i.local)
if err != nil {
return errors.Wrap(err, "error creating ingester block")
}

i.blocksMtx.Lock()
i.completeBlocks = append(i.completeBlocks, completeBlock)
i.completeBlocks = append(i.completeBlocks, ingesterBlock)
i.blocksMtx.Unlock()

return nil
Expand All @@ -216,7 +224,7 @@ func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error {
}

// GetBlockToBeFlushed gets a list of blocks that can be flushed to the backend
func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *encoding.CompleteBlock {
func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *LocalBlock {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

Expand All @@ -243,7 +251,8 @@ func (i *instance) ClearFlushedBlocks(completeBlockTimeout time.Duration) error

if flushedTime.Add(completeBlockTimeout).Before(time.Now()) {
i.completeBlocks = append(i.completeBlocks[:idx], i.completeBlocks[idx+1:]...)
err = b.Clear() // todo: don't remove from complete blocks slice until after clear succeeds?
//err = b.Clear() // todo: don't remove from complete blocks slice until after clear succeeds?
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
err = i.local.ClearBlock(b.BlockMeta().BlockID, i.instanceID)
if err == nil {
metricBlocksClearedTotal.Inc()
}
Expand Down Expand Up @@ -291,7 +300,7 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) {

// completeBlock
for _, c := range i.completeBlocks {
foundBytes, err = c.Find(id, i)
foundBytes, err = c.Find(context.TODO(), id)
if err != nil {
return nil, fmt.Errorf("completeBlock.Find failed: %w", err)
}
Expand Down Expand Up @@ -404,3 +413,45 @@ func pushRequestTraceID(req *tempopb.PushRequest) ([]byte, error) {

return req.Batch.InstrumentationLibrarySpans[0].Spans[0].TraceId, nil
}

func (i *instance) rediscoverLocalBlocks(ctx context.Context) error {
ids, err := i.local.Blocks(ctx, i.instanceID)
if err != nil {
return err
}

for _, id := range ids {
meta, err := i.local.BlockMeta(ctx, id, i.instanceID)
if err != nil {
if err == backend.ErrMetaDoesNotExist {
// Partial/incomplete block found, remove, it will be recreated from data in the wal.
level.Warn(log.Logger).Log("msg", "Unable to reload meta for local block. This indicates an incomplete block and will be deleted", "tenant", i.instanceID, "block", id.String())
err = i.local.ClearBlock(id, i.instanceID)
if err != nil {
return errors.Wrapf(err, "deleting bad local block tenant %v block %v", i.instanceID, id.String())
}
continue
}

return err
}

b, err := encoding.NewBackendBlock(meta, i.local)
if err != nil {
return err
}

ib, err := NewIngesterBlock(ctx, b, i.local)
if err != nil {
return err
}

i.blocksMtx.Lock()
i.completeBlocks = append(i.completeBlocks, ib)
i.blocksMtx.Unlock()

level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime())
}

return nil
}
Loading