diff --git a/CHANGELOG.md b/CHANGELOG.md index f9978b37217..f8ecc5102eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [BUGFIX] Fixes issue where wal was deleted before successful flush and adds exponential backoff for flush errors [#593](https://github.com/grafana/tempo/pull/593) * [BUGFIX] Fixes issue where Tempo would not parse odd length trace ids [#605](https://github.com/grafana/tempo/pull/605) * [BUGFIX] Sort traces on flush to reduce unexpected recombination work by compactors [#606](https://github.com/grafana/tempo/pull/606) +* [BUGFIX] Ingester fully persists blocks locally to reduce amount of work done after restart [#628](https://github.com/grafana/tempo/pull/628) * [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613) ## v0.6.0 diff --git a/modules/compactor/config.go b/modules/compactor/config.go index 2f06f625db7..836bd47a329 100644 --- a/modules/compactor/config.go +++ b/modules/compactor/config.go @@ -21,7 +21,7 @@ type Config struct { func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { cfg.Compactor = tempodb.CompactorConfig{ ChunkSizeBytes: 10 * 1024 * 1024, // 10 MiB - FlushSizeBytes: 30 * 1024 * 1024, // 30 MiB + FlushSizeBytes: tempodb.DefaultFlushSizeBytes, CompactedBlockRetention: time.Hour, RetentionConcurrency: tempodb.DefaultRetentionConcurrency, } diff --git a/modules/ingester/flush.go b/modules/ingester/flush.go index 03ddb3c43de..c18ade317be 100644 --- a/modules/ingester/flush.go +++ b/modules/ingester/flush.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" @@ -173,57 +174,17 @@ func (i *Ingester) flushLoop(j int) { op := o.(*flushOp) op.attempts++ - retry := false + var retry bool + var err error if op.kind == opKindComplete { - // No point in proceeding if shutdown has been initiated since - // we won't be able to queue up the next flush op - if i.flushQueues.IsStopped() { - handleAbandonedOp(op) - continue - } - - level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID) - instance, err := i.getOrCreateInstance(op.userID) - if err != nil { - handleFailedOp(op, err) - continue - } - - err = instance.CompleteBlock(op.blockID) - if err != nil { - handleFailedOp(op, err) - - if op.attempts >= maxCompleteAttempts { - level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS", - "userID", op.userID, "attempts", op.attempts, "block", op.blockID.String()) - - err = instance.ClearCompletingBlock(op.blockID) - if err != nil { - // Failure to delete the WAL doesn't prevent flushing the bloc - handleFailedOp(op, err) - } - } else { - retry = true - } - } else { - // add a flushOp for the block we just completed - // No delay - i.enqueue(&flushOp{ - kind: opKindFlush, - userID: instance.instanceID, - blockID: op.blockID, - }, false) - } - + retry, err = i.handleComplete(op) } else { - level.Info(log.Logger).Log("msg", "flushing block", "userid", op.userID, "block", op.blockID.String()) + retry, err = i.handleFlush(op.userID, op.blockID) + } - err := i.flushBlock(op.userID, op.blockID) - if err != nil { - handleFailedOp(op, err) - retry = true - } + if err != nil { + handleFailedOp(op, err) } if retry { @@ -245,14 +206,62 @@ func handleAbandonedOp(op *flushOp) { "op", op.kind, "block", op.blockID.String(), "attempts", op.attempts) } -func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error { +func (i *Ingester) handleComplete(op *flushOp) (retry bool, err error) { + // No point in proceeding if shutdown has been initiated since + // we won't be able to queue up the next flush op + if i.flushQueues.IsStopped() { + handleAbandonedOp(op) + return false, nil + } + + level.Debug(log.Logger).Log("msg", "completing block", "userid", op.userID) + instance, err := i.getOrCreateInstance(op.userID) + if err != nil { + return false, err + } + + err = instance.CompleteBlock(op.blockID) + if err != nil { + handleFailedOp(op, err) + + if op.attempts >= maxCompleteAttempts { + level.Error(log.WithUserID(op.userID, log.Logger)).Log("msg", "Block exceeded max completion errors. Deleting. POSSIBLE DATA LOSS", + "userID", op.userID, "attempts", op.attempts, "block", op.blockID.String()) + + // Delete WAL and move on + err = instance.ClearCompletingBlock(op.blockID) + return false, err + } + + return true, nil + } + + err = instance.ClearCompletingBlock(op.blockID) + if err != nil { + return false, errors.Wrap(err, "error clearing completing block") + } + + // add a flushOp for the block we just completed + // No delay + i.enqueue(&flushOp{ + kind: opKindFlush, + userID: instance.instanceID, + blockID: op.blockID, + }, false) + + return false, nil +} + +func (i *Ingester) handleFlush(userID string, blockID uuid.UUID) (retry bool, err error) { + level.Info(log.Logger).Log("msg", "flushing block", "userid", userID, "block", blockID.String()) + instance, err := i.getOrCreateInstance(userID) if err != nil { - return err + return true, err } if instance == nil { - return fmt.Errorf("instance id %s not found", userID) + return false, fmt.Errorf("instance id %s not found", userID) } if block := instance.GetBlockToBeFlushed(blockID); block != nil { @@ -264,23 +273,15 @@ func (i *Ingester) flushBlock(userID string, blockID uuid.UUID) error { err = i.store.WriteBlock(ctx, block) metricFlushDuration.Observe(time.Since(start).Seconds()) if err != nil { - return err - } - - // Delete original wal only after successful flush - err = instance.ClearCompletingBlock(blockID) - if err != nil { - // Error deleting wal doesn't fail the flush - level.Error(log.Logger).Log("msg", "Error clearing wal", "userID", userID, "blockID", blockID.String(), "err", err) - metricFailedFlushes.Inc() + return true, err } metricBlocksFlushed.Inc() } else { - return fmt.Errorf("error getting block to flush") + return false, fmt.Errorf("error getting block to flush") } - return nil + return false, nil } func (i *Ingester) enqueue(op *flushOp, jitter bool) { diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index eb20448aba0..b8162ac2403 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -2,7 +2,6 @@ package ingester import ( "context" - "errors" "fmt" "sync" "time" @@ -10,6 +9,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/opentracing/opentracing-go" ot_log "github.com/opentracing/opentracing-go/log" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/user" @@ -17,12 +17,12 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" - "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/storage" "github.com/grafana/tempo/pkg/flushqueues" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/validation" + "github.com/grafana/tempo/tempodb/backend/local" tempodb_wal "github.com/grafana/tempo/tempodb/wal" ) @@ -48,6 +48,7 @@ type Ingester struct { lifecycler *ring.Lifecycler store storage.Store + local *local.Backend flushQueues *flushqueues.ExclusiveQueues flushQueuesDone sync.WaitGroup @@ -66,16 +67,18 @@ func New(cfg Config, store storage.Store, limits *overrides.Overrides) (*Ingeste flushQueues: flushqueues.New(cfg.ConcurrentFlushes, metricFlushQueueLength), } + i.local = store.WAL().LocalBackend() + i.flushQueuesDone.Add(cfg.ConcurrentFlushes) for j := 0; j < cfg.ConcurrentFlushes; j++ { go i.flushLoop(j) } - var err error - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, prometheus.DefaultRegisterer) + lc, err := ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", cfg.OverrideRingKey, true, prometheus.DefaultRegisterer) if err != nil { return nil, fmt.Errorf("NewLifecycler failed %w", err) } + i.lifecycler = lc // Now that the lifecycler has been created, we can create the limiter // which depends on it. @@ -94,6 +97,11 @@ func (i *Ingester) starting(ctx context.Context) error { return fmt.Errorf("failed to replay wal %w", err) } + err = i.rediscoverLocalBlocks() + if err != nil { + return fmt.Errorf("failed to rediscover local blocks %w", err) + } + // Now that user states have been created, we can start the lifecycler. // Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context if err := i.lifecycler.StartAsync(context.Background()); err != nil { @@ -236,7 +244,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) (*instance, error) { inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(instanceID, i.limiter, i.store) + inst, err = newInstance(instanceID, i.limiter, i.store, i.local) if err != nil { return nil, err } @@ -351,3 +359,39 @@ func (i *Ingester) replayBlock(b *tempodb_wal.ReplayBlock) error { return nil } + +func (i *Ingester) rediscoverLocalBlocks() error { + ctx := context.TODO() + + tenants, err := i.local.Tenants(ctx) + if err != nil { + return errors.Wrap(err, "getting local tenants") + } + + level.Info(log.Logger).Log("msg", "reloading local blocks", "tenants", len(tenants)) + + for _, t := range tenants { + inst, err := i.getOrCreateInstance(t) + if err != nil { + return err + } + + err = inst.rediscoverLocalBlocks(ctx) + if err != nil { + return errors.Wrapf(err, "getting local blocks for tenant %v", t) + } + + // Requeue needed flushes + for _, b := range inst.completeBlocks { + if b.FlushedTime().IsZero() { + i.enqueue(&flushOp{ + kind: opKindFlush, + userID: t, + blockID: b.BlockMeta().BlockID, + }, true) + } + } + } + + return nil +} diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 9365f5b8fcd..47889b2d9d9 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -22,6 +22,8 @@ import ( "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/wal" @@ -57,7 +59,7 @@ type instance struct { blocksMtx sync.RWMutex headBlock *wal.AppendBlock completingBlocks []*wal.AppendBlock - completeBlocks []*encoding.CompleteBlock + completeBlocks []*wal.LocalBlock lastBlockCut time.Time @@ -66,11 +68,12 @@ type instance struct { bytesWrittenTotal prometheus.Counter limiter *Limiter writer tempodb.Writer + local *local.Backend hash hash.Hash32 } -func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer) (*instance, error) { +func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer, l *local.Backend) (*instance, error) { i := &instance{ traces: map[uint32]*trace{}, @@ -79,6 +82,7 @@ func newInstance(instanceID string, limiter *Limiter, writer tempodb.Writer) (*i bytesWrittenTotal: metricBytesWrittenTotal.WithLabelValues(instanceID), limiter: limiter, writer: writer, + local: l, hash: fnv.New32(), } @@ -180,16 +184,20 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error { return fmt.Errorf("error finding completingBlock") } - // potentially long running operation placed outside blocksMtx - completeBlock, err := i.writer.CompleteBlock(completingBlock, i) + ctx := context.Background() + + backendBlock, err := i.writer.CompleteBlockWithBackend(ctx, completingBlock, i, i.local, i.local) if err != nil { - metricFailedFlushes.Inc() - level.Error(log.Logger).Log("msg", "unable to complete block.", "tenantID", i.instanceID, "err", err) - return err + return errors.Wrap(err, "error completing wal block with local backend") + } + + ingesterBlock, err := wal.NewLocalBlock(ctx, backendBlock, i.local) + if err != nil { + return errors.Wrap(err, "error creating ingester block") } i.blocksMtx.Lock() - i.completeBlocks = append(i.completeBlocks, completeBlock) + i.completeBlocks = append(i.completeBlocks, ingesterBlock) i.blocksMtx.Unlock() return nil @@ -216,7 +224,7 @@ func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error { } // GetBlockToBeFlushed gets a list of blocks that can be flushed to the backend -func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *encoding.CompleteBlock { +func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *wal.LocalBlock { i.blocksMtx.Lock() defer i.blocksMtx.Unlock() @@ -243,7 +251,7 @@ func (i *instance) ClearFlushedBlocks(completeBlockTimeout time.Duration) error if flushedTime.Add(completeBlockTimeout).Before(time.Now()) { i.completeBlocks = append(i.completeBlocks[:idx], i.completeBlocks[idx+1:]...) - err = b.Clear() // todo: don't remove from complete blocks slice until after clear succeeds? + err = i.local.ClearBlock(b.BlockMeta().BlockID, i.instanceID) if err == nil { metricBlocksClearedTotal.Inc() } @@ -291,7 +299,7 @@ func (i *instance) FindTraceByID(id []byte) (*tempopb.Trace, error) { // completeBlock for _, c := range i.completeBlocks { - foundBytes, err = c.Find(id, i) + foundBytes, err = c.Find(context.TODO(), id) if err != nil { return nil, fmt.Errorf("completeBlock.Find failed: %w", err) } @@ -404,3 +412,45 @@ func pushRequestTraceID(req *tempopb.PushRequest) ([]byte, error) { return req.Batch.InstrumentationLibrarySpans[0].Spans[0].TraceId, nil } + +func (i *instance) rediscoverLocalBlocks(ctx context.Context) error { + ids, err := i.local.Blocks(ctx, i.instanceID) + if err != nil { + return err + } + + for _, id := range ids { + meta, err := i.local.BlockMeta(ctx, id, i.instanceID) + if err != nil { + if err == backend.ErrMetaDoesNotExist { + // Partial/incomplete block found, remove, it will be recreated from data in the wal. + level.Warn(log.Logger).Log("msg", "Unable to reload meta for local block. This indicates an incomplete block and will be deleted", "tenant", i.instanceID, "block", id.String()) + err = i.local.ClearBlock(id, i.instanceID) + if err != nil { + return errors.Wrapf(err, "deleting bad local block tenant %v block %v", i.instanceID, id.String()) + } + continue + } + + return err + } + + b, err := encoding.NewBackendBlock(meta, i.local) + if err != nil { + return err + } + + ib, err := wal.NewLocalBlock(ctx, b, i.local) + if err != nil { + return err + } + + i.blocksMtx.Lock() + i.completeBlocks = append(i.completeBlocks, ib) + i.blocksMtx.Unlock() + + level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime()) + } + + return nil +} diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index cc8254cd8cf..59942e98f77 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -46,7 +46,7 @@ func TestInstance(t *testing.T) { ingester, _, _ := defaultIngester(t, tempDir) request := test.MakeRequest(10, []byte{}) - i, err := newInstance("fake", limiter, ingester.store) + i, err := newInstance("fake", limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") err = i.Push(context.Background(), request) assert.NoError(t, err) @@ -118,7 +118,7 @@ func TestInstanceFind(t *testing.T) { defer os.RemoveAll(tempDir) ingester, _, _ := defaultIngester(t, tempDir) - i, err := newInstance("fake", limiter, ingester.store) + i, err := newInstance("fake", limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") request := test.MakeRequest(10, []byte{}) @@ -151,7 +151,7 @@ func TestInstanceDoesNotRace(t *testing.T) { ingester, _, _ := defaultIngester(t, tempDir) - i, err := newInstance("fake", limiter, ingester.store) + i, err := newInstance("fake", limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") end := make(chan struct{}) @@ -226,7 +226,7 @@ func TestInstanceLimits(t *testing.T) { ingester, _, _ := defaultIngester(t, tempDir) - i, err := newInstance("fake", limiter, ingester.store) + i, err := newInstance("fake", limiter, ingester.store, ingester.local) assert.NoError(t, err, "unexpected error creating new instance") type push struct { @@ -459,11 +459,16 @@ func TestInstanceCutBlockIfReady(t *testing.T) { } } -func defaultInstance(t assert.TestingT, tmpDir string) *instance { +func defaultInstance(t require.TestingT, tmpDir string) *instance { limits, err := overrides.NewOverrides(overrides.Limits{}) assert.NoError(t, err, "unexpected error creating limits") limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + l, err := local.NewBackend(&local.Config{ + Path: tmpDir + "/blocks", + }) + require.NoError(t, err) + s, err := storage.NewStore(storage.Config{ Trace: tempodb.Config{ Backend: "local", @@ -483,7 +488,7 @@ func defaultInstance(t assert.TestingT, tmpDir string) *instance { }, log.NewNopLogger()) assert.NoError(t, err, "unexpected error creating store") - instance, err := newInstance("fake", limiter, s) + instance, err := newInstance("fake", limiter, s, l) assert.NoError(t, err, "unexpected error creating new instance") return instance diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go index 48dca1c6c6e..77f3fb7d871 100644 --- a/modules/querier/querier_test.go +++ b/modules/querier/querier_test.go @@ -91,10 +91,7 @@ func TestReturnAllHits(t *testing.T) { err = head.Write(testTraceID, bReq) assert.NoError(t, err, "unexpected error writing req") - complete, err := w.CompleteBlock(head, &mockSharder{}) - assert.NoError(t, err) - - err = w.WriteBlock(context.Background(), complete) + _, err = w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) } diff --git a/tempodb/backend/azure/azure.go b/tempodb/backend/azure/azure.go index bd4f922c59d..000c898e960 100644 --- a/tempodb/backend/azure/azure.go +++ b/tempodb/backend/azure/azure.go @@ -197,6 +197,10 @@ func (rw *readerWriter) Read(ctx context.Context, name string, blockID uuid.UUID return rw.readAll(derivedCtx, util.ObjectFileName(blockID, tenantID, name)) } +func (rw *readerWriter) ReadReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { + panic("ReadReader is not yet supported for Azure backend") +} + // ReadRange implements backend.Reader func (rw *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "ReadRange") diff --git a/tempodb/backend/backend.go b/tempodb/backend/backend.go index 674383ce831..b7fc0598c72 100644 --- a/tempodb/backend/backend.go +++ b/tempodb/backend/backend.go @@ -33,6 +33,8 @@ type Writer interface { type Reader interface { // Reader is for reading entire objects from the backend. It is expected that there will be an attempt to retrieve this from cache Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) + // ReadReader is for streaming entire objects from the backend. It is expected this will _not_ be cached. + ReadReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) // ReadRange is for reading parts of large objects from the backend. It is expected this will _not_ be cached. ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error diff --git a/tempodb/backend/cache/cache.go b/tempodb/backend/cache/cache.go index 49d1e419d12..d04fe1e1085 100644 --- a/tempodb/backend/cache/cache.go +++ b/tempodb/backend/cache/cache.go @@ -61,6 +61,10 @@ func (r *readerWriter) Read(ctx context.Context, name string, blockID uuid.UUID, return val, err } +func (r *readerWriter) ReadReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { + panic("ReadReader is not yet supported for cache") +} + // ReadRange implements backend.Reader func (r *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { return r.nextReader.ReadRange(ctx, name, blockID, tenantID, offset, buffer) diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go index d990a0a08ae..c17548f54cd 100644 --- a/tempodb/backend/gcs/gcs.go +++ b/tempodb/backend/gcs/gcs.go @@ -233,6 +233,10 @@ func (rw *readerWriter) Read(ctx context.Context, name string, blockID uuid.UUID return bytes, err } +func (rw *readerWriter) ReadReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { + panic("ReadReader is not yet supported for GCS backend") +} + // ReadRange implements backend.Reader func (rw *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.ReadRange") diff --git a/tempodb/backend/local/compactor.go b/tempodb/backend/local/compactor.go index 2d321ec0402..4ecc937efce 100644 --- a/tempodb/backend/local/compactor.go +++ b/tempodb/backend/local/compactor.go @@ -12,7 +12,7 @@ import ( "github.com/grafana/tempo/tempodb/backend" ) -func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error { +func (rw *Backend) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error { // move meta file to a new location metaFilename := rw.metaFileName(blockID, tenantID) compactedMetaFilename := rw.compactedMetaFileName(blockID, tenantID) @@ -20,7 +20,7 @@ func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) e return os.Rename(metaFilename, compactedMetaFilename) } -func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error { +func (rw *Backend) ClearBlock(blockID uuid.UUID, tenantID string) error { if len(tenantID) == 0 { return fmt.Errorf("empty tenant id") } @@ -32,7 +32,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error { return os.RemoveAll(rw.rootPath(blockID, tenantID)) } -func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*backend.CompactedBlockMeta, error) { +func (rw *Backend) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*backend.CompactedBlockMeta, error) { filename := rw.compactedMetaFileName(blockID, tenantID) fi, err := os.Stat(filename) @@ -58,6 +58,6 @@ func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) ( return out, err } -func (rw *readerWriter) compactedMetaFileName(blockID uuid.UUID, tenantID string) string { +func (rw *Backend) compactedMetaFileName(blockID uuid.UUID, tenantID string) string { return path.Join(rw.rootPath(blockID, tenantID), "meta.compacted.json") } diff --git a/tempodb/backend/local/local.go b/tempodb/backend/local/local.go index 1c71bc2d254..eab11c172b5 100644 --- a/tempodb/backend/local/local.go +++ b/tempodb/backend/local/local.go @@ -14,30 +14,39 @@ import ( "github.com/grafana/tempo/tempodb/backend" ) -type readerWriter struct { +type Backend struct { cfg *Config } -func New(cfg *Config) (backend.Reader, backend.Writer, backend.Compactor, error) { +var _ backend.Reader = (*Backend)(nil) +var _ backend.Writer = (*Backend)(nil) +var _ backend.Compactor = (*Backend)(nil) + +func NewBackend(cfg *Config) (*Backend, error) { err := os.MkdirAll(cfg.Path, os.ModePerm) if err != nil { - return nil, nil, nil, err + return nil, err } - rw := &readerWriter{ + l := &Backend{ cfg: cfg, } - return rw, rw, rw, nil + return l, nil +} + +func New(cfg *Config) (backend.Reader, backend.Writer, backend.Compactor, error) { + l, err := NewBackend(cfg) + return l, l, l, err } // Write implements backend.Writer -func (rw *readerWriter) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error { +func (rw *Backend) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error { return rw.WriteReader(ctx, name, blockID, tenantID, bytes.NewBuffer(buffer), int64(len(buffer))) } // WriteReader implements backend.Writer -func (rw *readerWriter) WriteReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, size int64) error { +func (rw *Backend) WriteReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string, data io.Reader, _ int64) error { blockFolder := rw.rootPath(blockID, tenantID) err := os.MkdirAll(blockFolder, os.ModePerm) if err != nil { @@ -59,7 +68,7 @@ func (rw *readerWriter) WriteReader(ctx context.Context, name string, blockID uu } // WriteBlockMeta implements backend.Writer -func (rw *readerWriter) WriteBlockMeta(ctx context.Context, meta *backend.BlockMeta) error { +func (rw *Backend) WriteBlockMeta(ctx context.Context, meta *backend.BlockMeta) error { blockID := meta.BlockID tenantID := meta.TenantID @@ -84,7 +93,7 @@ func (rw *readerWriter) WriteBlockMeta(ctx context.Context, meta *backend.BlockM } // Append implements backend.Writer -func (rw *readerWriter) Append(ctx context.Context, name string, blockID uuid.UUID, tenantID string, tracker backend.AppendTracker, buffer []byte) (backend.AppendTracker, error) { +func (rw *Backend) Append(ctx context.Context, name string, blockID uuid.UUID, tenantID string, tracker backend.AppendTracker, buffer []byte) (backend.AppendTracker, error) { var dst *os.File if tracker == nil { blockFolder := rw.rootPath(blockID, tenantID) @@ -111,7 +120,7 @@ func (rw *readerWriter) Append(ctx context.Context, name string, blockID uuid.UU } // CloseAppend implements backend.Writer -func (rw *readerWriter) CloseAppend(ctx context.Context, tracker backend.AppendTracker) error { +func (rw *Backend) CloseAppend(ctx context.Context, tracker backend.AppendTracker) error { if tracker == nil { return nil } @@ -121,7 +130,7 @@ func (rw *readerWriter) CloseAppend(ctx context.Context, tracker backend.AppendT } // Tenants implements backend.Reader -func (rw *readerWriter) Tenants(ctx context.Context) ([]string, error) { +func (rw *Backend) Tenants(ctx context.Context) ([]string, error) { folders, err := ioutil.ReadDir(rw.cfg.Path) if err != nil { return nil, err @@ -139,7 +148,7 @@ func (rw *readerWriter) Tenants(ctx context.Context) ([]string, error) { } // Blocks implements backend.Reader -func (rw *readerWriter) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) { +func (rw *Backend) Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error) { var warning error path := path.Join(rw.cfg.Path, tenantID) folders, err := ioutil.ReadDir(path) @@ -164,7 +173,7 @@ func (rw *readerWriter) Blocks(ctx context.Context, tenantID string) ([]uuid.UUI } // BlockMeta implements backend.Reader -func (rw *readerWriter) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*backend.BlockMeta, error) { +func (rw *Backend) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*backend.BlockMeta, error) { filename := rw.metaFileName(blockID, tenantID) bytes, err := ioutil.ReadFile(filename) if os.IsNotExist(err) { @@ -184,13 +193,13 @@ func (rw *readerWriter) BlockMeta(ctx context.Context, blockID uuid.UUID, tenant } // Read implements backend.Reader -func (rw *readerWriter) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) { +func (rw *Backend) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) { filename := rw.objectFileName(blockID, tenantID, name) return ioutil.ReadFile(filename) } // ReadRange implements backend.Reader -func (rw *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { +func (rw *Backend) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { filename := rw.objectFileName(blockID, tenantID, name) f, err := os.OpenFile(filename, os.O_RDONLY, 0644) @@ -207,19 +216,35 @@ func (rw *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid return nil } +func (rw *Backend) ReadReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { + filename := rw.objectFileName(blockID, tenantID, name) + + f, err := os.OpenFile(filename, os.O_RDONLY, 0644) + if err != nil { + return nil, -1, err + } + + stat, err := f.Stat() + if err != nil { + return nil, -1, err + } + + return f, stat.Size(), err +} + // Shutdown implements backend.Reader -func (rw *readerWriter) Shutdown() { +func (rw *Backend) Shutdown() { } -func (rw *readerWriter) objectFileName(blockID uuid.UUID, tenantID string, name string) string { +func (rw *Backend) objectFileName(blockID uuid.UUID, tenantID string, name string) string { return filepath.Join(rw.rootPath(blockID, tenantID), name) } -func (rw *readerWriter) metaFileName(blockID uuid.UUID, tenantID string) string { +func (rw *Backend) metaFileName(blockID uuid.UUID, tenantID string) string { return filepath.Join(rw.rootPath(blockID, tenantID), "meta.json") } -func (rw *readerWriter) rootPath(blockID uuid.UUID, tenantID string) string { +func (rw *Backend) rootPath(blockID uuid.UUID, tenantID string) string { return filepath.Join(rw.cfg.Path, tenantID, blockID.String()) } diff --git a/tempodb/backend/s3/s3.go b/tempodb/backend/s3/s3.go index d7ca32861ab..0573de28976 100644 --- a/tempodb/backend/s3/s3.go +++ b/tempodb/backend/s3/s3.go @@ -324,6 +324,10 @@ func (rw *readerWriter) Read(ctx context.Context, name string, blockID uuid.UUID return rw.readAll(derivedCtx, util.ObjectFileName(blockID, tenantID, name)) } +func (rw *readerWriter) ReadReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { + panic("ReadReader is not yet supported for S3 backend") +} + // ReadRange implements backend.Reader func (rw *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "ReadRange") diff --git a/tempodb/backend/util/test.go b/tempodb/backend/util/test.go index 3e20c2ccaec..11dcb713a29 100644 --- a/tempodb/backend/util/test.go +++ b/tempodb/backend/util/test.go @@ -33,6 +33,11 @@ func (m *MockReader) Read(ctx context.Context, name string, blockID uuid.UUID, t return m.R, nil } + +func (m *MockReader) ReadReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error) { + panic("ReadReader is not yet supported for mock reader") +} + func (m *MockReader) ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error { copy(buffer, m.Range) diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 97839c45d50..00bd23c488f 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -53,6 +53,8 @@ const ( outputBlocks = 1 compactionCycle = 30 * time.Second + + DefaultFlushSizeBytes uint32 = 30 * 1024 * 1024 // 30 MiB ) // todo: pass a context/chan in to cancel this cleanly @@ -167,7 +169,7 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string recordsPerBlock := (totalRecords / outputBlocks) var newCompactedBlocks []*backend.BlockMeta - var currentBlock *encoding.CompactorBlock + var currentBlock *encoding.StreamingBlock var tracker backend.AppendTracker for !allDone(ctx, bookmarks) { @@ -208,7 +210,7 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string // make a new block if necessary if currentBlock == nil { - currentBlock, err = encoding.NewCompactorBlock(rw.cfg.Block, uuid.New(), tenantID, blockMetas, recordsPerBlock) + currentBlock, err = encoding.NewStreamingBlock(rw.cfg.Block, uuid.New(), tenantID, blockMetas, recordsPerBlock) if err != nil { return errors.Wrap(err, "error making new compacted block") } @@ -259,7 +261,7 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string return nil } -func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.CompactorBlock) (backend.AppendTracker, error) { +func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.StreamingBlock) (backend.AppendTracker, error) { compactionLevelLabel := strconv.Itoa(int(block.BlockMeta().CompactionLevel - 1)) metricCompactionObjectsWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferedObjects())) @@ -272,7 +274,7 @@ func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encodin return tracker, nil } -func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.CompactorBlock) error { +func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.StreamingBlock) error { level.Info(rw.logger).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta())) bytesFlushed, err := block.Complete(context.TODO(), tracker, rw.w) diff --git a/tempodb/compactor_bookmark_test.go b/tempodb/compactor_bookmark_test.go index 1030cca9b2e..47ce6046ce7 100644 --- a/tempodb/compactor_bookmark_test.go +++ b/tempodb/compactor_bookmark_test.go @@ -27,7 +27,7 @@ func TestCurrentClear(t *testing.T) { defer os.RemoveAll(tempDir) require.NoError(t, err, "unexpected error creating temp dir") - r, w, c, err := New(&Config{ + _, w, c, err := New(&Config{ Backend: "local", Local: &local.Config{ Path: path.Join(tempDir, "traces"), @@ -73,13 +73,7 @@ func TestCurrentClear(t *testing.T) { complete, err := w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) - err = w.WriteBlock(context.Background(), complete) - assert.NoError(t, err) - - rw := r.(*readerWriter) - block, err := encoding.NewBackendBlock(complete.BlockMeta(), rw.r) - assert.NoError(t, err) - iter, err := block.Iterator(10) + iter, err := complete.Iterator(10) assert.NoError(t, err) bm := newBookmark(iter) diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 7a1ce4fb51b..a5dda2cc18a 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -114,11 +114,11 @@ func TestCompaction(t *testing.T) { allReqs = append(allReqs, reqs...) allIds = append(allIds, ids...) - complete, err := w.CompleteBlock(head, &mockSharder{}) + _, err = w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) - err = w.WriteBlock(context.Background(), complete) - assert.NoError(t, err) + //err = w.WriteBlock(context.Background(), complete) + //assert.NoError(t, err) } rw := r.(*readerWriter) @@ -227,10 +227,7 @@ func TestSameIDCompaction(t *testing.T) { err = head.Write(id, rec) assert.NoError(t, err, "unexpected error writing req") - complete, err := w.CompleteBlock(head, &mockSharder{}) - assert.NoError(t, err) - - err = w.WriteBlock(context.Background(), complete) + _, err = w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) } @@ -473,10 +470,7 @@ func cutTestBlocks(t *testing.T, w Writer, tenantID string, blockCount int, reco assert.NoError(t, err, "unexpected error writing rec") } - complete, err := w.CompleteBlock(head, &mockSharder{}) - assert.NoError(t, err) - - err = w.WriteBlock(context.Background(), complete) + _, err = w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) } } diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index 3a768b1432c..d61aa3cd27a 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -125,3 +125,7 @@ func (b *BackendBlock) NewIndexReader() (common.IndexReader, error) { return reader, nil } + +func (b *BackendBlock) BlockMeta() *backend.BlockMeta { + return b.meta +} diff --git a/tempodb/encoding/block.go b/tempodb/encoding/block.go index 91cbaba55dc..e56a4a1acb9 100644 --- a/tempodb/encoding/block.go +++ b/tempodb/encoding/block.go @@ -3,11 +3,11 @@ package encoding import ( "context" "fmt" - "io" "strconv" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/pkg/errors" ) const ( @@ -54,12 +54,47 @@ func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMe return nil } -// writeBlockData writes the data object from an io.Reader to the backend.Writer -func writeBlockData(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, r io.Reader, size int64) error { - return w.WriteReader(ctx, nameObjects, meta.BlockID, meta.TenantID, r, size) -} - // appendBlockData appends the bytes passed to the block data func appendBlockData(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, tracker backend.AppendTracker, buffer []byte) (backend.AppendTracker, error) { return w.Append(ctx, nameObjects, meta.BlockID, meta.TenantID, tracker, buffer) } + +// CopyBlock copies a block from one backend to another. It is done at a low level, all encoding/formatting is preserved. +func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader, dest backend.Writer) error { + blockID := meta.BlockID + tenantID := meta.TenantID + + copy := func(name string) error { + reader, size, err := src.ReadReader(ctx, name, blockID, tenantID) + if err != nil { + return errors.Wrapf(err, "error reading %s", name) + } + defer reader.Close() + + return dest.WriteReader(ctx, name, blockID, tenantID, reader, size) + } + + // Data + err := copy(nameObjects) + if err != nil { + return err + } + + // Bloom + for i := 0; i < common.GetShardNum(); i++ { + err = copy(bloomName(i)) + if err != nil { + return err + } + } + + // Index + err = copy(nameIndex) + if err != nil { + return err + } + + // Meta + err = dest.WriteBlockMeta(ctx, meta) + return err +} diff --git a/tempodb/encoding/complete_block.go b/tempodb/encoding/complete_block.go deleted file mode 100644 index 599ae854803..00000000000 --- a/tempodb/encoding/complete_block.go +++ /dev/null @@ -1,196 +0,0 @@ -package encoding - -import ( - "context" - "fmt" - "os" - "sync" - "time" - - "go.uber.org/atomic" - - "github.com/grafana/tempo/tempodb/backend" - "github.com/grafana/tempo/tempodb/encoding/common" -) - -// CompleteBlock represent a block that has been "cut", is ready to be flushed and is not appendable. -// A CompleteBlock also knows the filepath of the append wal file it was cut from. It is responsible for -// cleaning this block up once it has been flushed to the backend. -type CompleteBlock struct { - encoding versionedEncoding - - meta *backend.BlockMeta - bloom *common.ShardedBloomFilter - records []*common.Record - - flushedTime atomic.Int64 // protecting flushedTime b/c it's accessed from the store on flush and from the ingester instance checking flush time - - filepath string - readFile *os.File - once sync.Once - - cfg *BlockConfig -} - -// NewCompleteBlock creates a new block and takes _ALL_ the parameters necessary to build the ordered, deduped file on disk -func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iterator Iterator, estimatedObjects int, filepath string) (*CompleteBlock, error) { - c := &CompleteBlock{ - encoding: latestEncoding(), - meta: backend.NewBlockMeta(originatingMeta.TenantID, originatingMeta.BlockID, currentVersion, cfg.Encoding), - bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), - records: make([]*common.Record, 0), - filepath: filepath, - cfg: cfg, - } - - appendFile, err := os.OpenFile(c.fullFilename(), os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return nil, err - } - defer appendFile.Close() - - dataWriter, err := c.encoding.newDataWriter(appendFile, cfg.Encoding) - if err != nil { - return nil, err - } - - appender, err := NewBufferedAppender(dataWriter, cfg.IndexDownsampleBytes, estimatedObjects) - if err != nil { - return nil, err - } - - // todo: add a timeout? propagage context from completing? - ctx := context.Background() - for { - bytesID, bytesObject, err := iterator.Next(ctx) - if bytesID == nil { - break - } - if err != nil { - _ = os.Remove(c.fullFilename()) - return nil, err - } - - c.meta.ObjectAdded(bytesID) - c.bloom.Add(bytesID) - // obj gets written to disk immediately but the id escapes the iterator and needs to be copied - writeID := append([]byte(nil), bytesID...) - err = appender.Append(writeID, bytesObject) - if err != nil { - _ = os.Remove(c.fullFilename()) - return nil, err - } - } - err = appender.Complete() - if err != nil { - return nil, err - } - c.records = appender.Records() - c.meta.Size = appender.DataLength() // Must be after Complete() - c.meta.StartTime = originatingMeta.StartTime - c.meta.EndTime = originatingMeta.EndTime - - return c, nil -} - -// BlockMeta returns a pointer to this blocks meta -func (c *CompleteBlock) BlockMeta() *backend.BlockMeta { - return c.meta -} - -// Write implements WriteableBlock -func (c *CompleteBlock) Write(ctx context.Context, w backend.Writer) error { - // write object file - src, err := os.Open(c.fullFilename()) - if err != nil { - return err - } - defer src.Close() - - fileStat, err := src.Stat() - if err != nil { - return err - } - - err = writeBlockData(ctx, w, c.meta, src, fileStat.Size()) - if err != nil { - return err - } - - indexWriter := c.encoding.newIndexWriter(c.cfg.IndexPageSizeBytes) - indexBytes, err := indexWriter.Write(c.records) - if err != nil { - return err - } - - c.meta.TotalRecords = uint32(len(c.records)) - c.meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) - - err = writeBlockMeta(ctx, w, c.meta, indexBytes, c.bloom) - if err != nil { - return err - } - - // book keeping - c.flushedTime.Store(time.Now().Unix()) - return nil -} - -// Find searches the for the provided trace id. A CompleteBlock should never -// have multiples of a single id so not sure why this uses a DedupingFinder. -func (c *CompleteBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte, error) { - if !c.bloom.Test(id) { - return nil, nil - } - - file, err := c.file() - if err != nil { - return nil, err - } - - dataReader, err := c.encoding.newDataReader(backend.NewContextReaderWithAllReader(file), c.meta.Encoding) - if err != nil { - return nil, err - } - defer dataReader.Close() - - finder := NewPagedFinder(common.Records(c.records), dataReader, combiner, c.encoding.newObjectReaderWriter()) - return finder.Find(context.Background(), id) -} - -// Clear removes the backing file. -func (c *CompleteBlock) Clear() error { - if c.readFile != nil { - _ = c.readFile.Close() - } - - name := c.fullFilename() - return os.Remove(name) -} - -// FlushedTime returns the time the block was flushed. Will return 0 -// if the block was never flushed -func (c *CompleteBlock) FlushedTime() time.Time { - unixTime := c.flushedTime.Load() - if unixTime == 0 { - return time.Time{} // return 0 time. 0 unix time is jan 1, 1970 - } - return time.Unix(unixTime, 0) -} - -func (c *CompleteBlock) fullFilename() string { - return fmt.Sprintf("%s/%v:%v", c.filepath, c.meta.BlockID, c.meta.TenantID) -} - -func (c *CompleteBlock) file() (*os.File, error) { - var err error - c.once.Do(func() { - if c.readFile == nil { - name := c.fullFilename() - - c.readFile, err = os.OpenFile(name, os.O_RDONLY, 0644) - } - }) - - return c.readFile, err -} diff --git a/tempodb/encoding/complete_block_test.go b/tempodb/encoding/complete_block_test.go deleted file mode 100644 index a7de7966aa7..00000000000 --- a/tempodb/encoding/complete_block_test.go +++ /dev/null @@ -1,315 +0,0 @@ -package encoding - -import ( - "bufio" - "bytes" - "context" - "fmt" - "io" - "io/ioutil" - "math/rand" - "os" - "sort" - "testing" - "time" - - "github.com/golang/protobuf/proto" - "github.com/google/uuid" - "github.com/grafana/tempo/pkg/util" - "github.com/grafana/tempo/pkg/util/test" - "github.com/grafana/tempo/tempodb/backend" - "github.com/grafana/tempo/tempodb/backend/local" - "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" - v2 "github.com/grafana/tempo/tempodb/encoding/v2" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type mockCombiner struct { -} - -func (m *mockCombiner) Combine(objA []byte, objB []byte) []byte { - if len(objA) > len(objB) { - return objA - } - - return objB -} - -func TestCompleteBlock(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - require.NoError(t, err, "unexpected error creating temp dir") - - block, ids, reqs := completeBlock(t, &BlockConfig{ - IndexDownsampleBytes: 13, - BloomFP: .01, - Encoding: backend.EncGZIP, - }, tempDir) - - // test Find - for i, id := range ids { - foundBytes, err := block.Find(id, &mockCombiner{}) - assert.NoError(t, err) - - assert.Equal(t, reqs[i], foundBytes) - assert.True(t, block.bloom.Test(id)) - } - - // confirm order - var prev *common.Record - for _, r := range block.records { - if prev != nil { - assert.Greater(t, r.Start, prev.Start) - } - - prev = r - } -} - -func TestCompleteBlockAll(t *testing.T) { - for _, enc := range backend.SupportedEncoding { - t.Run(enc.String(), func(t *testing.T) { - testCompleteBlockToBackendBlock(t, - &BlockConfig{ - IndexDownsampleBytes: 1000, - BloomFP: .01, - Encoding: enc, - IndexPageSizeBytes: 1000, - }, - ) - }) - } -} - -func testCompleteBlockToBackendBlock(t *testing.T, cfg *BlockConfig) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - require.NoError(t, err, "unexpected error creating temp dir") - - block, ids, reqs := completeBlock(t, cfg, tempDir) - - backendTmpDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(backendTmpDir) - require.NoError(t, err, "unexpected error creating temp dir") - - r, w, _, err := local.New(&local.Config{ - Path: backendTmpDir, - }) - require.NoError(t, err, "error creating backend") - - err = block.Write(context.Background(), w) - require.NoError(t, err, "error writing backend") - - // meta? - uuids, err := r.Blocks(context.Background(), testTenantID) - require.NoError(t, err, "error listing blocks") - require.Len(t, uuids, 1) - - meta, err := r.BlockMeta(context.Background(), uuids[0], testTenantID) - require.NoError(t, err, "error getting meta") - - backendBlock, err := NewBackendBlock(meta, r) - require.NoError(t, err, "error creating block") - - // test Find - for i, id := range ids { - foundBytes, err := backendBlock.Find(context.Background(), id) - assert.NoError(t, err) - - assert.Equal(t, reqs[i], foundBytes) - } - - // test Iterator - idsToObjs := map[uint32][]byte{} - for i, id := range ids { - idsToObjs[util.TokenForTraceID(id)] = reqs[i] - } - sort.Slice(ids, func(i int, j int) bool { return bytes.Compare(ids[i], ids[j]) == -1 }) - - iterator, err := backendBlock.Iterator(10 * 1024 * 1024) - require.NoError(t, err, "error getting iterator") - i := 0 - for { - id, obj, err := iterator.Next(context.Background()) - if id == nil { - break - } - - assert.NoError(t, err) - assert.Equal(t, ids[i], []byte(id)) - assert.Equal(t, idsToObjs[util.TokenForTraceID(id)], obj) - i++ - } - assert.Equal(t, len(ids), i) -} - -func completeBlock(t *testing.T, cfg *BlockConfig, tempDir string) (*CompleteBlock, [][]byte, [][]byte) { - rand.Seed(time.Now().Unix()) - - buffer := &bytes.Buffer{} - writer := bufio.NewWriter(buffer) - appender := NewAppender(v0.NewDataWriter(writer)) - - numMsgs := 1000 - reqs := make([][]byte, 0, numMsgs) - ids := make([][]byte, 0, numMsgs) - var maxID, minID []byte - for i := 0; i < numMsgs; i++ { - id := make([]byte, 16) - rand.Read(id) - req := test.MakeRequest(rand.Int()%10, id) - ids = append(ids, id) - bReq, err := proto.Marshal(req) - require.NoError(t, err) - reqs = append(reqs, bReq) - - err = appender.Append(id, bReq) - require.NoError(t, err, "unexpected error writing req") - - if len(maxID) == 0 || bytes.Compare(id, maxID) == 1 { - maxID = id - } - if len(minID) == 0 || bytes.Compare(id, minID) == -1 { - minID = id - } - } - err := appender.Complete() - require.NoError(t, err) - err = writer.Flush() - require.NoError(t, err, "unexpected error flushing writer") - - originatingMeta := backend.NewBlockMeta(testTenantID, uuid.New(), "should_be_ignored", backend.EncGZIP) - originatingMeta.StartTime = time.Now().Add(-5 * time.Minute) - originatingMeta.EndTime = time.Now().Add(5 * time.Minute) - - // calc expected records - byteCounter := 0 - expectedRecords := 0 - for _, rec := range appender.Records() { - byteCounter += int(rec.Length) - if byteCounter > cfg.IndexDownsampleBytes { - byteCounter = 0 - expectedRecords++ - } - } - if byteCounter > 0 { - expectedRecords++ - } - - iterator := NewRecordIterator(appender.Records(), bytes.NewReader(buffer.Bytes()), v0.NewObjectReaderWriter()) - block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir) - require.NoError(t, err, "unexpected error completing block") - - // test downsample config - require.Equal(t, expectedRecords, len(block.records)) - require.True(t, block.FlushedTime().IsZero()) - require.True(t, bytes.Equal(block.meta.MinID, minID)) - require.True(t, bytes.Equal(block.meta.MaxID, maxID)) - require.Equal(t, originatingMeta.StartTime, block.meta.StartTime) - require.Equal(t, originatingMeta.EndTime, block.meta.EndTime) - require.Equal(t, originatingMeta.TenantID, block.meta.TenantID) - - // Verify block size was written - require.Greater(t, block.meta.Size, uint64(0)) - - return block, ids, reqs -} - -const benchDownsample = 200 - -func BenchmarkWriteGzip(b *testing.B) { - benchmarkCompressBlock(b, backend.EncGZIP, benchDownsample, false) -} -func BenchmarkWriteSnappy(b *testing.B) { - benchmarkCompressBlock(b, backend.EncSnappy, benchDownsample, false) -} -func BenchmarkWriteLZ4256(b *testing.B) { - benchmarkCompressBlock(b, backend.EncLZ4_256k, benchDownsample, false) -} -func BenchmarkWriteLZ41M(b *testing.B) { - benchmarkCompressBlock(b, backend.EncLZ4_1M, benchDownsample, false) -} -func BenchmarkWriteNone(b *testing.B) { - benchmarkCompressBlock(b, backend.EncNone, benchDownsample, false) -} - -func BenchmarkWriteZstd(b *testing.B) { - benchmarkCompressBlock(b, backend.EncZstd, benchDownsample, false) -} - -func BenchmarkReadGzip(b *testing.B) { - benchmarkCompressBlock(b, backend.EncGZIP, benchDownsample, true) -} -func BenchmarkReadSnappy(b *testing.B) { - benchmarkCompressBlock(b, backend.EncSnappy, benchDownsample, true) -} -func BenchmarkReadLZ4256(b *testing.B) { - benchmarkCompressBlock(b, backend.EncLZ4_256k, benchDownsample, true) -} -func BenchmarkReadLZ41M(b *testing.B) { - benchmarkCompressBlock(b, backend.EncLZ4_1M, benchDownsample, true) -} -func BenchmarkReadNone(b *testing.B) { - benchmarkCompressBlock(b, backend.EncNone, benchDownsample, true) -} - -func BenchmarkReadZstd(b *testing.B) { - benchmarkCompressBlock(b, backend.EncZstd, benchDownsample, true) -} - -// Download a block from your backend and place in ./benchmark_block/fake/ -//nolint:unparam -func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsample int, benchRead bool) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - require.NoError(b, err, "unexpected error creating temp dir") - - r, _, _, err := local.New(&local.Config{ - Path: "./benchmark_block", - }) - require.NoError(b, err, "error creating backend") - - backendBlock, err := NewBackendBlock(backend.NewBlockMeta("fake", uuid.MustParse("9f15417a-1242-40e4-9de3-a057d3b176c1"), "v0", backend.EncNone), r) - require.NoError(b, err, "error creating backend block") - - iterator, err := backendBlock.Iterator(10 * 1024 * 1024) - require.NoError(b, err, "error creating iterator") - - if !benchRead { - b.ResetTimer() - } - - originatingMeta := backend.NewBlockMeta(testTenantID, uuid.New(), "should_be_ignored", backend.EncGZIP) - cb, err := NewCompleteBlock(&BlockConfig{ - IndexDownsampleBytes: indexDownsample, - BloomFP: .05, - Encoding: encoding, - }, originatingMeta, iterator, 10000, tempDir) - require.NoError(b, err, "error creating block") - - lastRecord := cb.records[len(cb.records)-1] - fmt.Println("size: ", lastRecord.Start+uint64(lastRecord.Length)) - - if !benchRead { - return - } - - b.ResetTimer() - file, err := os.Open(cb.fullFilename()) - require.NoError(b, err) - pr, err := v2.NewDataReader(v0.NewDataReader(backend.NewContextReaderWithAllReader(file)), encoding) - require.NoError(b, err) - iterator = newPagedIterator(10*1024*1024, common.Records(cb.records), pr, backendBlock.encoding.newObjectReaderWriter()) - - for { - id, _, err := iterator.Next(context.Background()) - if err != io.EOF { - require.NoError(b, err) - } - if id == nil { - break - } - } -} diff --git a/tempodb/encoding/compactor_block.go b/tempodb/encoding/streaming_block.go similarity index 76% rename from tempodb/encoding/compactor_block.go rename to tempodb/encoding/streaming_block.go index 65c0182e69a..10e91c28b40 100644 --- a/tempodb/encoding/compactor_block.go +++ b/tempodb/encoding/streaming_block.go @@ -10,7 +10,7 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -type CompactorBlock struct { +type StreamingBlock struct { encoding versionedEncoding compactedMeta *backend.BlockMeta @@ -25,17 +25,13 @@ type CompactorBlock struct { cfg *BlockConfig } -// NewCompactorBlock creates a ... new compactor block! -func NewCompactorBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas []*backend.BlockMeta, estimatedObjects int) (*CompactorBlock, error) { +// NewStreamingBlock creates a ... new streaming block. Objects are appended one at a time to the backend. +func NewStreamingBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas []*backend.BlockMeta, estimatedObjects int) (*StreamingBlock, error) { if len(metas) == 0 { return nil, fmt.Errorf("empty block meta list") } - if estimatedObjects <= 0 { - return nil, fmt.Errorf("must have non-zero positive estimated objects for a reliable bloom filter") - } - - c := &CompactorBlock{ + c := &StreamingBlock{ encoding: latestEncoding(), compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding), bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), @@ -57,7 +53,7 @@ func NewCompactorBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] return c, nil } -func (c *CompactorBlock) AddObject(id common.ID, object []byte) error { +func (c *StreamingBlock) AddObject(id common.ID, object []byte) error { err := c.appender.Append(id, object) if err != nil { return err @@ -68,20 +64,20 @@ func (c *CompactorBlock) AddObject(id common.ID, object []byte) error { return nil } -func (c *CompactorBlock) CurrentBufferLength() int { +func (c *StreamingBlock) CurrentBufferLength() int { return c.appendBuffer.Len() } -func (c *CompactorBlock) CurrentBufferedObjects() int { +func (c *StreamingBlock) CurrentBufferedObjects() int { return c.bufferedObjects } -func (c *CompactorBlock) Length() int { +func (c *StreamingBlock) Length() int { return c.appender.Length() } // FlushBuffer flushes any existing objects to the backend -func (c *CompactorBlock) FlushBuffer(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (backend.AppendTracker, int, error) { +func (c *StreamingBlock) FlushBuffer(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (backend.AppendTracker, int, error) { if c.appender.Length() == 0 { return tracker, 0, nil } @@ -100,7 +96,7 @@ func (c *CompactorBlock) FlushBuffer(ctx context.Context, tracker backend.Append } // Complete finishes writes the compactor metadata and closes all buffers and appenders -func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (int, error) { +func (c *StreamingBlock) Complete(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (int, error) { err := c.appender.Complete() if err != nil { return 0, err @@ -112,6 +108,12 @@ func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTra return 0, err } + // close data file + err = w.CloseAppend(ctx, tracker) + if err != nil { + return 0, err + } + records := c.appender.Records() meta := c.BlockMeta() @@ -124,15 +126,10 @@ func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTra meta.TotalRecords = uint32(len(records)) // casting meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) - err = writeBlockMeta(ctx, w, meta, indexBytes, c.bloom) - if err != nil { - return 0, err - } - - return bytesFlushed, w.CloseAppend(ctx, tracker) + return bytesFlushed, writeBlockMeta(ctx, w, meta, indexBytes, c.bloom) } -func (c *CompactorBlock) BlockMeta() *backend.BlockMeta { +func (c *StreamingBlock) BlockMeta() *backend.BlockMeta { meta := c.compactedMeta meta.StartTime = c.inMetas[0].StartTime diff --git a/tempodb/encoding/compactor_block_test.go b/tempodb/encoding/streaming_block_test.go similarity index 95% rename from tempodb/encoding/compactor_block_test.go rename to tempodb/encoding/streaming_block_test.go index c737c5f0063..174753a55e1 100644 --- a/tempodb/encoding/compactor_block_test.go +++ b/tempodb/encoding/streaming_block_test.go @@ -18,7 +18,7 @@ const ( ) func TestCompactorBlockError(t *testing.T) { - _, err := NewCompactorBlock(nil, uuid.New(), "", nil, 0) + _, err := NewStreamingBlock(nil, uuid.New(), "", nil, 0) assert.Error(t, err) } @@ -37,7 +37,7 @@ func TestCompactorBlockAddObject(t *testing.T) { } numObjects := (rand.Int() % 20) + 1 - cb, err := NewCompactorBlock(&BlockConfig{ + cb, err := NewStreamingBlock(&BlockConfig{ BloomFP: .01, IndexDownsampleBytes: indexDownsample, Encoding: backend.EncGZIP, diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index f57c2f8778b..e3fb1c60117 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -1,7 +1,6 @@ package tempodb import ( - "context" "io/ioutil" "os" "path" @@ -60,9 +59,6 @@ func TestRetention(t *testing.T) { assert.NoError(t, err) blockID = complete.BlockMeta().BlockID - err = w.WriteBlock(context.Background(), complete) - assert.NoError(t, err) - rw := r.(*readerWriter) // poll checkBlocklists(t, blockID, 1, 0, rw) diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index f785b6209fa..60aa9bca4d7 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -5,6 +5,7 @@ import ( "context" "encoding/hex" "fmt" + "io" "sort" "sync" "time" @@ -12,6 +13,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/google/uuid" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -83,7 +85,8 @@ var ( type Writer interface { WriteBlock(ctx context.Context, block WriteableBlock) error - CompleteBlock(block *wal.AppendBlock, combiner common.ObjectCombiner) (*encoding.CompleteBlock, error) + CompleteBlock(block *wal.AppendBlock, combiner common.ObjectCombiner) (*encoding.BackendBlock, error) + CompleteBlockWithBackend(ctx context.Context, block *wal.AppendBlock, combiner common.ObjectCombiner, r backend.Reader, w backend.Writer) (*encoding.BackendBlock, error) WAL() *wal.WAL } @@ -201,8 +204,70 @@ func (rw *readerWriter) WriteBlock(ctx context.Context, c WriteableBlock) error return c.Write(ctx, rw.w) } -func (rw *readerWriter) CompleteBlock(block *wal.AppendBlock, combiner common.ObjectCombiner) (*encoding.CompleteBlock, error) { - return block.Complete(rw.cfg.Block, rw.wal, combiner) +// CompleteBlock iterates the given WAL block and flushes it to the TempoDB backend. +func (rw *readerWriter) CompleteBlock(block *wal.AppendBlock, combiner common.ObjectCombiner) (*encoding.BackendBlock, error) { + return rw.CompleteBlockWithBackend(context.TODO(), block, combiner, rw.r, rw.w) +} + +// CompleteBlock iterates the given WAL block but flushes it to the given backend instead of the default TempoDB backend. The +// new block will have the same ID as the input block. +func (rw *readerWriter) CompleteBlockWithBackend(ctx context.Context, block *wal.AppendBlock, combiner common.ObjectCombiner, r backend.Reader, w backend.Writer) (*encoding.BackendBlock, error) { + meta := block.Meta() + blockID := meta.BlockID + tenantID := meta.TenantID + + // Default and nil check is primarily to make testing easier. + flushSize := DefaultFlushSizeBytes + if rw.compactorCfg != nil && rw.compactorCfg.FlushSizeBytes > 0 { + flushSize = rw.compactorCfg.FlushSizeBytes + } + + iter, err := block.GetIterator(combiner) + if err != nil { + return nil, errors.Wrap(err, "error getting completing block iterator") + } + defer iter.Close() + + newBlock, err := encoding.NewStreamingBlock(rw.cfg.Block, blockID, tenantID, []*backend.BlockMeta{meta}, meta.TotalObjects) + if err != nil { + return nil, errors.Wrap(err, "error creating compactor block") + } + + var tracker backend.AppendTracker + for { + id, data, err := iter.Next(ctx) + if err != nil && err != io.EOF { + return nil, errors.Wrap(err, "error iterating") + } + + if id == nil { + break + } + + err = newBlock.AddObject(id, data) + if err != nil { + return nil, errors.Wrap(err, "error adding object to compactor block") + } + + if newBlock.CurrentBufferLength() > int(flushSize) { + tracker, _, err = newBlock.FlushBuffer(ctx, tracker, w) + if err != nil { + return nil, errors.Wrap(err, "error flushing compactor block") + } + } + } + + _, err = newBlock.Complete(ctx, tracker, w) + if err != nil { + return nil, errors.Wrap(err, "error completing compactor block") + } + + backendBlock, err := encoding.NewBackendBlock(newBlock.BlockMeta(), r) + if err != nil { + return nil, errors.Wrap(err, "error creating creating backend block") + } + + return backendBlock, nil } func (rw *readerWriter) WAL() *wal.WAL { diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 065a33b946f..2c6bb52f602 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -83,10 +83,7 @@ func TestDB(t *testing.T) { assert.NoError(t, err, "unexpected error writing req") } - complete, err := w.CompleteBlock(head, &mockSharder{}) - assert.NoError(t, err) - - err = w.WriteBlock(context.Background(), complete) + _, err = w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) // poll @@ -150,10 +147,7 @@ func TestBlockSharding(t *testing.T) { assert.NoError(t, err, "unexpected error writing req") // write block to backend - complete, err := w.CompleteBlock(head, &mockSharder{}) - assert.NoError(t, err) - - err = w.WriteBlock(context.Background(), complete) + _, err = w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) // poll @@ -251,10 +245,7 @@ func TestBlockCleanup(t *testing.T) { head, err := wal.NewBlock(blockID, testTenantID) assert.NoError(t, err) - complete, err := w.CompleteBlock(head, &mockSharder{}) - assert.NoError(t, err) - - err = w.WriteBlock(context.Background(), complete) + _, err = w.CompleteBlock(head, &mockSharder{}) assert.NoError(t, err) rw := r.(*readerWriter) @@ -925,9 +916,6 @@ func TestSearchCompactedBlocks(t *testing.T) { blockID := complete.BlockMeta().BlockID.String() - err = w.WriteBlock(context.Background(), complete) - assert.NoError(t, err) - rw := r.(*readerWriter) // poll @@ -975,3 +963,63 @@ func TestSearchCompactedBlocks(t *testing.T) { assert.True(t, proto.Equal(out, reqs[i])) } } + +func TestCompleteBlock(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + _, w, _, err := New(&Config{ + Backend: "local", + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &encoding.BlockConfig{ + IndexDownsampleBytes: 17, + BloomFP: .01, + Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: time.Minute, + }, log.NewNopLogger()) + assert.NoError(t, err) + + wal := w.WAL() + + blockID := uuid.New() + + block, err := wal.NewBlock(blockID, testTenantID) + assert.NoError(t, err, "unexpected error creating block") + + numMsgs := 100 + reqs := make([]*tempopb.PushRequest, 0, numMsgs) + ids := make([][]byte, 0, numMsgs) + for i := 0; i < numMsgs; i++ { + id := make([]byte, 16) + rand.Read(id) + req := test.MakeRequest(rand.Int()%1000, id) + reqs = append(reqs, req) + ids = append(ids, id) + bReq, err := proto.Marshal(req) + assert.NoError(t, err) + err = block.Write(id, bReq) + assert.NoError(t, err, "unexpected error writing req") + } + + complete, err := w.CompleteBlock(block, &mockSharder{}) + require.NoError(t, err, "unexpected error completing block") + + for i, id := range ids { + out := &tempopb.PushRequest{} + foundBytes, err := complete.Find(context.TODO(), id) + assert.NoError(t, err) + + err = proto.Unmarshal(foundBytes, out) + assert.NoError(t, err) + + assert.True(t, proto.Equal(out, reqs[i])) + } +} diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 8c409256f0d..78e0eb555f6 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -63,11 +63,11 @@ func (h *AppendBlock) DataLength() uint64 { return h.appender.DataLength() } -// Complete should be called when you are done with the block. This method will write and return a new CompleteBlock which -// includes an on disk file containing all objects in order. -// Note that calling this method leaves the original file on disk. This file is still considered to be part of the WAL -// until Write() is successfully called on the CompleteBlock. -func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner common.ObjectCombiner) (*encoding.CompleteBlock, error) { +func (h *AppendBlock) Meta() *backend.BlockMeta { + return h.meta +} + +func (h *AppendBlock) GetIterator(combiner common.ObjectCombiner) (encoding.Iterator, error) { if h.appendFile != nil { err := h.appendFile.Close() if err != nil { @@ -87,14 +87,8 @@ func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner commo if err != nil { return nil, err } - defer iterator.Close() - - orderedBlock, err := encoding.NewCompleteBlock(cfg, h.meta, iterator, len(records), w.c.CompletedFilepath) - if err != nil { - return nil, err - } - return orderedBlock, nil + return iterator, nil } func (h *AppendBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte, error) { diff --git a/tempodb/wal/local_block.go b/tempodb/wal/local_block.go new file mode 100644 index 00000000000..366554da026 --- /dev/null +++ b/tempodb/wal/local_block.go @@ -0,0 +1,84 @@ +package wal + +import ( + "context" + "time" + + "go.uber.org/atomic" + + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/local" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/pkg/errors" +) + +const nameFlushed = "flushed" + +// LocalBlock is a block stored in a local storage. It can be searched and flushed to a remote backend, and +// permanently tracks the flushed time with a special file in the block +type LocalBlock struct { + encoding.BackendBlock + local *local.Backend + + flushedTime atomic.Int64 // protecting flushedTime b/c it's accessed from the store on flush and from the ingester instance checking flush time +} + +func NewLocalBlock(ctx context.Context, existingBlock *encoding.BackendBlock, l *local.Backend) (*LocalBlock, error) { + + c := &LocalBlock{ + BackendBlock: *existingBlock, + local: l, + } + + flushedBytes, err := l.Read(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID) + if err == nil { + flushedTime := time.Time{} + err = flushedTime.UnmarshalText(flushedBytes) + if err == nil { + c.flushedTime.Store(flushedTime.Unix()) + } + } + + return c, nil +} + +func (c *LocalBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { + return c.BackendBlock.Find(ctx, id) +} + +// FlushedTime returns the time the block was flushed. Will return 0 +// if the block was never flushed +func (c *LocalBlock) FlushedTime() time.Time { + unixTime := c.flushedTime.Load() + if unixTime == 0 { + return time.Time{} // return 0 time. 0 unix time is jan 1, 1970 + } + return time.Unix(unixTime, 0) +} + +func (c *LocalBlock) SetFlushed(ctx context.Context) error { + flushedTime := time.Now() + flushedBytes, err := flushedTime.MarshalText() + if err != nil { + return errors.Wrap(err, "error marshalling flush time to text") + } + + err = c.local.Write(ctx, nameFlushed, c.BlockMeta().BlockID, c.BlockMeta().TenantID, flushedBytes) + if err != nil { + return errors.Wrap(err, "error writing ingester block flushed file") + } + + c.flushedTime.Store(flushedTime.Unix()) + return nil +} + +func (c *LocalBlock) Write(ctx context.Context, w backend.Writer) error { + err := encoding.CopyBlock(ctx, c.BlockMeta(), c.local, w) + if err != nil { + return errors.Wrap(err, "error copying block from local to remote backend") + } + + err = c.SetFlushed(ctx) + return err +} diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 6f4978e5633..17cf87f96b2 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -9,19 +9,23 @@ import ( "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/local" ) const ( completedDir = "completed" + blocksDir = "blocks" ) type WAL struct { c *Config + l *local.Backend } type Config struct { Filepath string `yaml:"path"` CompletedFilepath string + BlocksFilepath string } func New(c *Config) (*WAL, error) { @@ -35,22 +39,37 @@ func New(c *Config) (*WAL, error) { return nil, err } + // The /completed/ folder is now obsolete and no new data is written, + // but it needs to be cleared out one last time for any files left + // from a previous version. if c.CompletedFilepath == "" { completedFilepath := filepath.Join(c.Filepath, completedDir) err = os.RemoveAll(completedFilepath) if err != nil { return nil, err } - err = os.MkdirAll(completedFilepath, os.ModePerm) - if err != nil { - return nil, err - } c.CompletedFilepath = completedFilepath } + // Setup local backend in /blocks/ + p := filepath.Join(c.Filepath, blocksDir) + err = os.MkdirAll(p, os.ModePerm) + if err != nil { + return nil, err + } + c.BlocksFilepath = p + + l, err := local.NewBackend(&local.Config{ + Path: p, + }) + if err != nil { + return nil, err + } + return &WAL{ c: c, + l: l, }, nil } @@ -87,6 +106,10 @@ func (w *WAL) NewBlock(id uuid.UUID, tenantID string) (*AppendBlock, error) { return newAppendBlock(id, tenantID, w.c.Filepath) } +func (w *WAL) LocalBackend() *local.Backend { + return w.l +} + func parseFilename(name string) (uuid.UUID, string, error) { i := strings.Index(name, ":") diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 6efd8c05587..93e46cf7ea3 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -14,7 +14,6 @@ import ( "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" - "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) @@ -135,56 +134,9 @@ func TestAppend(t *testing.T) { assert.Equal(t, numMsgs, i) } -func TestAppendBlockComplete(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - wal, err := New(&Config{ - Filepath: tempDir, - }) - assert.NoError(t, err, "unexpected error creating temp wal") - - blockID := uuid.New() - - block, err := wal.NewBlock(blockID, testTenantID) - assert.NoError(t, err, "unexpected error creating block") - - numMsgs := 100 - reqs := make([]*tempopb.PushRequest, 0, numMsgs) - ids := make([][]byte, 0, numMsgs) - for i := 0; i < numMsgs; i++ { - id := make([]byte, 16) - rand.Read(id) - req := test.MakeRequest(rand.Int()%1000, id) - reqs = append(reqs, req) - ids = append(ids, id) - bReq, err := proto.Marshal(req) - assert.NoError(t, err) - err = block.Write(id, bReq) - assert.NoError(t, err, "unexpected error writing req") - } - - complete, err := block.Complete(&encoding.BlockConfig{ - IndexDownsampleBytes: 13, - BloomFP: .01, - Encoding: backend.EncGZIP, - }, wal, &mockCombiner{}) - assert.NoError(t, err, "unexpected error completing block") - - for i, id := range ids { - out := &tempopb.PushRequest{} - foundBytes, err := complete.Find(id, &mockCombiner{}) - assert.NoError(t, err) - - err = proto.Unmarshal(foundBytes, out) - assert.NoError(t, err) - - assert.True(t, proto.Equal(out, reqs[i])) - } -} +func TestCompletedDirIsRemoved(t *testing.T) { + // Create /completed/testfile and verify it is removed. -func TestWorkDir(t *testing.T) { tempDir, err := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) assert.NoError(t, err, "unexpected error creating temp dir") @@ -201,12 +153,7 @@ func TestWorkDir(t *testing.T) { assert.NoError(t, err, "unexpected error creating temp wal") _, err = os.Stat(path.Join(tempDir, completedDir)) - assert.NoError(t, err, "work folder should exist") - - files, err := ioutil.ReadDir(path.Join(tempDir, completedDir)) - assert.NoError(t, err, "unexpected reading work dir") - - assert.Len(t, files, 0, "work dir should be empty") + assert.Error(t, err, "completedDir should not exist") } func BenchmarkWriteRead(b *testing.B) {