Skip to content

Commit

Permalink
processor/otel: index span events as logs
Browse files Browse the repository at this point in the history
Produce log events for non-exception span events,
capturing the span event name as the log message,
and all other attributes as labels.

We only index logs when data streams are enabled;
they are dropped when classic indices are in use.
  • Loading branch information
axw committed Sep 4, 2021
1 parent 31893d2 commit a70b1c0
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 26 deletions.
25 changes: 25 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/apm-server/beater/config"
Expand Down Expand Up @@ -356,10 +357,20 @@ func (s *serverRunner) run(listener net.Listener) error {

// Send config to telemetry.
recordAPMServerConfig(s.config)

publisherConfig := &publish.PublisherConfig{
Pipeline: s.config.Pipeline,
Namespace: s.namespace,
}
if !s.config.DataStreams.Enabled {
// Logs are only supported with data streams;
// add a beat.Processor which drops them.
dropLogsProcessor, err := newDropLogsBeatProcessor()
if err != nil {
return err
}
publisherConfig.Processor = dropLogsProcessor
}

var kibanaClient kibana_client.Client
if s.config.Kibana.Enabled {
Expand Down Expand Up @@ -755,3 +766,17 @@ type transformerFunc func(context.Context) []beat.Event
func (f transformerFunc) Transform(ctx context.Context) []beat.Event {
return f(ctx)
}

func newDropLogsBeatProcessor() (beat.ProcessorList, error) {
return processors.New(processors.PluginConfig{
common.MustNewConfigFrom(map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"processor.event": "log",
},
},
},
}),
})
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
- Data streams now define a default `dynamic` mapping parameter, overridable in the `<data-stream>@custom` template {pull}5947[5947]
- The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974]
- Define index sorting for internal metrics data stream {pull}6116[6116]
- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122]

[float]
==== Deprecated
27 changes: 27 additions & 0 deletions model/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package model

const (
AppLogsDataset = "apm.app"
)

var (
// LogProcessor is the Processor value that should be assigned to log events.
LogProcessor = Processor{Name: "log", Event: "log"}
)
3 changes: 3 additions & 0 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) {
case model.ErrorProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.ErrorsDataset
case model.LogProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.AppLogsDataset
case model.MetricsetProcessor:
event.DataStream.Type = datastreams.MetricsType
// Metrics that include well-defined transaction/span fields
Expand Down
3 changes: 3 additions & 0 deletions model/modelprocessor/datastream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func TestSetDataStream(t *testing.T) {
}, {
input: model.APMEvent{Processor: model.ErrorProcessor},
output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"},
}, {
input: model.APMEvent{Processor: model.LogProcessor},
output: model.DataStream{Type: "logs", Dataset: "apm.app", Namespace: "custom"},
}, {
input: model.APMEvent{
Processor: model.MetricsetProcessor,
Expand Down
58 changes: 33 additions & 25 deletions processor/otel/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/apm-server/datastreams"
logs "github.com/elastic/apm-server/log"
"github.com/elastic/apm-server/model"
)
Expand Down Expand Up @@ -230,6 +231,7 @@ func (c *Consumer) convertSpan(
*out = append(*out, event)

events := otelSpan.Events()
event.Processor = model.Processor{}
event.Labels = baseEvent.Labels // only copy common labels to span events
event.Event.Outcome = "" // don't set event.outcome for span events
event.Destination = model.Destination{} // don't set destination for span events
Expand Down Expand Up @@ -826,22 +828,21 @@ func convertSpanEvent(
timeDelta time.Duration,
out *model.Batch,
) {
var e *model.Error
event := parent
event.Labels = initEventLabels(event.Labels)
event.Transaction = nil
event.Span = nil
event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta)

isJaeger := strings.HasPrefix(parent.Agent.Name, "Jaeger")
if isJaeger {
e = convertJaegerErrorSpanEvent(logger, spanEvent)
} else {
event.Error = convertJaegerErrorSpanEvent(logger, spanEvent, event.Labels)
} else if spanEvent.Name() == "exception" {
// Translate exception span events to errors.
//
// If it's not Jaeger, we assume OpenTelemetry semantic semconv.
//
// TODO(axw) we don't currently support arbitrary events, we only look
// for exceptions and convert those to Elastic APM error events.
if spanEvent.Name() != "exception" {
// Per OpenTelemetry semantic conventions:
// `The name of the event MUST be "exception"`
return
}
// Per OpenTelemetry semantic conventions:
// `The name of the event MUST be "exception"`
var exceptionEscaped bool
var exceptionMessage, exceptionStacktrace, exceptionType string
spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
Expand All @@ -854,34 +855,39 @@ func convertSpanEvent(
exceptionType = v.StringVal()
case "exception.escaped":
exceptionEscaped = v.BoolVal()
default:
event.Labels[replaceDots(k)] = ifaceAttributeValue(v)
}
return true
})
if exceptionMessage == "" && exceptionType == "" {
if exceptionMessage != "" || exceptionType != "" {
// Per OpenTelemetry semantic conventions:
// `At least one of the following sets of attributes is required:
// - exception.type
// - exception.message`
return
event.Error = convertOpenTelemetryExceptionSpanEvent(
exceptionType, exceptionMessage, exceptionStacktrace,
exceptionEscaped, parent.Service.Language.Name,
)
}
e = convertOpenTelemetryExceptionSpanEvent(
exceptionType, exceptionMessage, exceptionStacktrace,
exceptionEscaped, parent.Service.Language.Name,
)
}
if e != nil {
event := parent
event.Transaction = nil
event.Span = nil

if event.Error != nil {
event.Processor = model.ErrorProcessor
event.Error = e
event.Timestamp = spanEvent.Timestamp().AsTime().Add(timeDelta)
setErrorContext(&event, parent)
*out = append(*out, event)
} else {
event.Processor = model.LogProcessor
event.DataStream.Type = datastreams.LogsType
event.Message = spanEvent.Name()
spanEvent.Attributes().Range(func(k string, v pdata.AttributeValue) bool {
event.Labels[replaceDots(k)] = ifaceAttributeValue(v)
return true
})
}
*out = append(*out, event)
}

func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *model.Error {
func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent, labels common.MapStr) *model.Error {
var isError bool
var exMessage, exType string
logMessage := event.Name()
Expand Down Expand Up @@ -913,6 +919,8 @@ func convertJaegerErrorSpanEvent(logger *logp.Logger, event pdata.SpanEvent) *mo
isError = true
case "level":
isError = stringval == "error"
default:
labels[replaceDots(k)] = ifaceAttributeValue(v)
}
return true
})
Expand Down
2 changes: 1 addition & 1 deletion processor/otel/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ func testJaegerLogs() []jaegermodel.Log {
}, {
Timestamp: testStartTime().Add(65 * time.Nanosecond),
Fields: jaegerKeyValues(
"event", "retrying connection",
"message", "retrying connection",
"level", "info",
),
}, {
Expand Down

0 comments on commit a70b1c0

Please sign in to comment.