Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor/otel: index span events as logs #6122

Merged
merged 1 commit into from
Sep 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
esoutput "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

"github.com/elastic/apm-server/beater/config"
Expand Down Expand Up @@ -356,10 +357,20 @@ func (s *serverRunner) run(listener net.Listener) error {

// Send config to telemetry.
recordAPMServerConfig(s.config)

publisherConfig := &publish.PublisherConfig{
Pipeline: s.config.Pipeline,
Namespace: s.namespace,
}
if !s.config.DataStreams.Enabled {
// Logs are only supported with data streams;
// add a beat.Processor which drops them.
dropLogsProcessor, err := newDropLogsBeatProcessor()
if err != nil {
return err
}
publisherConfig.Processor = dropLogsProcessor
}

var kibanaClient kibana_client.Client
if s.config.Kibana.Enabled {
Expand Down Expand Up @@ -755,3 +766,17 @@ type transformerFunc func(context.Context) []beat.Event
func (f transformerFunc) Transform(ctx context.Context) []beat.Event {
return f(ctx)
}

func newDropLogsBeatProcessor() (beat.ProcessorList, error) {
return processors.New(processors.PluginConfig{
common.MustNewConfigFrom(map[string]interface{}{
"drop_event": map[string]interface{}{
"when": map[string]interface{}{
"contains": map[string]interface{}{
"processor.event": "log",
},
},
},
}),
})
}
1 change: 1 addition & 0 deletions changelogs/head.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ https://github.com/elastic/apm-server/compare/7.13\...master[View commits]
- The `error.log.message` or `error.exception.message` field of errors will be copied to the ECS field `message` {pull}5974[5974]
- Define index sorting for internal metrics data stream {pull}6116[6116]
- Add histogram dynamic_template to app metrics data stream {pull}6043[6043]
- Index OpenTelemetry span events and Jaeger logs into a log data stream {pull}6122[6122]

[float]
==== Deprecated
7 changes: 2 additions & 5 deletions docs/guide/opentelemetry-elastic.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -371,23 +371,20 @@ Here is an example of AWS Lambda Node.js function managed with Terraform and the
* https://github.com/michaelhyatt/terraform-aws-nodejs-api-worker-otel/tree/v0.23[Sample Terraform code]

[[elastic-open-telemetry-known-limitations]]
==== Limitations in 7.13
==== Limitations


[[elastic-open-telemetry-traces-limitations]]
===== OpenTelemetry traces

* Traces of applications using `messaging` semantics might be wrongly displayed or not shown in the APM UI. You may only see `spans` coming from such services, but no `transaction` https://github.com/elastic/apm-server/issues/5094[#5094]
* Inability to see Stack traces in spans or, in general, arbitrary span events for applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4715[#4715]
* Inability to see Stack traces in spans
* Inability in APM views to view the "Time Spent by Span Type" https://github.com/elastic/apm-server/issues/5747[#5747]
* Metrics derived from traces (throughput, latency, and errors) are not accurate when traces are sampled before being ingested by Elastic Observability (ie by an OpenTelemetry Collector or OpenTelemetry APM agent or SDK) https://github.com/elastic/apm/issues/472[#472]

[[elastic-open-telemetry-metrics-limitations]]
===== OpenTelemetry metrics

* OpenTelemetry histogram metrics, https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/DoubleValueRecorder.html[`DoubleValueRecorder`]
and https://www.javadoc.io/doc/io.opentelemetry/opentelemetry-api/latest/io/opentelemetry/api/metrics/LongValueObserver.html[`LongValueRecorder`], are not yet supported https://github.com/elastic/apm-server/issues/3195[#3195]
* Inability to see JVM metrics in Elastic APM UI for Java applications instrumented with OpenTelemetry https://github.com/elastic/apm-server/issues/4919[#4919]
* Inability to see host metrics in Elastic Metrics Infrastructure view when using the OpenTelemetry Collector host metrics receiver https://github.com/elastic/apm-server/issues/5310[#5310]

[[elastic-open-telemetry-logs-limitations]]
Expand Down
1 change: 0 additions & 1 deletion docs/jaeger-reference.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ There are some limitations and differences between Elastic APM and Jaeger that y
* Because Jaeger has its own trace context header, and does not currently support W3C trace context headers,
it is not possible to mix and match the use of Elastic's APM agents and Jaeger's clients.
* Elastic APM only supports probabilistic sampling.
* We currently only support exception logging. Span logs are not supported.

*Differences between APM Agents and Jaeger Clients:*

Expand Down
27 changes: 27 additions & 0 deletions model/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package model

const (
AppLogsDataset = "apm.app"
)

var (
// LogProcessor is the Processor value that should be assigned to log events.
LogProcessor = Processor{Name: "log", Event: "log"}
)
3 changes: 3 additions & 0 deletions model/modelprocessor/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (s *SetDataStream) setDataStream(event *model.APMEvent) {
case model.ErrorProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.ErrorsDataset
case model.LogProcessor:
event.DataStream.Type = datastreams.LogsType
event.DataStream.Dataset = model.AppLogsDataset
case model.MetricsetProcessor:
event.DataStream.Type = datastreams.MetricsType
// Metrics that include well-defined transaction/span fields
Expand Down
3 changes: 3 additions & 0 deletions model/modelprocessor/datastream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func TestSetDataStream(t *testing.T) {
}, {
input: model.APMEvent{Processor: model.ErrorProcessor},
output: model.DataStream{Type: "logs", Dataset: "apm.error", Namespace: "custom"},
}, {
input: model.APMEvent{Processor: model.LogProcessor},
output: model.DataStream{Type: "logs", Dataset: "apm.app", Namespace: "custom"},
}, {
input: model.APMEvent{
Processor: model.MetricsetProcessor,
Expand Down
10 changes: 8 additions & 2 deletions processor/otel/exceptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
semconv "go.opentelemetry.io/collector/model/semconv/v1.5.0"

"github.com/elastic/apm-server/model"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestEncodeSpanEventsNonExceptions(t *testing.T) {
Expand All @@ -57,8 +58,10 @@ func TestEncodeSpanEventsNonExceptions(t *testing.T) {
semconv.AttributeExceptionStacktrace: pdata.NewAttributeValueString("stacktrace"),
})

_, errors := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent)
require.Empty(t, errors)
_, events := transformTransactionSpanEvents(t, "java", nonExceptionEvent, incompleteExceptionEvent)
require.Len(t, events, 2)
assert.Equal(t, model.LogProcessor, events[0].Processor)
assert.Equal(t, model.LogProcessor, events[1].Processor)
}

func TestEncodeSpanEventsJavaExceptions(t *testing.T) {
Expand Down Expand Up @@ -114,6 +117,7 @@ Caused by: LowLevelException
Service: service,
Agent: agent,
Timestamp: timestamp,
Labels: common.MapStr{},
Processor: model.ErrorProcessor,
Trace: transactionEvent.Trace,
Parent: model.Parent{ID: transactionEvent.Transaction.ID},
Expand Down Expand Up @@ -164,6 +168,7 @@ Caused by: LowLevelException
Service: service,
Agent: agent,
Timestamp: timestamp,
Labels: common.MapStr{},
Processor: model.ErrorProcessor,
Trace: transactionEvent.Trace,
Parent: model.Parent{ID: transactionEvent.Transaction.ID},
Expand Down Expand Up @@ -323,6 +328,7 @@ func TestEncodeSpanEventsNonJavaExceptions(t *testing.T) {
Service: service,
Agent: agent,
Timestamp: timestamp,
Labels: common.MapStr{},
Processor: model.ErrorProcessor,
Trace: transactionEvent.Trace,
Parent: model.Parent{ID: transactionEvent.Transaction.ID},
Expand Down
101 changes: 101 additions & 0 deletions processor/otel/test_approved/span_jaeger_http.approved.json
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,107 @@
"url": {
"original": "http://foo.bar.com?a=12"
}
},
{
"@timestamp": "2019-12-16T12:46:58.000Z",
"agent": {
"name": "Jaeger",
"version": "unknown"
},
"data_stream.type": "logs",
"host": {
"hostname": "host-abc"
},
"labels": {
"event": "baggage",
"isValid": false
},
"parent": {
"id": "0000000058585858"
},
"processor": {
"event": "log",
"name": "log"
},
"service": {
"language": {
"name": "unknown"
},
"name": "unknown"
},
"trace": {
"id": "00000000000000000000000046467830"
},
"url": {
"original": "http://foo.bar.com?a=12"
}
},
{
"@timestamp": "2019-12-16T12:46:58.000Z",
"agent": {
"name": "Jaeger",
"version": "unknown"
},
"data_stream.type": "logs",
"host": {
"hostname": "host-abc"
},
"labels": {
"level": "info"
},
"message": "retrying connection",
"parent": {
"id": "0000000058585858"
},
"processor": {
"event": "log",
"name": "log"
},
"service": {
"language": {
"name": "unknown"
},
"name": "unknown"
},
"trace": {
"id": "00000000000000000000000046467830"
},
"url": {
"original": "http://foo.bar.com?a=12"
}
},
{
"@timestamp": "2019-12-16T12:46:58.000Z",
"agent": {
"name": "Jaeger",
"version": "unknown"
},
"data_stream.type": "logs",
"host": {
"hostname": "host-abc"
},
"labels": {
"level": "error"
},
"parent": {
"id": "0000000058585858"
},
"processor": {
"event": "log",
"name": "log"
},
"service": {
"language": {
"name": "unknown"
},
"name": "unknown"
},
"trace": {
"id": "00000000000000000000000046467830"
},
"url": {
"original": "http://foo.bar.com?a=12"
}
}
]
}
Loading