diff --git a/modules/generator/processor/localblocks/livetraces.go b/modules/generator/processor/localblocks/livetraces.go index 68c580384ec..468025758db 100644 --- a/modules/generator/processor/localblocks/livetraces.go +++ b/modules/generator/processor/localblocks/livetraces.go @@ -59,11 +59,11 @@ func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) b return true } -func (l *liveTraces) CutIdle(idleSince time.Time) []*liveTrace { +func (l *liveTraces) CutIdle(idleSince time.Time, immediate bool) []*liveTrace { res := []*liveTrace{} for k, tr := range l.traces { - if tr.timestamp.Before(idleSince) { + if tr.timestamp.Before(idleSince) || immediate { res = append(res, tr) delete(l.traces, k) } diff --git a/modules/generator/processor/localblocks/processor.go b/modules/generator/processor/localblocks/processor.go index 0a872253e1c..97ec30c25e9 100644 --- a/modules/generator/processor/localblocks/processor.go +++ b/modules/generator/processor/localblocks/processor.go @@ -577,11 +577,7 @@ func (p *Processor) cutIdleTraces(immediate bool) error { metricLiveTraces.WithLabelValues(p.tenant).Set(float64(len(p.liveTraces.traces))) since := time.Now().Add(-p.Cfg.TraceIdlePeriod) - if immediate { - since = time.Time{} - } - - tracesToCut := p.liveTraces.CutIdle(since) + tracesToCut := p.liveTraces.CutIdle(since, immediate) p.liveTracesMtx.Unlock() @@ -634,7 +630,13 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error { } func (p *Processor) resetHeadBlock() error { - block, err := p.wal.NewBlockWithDedicatedColumns(uuid.New(), p.tenant, model.CurrentEncoding, p.overrides.DedicatedColumns(p.tenant)) + meta := &backend.BlockMeta{ + BlockID: uuid.New(), + TenantID: p.tenant, + DedicatedColumns: p.overrides.DedicatedColumns(p.tenant), + ReplicationFactor: 1, + } + block, err := p.wal.NewBlock(meta, model.CurrentEncoding) if err != nil { return err } diff --git a/modules/generator/processor/localblocks/processor_test.go b/modules/generator/processor/localblocks/processor_test.go index 26e4c41406f..b9a5553a4b9 100644 --- a/modules/generator/processor/localblocks/processor_test.go +++ b/modules/generator/processor/localblocks/processor_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" - "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util/test" @@ -15,6 +13,7 @@ import ( "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/wal" + "github.com/stretchr/testify/require" ) type mockOverrides struct{} @@ -153,3 +152,52 @@ func TestProcessorDoesNotRace(t *testing.T) { wg.Wait() p.Shutdown(ctx) } + +func TestReplicationFactor(t *testing.T) { + wal, err := wal.New(&wal.Config{ + Filepath: t.TempDir(), + Version: encoding.DefaultEncoding().Version(), + }) + require.NoError(t, err) + + cfg := Config{ + FlushCheckPeriod: time.Minute, + TraceIdlePeriod: time.Minute, + CompleteBlockTimeout: time.Minute, + Block: &common.BlockConfig{ + BloomShardSizeBytes: 100_000, + BloomFP: 0.05, + Version: encoding.DefaultEncoding().Version(), + }, + Metrics: MetricsConfig{ + ConcurrentBlocks: 10, + TimeOverlapCutoff: 0.2, + }, + FilterServerSpans: false, + } + + p, err := New(cfg, "fake", wal, &mockOverrides{}) + require.NoError(t, err) + + tr := test.MakeTrace(10, []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + p.PushSpans(context.TODO(), &tempopb.PushSpansRequest{ + Batches: tr.Batches, + }) + + require.NoError(t, p.cutIdleTraces(true)) + verifyReplicationFactor(t, p.headBlock) + + require.NoError(t, p.cutBlocks(true)) + for _, b := range p.walBlocks { + verifyReplicationFactor(t, b) + } + + require.NoError(t, p.completeBlock()) + for _, b := range p.completeBlocks { + verifyReplicationFactor(t, b) + } +} + +func verifyReplicationFactor(t *testing.T, b common.BackendBlock) { + require.Equal(t, 1, int(b.BlockMeta().ReplicationFactor)) +} diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 5db973aa11a..1b1d2309c3c 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -521,7 +521,12 @@ func (i *instance) resetHeadBlock() error { dedicatedColumns := i.getDedicatedColumns() - newHeadBlock, err := i.writer.WAL().NewBlockWithDedicatedColumns(uuid.New(), i.instanceID, model.CurrentEncoding, dedicatedColumns) + meta := &backend.BlockMeta{ + BlockID: uuid.New(), + TenantID: i.instanceID, + DedicatedColumns: dedicatedColumns, + } + newHeadBlock, err := i.writer.WAL().NewBlock(meta, model.CurrentEncoding) if err != nil { return err } diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index ced9e9d7b5d..ac68c113956 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -126,6 +126,9 @@ type BlockMeta struct { FooterSize uint32 `json:"footerSize"` // DedicatedColumns configuration for attributes (used by vParquet3) DedicatedColumns DedicatedColumns `json:"dedicatedColumns,omitempty"` + // ReplicationFactor is the number of times the data written in this block has been replicated. + // It's left unset if replication factor is 3. Default is 0 (RF3). + ReplicationFactor uint32 `json:"replicationFactor,omitempty"` } // DedicatedColumn contains the configuration for a single attribute with the given name that should diff --git a/tempodb/compaction_block_selector.go b/tempodb/compaction_block_selector.go index ed0f147165a..febf6b27d76 100644 --- a/tempodb/compaction_block_selector.go +++ b/tempodb/compaction_block_selector.go @@ -78,25 +78,25 @@ func newTimeWindowBlockSelector(blocklist []*backend.BlockMeta, maxCompactionRan // inside active window. // Group by compaction level and window. // Choose lowest compaction level and most recent windows first. - entry.group = fmt.Sprintf("A-%v-%016X", b.CompactionLevel, age) + entry.group = fmt.Sprintf("A-%v-%016X-%v", b.CompactionLevel, age, b.ReplicationFactor) // Within group choose smallest blocks first. // update after parquet: we want to make sure blocks of the same version end up together // update afert vParquet3: we want to make sure blocks of the same dedicated columns end up together entry.order = fmt.Sprintf("%016X-%v-%016X", entry.meta.TotalObjects, entry.meta.Version, entry.meta.DedicatedColumnsHash()) - entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, b.CompactionLevel, w) + entry.hash = fmt.Sprintf("%v-%v-%v-%v", b.TenantID, b.CompactionLevel, w, b.ReplicationFactor) } else { // outside active window. // Group by window only. Choose most recent windows first. - entry.group = fmt.Sprintf("B-%016X", age) + entry.group = fmt.Sprintf("B-%016X-%v", age, b.ReplicationFactor) // Within group chose lowest compaction lvl and smallest blocks first. // update after parquet: we want to make sure blocks of the same version end up together // update afert vParquet3: we want to make sure blocks of the same dedicated columns end up together entry.order = fmt.Sprintf("%v-%016X-%v-%016X", b.CompactionLevel, entry.meta.TotalObjects, entry.meta.Version, entry.meta.DedicatedColumnsHash()) - entry.hash = fmt.Sprintf("%v-%v", b.TenantID, w) + entry.hash = fmt.Sprintf("%v-%v-%v", b.TenantID, w, b.ReplicationFactor) } twbs.entries = append(twbs.entries, entry) diff --git a/tempodb/compaction_block_selector_test.go b/tempodb/compaction_block_selector_test.go index 6b15dc81682..ffba3f5b7de 100644 --- a/tempodb/compaction_block_selector_test.go +++ b/tempodb/compaction_block_selector_test.go @@ -58,7 +58,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), }, { name: "choose smallest two", @@ -92,7 +92,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), }, { name: "different windows", @@ -124,7 +124,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), @@ -135,7 +135,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now.Add(-timeWindow), }, }, - expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Add(-timeWindow).Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Add(-timeWindow).Unix(), 0), }, { name: "different sizes", @@ -174,7 +174,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { TotalObjects: 3, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), @@ -187,7 +187,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { TotalObjects: 15, }, }, - expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), }, { name: "different compaction lvls", @@ -221,7 +221,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), @@ -234,7 +234,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { CompactionLevel: 1, }, }, - expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 1, now.Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 1, now.Unix(), 0), }, { name: "active time window vs not", @@ -273,7 +273,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), @@ -286,7 +286,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { CompactionLevel: 1, }, }, - expectedHash2: fmt.Sprintf("%v-%v", tenantID, now.Add(-activeWindowDuration-time.Minute).Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, now.Add(-activeWindowDuration-time.Minute).Unix(), 0), }, { name: "choose lowest compaction level", @@ -328,7 +328,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), @@ -339,7 +339,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now.Add(-timeWindow), }, }, - expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Add(-timeWindow).Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Add(-timeWindow).Unix(), 0), }, { name: "doesn't choose across time windows", @@ -428,7 +428,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: nil, expectedHash2: "", }, @@ -464,7 +464,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: nil, expectedHash2: "", }, @@ -516,7 +516,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { TotalObjects: 3, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), @@ -529,7 +529,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { TotalObjects: 5, }, }, - expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), }, { name: "honors minimum block count", @@ -593,7 +593,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { CompactionLevel: 1, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 1, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 1, now.Unix(), 0), expectedSecond: nil, expectedHash2: "", }, @@ -683,7 +683,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { Version: "v2", }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), @@ -696,7 +696,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { Version: "vParquet3", }, }, - expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), }, { name: "blocks with different dedicated columns are not selected together", @@ -746,7 +746,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { }, }, }, - expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), expectedSecond: []*backend.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), @@ -763,7 +763,58 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { }, }, }, - expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 0), + }, + { + name: "blocks are grouped by replication factor", + blocklist: []*backend.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + EndTime: now, + ReplicationFactor: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now, + ReplicationFactor: 3, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + EndTime: now, + ReplicationFactor: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), + EndTime: now, + ReplicationFactor: 3, + }, + }, + expected: []*backend.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + EndTime: now, + ReplicationFactor: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + EndTime: now, + ReplicationFactor: 1, + }, + }, + expectedHash: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 1), + expectedSecond: []*backend.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now, + ReplicationFactor: 3, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), + EndTime: now, + ReplicationFactor: 3, + }, + }, + expectedHash2: fmt.Sprintf("%v-%v-%v-%v", tenantID, 0, now.Unix(), 3), }, } diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 1206118acfe..1dd8034e8be 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -126,7 +126,8 @@ func testCompactionRoundtrip(t *testing.T, targetBlockVersion string) { for i := 0; i < blockCount; i++ { blockID := uuid.New() - head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID, DataEncoding: model.CurrentEncoding} + head, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) for j := 0; j < recordCount; j++ { @@ -297,7 +298,8 @@ func testSameIDCompaction(t *testing.T, targetBlockVersion string) { // and write them to different blocks for i := 0; i < blockCount; i++ { blockID := uuid.New() - head, err := wal.NewBlock(blockID, testTenantID, v1.Encoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID, DataEncoding: v1.Encoding} + head, err := wal.NewBlock(meta, v1.Encoding) require.NoError(t, err) for j := 0; j < recordCount; j++ { @@ -659,7 +661,8 @@ func cutTestBlockWithTraces(t testing.TB, w Writer, tenantID string, data []test wal := w.WAL() - head, err := wal.NewBlock(uuid.New(), tenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: uuid.New(), TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) for _, d := range data { @@ -678,7 +681,8 @@ func cutTestBlocks(t testing.TB, w Writer, tenantID string, blockCount int, reco wal := w.WAL() for i := 0; i < blockCount; i++ { - head, err := wal.NewBlock(uuid.New(), tenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: uuid.New(), TenantID: tenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) for j := 0; j < recordCount; j++ { diff --git a/tempodb/encoding/v2/encoding.go b/tempodb/encoding/v2/encoding.go index 9815771ac59..3dda0316575 100644 --- a/tempodb/encoding/v2/encoding.go +++ b/tempodb/encoding/v2/encoding.go @@ -5,8 +5,6 @@ import ( "io/fs" "time" - "github.com/google/uuid" - "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" @@ -47,12 +45,12 @@ func (v Encoding) OpenWALBlock(filename string, path string, ingestionSlack time } // CreateWALBlock creates a new appendable block -func (v Encoding) CreateWALBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration, _ backend.DedicatedColumns) (common.WALBlock, error) { +func (v Encoding) CreateWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { // Default data encoding if needed if dataEncoding == "" { dataEncoding = model.CurrentEncoding } - return createWALBlock(id, tenantID, filepath, e, dataEncoding, ingestionSlack) + return createWALBlock(meta, filepath, dataEncoding, ingestionSlack) } func (v Encoding) OwnsWALBlock(entry fs.DirEntry) bool { diff --git a/tempodb/encoding/v2/wal_block.go b/tempodb/encoding/v2/wal_block.go index 967c88a8885..d509a0b454f 100644 --- a/tempodb/encoding/v2/wal_block.go +++ b/tempodb/encoding/v2/wal_block.go @@ -43,7 +43,7 @@ type walBlock struct { once sync.Once } -func createWALBlock(id uuid.UUID, tenantID, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { +func createWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { if strings.ContainsRune(dataEncoding, ':') || strings.ContainsRune(dataEncoding, '+') || len([]rune(dataEncoding)) > maxDataEncodingLength { return nil, fmt.Errorf("dataEncoding %s is invalid", dataEncoding) @@ -55,7 +55,7 @@ func createWALBlock(id uuid.UUID, tenantID, filepath string, e backend.Encoding, } h := &walBlock{ - meta: backend.NewBlockMeta(tenantID, id, VersionString, e, dataEncoding), + meta: backend.NewBlockMeta(meta.TenantID, meta.BlockID, meta.Version, meta.Encoding, dataEncoding), filepath: filepath, ingestionSlack: ingestionSlack, encoder: enc, @@ -69,7 +69,7 @@ func createWALBlock(id uuid.UUID, tenantID, filepath string, e backend.Encoding, } h.appendFile = f - dataWriter, err := NewDataWriter(f, e) + dataWriter, err := NewDataWriter(f, meta.Encoding) if err != nil { return nil, err } diff --git a/tempodb/encoding/v2/wal_block_test.go b/tempodb/encoding/v2/wal_block_test.go index c0d8cfd761c..0fa5d40958f 100644 --- a/tempodb/encoding/v2/wal_block_test.go +++ b/tempodb/encoding/v2/wal_block_test.go @@ -170,7 +170,8 @@ func TestAdjustTimeRangeForSlack(t *testing.T) { func TestPartialBlock(t *testing.T) { blockID := uuid.New() - block, err := createWALBlock(blockID, testTenantID, t.TempDir(), backend.EncSnappy, "v2", 0) + meta := backend.NewBlockMeta(testTenantID, blockID, "v2", backend.EncSnappy, model.CurrentEncoding) + block, err := createWALBlock(meta, t.TempDir(), model.CurrentEncoding, 0) require.NoError(t, err, "unexpected error creating block") enc := model.MustNewSegmentDecoder(model.CurrentEncoding) diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index 6af6219d3ba..833db7a47c5 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -6,8 +6,6 @@ import ( "io/fs" "time" - "github.com/google/uuid" - "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" v2 "github.com/grafana/tempo/tempodb/encoding/v2" @@ -49,7 +47,14 @@ type VersionedEncoding interface { OpenWALBlock(filename, path string, ingestionSlack, additionalStartSlack time.Duration) (common.WALBlock, error, error) // CreateWALBlock creates a new appendable block for the WAL - CreateWALBlock(id uuid.UUID, tenantID, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration, dedicatedColumns backend.DedicatedColumns) (common.WALBlock, error) + // BlockMeta is used as a container for many options. Required fields: + // * BlockID + // * TenantID + // * Encoding + // * DataEncoding (of the file - v2) + // * DedicatedColumns (vParquet3) + // * ReplicationFactor (Optional) + CreateWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) // OwnsWALBlock indicates if this encoding owns the WAL block OwnsWALBlock(entry fs.DirEntry) bool diff --git a/tempodb/encoding/vparquet2/encoding.go b/tempodb/encoding/vparquet2/encoding.go index 702b4e763eb..45a0def1354 100644 --- a/tempodb/encoding/vparquet2/encoding.go +++ b/tempodb/encoding/vparquet2/encoding.go @@ -5,8 +5,6 @@ import ( "io/fs" "time" - "github.com/google/uuid" - "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -45,8 +43,8 @@ func (v Encoding) OpenWALBlock(filename string, path string, ingestionSlack time } // CreateWALBlock creates a new appendable block -func (v Encoding) CreateWALBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration, _ backend.DedicatedColumns) (common.WALBlock, error) { - return createWALBlock(id, tenantID, filepath, e, dataEncoding, ingestionSlack) +func (v Encoding) CreateWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { + return createWALBlock(meta, filepath, dataEncoding, ingestionSlack) } func (v Encoding) OwnsWALBlock(entry fs.DirEntry) bool { diff --git a/tempodb/encoding/vparquet2/wal_block.go b/tempodb/encoding/vparquet2/wal_block.go index 23b1cb5a6ea..46b732498d2 100644 --- a/tempodb/encoding/vparquet2/wal_block.go +++ b/tempodb/encoding/vparquet2/wal_block.go @@ -152,12 +152,13 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo } // createWALBlock creates a new appendable block -func createWALBlock(id uuid.UUID, tenantID, filepath string, _ backend.Encoding, dataEncoding string, ingestionSlack time.Duration) (*walBlock, error) { +func createWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (*walBlock, error) { b := &walBlock{ meta: &backend.BlockMeta{ - Version: VersionString, - BlockID: id, - TenantID: tenantID, + Version: VersionString, + BlockID: meta.BlockID, + TenantID: meta.TenantID, + ReplicationFactor: meta.ReplicationFactor, }, path: filepath, ids: common.NewIDMap[int64](), diff --git a/tempodb/encoding/vparquet2/wal_block_test.go b/tempodb/encoding/vparquet2/wal_block_test.go index 09938e2afb5..a730c0baa52 100644 --- a/tempodb/encoding/vparquet2/wal_block_test.go +++ b/tempodb/encoding/vparquet2/wal_block_test.go @@ -64,7 +64,8 @@ func TestPartialReplay(t *testing.T) { blockID := uuid.New() basePath := t.TempDir() - w, err := createWALBlock(blockID, "fake", basePath, backend.EncNone, model.CurrentEncoding, 0) + meta := backend.NewBlockMeta("fake", blockID, VersionString, backend.EncNone, "") + w, err := createWALBlock(meta, basePath, model.CurrentEncoding, 0) require.NoError(t, err) // Flush a set of traces across 2 pages @@ -291,7 +292,8 @@ func TestRowIterator(t *testing.T) { } func testWalBlock(t *testing.T, f func(w *walBlock, ids []common.ID, trs []*tempopb.Trace)) { - w, err := createWALBlock(uuid.New(), "fake", t.TempDir(), backend.EncNone, model.CurrentEncoding, 0) + meta := backend.NewBlockMeta("fake", uuid.New(), VersionString, backend.EncNone, "") + w, err := createWALBlock(meta, t.TempDir(), model.CurrentEncoding, 0) require.NoError(t, err) decoder := model.MustNewSegmentDecoder(model.CurrentEncoding) diff --git a/tempodb/encoding/vparquet3/create.go b/tempodb/encoding/vparquet3/create.go index 692e3b8aeb1..5eb82a7035f 100644 --- a/tempodb/encoding/vparquet3/create.go +++ b/tempodb/encoding/vparquet3/create.go @@ -110,6 +110,7 @@ func newStreamingBlock(ctx context.Context, cfg *common.BlockConfig, meta *backe newMeta := backend.NewBlockMetaWithDedicatedColumns(meta.TenantID, meta.BlockID, VersionString, backend.EncNone, "", meta.DedicatedColumns) newMeta.StartTime = meta.StartTime newMeta.EndTime = meta.EndTime + newMeta.ReplicationFactor = meta.ReplicationFactor // TotalObjects is used here an an estimated count for the bloom filter. // The real number of objects is tracked below. diff --git a/tempodb/encoding/vparquet3/encoding.go b/tempodb/encoding/vparquet3/encoding.go index f5cceca35d0..9ad204a3dd5 100644 --- a/tempodb/encoding/vparquet3/encoding.go +++ b/tempodb/encoding/vparquet3/encoding.go @@ -5,8 +5,6 @@ import ( "io/fs" "time" - "github.com/google/uuid" - "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -45,8 +43,8 @@ func (v Encoding) OpenWALBlock(filename, path string, ingestionSlack, additional } // CreateWALBlock creates a new appendable block -func (v Encoding) CreateWALBlock(id uuid.UUID, tenantID, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration, dedicatedColumns backend.DedicatedColumns) (common.WALBlock, error) { - return createWALBlock(id, tenantID, filepath, e, dataEncoding, ingestionSlack, dedicatedColumns) +func (v Encoding) CreateWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { + return createWALBlock(meta, filepath, dataEncoding, ingestionSlack) } func (v Encoding) OwnsWALBlock(entry fs.DirEntry) bool { diff --git a/tempodb/encoding/vparquet3/wal_block.go b/tempodb/encoding/vparquet3/wal_block.go index 4831dfd1bba..239610680f5 100644 --- a/tempodb/encoding/vparquet3/wal_block.go +++ b/tempodb/encoding/vparquet3/wal_block.go @@ -149,13 +149,14 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo } // createWALBlock creates a new appendable block -func createWALBlock(id uuid.UUID, tenantID, filepath string, _ backend.Encoding, dataEncoding string, ingestionSlack time.Duration, dedicatedColumns backend.DedicatedColumns) (*walBlock, error) { +func createWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (*walBlock, error) { b := &walBlock{ meta: &backend.BlockMeta{ - Version: VersionString, - BlockID: id, - TenantID: tenantID, - DedicatedColumns: dedicatedColumns, + Version: VersionString, + BlockID: meta.BlockID, + TenantID: meta.TenantID, + DedicatedColumns: meta.DedicatedColumns, + ReplicationFactor: meta.ReplicationFactor, }, path: filepath, ids: common.NewIDMap[int64](), diff --git a/tempodb/encoding/vparquet3/wal_block_test.go b/tempodb/encoding/vparquet3/wal_block_test.go index c07ad378cc2..67462a4a542 100644 --- a/tempodb/encoding/vparquet3/wal_block_test.go +++ b/tempodb/encoding/vparquet3/wal_block_test.go @@ -64,7 +64,8 @@ func TestPartialReplay(t *testing.T) { blockID := uuid.New() basePath := t.TempDir() - w, err := createWALBlock(blockID, "fake", basePath, backend.EncNone, model.CurrentEncoding, 0, nil) + meta := backend.NewBlockMeta("fake", blockID, VersionString, backend.EncNone, "") + w, err := createWALBlock(meta, basePath, model.CurrentEncoding, 0) require.NoError(t, err) // Flush a set of traces across 2 pages @@ -291,7 +292,8 @@ func TestRowIterator(t *testing.T) { } func testWalBlock(t *testing.T, f func(w *walBlock, ids []common.ID, trs []*tempopb.Trace)) { - w, err := createWALBlock(uuid.New(), "fake", t.TempDir(), backend.EncNone, model.CurrentEncoding, 0, nil) + meta := backend.NewBlockMeta("fake", uuid.New(), VersionString, backend.EncNone, "") + w, err := createWALBlock(meta, t.TempDir(), model.CurrentEncoding, 0) require.NoError(t, err) decoder := model.MustNewSegmentDecoder(model.CurrentEncoding) diff --git a/tempodb/encoding/vparquet4/create.go b/tempodb/encoding/vparquet4/create.go index b7c2b560e60..f0e9472c305 100644 --- a/tempodb/encoding/vparquet4/create.go +++ b/tempodb/encoding/vparquet4/create.go @@ -110,6 +110,7 @@ func newStreamingBlock(ctx context.Context, cfg *common.BlockConfig, meta *backe newMeta := backend.NewBlockMetaWithDedicatedColumns(meta.TenantID, meta.BlockID, VersionString, backend.EncNone, "", meta.DedicatedColumns) newMeta.StartTime = meta.StartTime newMeta.EndTime = meta.EndTime + newMeta.ReplicationFactor = meta.ReplicationFactor // TotalObjects is used here an an estimated count for the bloom filter. // The real number of objects is tracked below. diff --git a/tempodb/encoding/vparquet4/encoding.go b/tempodb/encoding/vparquet4/encoding.go index 1c76c7bac65..027b0567b78 100644 --- a/tempodb/encoding/vparquet4/encoding.go +++ b/tempodb/encoding/vparquet4/encoding.go @@ -5,8 +5,6 @@ import ( "io/fs" "time" - "github.com/google/uuid" - "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -45,8 +43,8 @@ func (v Encoding) OpenWALBlock(filename, path string, ingestionSlack, additional } // CreateWALBlock creates a new appendable block -func (v Encoding) CreateWALBlock(id uuid.UUID, tenantID, filepath string, e backend.Encoding, dataEncoding string, ingestionSlack time.Duration, dedicatedColumns backend.DedicatedColumns) (common.WALBlock, error) { - return createWALBlock(id, tenantID, filepath, e, dataEncoding, ingestionSlack, dedicatedColumns) +func (v Encoding) CreateWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (common.WALBlock, error) { + return createWALBlock(meta, filepath, dataEncoding, ingestionSlack) } func (v Encoding) OwnsWALBlock(entry fs.DirEntry) bool { diff --git a/tempodb/encoding/vparquet4/wal_block.go b/tempodb/encoding/vparquet4/wal_block.go index 81028c078f1..321bacd2dde 100644 --- a/tempodb/encoding/vparquet4/wal_block.go +++ b/tempodb/encoding/vparquet4/wal_block.go @@ -149,13 +149,14 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo } // createWALBlock creates a new appendable block -func createWALBlock(id uuid.UUID, tenantID, filepath string, _ backend.Encoding, dataEncoding string, ingestionSlack time.Duration, dedicatedColumns backend.DedicatedColumns) (*walBlock, error) { +func createWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, ingestionSlack time.Duration) (*walBlock, error) { b := &walBlock{ meta: &backend.BlockMeta{ - Version: VersionString, - BlockID: id, - TenantID: tenantID, - DedicatedColumns: dedicatedColumns, + Version: VersionString, + BlockID: meta.BlockID, + TenantID: meta.TenantID, + DedicatedColumns: meta.DedicatedColumns, + ReplicationFactor: meta.ReplicationFactor, }, path: filepath, ids: common.NewIDMap[int64](), diff --git a/tempodb/encoding/vparquet4/wal_block_test.go b/tempodb/encoding/vparquet4/wal_block_test.go index 14650ba6570..387aec40d28 100644 --- a/tempodb/encoding/vparquet4/wal_block_test.go +++ b/tempodb/encoding/vparquet4/wal_block_test.go @@ -64,7 +64,8 @@ func TestPartialReplay(t *testing.T) { blockID := uuid.New() basePath := t.TempDir() - w, err := createWALBlock(blockID, "fake", basePath, backend.EncNone, model.CurrentEncoding, 0, nil) + meta := backend.NewBlockMeta("fake", blockID, VersionString, backend.EncNone, "") + w, err := createWALBlock(meta, basePath, model.CurrentEncoding, 0) require.NoError(t, err) // Flush a set of traces across 2 pages @@ -291,7 +292,8 @@ func TestRowIterator(t *testing.T) { } func testWalBlock(t *testing.T, f func(w *walBlock, ids []common.ID, trs []*tempopb.Trace)) { - w, err := createWALBlock(uuid.New(), "fake", t.TempDir(), backend.EncNone, model.CurrentEncoding, 0, nil) + meta := backend.NewBlockMeta("fake", uuid.New(), VersionString, backend.EncNone, "") + w, err := createWALBlock(meta, t.TempDir(), model.CurrentEncoding, 0) require.NoError(t, err) decoder := model.MustNewSegmentDecoder(model.CurrentEncoding) diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 59783b85e43..adbe4f79d1c 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -58,7 +58,8 @@ func TestRetention(t *testing.T) { wal := w.WAL() assert.NoError(t, err) - head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) assert.NoError(t, err) complete, err := w.CompleteBlock(context.Background(), head) @@ -121,7 +122,8 @@ func TestRetentionUpdatesBlocklistImmediately(t *testing.T) { blockID := uuid.New() - head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) assert.NoError(t, err) complete, err := w.CompleteBlock(ctx, head) diff --git a/tempodb/tempodb_search_test.go b/tempodb/tempodb_search_test.go index ecaa9eed75f..4d6bc7273ce 100644 --- a/tempodb/tempodb_search_test.go +++ b/tempodb/tempodb_search_test.go @@ -1531,7 +1531,9 @@ func runCompleteBlockSearchTest(t *testing.T, blockVersion string, runners ...ru // Write to wal wal := w.WAL() - head, err := wal.NewBlockWithDedicatedColumns(uuid.New(), testTenantID, model.CurrentEncoding, dc) + + meta := &backend.BlockMeta{BlockID: uuid.New(), TenantID: testTenantID, DedicatedColumns: dc} + head, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) dec := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -1559,10 +1561,10 @@ func runCompleteBlockSearchTest(t *testing.T, blockVersion string, runners ...ru // Complete block block, err := w.CompleteBlock(context.Background(), head) require.NoError(t, err) - meta := block.BlockMeta() + blockMeta := block.BlockMeta() for _, r := range runners { - r(t, wantTr, wantMeta, searchesThatMatch, searchesThatDontMatch, meta, rw, block) + r(t, wantTr, wantMeta, searchesThatMatch, searchesThatDontMatch, blockMeta, rw, block) } // todo: do some compaction and then call runner again @@ -1956,7 +1958,8 @@ func TestWALBlockGetMetrics(t *testing.T) { r.EnablePolling(ctx, &mockJobSharder{}) wal := w.WAL() - head, err := wal.NewBlock(uuid.New(), testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: uuid.New(), TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) // Write to wal @@ -2014,7 +2017,8 @@ func TestSearchForTagsAndTagValues(t *testing.T) { wal := w.WAL() - head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) dec := model.MustNewSegmentDecoder(model.CurrentEncoding) diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 0a1d61acf79..44f4b1b3921 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/log" "github.com/golang/protobuf/proto" //nolint:all "github.com/google/uuid" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -90,7 +91,8 @@ func TestDB(t *testing.T) { wal := w.WAL() - head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) assert.NoError(t, err) dec := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -142,7 +144,8 @@ func TestBlockSharding(t *testing.T) { wal := w.WAL() dec := model.MustNewSegmentDecoder(model.CurrentEncoding) - head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID, DataEncoding: model.CurrentEncoding} + head, err := wal.NewBlock(meta, model.CurrentEncoding) assert.NoError(t, err) // add a trace to the block @@ -205,7 +208,8 @@ func TestBlockCleanup(t *testing.T) { wal := w.WAL() - head, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: blockID, TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) assert.NoError(t, err) _, err = w.CompleteBlock(context.Background(), head) @@ -533,7 +537,8 @@ func TestSearchCompactedBlocks(t *testing.T) { wal := w.WAL() - head, err := wal.NewBlock(uuid.New(), testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: uuid.New(), TenantID: testTenantID} + head, err := wal.NewBlock(meta, model.CurrentEncoding) assert.NoError(t, err) dec := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -615,7 +620,12 @@ func testCompleteBlock(t *testing.T, from, to string) { blockID := uuid.New() - block, err := wal.NewBlock(blockID, testTenantID, model.CurrentEncoding) + var dataEncoding string + if from == v2.VersionString { + dataEncoding = model.CurrentEncoding + } + meta := backend.NewBlockMeta(testTenantID, blockID, from, backend.EncNone, dataEncoding) + block, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err, "unexpected error creating block") require.Equal(t, block.BlockMeta().Version, from) @@ -688,7 +698,8 @@ func testCompleteBlockHonorsStartStopTimes(t *testing.T, targetBlockVersion stri oneHourAgo := now.Add(-1 * time.Hour).Unix() oneHour := now.Add(time.Hour).Unix() - block, err := wal.NewBlock(uuid.New(), testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: uuid.New(), TenantID: testTenantID} + block, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err, "unexpected error creating block") // Write a trace from 1 hour ago. @@ -756,7 +767,8 @@ func benchmarkCompleteBlock(b *testing.B, e encoding.VersionedEncoding) { dec := model.MustNewSegmentDecoder(model.CurrentEncoding) wal := w.WAL() - blk, err := wal.NewBlock(uuid.New(), testTenantID, model.CurrentEncoding) + meta := &backend.BlockMeta{BlockID: uuid.New(), TenantID: testTenantID} + blk, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(b, err) for i := 0; i < traceCount; i++ { diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 9421dbafcb6..bba29a82f1d 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -8,7 +8,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" @@ -153,28 +152,12 @@ func (w *WAL) RescanBlocks(additionalStartSlack time.Duration, log log.Logger) ( return blocks, nil } -func (w *WAL) NewBlock(id uuid.UUID, tenantID, dataEncoding string) (common.WALBlock, error) { - return w.NewBlockWithDedicatedColumns(id, tenantID, dataEncoding, nil) -} - -// TODO(mapno): NewBlock and NewBlockWithDedicatedColumns should be consolidated into a single method -// They're currently separate because the dedicated columns feature is vParquet3-only, -// and we prefer to avoid leaking vParquet3-specific code where possible. -// There are a couple of ways to do this: -// 1. Add a dedicatedColumns parameter to NewBlock, and have it default to nil. -// 2. Have encoding-specific config be part of the encoding itself -// 3. Pass the meta file path to the WAL constructor - -func (w *WAL) NewBlockWithDedicatedColumns(id uuid.UUID, tenantID, dataEncoding string, dedicatedColumns backend.DedicatedColumns) (common.WALBlock, error) { - return w.newBlock(id, tenantID, dataEncoding, w.c.Version, dedicatedColumns) -} - -func (w *WAL) newBlock(id uuid.UUID, tenantID string, dataEncoding string, blockVersion string, dedicatedColumns backend.DedicatedColumns) (common.WALBlock, error) { - v, err := encoding.FromVersion(blockVersion) +func (w *WAL) NewBlock(meta *backend.BlockMeta, dataEncoding string) (common.WALBlock, error) { + v, err := encoding.FromVersion(w.c.Version) if err != nil { return nil, err } - return v.CreateWALBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding, w.c.IngestionSlack, dedicatedColumns) + return v.CreateWALBlock(meta, w.c.Filepath, dataEncoding, w.c.IngestionSlack) } func (w *WAL) GetFilepath() string { diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index c0dfad6f7f3..bb39d3ccb89 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -71,7 +71,8 @@ func testAppendBlockStartEnd(t *testing.T, e encoding.VersionedEncoding) { require.NoError(t, err, "unexpected error creating temp wal") blockID := uuid.New() - block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, e.Version(), nil) + meta := backend.NewBlockMeta("fake", blockID, e.Version(), backend.EncNone, "") + block, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err, "unexpected error creating block") enc := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -128,7 +129,8 @@ func testIngestionSlack(t *testing.T, e encoding.VersionedEncoding) { require.NoError(t, err, "unexpected error creating temp wal") blockID := uuid.New() - block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, e.Version(), nil) + meta := backend.NewBlockMeta("fake", blockID, e.Version(), backend.EncNone, "") + block, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err, "unexpected error creating block") enc := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -340,7 +342,8 @@ func TestInvalidFilesAndFoldersAreHandled(t *testing.T) { // create all valid blocks for _, e := range encoding.AllEncodings() { - block, err := wal.newBlock(uuid.New(), testTenantID, model.CurrentEncoding, e.Version(), nil) + meta := backend.NewBlockMeta("fake", uuid.New(), e.Version(), backend.EncNone, "") + block, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) id := make([]byte, 16) @@ -394,7 +397,8 @@ func runWALTestWithAppendMode(t testing.TB, encoding string, appendTrace bool, r blockID := uuid.New() - block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, encoding, nil) + meta := backend.NewBlockMeta("fake", blockID, encoding, backend.EncNone, "") + block, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err, "unexpected error creating block") enc := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -541,7 +545,8 @@ func runWALBenchmarkWithAppendMode(b *testing.B, encoding string, flushCount int blockID := uuid.New() - block, err := wal.newBlock(blockID, testTenantID, model.CurrentEncoding, encoding, nil) + meta := backend.NewBlockMeta("fake", blockID, encoding, backend.EncNone, "") + block, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(b, err, "unexpected error creating block") dec := model.MustNewSegmentDecoder(model.CurrentEncoding)