Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rhythm] Multiple fixes in block-builder #4364

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ func (b *BlockBuilder) consumePartitionSection(ctx context.Context, partition in
"lag", lag.Lag,
)

// TODO - Review what ts is used here
writer := newPartitionSectionWriter(b.logger, sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc)
// TODO - Review what endTimestamp is used here
writer := newPartitionSectionWriter(b.logger, int64(partition), sectionEndTime.UnixMilli(), b.cfg.blockConfig, b.overrides, b.wal, b.enc)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
// This is so the cycle started exactly at the commit offset, and not at what was (potentially over-) consumed previously.
Expand Down
2 changes: 2 additions & 0 deletions modules/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/grafana/tempo/pkg/ingest"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
)

Expand All @@ -23,6 +24,7 @@ func (c *BlockConfig) RegisterFlags(f *flag.FlagSet) {
func (c *BlockConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
f.Uint64Var(&c.MaxBlockBytes, prefix+".max-block-bytes", 20*1024*1024, "Maximum size of a block.") // TODO - Review default

c.BlockCfg.Version = encoding.DefaultEncoding().Version()
c.BlockCfg.RegisterFlagsAndApplyDefaults(prefix, f)
}

Expand Down
18 changes: 12 additions & 6 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blockbuilder
import (
"context"
"fmt"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -22,25 +23,27 @@ type partitionSectionWriter interface {
type writer struct {
logger log.Logger

blockCfg BlockConfig
cycleEndTs int64
blockCfg BlockConfig
partition, cycleEndTs int64

overrides Overrides
wal *wal.WAL
enc encoding.VersionedEncoding

// TODO - Lock
m map[string]*tenantStore
mtx sync.Mutex
m map[string]*tenantStore
}

func newPartitionSectionWriter(logger log.Logger, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
func newPartitionSectionWriter(logger log.Logger, partition, cycleEndTs int64, blockCfg BlockConfig, overrides Overrides, wal *wal.WAL, enc encoding.VersionedEncoding) *writer {
return &writer{
logger: logger,
partition: partition,
cycleEndTs: cycleEndTs,
blockCfg: blockCfg,
overrides: overrides,
wal: wal,
enc: enc,
mtx: sync.Mutex{},
m: make(map[string]*tenantStore),
}
}
Expand Down Expand Up @@ -84,11 +87,14 @@ func (p *writer) flush(ctx context.Context, store tempodb.Writer) error {
}

func (p *writer) instanceForTenant(tenant string) (*tenantStore, error) {
p.mtx.Lock()
defer p.mtx.Unlock()

if i, ok := p.m[tenant]; ok {
return i, nil
}

i, err := newTenantStore(tenant, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
i, err := newTenantStore(tenant, p.partition, p.cycleEndTs, p.blockCfg, p.logger, p.wal, p.enc, p.overrides)
if err != nil {
return nil, err
}
Expand Down
97 changes: 61 additions & 36 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package blockbuilder

import (
"context"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/tempo/modules/blockbuilder/util"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
Expand All @@ -28,53 +30,69 @@ var metricBlockBuilderFlushedBlocks = promauto.NewCounterVec(

// TODO - This needs locking
type tenantStore struct {
tenantID string
ts int64

cfg BlockConfig
logger log.Logger
tenantID string
idGenerator util.IDGenerator

cfg BlockConfig
logger log.Logger
overrides Overrides
wal *wal.WAL
headBlock common.WALBlock
walBlocks []common.WALBlock
enc encoding.VersionedEncoding

wal *wal.WAL

headBlockMtx sync.Mutex
headBlock common.WALBlock

blocksMtx sync.Mutex
walBlocks []common.WALBlock
}

func newTenantStore(tenantID string, ts int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
func newTenantStore(tenantID string, partitionID, endTimestamp int64, cfg BlockConfig, logger log.Logger, wal *wal.WAL, enc encoding.VersionedEncoding, o Overrides) (*tenantStore, error) {
s := &tenantStore{
tenantID: tenantID,
ts: ts,
cfg: cfg,
logger: logger,
overrides: o,
wal: wal,
enc: enc,
tenantID: tenantID,
idGenerator: util.NewDeterministicIDGenerator(partitionID, endTimestamp),
cfg: cfg,
logger: logger,
overrides: o,
wal: wal,
headBlockMtx: sync.Mutex{},
blocksMtx: sync.Mutex{},
enc: enc,
}

return s, s.resetHeadBlock()
}

func (s *tenantStore) cutHeadBlock() error {
// Flush the current head block if it exists
if s.headBlock != nil {
if err := s.headBlock.Flush(); err != nil {
return err
}
s.walBlocks = append(s.walBlocks, s.headBlock)
s.headBlock = nil
// TODO - periodically flush
func (s *tenantStore) cutHeadBlock(immediate bool) error {
s.headBlockMtx.Lock()
defer s.headBlockMtx.Unlock()

dataLen := s.headBlock.DataLength()

if s.headBlock == nil || dataLen == 0 {
return nil
}

return nil
}
if !immediate && dataLen < s.cfg.MaxBlockBytes {
return nil
}

s.blocksMtx.Lock()
defer s.blocksMtx.Unlock()

func (s *tenantStore) newUUID() backend.UUID {
return backend.UUID(util.NewDeterministicID(s.ts, int64(len(s.walBlocks))))
if err := s.headBlock.Flush(); err != nil {
return err
}
s.walBlocks = append(s.walBlocks, s.headBlock)
s.headBlock = nil

return s.resetHeadBlock()
}

func (s *tenantStore) resetHeadBlock() error {
meta := &backend.BlockMeta{
BlockID: s.newUUID(),
BlockID: s.idGenerator.NewID(),
TenantID: s.tenantID,
DedicatedColumns: s.overrides.DedicatedColumns(s.tenantID),
ReplicationFactor: backend.MetricsGeneratorReplicationFactor,
Expand All @@ -88,11 +106,9 @@ func (s *tenantStore) resetHeadBlock() error {
}

func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, start, end uint32) error {
// TODO - Do this async? This slows down consumption, but we need to be precise
if s.headBlock.DataLength() > s.cfg.MaxBlockBytes {
if err := s.resetHeadBlock(); err != nil {
return err
}
// TODO - Do this async, it slows down consumption
if err := s.cutHeadBlock(false); err != nil {
return err
}

return s.headBlock.AppendTrace(traceID, tr, start, end)
Expand All @@ -102,9 +118,13 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
// TODO - Advance some of this work if possible

// Cut head block
if err := s.cutHeadBlock(); err != nil {
if err := s.cutHeadBlock(true); err != nil {
return err
}

s.blocksMtx.Lock()
defer s.blocksMtx.Unlock()

completeBlocks := make([]tempodb.WriteableBlock, 0, len(s.walBlocks))
// Write all blocks
for _, block := range s.walBlocks {
Expand All @@ -126,9 +146,14 @@ func (s *tenantStore) Flush(ctx context.Context, store tempodb.Writer) error {
}

// Clear the blocks
for _, block := range s.walBlocks {
if err := s.wal.LocalBackend().ClearBlock((uuid.UUID)(block.BlockMeta().BlockID), s.tenantID); err != nil {
return err
}
}
s.walBlocks = s.walBlocks[:0]

return s.resetHeadBlock()
return nil
}

func (s *tenantStore) buildWriteableBlock(ctx context.Context, b common.WALBlock) (tempodb.WriteableBlock, error) {
Expand Down
41 changes: 34 additions & 7 deletions modules/blockbuilder/util/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,53 @@ import (
"encoding/binary"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"go.uber.org/atomic"
)

var (
ns = uuid.MustParse("28840903-6eb5-4ffb-8880-93a4fa98dbcb") // Random UUID
hash = sha1.New()
)

func NewDeterministicID(ts, seq int64) uuid.UUID {
b := int64ToBytes(ts, seq)
type IDGenerator interface {
NewID() backend.UUID
}

var _ IDGenerator = (*DeterministicIDGenerator)(nil)

type DeterministicIDGenerator struct {
seeds []int64
seq *atomic.Int64
}

func NewDeterministicIDGenerator(seeds ...int64) *DeterministicIDGenerator {
return &DeterministicIDGenerator{
seeds: seeds,
seq: atomic.NewInt64(0),
}
}

func (d *DeterministicIDGenerator) NewID() backend.UUID {
seq := d.seq.Inc()
seeds := append(d.seeds, seq)
return backend.UUID(newDeterministicID(seeds))
}

func newDeterministicID(seeds []int64) uuid.UUID {
b := int64ToBytes(seeds...)

return uuid.NewHash(hash, ns, b, 5)
}

func int64ToBytes(val1, val2 int64) []byte {
// 16 bytes = 8 bytes (int64) + 8 bytes (int64)
bytes := make([]byte, 16)
func int64ToBytes(seeds ...int64) []byte {
l := len(seeds)
bytes := make([]byte, l*8)

// Use binary.LittleEndian or binary.BigEndian depending on your requirement
binary.LittleEndian.PutUint64(bytes[0:8], uint64(val1))
binary.LittleEndian.PutUint64(bytes[8:16], uint64(val2))
for i, seed := range seeds {
binary.LittleEndian.PutUint64(bytes[i*8:], uint64(seed))
}

return bytes
}
15 changes: 10 additions & 5 deletions modules/blockbuilder/util/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ import (
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
)

func TestDeterministicID(t *testing.T) {
func TestDeterministicIDGenerator(t *testing.T) {
ts := time.Now().UnixMilli()

firstPassIDs := make(map[uuid.UUID]struct{})
gen := NewDeterministicIDGenerator(ts)

firstPassIDs := make(map[backend.UUID]struct{})
for seq := int64(0); seq < 10; seq++ {
id := NewDeterministicID(ts, seq)
id := gen.NewID()
firstPassIDs[id] = struct{}{}
}

gen = NewDeterministicIDGenerator(ts)
for seq := int64(0); seq < 10; seq++ {
id := NewDeterministicID(ts, seq)
id := gen.NewID()
if _, ok := firstPassIDs[id]; !ok {
t.Errorf("ID %s not found in first pass IDs", id)
}
Expand All @@ -26,8 +30,9 @@ func TestDeterministicID(t *testing.T) {

func BenchmarkDeterministicID(b *testing.B) {
ts := time.Now().UnixMilli()
gen := NewDeterministicIDGenerator(ts)
for i := 0; i < b.N; i++ {
_ = NewDeterministicID(ts, int64(i))
_ = gen.NewID()
}
}

Expand Down
Loading