Skip to content

Commit

Permalink
Log trace data for discarded traces (#2816)
Browse files Browse the repository at this point in the history
* logging the name of rootSpanName for discarded traces

* add rootServiceName, ignore tests

* changelog

* format

* use already defined constants with column names, update tests
  • Loading branch information
MarcinGinszt authored Aug 23, 2023
1 parent 3d0c013 commit ef29986
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 22 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Add several metrics-generator fields to user-configurable overrides [#2711](https://github.com/grafana/tempo/pull/2711) (@kvrhdn)
* [ENHANCEMENT] Update /api/metrics/summary to correctly handle missing attributes and improve performance of TraceQL `select()` queries. [#2765](https://github.com/grafana/tempo/pull/2765) (@mdisibio)
* [ENHANCEMENT] Add `TempoUserConfigurableOverridesReloadFailing` alert [#2784](https://github.com/grafana/tempo/pull/2784) (@kvrhdn)
* [ENHANCEMENT] Add RootSpanName and RootServiceName to log about discarded spans [#2816](https://github.com/grafana/tempo/pull/2816) (@marcinginszt)
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix node role auth IDMSv1 [#2760](https://github.com/grafana/tempo/pull/2760) (@coufalja)
* [BUGFIX] Only search ingester blocks that fall within the request time range. [#2783](https://github.com/grafana/tempo/pull/2783) (@joe-elliott)
Expand Down
5 changes: 3 additions & 2 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,9 @@ func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte
}

// RecordDiscardedSpans implements tempodb.CompactorSharder
func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string) {
level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "discarded_span_count", count)
func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string, rootServiceName string) {
level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID,
"rootSpanName", rootSpanName, "rootServiceName", rootServiceName, "discarded_span_count", count)
overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID)
}

Expand Down
4 changes: 2 additions & 2 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block
ObjectsWritten: func(compactionLevel, objs int) {
metricCompactionObjectsWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(objs))
},
SpansDiscarded: func(traceId string, spans int) {
rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId)
SpansDiscarded: func(traceId, rootSpanName string, rootServiceName string, spans int) {
rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId, rootSpanName, rootServiceName)
},
DisconnectedTrace: func() {
dataquality.WarnDisconnectedTrace(tenantID, dataquality.PhaseTraceCompactorCombine)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (m *mockSharder) Combine(dataEncoding string, _ string, objs ...[]byte) ([]
return model.StaticCombiner.Combine(dataEncoding, objs...)
}

func (m *mockSharder) RecordDiscardedSpans(int, string, string) {}
func (m *mockSharder) RecordDiscardedSpans(int, string, string, string, string) {}

type mockJobSharder struct{}

Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type CompactionOptions struct {
ObjectsCombined func(compactionLevel, objects int)
ObjectsWritten func(compactionLevel, objects int)
BytesWritten func(compactionLevel, bytes int)
SpansDiscarded func(traceID string, spans int)
SpansDiscarded func(traceID string, rootSpanName string, rootServiceName string, spans int)
DisconnectedTrace func()
}

Expand Down
24 changes: 21 additions & 3 deletions tempodb/encoding/vparquet/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,25 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) {
// countSpans counts the number of spans in the given trace in deconstructed
// parquet row format and returns traceId.
// It simply counts the number of values for span ID, which is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) {
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, rootServiceName string, spans int) {
traceIDColumn, found := schema.Lookup(TraceIDColumnName)
if !found {
return "", 0
return "", "", "", 0
}

rootSpanNameColumn, found := schema.Lookup(columnPathRootSpanName)
if !found {
return "", "", "", 0
}

rootServiceNameColumn, found := schema.Lookup(columnPathRootServiceName)
if !found {
return "", "", "", 0
}

spanID, found := schema.Lookup("rs", "ils", "Spans", "ID")
if !found {
return "", 0
return "", "", "", 0
}

for _, v := range row {
Expand All @@ -336,6 +346,14 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans
if v.Column() == traceIDColumn.ColumnIndex {
traceID = tempoUtil.TraceIDToHexString(v.ByteArray())
}

if v.Column() == rootSpanNameColumn.ColumnIndex {
rootSpanName = v.String()
}

if v.Column() == rootServiceNameColumn.ColumnIndex {
rootServiceName = v.String()
}
}

return
Expand Down
11 changes: 9 additions & 2 deletions tempodb/encoding/vparquet/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) {
FlushSizeBytes: 30_000_000,
MaxBytesPerTrace: 50_000_000,
ObjectsCombined: func(compactionLevel, objects int) {},
SpansDiscarded: func(traceID string, spans int) {},
SpansDiscarded: func(traceID, rootSpanName string, rootServiceName string, spans int) {},
})

_, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs)
Expand Down Expand Up @@ -150,6 +150,9 @@ func TestCountSpans(t *testing.T) {
batchSize := 300 + rand.Intn(25)
spansEach := 250 + rand.Intn(25)

rootSpan := "foo"
rootService := "bar"

sch := parquet.SchemaOf(new(Trace))
traceID := make([]byte, 16)
_, err := crand.Read(traceID)
Expand All @@ -158,10 +161,14 @@ func TestCountSpans(t *testing.T) {
// make Trace and convert to parquet.Row
tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID)
trp := traceToParquet(traceID, tr, nil)
trp.RootServiceName = rootService
trp.RootSpanName = rootSpan
row := sch.Deconstruct(nil, trp)

// count spans for generated rows.
tID, spans := countSpans(sch, row)
tID, rootSpanName, rootServiceName, spans := countSpans(sch, row)
require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID))
require.Equal(t, spans, batchSize*spansEach)
require.Equal(t, rootSpan, rootSpanName)
require.Equal(t, rootService, rootServiceName)
}
24 changes: 21 additions & 3 deletions tempodb/encoding/vparquet2/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,25 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) {
// countSpans counts the number of spans in the given trace in deconstructed
// parquet row format and returns traceId.
// It simply counts the number of values for span ID, which is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) {
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, rootServiceName string, spans int) {
traceIDColumn, found := schema.Lookup(TraceIDColumnName)
if !found {
return "", 0
return "", "", "", 0
}

rootSpanNameColumn, found := schema.Lookup(columnPathRootSpanName)
if !found {
return "", "", "", 0
}

rootServiceNameColumn, found := schema.Lookup(columnPathRootServiceName)
if !found {
return "", "", "", 0
}

spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID")
if !found {
return "", 0
return "", "", "", 0
}

for _, v := range row {
Expand All @@ -336,6 +346,14 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans
if v.Column() == traceIDColumn.ColumnIndex {
traceID = tempoUtil.TraceIDToHexString(v.ByteArray())
}

if v.Column() == rootSpanNameColumn.ColumnIndex {
rootSpanName = v.String()
}

if v.Column() == rootServiceNameColumn.ColumnIndex {
rootServiceName = v.String()
}
}

return
Expand Down
11 changes: 9 additions & 2 deletions tempodb/encoding/vparquet2/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) {
FlushSizeBytes: 30_000_000,
MaxBytesPerTrace: 50_000_000,
ObjectsCombined: func(compactionLevel, objects int) {},
SpansDiscarded: func(traceID string, spans int) {},
SpansDiscarded: func(traceID, rootSpanName string, rootServiceName string, spans int) {},
})

_, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs)
Expand Down Expand Up @@ -150,6 +150,9 @@ func TestCountSpans(t *testing.T) {
batchSize := 300 + rand.Intn(25)
spansEach := 250 + rand.Intn(25)

rootSpan := "foo"
rootService := "bar"

sch := parquet.SchemaOf(new(Trace))
traceID := make([]byte, 16)
_, err := crand.Read(traceID)
Expand All @@ -158,10 +161,14 @@ func TestCountSpans(t *testing.T) {
// make Trace and convert to parquet.Row
tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID)
trp := traceToParquet(traceID, tr, nil)
trp.RootServiceName = rootService
trp.RootSpanName = rootSpan
row := sch.Deconstruct(nil, trp)

// count spans for generated rows.
tID, spans := countSpans(sch, row)
tID, rootSpanName, rootServiceName, spans := countSpans(sch, row)
require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID))
require.Equal(t, spans, batchSize*spansEach)
require.Equal(t, rootSpan, rootSpanName)
require.Equal(t, rootService, rootServiceName)
}
24 changes: 21 additions & 3 deletions tempodb/encoding/vparquet3/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,25 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) {
// countSpans counts the number of spans in the given trace in deconstructed
// parquet row format and returns traceId.
// It simply counts the number of values for span ID, which is always present.
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) {
func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, rootServiceName string, spans int) {
traceIDColumn, found := schema.Lookup(TraceIDColumnName)
if !found {
return "", 0
return "", "", "", 0
}

rootSpanNameColumn, found := schema.Lookup(columnPathRootSpanName)
if !found {
return "", "", "", 0
}

rootServiceNameColumn, found := schema.Lookup(columnPathRootServiceName)
if !found {
return "", "", "", 0
}

spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID")
if !found {
return "", 0
return "", "", "", 0
}

for _, v := range row {
Expand All @@ -340,6 +350,14 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans
if v.Column() == traceIDColumn.ColumnIndex {
traceID = tempoUtil.TraceIDToHexString(v.ByteArray())
}

if v.Column() == rootSpanNameColumn.ColumnIndex {
rootSpanName = v.String()
}

if v.Column() == rootServiceNameColumn.ColumnIndex {
rootServiceName = v.String()
}
}

return
Expand Down
11 changes: 9 additions & 2 deletions tempodb/encoding/vparquet3/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) {
FlushSizeBytes: 30_000_000,
MaxBytesPerTrace: 50_000_000,
ObjectsCombined: func(compactionLevel, objects int) {},
SpansDiscarded: func(traceID string, spans int) {},
SpansDiscarded: func(traceID, rootSpanName string, rootServiceName string, spans int) {},
})

_, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs)
Expand Down Expand Up @@ -152,6 +152,9 @@ func TestCountSpans(t *testing.T) {
batchSize := 300 + rand.Intn(25)
spansEach := 250 + rand.Intn(25)

rootSpan := "foo"
rootService := "bar"

sch := parquet.SchemaOf(new(Trace))
traceID := make([]byte, 16)
_, err := crand.Read(traceID)
Expand All @@ -161,12 +164,16 @@ func TestCountSpans(t *testing.T) {
tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID)
trp, connected := traceToParquet(&backend.BlockMeta{}, traceID, tr, nil)
require.False(t, connected)
trp.RootServiceName = rootService
trp.RootSpanName = rootSpan
row := sch.Deconstruct(nil, trp)

// count spans for generated rows.
tID, spans := countSpans(sch, row)
tID, rootSpanName, rootServiceName, spans := countSpans(sch, row)
require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID))
require.Equal(t, spans, batchSize*spansEach)
require.Equal(t, rootSpan, rootSpanName)
require.Equal(t, rootService, rootServiceName)
}

func TestCompact(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type Compactor interface {
type CompactorSharder interface {
Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error)
Owns(hash string) bool
RecordDiscardedSpans(count int, tenantID string, traceID string)
RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string, rootServiceName string)
}

type CompactorOverrides interface {
Expand Down

0 comments on commit ef29986

Please sign in to comment.