Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Parquet] Add traceid index to vparquet2 and vparquet3 and use it when finding trace by id #2697

Merged
merged 15 commits into from
Aug 24, 2023
Merged
165 changes: 100 additions & 65 deletions tempodb/encoding/vparquet2/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool
return filter.Test(id), nil
}

func (b *backendBlock) checkIndex(ctx context.Context, id common.ID) (rowGroup int, err error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.checkIndex",
opentracing.Tags{
"blockID": b.meta.BlockID,
"tenantID": b.meta.TenantID,
})
defer span.Finish()

indexBytes, err := b.r.Read(derivedCtx, common.NameIndex, b.meta.BlockID, b.meta.TenantID, true)
if err == backend.ErrDoesNotExist {
return -1, nil
}
if err != nil {
return -1, fmt.Errorf("error retrieving index (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

index, err := unmarshalIndex(indexBytes)
if err != nil {
return -1, fmt.Errorf("error parsing index (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

return index.Find(id), nil
}

func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opts common.SearchOptions) (_ *tempopb.Trace, err error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.FindTraceByID",
opentracing.Tags{
Expand All @@ -70,6 +94,11 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt
return nil, nil
}

rowGroup, err := b.checkIndex(ctx, traceID)
if err != nil {
return nil, err
}

pf, rr, err := b.openForSearch(derivedCtx, opts)
if err != nil {
return nil, fmt.Errorf("unexpected error opening parquet file: %w", err)
Expand All @@ -78,85 +107,91 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt
span.SetTag("inspectedBytes", rr.BytesRead())
}()

return findTraceByID(derivedCtx, traceID, b.meta, pf)
return findTraceByID(derivedCtx, traceID, b.meta, pf, rowGroup)
}

func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File) (*tempopb.Trace, error) {
func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int) (*tempopb.Trace, error) {
// traceID column index
colIndex, _ := pq.GetColumnIndexByPath(pf, TraceIDColumnName)
if colIndex == -1 {
return nil, fmt.Errorf("unable to get index for column: %s", TraceIDColumnName)
}

numRowGroups := len(pf.RowGroups())
buf := make(parquet.Row, 1)

// Cache of row group bounds
rowGroupMins := make([]common.ID, numRowGroups+1)
// todo: restore using meta min/max id once it works
// https://github.com/grafana/tempo/issues/1903
rowGroupMins[0] = bytes.Repeat([]byte{0}, 16)
rowGroupMins[numRowGroups] = bytes.Repeat([]byte{255}, 16) // This is actually inclusive and the logic is special for the last row group below
// If no index then fallback to binary searching the rowgroups.
if rowGroup == -1 {
var (
numRowGroups = len(pf.RowGroups())
buf = make(parquet.Row, 1)
err error
)

// Gets the minimum trace ID within the row group. Since the column is sorted
// ascending we just read the first value from the first page.
getRowGroupMin := func(rgIdx int) (common.ID, error) {
min := rowGroupMins[rgIdx]
if len(min) > 0 {
// Already loaded
// Cache of row group bounds
rowGroupMins := make([]common.ID, numRowGroups+1)
// todo: restore using meta min/max id once it works
// https://github.com/grafana/tempo/issues/1903
rowGroupMins[0] = bytes.Repeat([]byte{0}, 16)
rowGroupMins[numRowGroups] = bytes.Repeat([]byte{255}, 16) // This is actually inclusive and the logic is special for the last row group below

// Gets the minimum trace ID within the row group. Since the column is sorted
// ascending we just read the first value from the first page.
getRowGroupMin := func(rgIdx int) (common.ID, error) {
min := rowGroupMins[rgIdx]
if len(min) > 0 {
// Already loaded
return min, nil
}

pages := pf.RowGroups()[rgIdx].ColumnChunks()[colIndex].Pages()
defer pages.Close()

page, err := pages.ReadPage()
if err != nil {
return nil, err
}

c, err := page.Values().ReadValues(buf)
if err != nil && err != io.EOF {
return nil, err
}
if c < 1 {
return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx)
}

min = buf[0].ByteArray()
rowGroupMins[rgIdx] = min
return min, nil
}

pages := pf.RowGroups()[rgIdx].ColumnChunks()[colIndex].Pages()
defer pages.Close()

page, err := pages.ReadPage()
if err != nil {
return nil, err
}

c, err := page.Values().ReadValues(buf)
if err != nil && err != io.EOF {
return nil, err
}
if c < 1 {
return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx)
}

min = buf[0].ByteArray()
rowGroupMins[rgIdx] = min
return min, nil
}

rowGroup, err := binarySearch(numRowGroups, func(rgIdx int) (int, error) {
min, err := getRowGroupMin(rgIdx)
if err != nil {
return 0, err
}

if check := bytes.Compare(traceID, min); check <= 0 {
// Trace is before or in this group
return check, nil
}

max, err := getRowGroupMin(rgIdx + 1)
rowGroup, err = binarySearch(numRowGroups, func(rgIdx int) (int, error) {
min, err := getRowGroupMin(rgIdx)
if err != nil {
return 0, err
}

if check := bytes.Compare(traceID, min); check <= 0 {
// Trace is before or in this group
return check, nil
}

max, err := getRowGroupMin(rgIdx + 1)
if err != nil {
return 0, err
}

// This is actually the min of the next group, so check is exclusive not inclusive like min
// Except for the last group, it is inclusive
check := bytes.Compare(traceID, max)
if check > 0 || (check == 0 && rgIdx < (numRowGroups-1)) {
// Trace is after this group
return 1, nil
}

// Must be in this group
return 0, nil
})
if err != nil {
return 0, err
}

// This is actually the min of the next group, so check is exclusive not inclusive like min
// Except for the last group, it is inclusive
check := bytes.Compare(traceID, max)
if check > 0 || (check == 0 && rgIdx < (numRowGroups-1)) {
// Trace is after this group
return 1, nil
return nil, errors.Wrap(err, "error binary searching row groups")
}

// Must be in this group
return 0, nil
})
if err != nil {
return nil, errors.Wrap(err, "error binary searching row groups")
}

if rowGroup == -1 {
Expand Down
32 changes: 30 additions & 2 deletions tempodb/encoding/vparquet2/block_findtracebyid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,43 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) {
}
}

/*func genIndex(b *testing.B, block *backendBlock) *index {
pf, _, err := block.openForSearch(context.TODO(), common.DefaultSearchOptions())
require.NoError(b, err)

i := &index{}

for j := range pf.RowGroups() {
iter := parquetquery.NewSyncIterator(context.TODO(), pf.RowGroups()[j:j+1], 0, "", 1000, nil, "TraceID")
defer iter.Close()

for {
v, err := iter.Next()
require.NoError(b, err)
if v == nil {
break
}

i.Add(v.Entries[0].Value.ByteArray())
}
i.Flush()
}

return i
}*/
mdisibio marked this conversation as resolved.
Show resolved Hide resolved

func BenchmarkFindTraceByID(b *testing.B) {
ctx := context.TODO()
tenantID := "1"
blockID := uuid.MustParse("3685ee3d-cbbf-4f36-bf28-93447a19dea6")
// blockID := uuid.MustParse("1a2d50d7-f10e-41f0-850d-158b19ead23d")
blockID := uuid.MustParse("1f38c153-b798-4bba-b4f4-cc60633e5cab")

r, _, _, err := local.New(&local.Config{
Path: path.Join("/Users/marty/src/tmp/"),
})
require.NoError(b, err)

rr := backend.NewReader(r)
// ww := backend.NewWriter(w)

meta, err := rr.BlockMeta(ctx, blockID, tenantID)
require.NoError(b, err)
Expand All @@ -165,6 +190,9 @@ func BenchmarkFindTraceByID(b *testing.B) {

block := newBackendBlock(meta, rr)

// idx := genIndex(b, block)
// writeBlockMeta(ctx, ww, block.meta, &common.ShardedBloomFilter{}, idx)

b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand Down
18 changes: 17 additions & 1 deletion tempodb/encoding/vparquet2/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ func CopyBlock(ctx context.Context, fromMeta, toMeta *backend.BlockMeta, from ba
}
}

// Index (may not exist)
err = cpy(common.NameIndex)
if err != nil && err != backend.ErrDoesNotExist {
return err
}

// Meta
err = to.WriteBlockMeta(ctx, toMeta)
return err
}

func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, bloom *common.ShardedBloomFilter) error {
func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, bloom *common.ShardedBloomFilter, index *index) error {
// bloom
blooms, err := bloom.Marshal()
if err != nil {
Expand All @@ -65,6 +71,16 @@ func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMe
}
}

// Index
i, err := index.Marshal()
if err != nil {
return err
}
err = w.Write(ctx, common.NameIndex, meta.BlockID, meta.TenantID, i, true)
if err != nil {
return err
}

// meta
err = w.WriteBlockMeta(ctx, meta)
if err != nil {
Expand Down
8 changes: 7 additions & 1 deletion tempodb/encoding/vparquet2/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type streamingBlock struct {
w *backendWriter
r backend.Reader
to backend.Writer
index *index

currentBufferedTraces int
currentBufferedBytes int
Expand Down Expand Up @@ -126,6 +127,7 @@ func newStreamingBlock(ctx context.Context, cfg *common.BlockConfig, meta *backe
w: w,
r: r,
to: to,
index: &index{},
}
}

Expand All @@ -136,6 +138,7 @@ func (b *streamingBlock) Add(tr *Trace, start, end uint32) error {
}
id := tr.TraceID

b.index.Add(id)
b.bloom.Add(id)
b.meta.ObjectAdded(id, start, end)
b.currentBufferedTraces++
Expand All @@ -150,6 +153,7 @@ func (b *streamingBlock) AddRaw(id []byte, row parquet.Row, start, end uint32) e
return err
}

b.index.Add(id)
b.bloom.Add(id)
b.meta.ObjectAdded(id, start, end)
b.currentBufferedTraces++
Expand All @@ -168,6 +172,7 @@ func (b *streamingBlock) CurrentBufferedObjects() int {

func (b *streamingBlock) Flush() (int, error) {
// Flush row group
b.index.Flush()
err := b.pw.Flush()
if err != nil {
return 0, err
Expand All @@ -185,6 +190,7 @@ func (b *streamingBlock) Flush() (int, error) {

func (b *streamingBlock) Complete() (int, error) {
// Flush final row group
b.index.Flush()
b.meta.TotalRecords++
err := b.pw.Flush()
if err != nil {
Expand Down Expand Up @@ -228,7 +234,7 @@ func (b *streamingBlock) Complete() (int, error) {

b.meta.BloomShardCount = uint16(b.bloom.GetShardCount())

return n, writeBlockMeta(b.ctx, b.to, b.meta, b.bloom)
return n, writeBlockMeta(b.ctx, b.to, b.meta, b.bloom, b.index)
}

// estimateMarshalledSizeFromTrace attempts to estimate the size of trace in bytes. This is used to make choose
Expand Down
41 changes: 41 additions & 0 deletions tempodb/encoding/vparquet2/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package vparquet2

import (
"bytes"
"encoding/json"
"sort"

"github.com/grafana/tempo/tempodb/encoding/common"
)

type index struct {
lastID common.ID
RowGroups []common.ID `json:"rowGroups"`
}

func (i *index) Add(id common.ID) {
i.lastID = id
}

func (i *index) Flush() {
i.RowGroups = append(i.RowGroups, i.lastID)
}

func (i *index) Marshal() ([]byte, error) {
return json.Marshal(i)
}

func (i *index) Find(id common.ID) int {
n := sort.Search(len(i.RowGroups), func(j int) bool {
return bytes.Compare(id, i.RowGroups[j]) <= 0
})
if n >= len(i.RowGroups) {
return -1
}
return n
}

func unmarshalIndex(b []byte) (*index, error) {
i := &index{}
return i, json.Unmarshal(b, i)
}