Skip to content

Commit

Permalink
logging the name of rootSpanName for discarded traces
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinGinszt committed Aug 21, 2023
1 parent c2a7c17 commit b4e11d8
Show file tree
Hide file tree
Showing 15 changed files with 65 additions and 26 deletions.
4 changes: 2 additions & 2 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte
}

// RecordDiscardedSpans implements tempodb.CompactorSharder
func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string) {
level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "discarded_span_count", count)
func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string) {
level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "rootSpanName", rootSpanName, "discarded_span_count", count)
overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID)
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/util/test/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ func MakeBatch(spans int, traceID []byte) *v1_trace.ResourceSpans {
}

ss.Spans = append(ss.Spans, MakeSpan(traceID))

// first span should not have parent
if len(ss.Spans) == 1 {
(*ss.Spans[0]).ParentSpanId = nil //[]byte{}
}
}
return batch
}
Expand Down
4 changes: 2 additions & 2 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block
ObjectsWritten: func(compactionLevel, objs int) {
metricCompactionObjectsWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(objs))
},
SpansDiscarded: func(traceId string, spans int) {
rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId)
SpansDiscarded: func(traceId, rootSpanName string, spans int) {
rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId, rootSpanName)
},
DisconnectedTrace: func() {
dataquality.WarnDisconnectedTrace(tenantID, dataquality.PhaseTraceCompactorCombine)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *mockSharder) Combine(dataEncoding string, _ string, objs ...[]byte) ([]
return model.StaticCombiner.Combine(dataEncoding, objs...)
}

func (m *mockSharder) RecordDiscardedSpans(int, string, string) {}
func (m *mockSharder) RecordDiscardedSpans(int, string, string, string) {}

type mockJobSharder struct{}

Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type CompactionOptions struct {
ObjectsCombined func(compactionLevel, objects int)
ObjectsWritten func(compactionLevel, objects int)
BytesWritten func(compactionLevel, bytes int)
SpansDiscarded func(traceID string, spans int)
SpansDiscarded func(traceID string, rootSpanName string, rootServiceName string, spans int)
DisconnectedTrace func()
}

Expand Down
3 changes: 2 additions & 1 deletion tempodb/encoding/vparquet/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
SearchNext = -2
NotFound = -3

TraceIDColumnName = "TraceID"
TraceIDColumnName = "TraceID"
RootSpanNameColumnName = "RootSpanName"
)

func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) {
Expand Down
15 changes: 12 additions & 3 deletions tempodb/encoding/vparquet/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,20 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) {
// countSpans counts the number of spans in the given trace in deconstructed
// parquet row format and returns traceId.
// It simply counts the number of values for span ID, which is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) {
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, spans int) {
traceIDColumn, found := schema.Lookup(TraceIDColumnName)
if !found {
return "", 0
return "", "", 0
}

rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName)
if !found {
return "", "", 0
}

spanID, found := schema.Lookup("rs", "ils", "Spans", "ID")
if !found {
return "", 0
return "", "", 0
}

for _, v := range row {
Expand All @@ -336,6 +341,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans
if v.Column() == traceIDColumn.ColumnIndex {
traceID = tempoUtil.TraceIDToHexString(v.ByteArray())
}

if v.Column() == rootSpanNameColumn.ColumnIndex {
rootSpanName = v.String()
}
}

return
Expand Down
5 changes: 3 additions & 2 deletions tempodb/encoding/vparquet/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) {
FlushSizeBytes: 30_000_000,
MaxBytesPerTrace: 50_000_000,
ObjectsCombined: func(compactionLevel, objects int) {},
SpansDiscarded: func(traceID string, spans int) {},
SpansDiscarded: func(traceID, rootSpanName string, spans int) {},
})

_, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs)
Expand Down Expand Up @@ -161,7 +161,8 @@ func TestCountSpans(t *testing.T) {
row := sch.Deconstruct(nil, trp)

// count spans for generated rows.
tID, spans := countSpans(sch, row)
tID, rootSpanName, spans := countSpans(sch, row)
require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID))
require.Equal(t, rootSpanName, "test")
require.Equal(t, spans, batchSize*spansEach)
}
3 changes: 2 additions & 1 deletion tempodb/encoding/vparquet2/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
SearchNext = -2
NotFound = -3

TraceIDColumnName = "TraceID"
TraceIDColumnName = "TraceID"
RootSpanNameColumnName = "RootSpanName"
)

func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) {
Expand Down
18 changes: 14 additions & 4 deletions tempodb/encoding/vparquet2/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,21 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) {
// countSpans counts the number of spans in the given trace in deconstructed
// parquet row format and returns traceId.
// It simply counts the number of values for span ID, which is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) {
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, spans int) {
traceIDColumn, found := schema.Lookup(TraceIDColumnName)
if !found {
return "", 0
return "", "", 0
}

spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID")
rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName)
if !found {
return "", 0
return "", "", 0
}

spanID, found := schema.Lookup("rs", "list", "element", "ss",
"list", "element", "Spans", "list", "element", "SpanID")
if !found {
return "", "", 0
}

for _, v := range row {
Expand All @@ -336,6 +342,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans
if v.Column() == traceIDColumn.ColumnIndex {
traceID = tempoUtil.TraceIDToHexString(v.ByteArray())
}

if v.Column() == rootSpanNameColumn.ColumnIndex {
rootSpanName = v.String()
}
}

return
Expand Down
6 changes: 4 additions & 2 deletions tempodb/encoding/vparquet2/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) {
FlushSizeBytes: 30_000_000,
MaxBytesPerTrace: 50_000_000,
ObjectsCombined: func(compactionLevel, objects int) {},
SpansDiscarded: func(traceID string, spans int) {},
SpansDiscarded: func(traceID, rootSpanName string, spans int) {},
})

_, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs)
Expand Down Expand Up @@ -161,7 +161,9 @@ func TestCountSpans(t *testing.T) {
row := sch.Deconstruct(nil, trp)

// count spans for generated rows.
tID, spans := countSpans(sch, row)
tID, rootSpanName, spans := countSpans(sch, row)
require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID))
require.Equal(t, rootSpanName, "test")
require.Equal(t, spans, batchSize*spansEach)

}
3 changes: 2 additions & 1 deletion tempodb/encoding/vparquet3/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
SearchNext = -2
NotFound = -3

TraceIDColumnName = "TraceID"
TraceIDColumnName = "TraceID"
RootSpanNameColumnName = "RootSpanName"
)

func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) {
Expand Down
15 changes: 12 additions & 3 deletions tempodb/encoding/vparquet3/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,20 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) {
// countSpans counts the number of spans in the given trace in deconstructed
// parquet row format and returns traceId.
// It simply counts the number of values for span ID, which is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) {
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, traceName string, spans int) {
traceIDColumn, found := schema.Lookup(TraceIDColumnName)
if !found {
return "", 0
return "", "", 0
}

rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName)
if !found {
return "", "", 0
}

spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID")
if !found {
return "", 0
return "", "", 0
}

for _, v := range row {
Expand All @@ -340,6 +345,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans
if v.Column() == traceIDColumn.ColumnIndex {
traceID = tempoUtil.TraceIDToHexString(v.ByteArray())
}

if v.Column() == rootSpanNameColumn.ColumnIndex {
traceName = v.String()
}
}

return
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/vparquet3/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) {
FlushSizeBytes: 30_000_000,
MaxBytesPerTrace: 50_000_000,
ObjectsCombined: func(compactionLevel, objects int) {},
SpansDiscarded: func(traceID string, spans int) {},
SpansDiscarded: func(traceID, rootSpanName string, spans int) {},
})

_, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestCountSpans(t *testing.T) {
row := sch.Deconstruct(nil, trp)

// count spans for generated rows.
tID, spans := countSpans(sch, row)
tID, _, spans := countSpans(sch, row)
require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID))
require.Equal(t, spans, batchSize*spansEach)
}
Expand Down
2 changes: 1 addition & 1 deletion tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Compactor interface {
type CompactorSharder interface {
Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error)
Owns(hash string) bool
RecordDiscardedSpans(count int, tenantID string, traceID string)
RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string)
}

type CompactorOverrides interface {
Expand Down

0 comments on commit b4e11d8

Please sign in to comment.