From d27b48904605159a6ceb723c58981ee130960e71 Mon Sep 17 00:00:00 2001 From: Mario Date: Thu, 21 Nov 2024 17:43:25 +0100 Subject: [PATCH] Multiple fixes in block-builder (#4364) --- modules/blockbuilder/blockbuilder.go | 4 +- modules/blockbuilder/config.go | 2 + modules/blockbuilder/partition_writer.go | 18 +++-- modules/blockbuilder/tenant_store.go | 97 +++++++++++++++--------- modules/blockbuilder/util/id.go | 41 ++++++++-- modules/blockbuilder/util/id_test.go | 15 ++-- 6 files changed, 121 insertions(+), 56 deletions(-) diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index 68fbca4c22f..a4c320ddd7e 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -227,8 +227,8 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in "lag", lag.Lag, ) - // TODO - Review what ts is used here - writer := newPartitionSectionWriter(b.logger, sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc) + // TODO - Review what endTimestamp is used here + writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc) // We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment). // This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously. diff --git a/modules/blockbuilder/config.go b/modules/blockbuilder/config.go index 8fd79cf9cbc..d8b4a328802 100644 --- a/modules/blockbuilder/config.go +++ b/modules/blockbuilder/config.go @@ -7,6 +7,7 @@ import ( "time" "github.com/grafana/tempo/pkg/ingest" + "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -23,6 +24,7 @@ func (c *BlockConfig) RegisterFlags(f *flag.FlagSet) { func (c *BlockConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { f.Uint64Var(&c.MaxBlockBytes, prefix+".max-block-bytes", 20*1024*1024, "Maximum size of a block.") // TODO - Review default + c.BlockCfg.Version = encoding.DefaultEncoding().Version() c.BlockCfg.RegisterFlagsAndApplyDefaults(prefix, f) } diff --git a/modules/blockbuilder/partition_writer.go b/modules/blockbuilder/partition_writer.go index d21963be29d..f35b24a44d3 100644 --- a/modules/blockbuilder/partition_writer.go +++ b/modules/blockbuilder/partition_writer.go @@ -3,6 +3,7 @@ package blockbuilder import ( "context" "fmt" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -22,25 +23,27 @@ type partitionSectionWriter interface { type writer struct { logger log.Logger - blockCfg BlockConfig - cycleEndTs int64 + blockCfg BlockConfig + partition, cycleEndTs int64 overrides Overrides wal *wal.WAL enc encoding.VersionedEncoding - // TODO - Lock - m map[string]*tenantStore + mtx sync.Mutex + m map[string]*tenantStore } -func newPartitionSectionWriter(logger log.Logger, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { +func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer { return &writer{ logger: logger, + partition: partition, cycleEndTs: cycleEndTs, blockCfg: blockCfg, overrides: overrides, wal: wal, enc: enc, + mtx: sync.Mutex{}, m: make(map[string]*tenantStore), } } @@ -84,11 +87,14 @@ func (p *writer) flush(ctx context.Context, store tempodb.Writer) error { } func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) { + p.mtx.Lock() + defer p.mtx.Unlock() + if i, ok := p.m[tenant]; ok { return i, nil } - i, err := newTenantStore(tenant, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides) + i, err := newTenantStore(tenant, p.partition, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides) if err != nil { return nil, err } diff --git a/modules/blockbuilder/tenant_store.go b/modules/blockbuilder/tenant_store.go index 431933213ad..b49bf56acc2 100644 --- a/modules/blockbuilder/tenant_store.go +++ b/modules/blockbuilder/tenant_store.go @@ -2,9 +2,11 @@ package blockbuilder import ( "context" + "sync" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/google/uuid" "github.com/grafana/tempo/modules/blockbuilder/util" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" @@ -28,53 +30,69 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec( // TODO - This needs locking type tenantStore struct { - tenantID string - ts int64 - - cfg BlockConfig - logger log.Logger + tenantID string + idGenerator util.IDGenerator + cfg BlockConfig + logger log.Logger overrides Overrides - wal *wal.WAL - headBlock common.WALBlock - walBlocks []common.WALBlock enc encoding.VersionedEncoding + + wal *wal.WAL + + headBlockMtx sync.Mutex + headBlock common.WALBlock + + blocksMtx sync.Mutex + walBlocks []common.WALBlock } -func newTenantStore(tenantID string, ts int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) { +func newTenantStore(tenantID string, partitionID, endTimestamp int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) { s := &tenantStore{ - tenantID: tenantID, - ts: ts, - cfg: cfg, - logger: logger, - overrides: o, - wal: wal, - enc: enc, + tenantID: tenantID, + idGenerator: util.NewDeterministicIDGenerator(partitionID, endTimestamp), + cfg: cfg, + logger: logger, + overrides: o, + wal: wal, + headBlockMtx: sync.Mutex{}, + blocksMtx: sync.Mutex{}, + enc: enc, } return s, s.resetHeadBlock() } -func (s *tenantStore) cutHeadBlock() error { - // Flush the current head block if it exists - if s.headBlock != nil { - if err := s.headBlock.Flush(); err != nil { - return err - } - s.walBlocks = append(s.walBlocks, s.headBlock) - s.headBlock = nil +// TODO - periodically flush +func (s *tenantStore) cutHeadBlock(immediate bool) error { + s.headBlockMtx.Lock() + defer s.headBlockMtx.Unlock() + + dataLen := s.headBlock.DataLength() + + if s.headBlock == nil || dataLen == 0 { + return nil } - return nil -} + if !immediate && dataLen < s.cfg.MaxBlockBytes { + return nil + } + + s.blocksMtx.Lock() + defer s.blocksMtx.Unlock() -func (s *tenantStore) newUUID() backend.UUID { - return backend.UUID(util.NewDeterministicID(s.ts, int64(len(s.walBlocks)))) + if err := s.headBlock.Flush(); err != nil { + return err + } + s.walBlocks = append(s.walBlocks, s.headBlock) + s.headBlock = nil + + return s.resetHeadBlock() } func (s *tenantStore) resetHeadBlock() error { meta := &backend.BlockMeta{ - BlockID: s.newUUID(), + BlockID: s.idGenerator.NewID(), TenantID: s.tenantID, DedicatedColumns: s.overrides.DedicatedColumns(s.tenantID), ReplicationFactor: backend.MetricsGeneratorReplicationFactor, @@ -88,11 +106,9 @@ func (s *tenantStore) resetHeadBlock() error { } func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, start, end uint32) error { - // TODO - Do this async? This slows down consumption, but we need to be precise - if s.headBlock.DataLength() > s.cfg.MaxBlockBytes { - if err := s.resetHeadBlock(); err != nil { - return err - } + // TODO - Do this async, it slows down consumption + if err := s.cutHeadBlock(false); err != nil { + return err } return s.headBlock.AppendTrace(traceID, tr, start, end) @@ -102,9 +118,13 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { // TODO - Advance some of this work if possible // Cut head block - if err := s.cutHeadBlock(); err != nil { + if err := s.cutHeadBlock(true); err != nil { return err } + + s.blocksMtx.Lock() + defer s.blocksMtx.Unlock() + completeBlocks := make([]tempodb.WriteableBlock, 0, len(s.walBlocks)) // Write all blocks for _, block := range s.walBlocks { @@ -126,9 +146,14 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error { } // Clear the blocks + for _, block := range s.walBlocks { + if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil { + return err + } + } s.walBlocks = s.walBlocks[:0] - return s.resetHeadBlock() + return nil } func (s *tenantStore) buildWriteableBlock(ctx context.Context, b common.WALBlock) (tempodb.WriteableBlock, error) { diff --git a/modules/blockbuilder/util/id.go b/modules/blockbuilder/util/id.go index 660146fe41c..1223412d8d4 100644 --- a/modules/blockbuilder/util/id.go +++ b/modules/blockbuilder/util/id.go @@ -5,6 +5,8 @@ import ( "encoding/binary" "github.com/google/uuid" + "github.com/grafana/tempo/tempodb/backend" + "go.uber.org/atomic" ) var ( @@ -12,19 +14,44 @@ var ( hash = sha1.New() ) -func NewDeterministicID(ts, seq int64) uuid.UUID { - b := int64ToBytes(ts, seq) +type IDGenerator interface { + NewID() backend.UUID +} + +var _ IDGenerator = (*DeterministicIDGenerator)(nil) + +type DeterministicIDGenerator struct { + seeds []int64 + seq *atomic.Int64 +} + +func NewDeterministicIDGenerator(seeds ...int64) *DeterministicIDGenerator { + return &DeterministicIDGenerator{ + seeds: seeds, + seq: atomic.NewInt64(0), + } +} + +func (d *DeterministicIDGenerator) NewID() backend.UUID { + seq := d.seq.Inc() + seeds := append(d.seeds, seq) + return backend.UUID(newDeterministicID(seeds)) +} + +func newDeterministicID(seeds []int64) uuid.UUID { + b := int64ToBytes(seeds...) return uuid.NewHash(hash, ns, b, 5) } -func int64ToBytes(val1, val2 int64) []byte { - // 16 bytes = 8 bytes (int64) + 8 bytes (int64) - bytes := make([]byte, 16) +func int64ToBytes(seeds ...int64) []byte { + l := len(seeds) + bytes := make([]byte, l*8) // Use binary.LittleEndian or binary.BigEndian depending on your requirement - binary.LittleEndian.PutUint64(bytes[0:8], uint64(val1)) - binary.LittleEndian.PutUint64(bytes[8:16], uint64(val2)) + for i, seed := range seeds { + binary.LittleEndian.PutUint64(bytes[i*8:], uint64(seed)) + } return bytes } diff --git a/modules/blockbuilder/util/id_test.go b/modules/blockbuilder/util/id_test.go index 6e927f89bff..e767b96c632 100644 --- a/modules/blockbuilder/util/id_test.go +++ b/modules/blockbuilder/util/id_test.go @@ -5,19 +5,23 @@ import ( "time" "github.com/google/uuid" + "github.com/grafana/tempo/tempodb/backend" ) -func TestDeterministicID(t *testing.T) { +func TestDeterministicIDGenerator(t *testing.T) { ts := time.Now().UnixMilli() - firstPassIDs := make(map[uuid.UUID]struct{}) + gen := NewDeterministicIDGenerator(ts) + + firstPassIDs := make(map[backend.UUID]struct{}) for seq := int64(0); seq < 10; seq++ { - id := NewDeterministicID(ts, seq) + id := gen.NewID() firstPassIDs[id] = struct{}{} } + gen = NewDeterministicIDGenerator(ts) for seq := int64(0); seq < 10; seq++ { - id := NewDeterministicID(ts, seq) + id := gen.NewID() if _, ok := firstPassIDs[id]; !ok { t.Errorf("ID %s not found in first pass IDs", id) } @@ -26,8 +30,9 @@ func TestDeterministicID(t *testing.T) { func BenchmarkDeterministicID(b *testing.B) { ts := time.Now().UnixMilli() + gen := NewDeterministicIDGenerator(ts) for i := 0; i < b.N; i++ { - _ = NewDeterministicID(ts, int64(i)) + _ = gen.NewID() } }