diff --git a/cmd/agent/app/reporter/grpc/reporter.go b/cmd/agent/app/reporter/grpc/reporter.go index 217c5ae42f2..cb14cf0e3e4 100644 --- a/cmd/agent/app/reporter/grpc/reporter.go +++ b/cmd/agent/app/reporter/grpc/reporter.go @@ -44,7 +44,7 @@ func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap collector: api_v2.NewCollectorServiceClient(conn), agentTags: makeModelKeyValue(agentTags), logger: logger, - sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...), + sanitizer: zipkin2.NewChainedSanitizer(zipkin2.NewStandardSanitizers()...), } } diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index abc390869d3..2003a40575d 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -37,7 +37,7 @@ type options struct { logger *zap.Logger serviceMetrics metrics.Factory hostMetrics metrics.Factory - preProcessSpans ProcessSpans + preProcessSpans ProcessSpans // see docs in PreProcessSpans option. sanitizer sanitizer.SanitizeSpan preSave ProcessSpan spanFilter FilterSpan @@ -78,7 +78,9 @@ func (options) HostMetrics(hostMetrics metrics.Factory) Option { } } -// PreProcessSpans creates an Option that initializes the preProcessSpans function +// PreProcessSpans creates an Option that initializes the preProcessSpans function. +// This function can implement non-standard pre-processing of the spans when extending +// the collector from source. Jaeger itself does not define any pre-processing. func (options) PreProcessSpans(preProcessSpans ProcessSpans) Option { return func(b *options) { b.preProcessSpans = preProcessSpans diff --git a/cmd/collector/app/sanitizer/empty_service_name_sanitizer.go b/cmd/collector/app/sanitizer/empty_service_name_sanitizer.go new file mode 100644 index 00000000000..9e94acfdd79 --- /dev/null +++ b/cmd/collector/app/sanitizer/empty_service_name_sanitizer.go @@ -0,0 +1,41 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sanitizer + +import ( + "github.com/jaegertracing/jaeger/model" +) + +const ( + serviceNameReplacement = "empty-service-name" + nullProcessServiceName = "null-process-and-service-name" +) + +// NewEmptyServiceNameSanitizer returns a function that replaces empty service name +// with a string "empty-service-name". +// If the whole span.Process is null, it creates one with "null-process-and-service-name". +func NewEmptyServiceNameSanitizer() SanitizeSpan { + return sanitizeEmptyServiceName +} + +// Sanitize sanitizes the service names in the span annotations. +func sanitizeEmptyServiceName(span *model.Span) *model.Span { + if span.Process == nil { + span.Process = &model.Process{ServiceName: nullProcessServiceName} + } else if span.Process.ServiceName == "" { + span.Process.ServiceName = serviceNameReplacement + } + return span +} diff --git a/cmd/collector/app/sanitizer/empty_service_name_sanitizer_test.go b/cmd/collector/app/sanitizer/empty_service_name_sanitizer_test.go new file mode 100644 index 00000000000..c79ede4adfe --- /dev/null +++ b/cmd/collector/app/sanitizer/empty_service_name_sanitizer_test.go @@ -0,0 +1,32 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sanitizer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/model" +) + +func TestEmptyServiceNameSanitizer(t *testing.T) { + s := NewEmptyServiceNameSanitizer() + s1 := s(&model.Span{}) + assert.NotNil(t, s1.Process) + assert.Equal(t, nullProcessServiceName, s1.Process.ServiceName) + s2 := s(&model.Span{Process: &model.Process{}}) + assert.Equal(t, serviceNameReplacement, s2.Process.ServiceName) +} diff --git a/cmd/collector/app/sanitizer/sanitizer.go b/cmd/collector/app/sanitizer/sanitizer.go index 4d89af8bdd0..3ccebda82d4 100644 --- a/cmd/collector/app/sanitizer/sanitizer.go +++ b/cmd/collector/app/sanitizer/sanitizer.go @@ -23,8 +23,19 @@ import ( // span should implement this interface. type SanitizeSpan func(span *model.Span) *model.Span -// NewChainedSanitizer creates a Sanitizer from the variadic list of passed Sanitizers +// NewStandardSanitizers are automatically applied by SpanProcessor. +func NewStandardSanitizers() []SanitizeSpan { + return []SanitizeSpan{ + NewEmptyServiceNameSanitizer(), + } +} + +// NewChainedSanitizer creates a Sanitizer from the variadic list of passed Sanitizers. +// If the list only has one element, it is returned directly to minimize indirection. func NewChainedSanitizer(sanitizers ...SanitizeSpan) SanitizeSpan { + if len(sanitizers) == 1 { + return sanitizers[0] + } return func(span *model.Span) *model.Span { for _, s := range sanitizers { span = s(span) diff --git a/cmd/collector/app/sanitizer/sanitizer_test.go b/cmd/collector/app/sanitizer/sanitizer_test.go new file mode 100644 index 00000000000..e370e7a7747 --- /dev/null +++ b/cmd/collector/app/sanitizer/sanitizer_test.go @@ -0,0 +1,44 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sanitizer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/model" +) + +func TestNewStandardSanitizers(t *testing.T) { + NewStandardSanitizers() +} + +func TestChainedSanitizer(t *testing.T) { + var s1 SanitizeSpan = func(span *model.Span) *model.Span { + span.Process = &model.Process{ServiceName: "s1"} + return span + } + var s2 SanitizeSpan = func(span *model.Span) *model.Span { + span.Process = &model.Process{ServiceName: "s2"} + return span + } + c1 := NewChainedSanitizer(s1) + sp1 := c1(&model.Span{}) + assert.Equal(t, "s1", sp1.Process.ServiceName) + c2 := NewChainedSanitizer(s1, s2) + sp2 := c2(&model.Span{}) + assert.Equal(t, "s2", sp2.Process.ServiceName) +} diff --git a/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go b/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go index 61e36a67f1a..71b707f90b2 100644 --- a/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go +++ b/cmd/collector/app/sanitizer/zipkin/span_sanitizer.go @@ -27,11 +27,17 @@ const ( zeroParentIDTag = "errZeroParentID" ) -var ( - defaultDuration = int64(1) - // StandardSanitizers is a list of standard zipkin sanitizers. - StandardSanitizers = []Sanitizer{NewSpanStartTimeSanitizer(), NewSpanDurationSanitizer(), NewParentIDSanitizer(), NewErrorTagSanitizer()} -) +var defaultDuration = int64(1) // not a const because we take its address + +// NewStandardSanitizers is a list of standard zipkin sanitizers. +func NewStandardSanitizers() []Sanitizer { + return []Sanitizer{ + NewSpanStartTimeSanitizer(), + NewSpanDurationSanitizer(), + NewParentIDSanitizer(), + NewErrorTagSanitizer(), + } +} // Sanitizer interface for sanitizing spans. Any business logic that needs to be applied to normalize the contents of a // span should implement this interface. diff --git a/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go b/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go index 04e63997ec5..137d3478d5a 100644 --- a/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go +++ b/cmd/collector/app/sanitizer/zipkin/span_sanitizer_test.go @@ -28,6 +28,10 @@ var ( positiveDuration = int64(1) ) +func TestNewStandardSanitizers(t *testing.T) { + NewStandardSanitizers() +} + func TestChainedSanitizer(t *testing.T) { sanitizer := NewChainedSanitizer(NewSpanDurationSanitizer()) diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index 301cecfbf5d..69e5aac865e 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -67,7 +67,11 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) proce // BuildHandlers builds span handlers (Zipkin, Jaeger) func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor) *SpanHandlers { return &SpanHandlers{ - handler.NewZipkinSpanHandler(b.Logger, spanProcessor, zs.NewChainedSanitizer(zs.StandardSanitizers...)), + handler.NewZipkinSpanHandler( + b.Logger, + spanProcessor, + zs.NewChainedSanitizer(zs.NewStandardSanitizers()...), + ), handler.NewJaegerSpanHandler(b.Logger, spanProcessor), handler.NewGRPCHandler(b.Logger, spanProcessor), } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 906b3930aca..abb7a322a9b 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -63,7 +63,7 @@ type queueItem struct { span *model.Span } -// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans +// NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans. func NewSpanProcessor( spanWriter spanstore.Writer, additional []ProcessSpan, @@ -96,13 +96,18 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt } boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler) + sanitizers := sanitizer.NewStandardSanitizers() + if options.sanitizer != nil { + sanitizers = append(sanitizers, options.sanitizer) + } + sp := spanProcessor{ queue: boundedQueue, metrics: handlerMetrics, logger: options.logger, preProcessSpans: options.preProcessSpans, filterSpan: options.spanFilter, - sanitizer: options.sanitizer, + sanitizer: sanitizer.NewChainedSanitizer(sanitizers...), reportBusy: options.reportBusy, numWorkers: options.numWorkers, spanWriter: spanWriter, diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index ab781f80818..419d0a76326 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -165,10 +165,15 @@ func isSpanAllowed(span *model.Span) bool { } type fakeSpanWriter struct { - err error + spansLock sync.Mutex + spans []*model.Span + err error } func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { + n.spansLock.Lock() + defer n.spansLock.Unlock() + n.spans = append(n.spans, span) return n.err } @@ -222,16 +227,15 @@ func TestSpanProcessor(t *testing.T) { w := &fakeSpanWriter{} p := NewSpanProcessor(w, nil, Options.QueueSize(1)).(*spanProcessor) - res, err := p.ProcessSpans([]*model.Span{ - { - Process: &model.Process{ - ServiceName: "x", - }, - }, - }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + res, err := p.ProcessSpans( + []*model.Span{{}}, // empty span should be enriched by sanitizers + processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) assert.NoError(t, p.Close()) + assert.Len(t, w.spans, 1) + assert.NotNil(t, w.spans[0].Process) + assert.NotEmpty(t, w.spans[0].Process.ServiceName) } func TestSpanProcessorErrors(t *testing.T) { diff --git a/scripts/es-integration-test.sh b/scripts/es-integration-test.sh index 63727f9ffa8..9a7a229bc55 100755 --- a/scripts/es-integration-test.sh +++ b/scripts/es-integration-test.sh @@ -1,5 +1,6 @@ #!/bin/bash +PS4='T$(date "+%H:%M:%S") ' set -euxf -o pipefail usage() { @@ -68,7 +69,7 @@ wait_for_it() { ''%{http_code}'' ) local counter=0 - while [[ "$(curl ${params[@]} ${url})" != "200" && ${counter} -le 30 ]]; do + while [[ "$(curl ${params[@]} ${url})" != "200" && ${counter} -le 60 ]]; do sleep 2 counter=$((counter+1)) echo "waiting for ${url} to be up..."