From b4e11d896c8bc06bc9b39e13f1e9d46723d349c5 Mon Sep 17 00:00:00 2001 From: Marcin Ginszt Date: Wed, 16 Aug 2023 13:56:30 +0200 Subject: [PATCH 1/5] logging the name of rootSpanName for discarded traces --- modules/compactor/compactor.go | 4 ++-- pkg/util/test/req.go | 5 +++++ tempodb/compactor.go | 4 ++-- tempodb/compactor_test.go | 2 +- tempodb/encoding/common/interfaces.go | 2 +- .../encoding/vparquet/block_findtracebyid.go | 3 ++- tempodb/encoding/vparquet/compactor.go | 15 ++++++++++++--- tempodb/encoding/vparquet/compactor_test.go | 5 +++-- .../encoding/vparquet2/block_findtracebyid.go | 3 ++- tempodb/encoding/vparquet2/compactor.go | 18 ++++++++++++++---- tempodb/encoding/vparquet2/compactor_test.go | 6 ++++-- .../encoding/vparquet3/block_findtracebyid.go | 3 ++- tempodb/encoding/vparquet3/compactor.go | 15 ++++++++++++--- tempodb/encoding/vparquet3/compactor_test.go | 4 ++-- tempodb/tempodb.go | 2 +- 15 files changed, 65 insertions(+), 26 deletions(-) diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index aee20e10c8b..902a5caca57 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -244,8 +244,8 @@ func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte } // RecordDiscardedSpans implements tempodb.CompactorSharder -func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string) { - level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "discarded_span_count", count) +func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string) { + level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "rootSpanName", rootSpanName, "discarded_span_count", count) overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID) } diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index 24b3f0253b5..55ff8f01394 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -138,6 +138,11 @@ func MakeBatch(spans int, traceID []byte) *v1_trace.ResourceSpans { } ss.Spans = append(ss.Spans, MakeSpan(traceID)) + + // first span should not have parent + if len(ss.Spans) == 1 { + (*ss.Spans[0]).ParentSpanId = nil //[]byte{} + } } return batch } diff --git a/tempodb/compactor.go b/tempodb/compactor.go index ee04c98acd7..0b121ce3e93 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -223,8 +223,8 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block ObjectsWritten: func(compactionLevel, objs int) { metricCompactionObjectsWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(objs)) }, - SpansDiscarded: func(traceId string, spans int) { - rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId) + SpansDiscarded: func(traceId, rootSpanName string, spans int) { + rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId, rootSpanName) }, DisconnectedTrace: func() { dataquality.WarnDisconnectedTrace(tenantID, dataquality.PhaseTraceCompactorCombine) diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 87bca14f50f..2796173fbd1 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -39,7 +39,7 @@ func (m *mockSharder) Combine(dataEncoding string, _ string, objs ...[]byte) ([] return model.StaticCombiner.Combine(dataEncoding, objs...) } -func (m *mockSharder) RecordDiscardedSpans(int, string, string) {} +func (m *mockSharder) RecordDiscardedSpans(int, string, string, string) {} type mockJobSharder struct{} diff --git a/tempodb/encoding/common/interfaces.go b/tempodb/encoding/common/interfaces.go index ee7115885ec..598d8b2b937 100644 --- a/tempodb/encoding/common/interfaces.go +++ b/tempodb/encoding/common/interfaces.go @@ -75,7 +75,7 @@ type CompactionOptions struct { ObjectsCombined func(compactionLevel, objects int) ObjectsWritten func(compactionLevel, objects int) BytesWritten func(compactionLevel, bytes int) - SpansDiscarded func(traceID string, spans int) + SpansDiscarded func(traceID string, rootSpanName string, rootServiceName string, spans int) DisconnectedTrace func() } diff --git a/tempodb/encoding/vparquet/block_findtracebyid.go b/tempodb/encoding/vparquet/block_findtracebyid.go index 5f37ef39c95..328d7999d88 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid.go +++ b/tempodb/encoding/vparquet/block_findtracebyid.go @@ -24,7 +24,8 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" + TraceIDColumnName = "TraceID" + RootSpanNameColumnName = "RootSpanName" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet/compactor.go b/tempodb/encoding/vparquet/compactor.go index 4ad4f808a2c..df0cad766df 100644 --- a/tempodb/encoding/vparquet/compactor.go +++ b/tempodb/encoding/vparquet/compactor.go @@ -317,15 +317,20 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) { // countSpans counts the number of spans in the given trace in deconstructed // parquet row format and returns traceId. // It simply counts the number of values for span ID, which is always present. -func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) { +func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, spans int) { traceIDColumn, found := schema.Lookup(TraceIDColumnName) if !found { - return "", 0 + return "", "", 0 + } + + rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) + if !found { + return "", "", 0 } spanID, found := schema.Lookup("rs", "ils", "Spans", "ID") if !found { - return "", 0 + return "", "", 0 } for _, v := range row { @@ -336,6 +341,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans if v.Column() == traceIDColumn.ColumnIndex { traceID = tempoUtil.TraceIDToHexString(v.ByteArray()) } + + if v.Column() == rootSpanNameColumn.ColumnIndex { + rootSpanName = v.String() + } } return diff --git a/tempodb/encoding/vparquet/compactor_test.go b/tempodb/encoding/vparquet/compactor_test.go index 326ecd26b70..efc6b2b9c63 100644 --- a/tempodb/encoding/vparquet/compactor_test.go +++ b/tempodb/encoding/vparquet/compactor_test.go @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) { FlushSizeBytes: 30_000_000, MaxBytesPerTrace: 50_000_000, ObjectsCombined: func(compactionLevel, objects int) {}, - SpansDiscarded: func(traceID string, spans int) {}, + SpansDiscarded: func(traceID, rootSpanName string, spans int) {}, }) _, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs) @@ -161,7 +161,8 @@ func TestCountSpans(t *testing.T) { row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, spans := countSpans(sch, row) + tID, rootSpanName, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) + require.Equal(t, rootSpanName, "test") require.Equal(t, spans, batchSize*spansEach) } diff --git a/tempodb/encoding/vparquet2/block_findtracebyid.go b/tempodb/encoding/vparquet2/block_findtracebyid.go index e6b3c01386e..9a359f8002c 100644 --- a/tempodb/encoding/vparquet2/block_findtracebyid.go +++ b/tempodb/encoding/vparquet2/block_findtracebyid.go @@ -24,7 +24,8 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" + TraceIDColumnName = "TraceID" + RootSpanNameColumnName = "RootSpanName" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet2/compactor.go b/tempodb/encoding/vparquet2/compactor.go index dd2a4ffea5a..3ae4a4935b2 100644 --- a/tempodb/encoding/vparquet2/compactor.go +++ b/tempodb/encoding/vparquet2/compactor.go @@ -317,15 +317,21 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) { // countSpans counts the number of spans in the given trace in deconstructed // parquet row format and returns traceId. // It simply counts the number of values for span ID, which is always present. -func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) { +func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, spans int) { traceIDColumn, found := schema.Lookup(TraceIDColumnName) if !found { - return "", 0 + return "", "", 0 } - spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID") + rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) if !found { - return "", 0 + return "", "", 0 + } + + spanID, found := schema.Lookup("rs", "list", "element", "ss", + "list", "element", "Spans", "list", "element", "SpanID") + if !found { + return "", "", 0 } for _, v := range row { @@ -336,6 +342,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans if v.Column() == traceIDColumn.ColumnIndex { traceID = tempoUtil.TraceIDToHexString(v.ByteArray()) } + + if v.Column() == rootSpanNameColumn.ColumnIndex { + rootSpanName = v.String() + } } return diff --git a/tempodb/encoding/vparquet2/compactor_test.go b/tempodb/encoding/vparquet2/compactor_test.go index 17cff5b62ce..ff61a480049 100644 --- a/tempodb/encoding/vparquet2/compactor_test.go +++ b/tempodb/encoding/vparquet2/compactor_test.go @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) { FlushSizeBytes: 30_000_000, MaxBytesPerTrace: 50_000_000, ObjectsCombined: func(compactionLevel, objects int) {}, - SpansDiscarded: func(traceID string, spans int) {}, + SpansDiscarded: func(traceID, rootSpanName string, spans int) {}, }) _, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs) @@ -161,7 +161,9 @@ func TestCountSpans(t *testing.T) { row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, spans := countSpans(sch, row) + tID, rootSpanName, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) + require.Equal(t, rootSpanName, "test") require.Equal(t, spans, batchSize*spansEach) + } diff --git a/tempodb/encoding/vparquet3/block_findtracebyid.go b/tempodb/encoding/vparquet3/block_findtracebyid.go index d3b0a31663e..afed7166e27 100644 --- a/tempodb/encoding/vparquet3/block_findtracebyid.go +++ b/tempodb/encoding/vparquet3/block_findtracebyid.go @@ -24,7 +24,8 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" + TraceIDColumnName = "TraceID" + RootSpanNameColumnName = "RootSpanName" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet3/compactor.go b/tempodb/encoding/vparquet3/compactor.go index 6746bf10ecc..4f4c76056f3 100644 --- a/tempodb/encoding/vparquet3/compactor.go +++ b/tempodb/encoding/vparquet3/compactor.go @@ -321,15 +321,20 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) { // countSpans counts the number of spans in the given trace in deconstructed // parquet row format and returns traceId. // It simply counts the number of values for span ID, which is always present. -func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans int) { +func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, traceName string, spans int) { traceIDColumn, found := schema.Lookup(TraceIDColumnName) if !found { - return "", 0 + return "", "", 0 + } + + rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) + if !found { + return "", "", 0 } spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID") if !found { - return "", 0 + return "", "", 0 } for _, v := range row { @@ -340,6 +345,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, spans if v.Column() == traceIDColumn.ColumnIndex { traceID = tempoUtil.TraceIDToHexString(v.ByteArray()) } + + if v.Column() == rootSpanNameColumn.ColumnIndex { + traceName = v.String() + } } return diff --git a/tempodb/encoding/vparquet3/compactor_test.go b/tempodb/encoding/vparquet3/compactor_test.go index c6720080639..3ad9f3fe9bf 100644 --- a/tempodb/encoding/vparquet3/compactor_test.go +++ b/tempodb/encoding/vparquet3/compactor_test.go @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) { FlushSizeBytes: 30_000_000, MaxBytesPerTrace: 50_000_000, ObjectsCombined: func(compactionLevel, objects int) {}, - SpansDiscarded: func(traceID string, spans int) {}, + SpansDiscarded: func(traceID, rootSpanName string, spans int) {}, }) _, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs) @@ -164,7 +164,7 @@ func TestCountSpans(t *testing.T) { row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, spans := countSpans(sch, row) + tID, _, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) require.Equal(t, spans, batchSize*spansEach) } diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 5d8fbf9bcfa..df181344e1e 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -91,7 +91,7 @@ type Compactor interface { type CompactorSharder interface { Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error) Owns(hash string) bool - RecordDiscardedSpans(count int, tenantID string, traceID string) + RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string) } type CompactorOverrides interface { From 0d5f9cdfd2e9f8f212d84d99dfe721dd1ddf169c Mon Sep 17 00:00:00 2001 From: Marcin Ginszt Date: Mon, 21 Aug 2023 11:45:10 +0200 Subject: [PATCH 2/5] add rootServiceName, ignore tests --- modules/compactor/compactor.go | 5 +++-- pkg/util/test/req.go | 5 ----- tempodb/compactor.go | 4 ++-- tempodb/compactor_test.go | 2 +- .../encoding/vparquet/block_findtracebyid.go | 5 +++-- tempodb/encoding/vparquet/compactor.go | 17 ++++++++++++---- tempodb/encoding/vparquet/compactor_test.go | 5 ++--- .../encoding/vparquet2/block_findtracebyid.go | 5 +++-- tempodb/encoding/vparquet2/compactor.go | 20 +++++++++++++------ tempodb/encoding/vparquet2/compactor_test.go | 5 ++--- .../encoding/vparquet3/block_findtracebyid.go | 5 +++-- tempodb/encoding/vparquet3/compactor.go | 19 +++++++++++++----- tempodb/encoding/vparquet3/compactor_test.go | 4 ++-- tempodb/tempodb.go | 2 +- 14 files changed, 63 insertions(+), 40 deletions(-) diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index 902a5caca57..bc7b4e6d60a 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -244,8 +244,9 @@ func (c *Compactor) Combine(dataEncoding string, tenantID string, objs ...[]byte } // RecordDiscardedSpans implements tempodb.CompactorSharder -func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string) { - level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, "rootSpanName", rootSpanName, "discarded_span_count", count) +func (c *Compactor) RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string, rootServiceName string) { + level.Warn(log.Logger).Log("msg", "max size of trace exceeded", "tenant", tenantID, "traceId", traceID, + "rootSpanName", rootSpanName, "rootServiceName", rootServiceName, "discarded_span_count", count) overrides.RecordDiscardedSpans(count, reasonCompactorDiscardedSpans, tenantID) } diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index 55ff8f01394..24b3f0253b5 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -138,11 +138,6 @@ func MakeBatch(spans int, traceID []byte) *v1_trace.ResourceSpans { } ss.Spans = append(ss.Spans, MakeSpan(traceID)) - - // first span should not have parent - if len(ss.Spans) == 1 { - (*ss.Spans[0]).ParentSpanId = nil //[]byte{} - } } return batch } diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 0b121ce3e93..051b2fd532f 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -223,8 +223,8 @@ func (rw *readerWriter) compact(ctx context.Context, blockMetas []*backend.Block ObjectsWritten: func(compactionLevel, objs int) { metricCompactionObjectsWritten.WithLabelValues(strconv.Itoa(compactionLevel)).Add(float64(objs)) }, - SpansDiscarded: func(traceId, rootSpanName string, spans int) { - rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId, rootSpanName) + SpansDiscarded: func(traceId, rootSpanName string, rootServiceName string, spans int) { + rw.compactorSharder.RecordDiscardedSpans(spans, tenantID, traceId, rootSpanName, rootServiceName) }, DisconnectedTrace: func() { dataquality.WarnDisconnectedTrace(tenantID, dataquality.PhaseTraceCompactorCombine) diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 2796173fbd1..e277a87563b 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -39,7 +39,7 @@ func (m *mockSharder) Combine(dataEncoding string, _ string, objs ...[]byte) ([] return model.StaticCombiner.Combine(dataEncoding, objs...) } -func (m *mockSharder) RecordDiscardedSpans(int, string, string, string) {} +func (m *mockSharder) RecordDiscardedSpans(int, string, string, string, string) {} type mockJobSharder struct{} diff --git a/tempodb/encoding/vparquet/block_findtracebyid.go b/tempodb/encoding/vparquet/block_findtracebyid.go index 328d7999d88..47d60689736 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid.go +++ b/tempodb/encoding/vparquet/block_findtracebyid.go @@ -24,8 +24,9 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" - RootSpanNameColumnName = "RootSpanName" + TraceIDColumnName = "TraceID" + RootSpanNameColumnName = "RootSpanName" + RootServiceNameColumnName = "RootServiceName" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet/compactor.go b/tempodb/encoding/vparquet/compactor.go index df0cad766df..1d78babb742 100644 --- a/tempodb/encoding/vparquet/compactor.go +++ b/tempodb/encoding/vparquet/compactor.go @@ -317,20 +317,25 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) { // countSpans counts the number of spans in the given trace in deconstructed // parquet row format and returns traceId. // It simply counts the number of values for span ID, which is always present. -func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, spans int) { +func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, rootServiceName string, spans int) { traceIDColumn, found := schema.Lookup(TraceIDColumnName) if !found { - return "", "", 0 + return "", "", "", 0 } rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) if !found { - return "", "", 0 + return "", "", "", 0 + } + + rootServiceNameColumn, found := schema.Lookup(RootServiceNameColumnName) + if !found { + return "", "", "", 0 } spanID, found := schema.Lookup("rs", "ils", "Spans", "ID") if !found { - return "", "", 0 + return "", "", "", 0 } for _, v := range row { @@ -345,6 +350,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSp if v.Column() == rootSpanNameColumn.ColumnIndex { rootSpanName = v.String() } + + if v.Column() == rootServiceNameColumn.ColumnIndex { + rootServiceName = v.String() + } } return diff --git a/tempodb/encoding/vparquet/compactor_test.go b/tempodb/encoding/vparquet/compactor_test.go index efc6b2b9c63..d310a8dee73 100644 --- a/tempodb/encoding/vparquet/compactor_test.go +++ b/tempodb/encoding/vparquet/compactor_test.go @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) { FlushSizeBytes: 30_000_000, MaxBytesPerTrace: 50_000_000, ObjectsCombined: func(compactionLevel, objects int) {}, - SpansDiscarded: func(traceID, rootSpanName string, spans int) {}, + SpansDiscarded: func(traceID, rootSpanName string, rootServiceName string, spans int) {}, }) _, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs) @@ -161,8 +161,7 @@ func TestCountSpans(t *testing.T) { row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, rootSpanName, spans := countSpans(sch, row) + tID, _, _, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) - require.Equal(t, rootSpanName, "test") require.Equal(t, spans, batchSize*spansEach) } diff --git a/tempodb/encoding/vparquet2/block_findtracebyid.go b/tempodb/encoding/vparquet2/block_findtracebyid.go index 9a359f8002c..d324af70ef4 100644 --- a/tempodb/encoding/vparquet2/block_findtracebyid.go +++ b/tempodb/encoding/vparquet2/block_findtracebyid.go @@ -24,8 +24,9 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" - RootSpanNameColumnName = "RootSpanName" + TraceIDColumnName = "TraceID" + RootSpanNameColumnName = "RootSpanName" + RootServiceNameColumnName = "RootServiceName" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet2/compactor.go b/tempodb/encoding/vparquet2/compactor.go index 3ae4a4935b2..eb634e41bff 100644 --- a/tempodb/encoding/vparquet2/compactor.go +++ b/tempodb/encoding/vparquet2/compactor.go @@ -317,21 +317,25 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) { // countSpans counts the number of spans in the given trace in deconstructed // parquet row format and returns traceId. // It simply counts the number of values for span ID, which is always present. -func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, spans int) { +func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, rootServiceName string, spans int) { traceIDColumn, found := schema.Lookup(TraceIDColumnName) if !found { - return "", "", 0 + return "", "", "", 0 } rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) if !found { - return "", "", 0 + return "", "", "", 0 } - spanID, found := schema.Lookup("rs", "list", "element", "ss", - "list", "element", "Spans", "list", "element", "SpanID") + rootServiceNameColumn, found := schema.Lookup(RootServiceNameColumnName) if !found { - return "", "", 0 + return "", "", "", 0 + } + + spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID") + if !found { + return "", "", "", 0 } for _, v := range row { @@ -346,6 +350,10 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSp if v.Column() == rootSpanNameColumn.ColumnIndex { rootSpanName = v.String() } + + if v.Column() == rootServiceNameColumn.ColumnIndex { + rootServiceName = v.String() + } } return diff --git a/tempodb/encoding/vparquet2/compactor_test.go b/tempodb/encoding/vparquet2/compactor_test.go index ff61a480049..288063b788c 100644 --- a/tempodb/encoding/vparquet2/compactor_test.go +++ b/tempodb/encoding/vparquet2/compactor_test.go @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) { FlushSizeBytes: 30_000_000, MaxBytesPerTrace: 50_000_000, ObjectsCombined: func(compactionLevel, objects int) {}, - SpansDiscarded: func(traceID, rootSpanName string, spans int) {}, + SpansDiscarded: func(traceID, rootSpanName string, rootServiceName string, spans int) {}, }) _, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs) @@ -161,9 +161,8 @@ func TestCountSpans(t *testing.T) { row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, rootSpanName, spans := countSpans(sch, row) + tID, _, _, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) - require.Equal(t, rootSpanName, "test") require.Equal(t, spans, batchSize*spansEach) } diff --git a/tempodb/encoding/vparquet3/block_findtracebyid.go b/tempodb/encoding/vparquet3/block_findtracebyid.go index afed7166e27..8259f092552 100644 --- a/tempodb/encoding/vparquet3/block_findtracebyid.go +++ b/tempodb/encoding/vparquet3/block_findtracebyid.go @@ -24,8 +24,9 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" - RootSpanNameColumnName = "RootSpanName" + TraceIDColumnName = "TraceID" + RootSpanNameColumnName = "RootSpanName" + RootServiceNameColumnName = "RootServiceName" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet3/compactor.go b/tempodb/encoding/vparquet3/compactor.go index 4f4c76056f3..f2571770d1c 100644 --- a/tempodb/encoding/vparquet3/compactor.go +++ b/tempodb/encoding/vparquet3/compactor.go @@ -321,20 +321,25 @@ func estimateMarshalledSizeFromParquetRow(row parquet.Row) (size int) { // countSpans counts the number of spans in the given trace in deconstructed // parquet row format and returns traceId. // It simply counts the number of values for span ID, which is always present. -func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, traceName string, spans int) { +func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSpanName string, rootServiceName string, spans int) { traceIDColumn, found := schema.Lookup(TraceIDColumnName) if !found { - return "", "", 0 + return "", "", "", 0 } rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) if !found { - return "", "", 0 + return "", "", "", 0 + } + + rootServiceNameColumn, found := schema.Lookup(RootServiceNameColumnName) + if !found { + return "", "", "", 0 } spanID, found := schema.Lookup("rs", "list", "element", "ss", "list", "element", "Spans", "list", "element", "SpanID") if !found { - return "", "", 0 + return "", "", "", 0 } for _, v := range row { @@ -347,7 +352,11 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, traceN } if v.Column() == rootSpanNameColumn.ColumnIndex { - traceName = v.String() + rootSpanName = v.String() + } + + if v.Column() == rootServiceNameColumn.ColumnIndex { + rootServiceName = v.String() } } diff --git a/tempodb/encoding/vparquet3/compactor_test.go b/tempodb/encoding/vparquet3/compactor_test.go index 3ad9f3fe9bf..e8e759d12c2 100644 --- a/tempodb/encoding/vparquet3/compactor_test.go +++ b/tempodb/encoding/vparquet3/compactor_test.go @@ -99,7 +99,7 @@ func BenchmarkCompactorDupes(b *testing.B) { FlushSizeBytes: 30_000_000, MaxBytesPerTrace: 50_000_000, ObjectsCombined: func(compactionLevel, objects int) {}, - SpansDiscarded: func(traceID, rootSpanName string, spans int) {}, + SpansDiscarded: func(traceID, rootSpanName string, rootServiceName string, spans int) {}, }) _, err = c.Compact(ctx, l, r, func(*backend.BlockMeta, time.Time) backend.Writer { return w }, inputs) @@ -164,7 +164,7 @@ func TestCountSpans(t *testing.T) { row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, _, spans := countSpans(sch, row) + tID, _, _, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) require.Equal(t, spans, batchSize*spansEach) } diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index df181344e1e..0bb585d34b0 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -91,7 +91,7 @@ type Compactor interface { type CompactorSharder interface { Combine(dataEncoding string, tenantID string, objs ...[]byte) ([]byte, bool, error) Owns(hash string) bool - RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string) + RecordDiscardedSpans(count int, tenantID string, traceID string, rootSpanName string, rootServiceName string) } type CompactorOverrides interface { From 7aff0e221f048d6b51121645ace3eadf8fafa40f Mon Sep 17 00:00:00 2001 From: Marcin Ginszt Date: Mon, 21 Aug 2023 13:37:57 +0200 Subject: [PATCH 3/5] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 62ebf830e60..52942e56ac1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [ENHANCEMENT] Add several metrics-generator fields to user-configurable overrides [#2711](https://github.com/grafana/tempo/pull/2711) (@kvrhdn) * [ENHANCEMENT] Update /api/metrics/summary to correctly handle missing attributes and improve performance of TraceQL `select()` queries. [#2765](https://github.com/grafana/tempo/pull/2765) (@mdisibio) * [ENHANCEMENT] Add `TempoUserConfigurableOverridesReloadFailing` alert [#2784](https://github.com/grafana/tempo/pull/2784) (@kvrhdn) +* [ENHANCEMENT] Add RootSpanName and RootServiceName to log about discarded spans [#2816](https://github.com/grafana/tempo/pull/2816) (@marcinginszt) * [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio) * [BUGFIX] Fix node role auth IDMSv1 [#2760](https://github.com/grafana/tempo/pull/2760) (@coufalja) * [BUGFIX] Only search ingester blocks that fall within the request time range. [#2783](https://github.com/grafana/tempo/pull/2783) (@joe-elliott) From a04e9859683194f33ae2647ca4c2518579b5b7fa Mon Sep 17 00:00:00 2001 From: Marcin Ginszt Date: Mon, 21 Aug 2023 15:19:02 +0200 Subject: [PATCH 4/5] format --- tempodb/encoding/vparquet2/compactor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tempodb/encoding/vparquet2/compactor_test.go b/tempodb/encoding/vparquet2/compactor_test.go index 288063b788c..d0611f51b87 100644 --- a/tempodb/encoding/vparquet2/compactor_test.go +++ b/tempodb/encoding/vparquet2/compactor_test.go @@ -164,5 +164,4 @@ func TestCountSpans(t *testing.T) { tID, _, _, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) require.Equal(t, spans, batchSize*spansEach) - } From c0a524601cfb439f9f3cf826c7dc1531cf93f5ef Mon Sep 17 00:00:00 2001 From: Marcin Ginszt Date: Wed, 23 Aug 2023 10:03:55 +0200 Subject: [PATCH 5/5] use already defined constants with column names, update tests --- tempodb/encoding/vparquet/block_findtracebyid.go | 4 +--- tempodb/encoding/vparquet/compactor.go | 4 ++-- tempodb/encoding/vparquet/compactor_test.go | 9 ++++++++- tempodb/encoding/vparquet2/block_findtracebyid.go | 4 +--- tempodb/encoding/vparquet2/compactor.go | 4 ++-- tempodb/encoding/vparquet2/compactor_test.go | 9 ++++++++- tempodb/encoding/vparquet3/block_findtracebyid.go | 4 +--- tempodb/encoding/vparquet3/compactor.go | 4 ++-- tempodb/encoding/vparquet3/compactor_test.go | 9 ++++++++- 9 files changed, 33 insertions(+), 18 deletions(-) diff --git a/tempodb/encoding/vparquet/block_findtracebyid.go b/tempodb/encoding/vparquet/block_findtracebyid.go index 47d60689736..5f37ef39c95 100644 --- a/tempodb/encoding/vparquet/block_findtracebyid.go +++ b/tempodb/encoding/vparquet/block_findtracebyid.go @@ -24,9 +24,7 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" - RootSpanNameColumnName = "RootSpanName" - RootServiceNameColumnName = "RootServiceName" + TraceIDColumnName = "TraceID" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet/compactor.go b/tempodb/encoding/vparquet/compactor.go index 1d78babb742..1ab7c84f92b 100644 --- a/tempodb/encoding/vparquet/compactor.go +++ b/tempodb/encoding/vparquet/compactor.go @@ -323,12 +323,12 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSp return "", "", "", 0 } - rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) + rootSpanNameColumn, found := schema.Lookup(columnPathRootSpanName) if !found { return "", "", "", 0 } - rootServiceNameColumn, found := schema.Lookup(RootServiceNameColumnName) + rootServiceNameColumn, found := schema.Lookup(columnPathRootServiceName) if !found { return "", "", "", 0 } diff --git a/tempodb/encoding/vparquet/compactor_test.go b/tempodb/encoding/vparquet/compactor_test.go index d310a8dee73..8ff9d792c49 100644 --- a/tempodb/encoding/vparquet/compactor_test.go +++ b/tempodb/encoding/vparquet/compactor_test.go @@ -150,6 +150,9 @@ func TestCountSpans(t *testing.T) { batchSize := 300 + rand.Intn(25) spansEach := 250 + rand.Intn(25) + rootSpan := "foo" + rootService := "bar" + sch := parquet.SchemaOf(new(Trace)) traceID := make([]byte, 16) _, err := crand.Read(traceID) @@ -158,10 +161,14 @@ func TestCountSpans(t *testing.T) { // make Trace and convert to parquet.Row tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID) trp := traceToParquet(traceID, tr, nil) + trp.RootServiceName = rootService + trp.RootSpanName = rootSpan row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, _, _, spans := countSpans(sch, row) + tID, rootSpanName, rootServiceName, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) require.Equal(t, spans, batchSize*spansEach) + require.Equal(t, rootSpan, rootSpanName) + require.Equal(t, rootService, rootServiceName) } diff --git a/tempodb/encoding/vparquet2/block_findtracebyid.go b/tempodb/encoding/vparquet2/block_findtracebyid.go index d324af70ef4..e6b3c01386e 100644 --- a/tempodb/encoding/vparquet2/block_findtracebyid.go +++ b/tempodb/encoding/vparquet2/block_findtracebyid.go @@ -24,9 +24,7 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" - RootSpanNameColumnName = "RootSpanName" - RootServiceNameColumnName = "RootServiceName" + TraceIDColumnName = "TraceID" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet2/compactor.go b/tempodb/encoding/vparquet2/compactor.go index eb634e41bff..a9df7cda1c1 100644 --- a/tempodb/encoding/vparquet2/compactor.go +++ b/tempodb/encoding/vparquet2/compactor.go @@ -323,12 +323,12 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSp return "", "", "", 0 } - rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) + rootSpanNameColumn, found := schema.Lookup(columnPathRootSpanName) if !found { return "", "", "", 0 } - rootServiceNameColumn, found := schema.Lookup(RootServiceNameColumnName) + rootServiceNameColumn, found := schema.Lookup(columnPathRootServiceName) if !found { return "", "", "", 0 } diff --git a/tempodb/encoding/vparquet2/compactor_test.go b/tempodb/encoding/vparquet2/compactor_test.go index d0611f51b87..2821969716d 100644 --- a/tempodb/encoding/vparquet2/compactor_test.go +++ b/tempodb/encoding/vparquet2/compactor_test.go @@ -150,6 +150,9 @@ func TestCountSpans(t *testing.T) { batchSize := 300 + rand.Intn(25) spansEach := 250 + rand.Intn(25) + rootSpan := "foo" + rootService := "bar" + sch := parquet.SchemaOf(new(Trace)) traceID := make([]byte, 16) _, err := crand.Read(traceID) @@ -158,10 +161,14 @@ func TestCountSpans(t *testing.T) { // make Trace and convert to parquet.Row tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID) trp := traceToParquet(traceID, tr, nil) + trp.RootServiceName = rootService + trp.RootSpanName = rootSpan row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, _, _, spans := countSpans(sch, row) + tID, rootSpanName, rootServiceName, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) require.Equal(t, spans, batchSize*spansEach) + require.Equal(t, rootSpan, rootSpanName) + require.Equal(t, rootService, rootServiceName) } diff --git a/tempodb/encoding/vparquet3/block_findtracebyid.go b/tempodb/encoding/vparquet3/block_findtracebyid.go index 8259f092552..d3b0a31663e 100644 --- a/tempodb/encoding/vparquet3/block_findtracebyid.go +++ b/tempodb/encoding/vparquet3/block_findtracebyid.go @@ -24,9 +24,7 @@ const ( SearchNext = -2 NotFound = -3 - TraceIDColumnName = "TraceID" - RootSpanNameColumnName = "RootSpanName" - RootServiceNameColumnName = "RootServiceName" + TraceIDColumnName = "TraceID" ) func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) { diff --git a/tempodb/encoding/vparquet3/compactor.go b/tempodb/encoding/vparquet3/compactor.go index f2571770d1c..f5f476aa553 100644 --- a/tempodb/encoding/vparquet3/compactor.go +++ b/tempodb/encoding/vparquet3/compactor.go @@ -327,12 +327,12 @@ func countSpans(schema *parquet.Schema, row parquet.Row) (traceID string, rootSp return "", "", "", 0 } - rootSpanNameColumn, found := schema.Lookup(RootSpanNameColumnName) + rootSpanNameColumn, found := schema.Lookup(columnPathRootSpanName) if !found { return "", "", "", 0 } - rootServiceNameColumn, found := schema.Lookup(RootServiceNameColumnName) + rootServiceNameColumn, found := schema.Lookup(columnPathRootServiceName) if !found { return "", "", "", 0 } diff --git a/tempodb/encoding/vparquet3/compactor_test.go b/tempodb/encoding/vparquet3/compactor_test.go index e8e759d12c2..2e54f3dc9e8 100644 --- a/tempodb/encoding/vparquet3/compactor_test.go +++ b/tempodb/encoding/vparquet3/compactor_test.go @@ -152,6 +152,9 @@ func TestCountSpans(t *testing.T) { batchSize := 300 + rand.Intn(25) spansEach := 250 + rand.Intn(25) + rootSpan := "foo" + rootService := "bar" + sch := parquet.SchemaOf(new(Trace)) traceID := make([]byte, 16) _, err := crand.Read(traceID) @@ -161,12 +164,16 @@ func TestCountSpans(t *testing.T) { tr := test.MakeTraceWithSpanCount(batchSize, spansEach, traceID) trp, connected := traceToParquet(&backend.BlockMeta{}, traceID, tr, nil) require.False(t, connected) + trp.RootServiceName = rootService + trp.RootSpanName = rootSpan row := sch.Deconstruct(nil, trp) // count spans for generated rows. - tID, _, _, spans := countSpans(sch, row) + tID, rootSpanName, rootServiceName, spans := countSpans(sch, row) require.Equal(t, tID, tempoUtil.TraceIDToHexString(traceID)) require.Equal(t, spans, batchSize*spansEach) + require.Equal(t, rootSpan, rootSpanName) + require.Equal(t, rootService, rootServiceName) } func TestCompact(t *testing.T) {