Skip to content

Commit

Permalink
Add replication factor to block meta [1 of 3] (grafana#3628)
Browse files Browse the repository at this point in the history
* Add replication factor to block meta

* Refactor CreateWAL func

* chlog

* Fixes

* Add data encoding to WAL constructor

* Implement in vParquet4

* Different fixes

* Add test and fix cutting traces
  • Loading branch information
mapno authored May 20, 2024
1 parent f514853 commit 89b0080
Show file tree
Hide file tree
Showing 28 changed files with 250 additions and 122 deletions.
4 changes: 2 additions & 2 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) b
return true
}

func (l *liveTraces) CutIdle(idleSince time.Time) []*liveTrace {
func (l *liveTraces) CutIdle(idleSince time.Time, immediate bool) []*liveTrace {
res := []*liveTrace{}

for k, tr := range l.traces {
if tr.timestamp.Before(idleSince) {
if tr.timestamp.Before(idleSince) || immediate {
res = append(res, tr)
delete(l.traces, k)
}
Expand Down
14 changes: 8 additions & 6 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,11 +577,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error {
metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces)))

since := time.Now().Add(-p.Cfg.TraceIdlePeriod)
if immediate {
since = time.Time{}
}

tracesToCut := p.liveTraces.CutIdle(since)
tracesToCut := p.liveTraces.CutIdle(since, immediate)

p.liveTracesMtx.Unlock()

Expand Down Expand Up @@ -634,7 +630,13 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
}

func (p *Processor) resetHeadBlock() error {
block, err := p.wal.NewBlockWithDedicatedColumns(uuid.New(), p.tenant, model.CurrentEncoding, p.overrides.DedicatedColumns(p.tenant))
meta := &backend.BlockMeta{
BlockID: uuid.New(),
TenantID: p.tenant,
DedicatedColumns: p.overrides.DedicatedColumns(p.tenant),
ReplicationFactor: 1,
}
block, err := p.wal.NewBlock(meta, model.CurrentEncoding)
if err != nil {
return err
}
Expand Down
52 changes: 50 additions & 2 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/require"
)

type mockOverrides struct{}
Expand Down Expand Up @@ -153,3 +152,52 @@ func TestProcessorDoesNotRace(t *testing.T) {
wg.Wait()
p.Shutdown(ctx)
}

func TestReplicationFactor(t *testing.T) {
wal, err := wal.New(&wal.Config{
Filepath: t.TempDir(),
Version: encoding.DefaultEncoding().Version(),
})
require.NoError(t, err)

cfg := Config{
FlushCheckPeriod: time.Minute,
TraceIdlePeriod: time.Minute,
CompleteBlockTimeout: time.Minute,
Block: &common.BlockConfig{
BloomShardSizeBytes: 100_000,
BloomFP: 0.05,
Version: encoding.DefaultEncoding().Version(),
},
Metrics: MetricsConfig{
ConcurrentBlocks: 10,
TimeOverlapCutoff: 0.2,
},
FilterServerSpans: false,
}

p, err := New(cfg, "fake", wal, &mockOverrides{})
require.NoError(t, err)

tr := test.MakeTrace(10, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
p.PushSpans(context.TODO(), &tempopb.PushSpansRequest{
Batches: tr.Batches,
})

require.NoError(t, p.cutIdleTraces(true))
verifyReplicationFactor(t, p.headBlock)

require.NoError(t, p.cutBlocks(true))
for _, b := range p.walBlocks {
verifyReplicationFactor(t, b)
}

require.NoError(t, p.completeBlock())
for _, b := range p.completeBlocks {
verifyReplicationFactor(t, b)
}
}

func verifyReplicationFactor(t *testing.T, b common.BackendBlock) {
require.Equal(t, 1, int(b.BlockMeta().ReplicationFactor))
}
7 changes: 6 additions & 1 deletion modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,12 @@ func (i *instance) resetHeadBlock() error {

dedicatedColumns := i.getDedicatedColumns()

newHeadBlock, err := i.writer.WAL().NewBlockWithDedicatedColumns(uuid.New(), i.instanceID, model.CurrentEncoding, dedicatedColumns)
meta := &backend.BlockMeta{
BlockID: uuid.New(),
TenantID: i.instanceID,
DedicatedColumns: dedicatedColumns,
}
newHeadBlock, err := i.writer.WAL().NewBlock(meta, model.CurrentEncoding)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type BlockMeta struct {
FooterSize uint32 `json:"footerSize"`
// DedicatedColumns configuration for attributes (used by vParquet3)
DedicatedColumns DedicatedColumns `json:"dedicatedColumns,omitempty"`
// ReplicationFactor is the number of times the data written in this block has been replicated.
// It's left unset if replication factor is 3. Default is 0 (RF3).
ReplicationFactor uint32 `json:"replicationFactor,omitempty"`
}

// DedicatedColumn contains the configuration for a single attribute with the given name that should
Expand Down
8 changes: 4 additions & 4 deletions tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,25 @@ func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRan
// inside active window.
// Group by compaction level and window.
// Choose lowest compaction level and most recent windows first.
entry.group = fmt.Sprintf("A-%v-%016X", b.CompactionLevel, age)
entry.group = fmt.Sprintf("A-%v-%016X-%v", b.CompactionLevel, age, b.ReplicationFactor)

// Within group choose smallest blocks first.
// update after parquet: we want to make sure blocks of the same version end up together
// update afert vParquet3: we want to make sure blocks of the same dedicated columns end up together
entry.order = fmt.Sprintf("%016X-%v-%016X", entry.meta.TotalObjects, entry.meta.Version, entry.meta.DedicatedColumnsHash())

entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, b.CompactionLevel, w)
entry.hash = fmt.Sprintf("%v-%v-%v-%v", b.TenantID, b.CompactionLevel, w, b.ReplicationFactor)
} else {
// outside active window.
// Group by window only. Choose most recent windows first.
entry.group = fmt.Sprintf("B-%016X", age)
entry.group = fmt.Sprintf("B-%016X-%v", age, b.ReplicationFactor)

// Within group chose lowest compaction lvl and smallest blocks first.
// update after parquet: we want to make sure blocks of the same version end up together
// update afert vParquet3: we want to make sure blocks of the same dedicated columns end up together
entry.order = fmt.Sprintf("%v-%016X-%v-%016X", b.CompactionLevel, entry.meta.TotalObjects, entry.meta.Version, entry.meta.DedicatedColumnsHash())

entry.hash = fmt.Sprintf("%v-%v", b.TenantID, w)
entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, w, b.ReplicationFactor)
}

twbs.entries = append(twbs.entries, entry)
Expand Down
Loading

0 comments on commit 89b0080

Please sign in to comment.