Skip to content

Commit

Permalink
[exporterhelper] Add data_type attribute to internal queue metric
Browse files Browse the repository at this point in the history
Add data_type attribute to the internal otelcol_exporter_queue_size metric to report the type of data being processed.

All other metrics have the data type reported as part of their names. We could've done the same for queue metrics, but that would introduce a significant breaking change. We want to avoid that until we have all the metrics standardized with OpenTelemetry semantic conventions.
  • Loading branch information
dmitryax committed Jul 22, 2024
1 parent 9ef6356 commit 6e55752
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 25 deletions.
20 changes: 20 additions & 0 deletions .chloggen/exporterhelper-report-data-type-in-queue-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add data_type attribute to `otelcol_exporter_queue_size` metric to report the type of data being processed.

# One or more tracking issues or pull requests related to the change
issues: [9943]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
14 changes: 13 additions & 1 deletion component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,26 @@ func (tts *TestTelemetry) CheckExporterEnqueueFailedLogs(enqueueFailed int64) er
return tts.prometheusChecker.checkExporterEnqueueFailed(tts.id, "log_records", enqueueFailed)
}

func (tts *TestTelemetry) CheckExporterQueueSizeMetric(size int64) error {
return tts.prometheusChecker.checkExporterQueueSize(tts.id, component.DataTypeMetrics, size)

Check warning on line 70 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}

func (tts *TestTelemetry) CheckExporterQueueSizeTraces(size int64) error {
return tts.prometheusChecker.checkExporterQueueSize(tts.id, component.DataTypeTraces, size)

Check warning on line 74 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L73-L74

Added lines #L73 - L74 were not covered by tests
}

func (tts *TestTelemetry) CheckExporterQueueSizeLogs(size int64) error {
return tts.prometheusChecker.checkExporterQueueSize(tts.id, component.DataTypeLogs, size)

Check warning on line 78 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}

// CheckExporterLogs checks that for the current exported values for logs exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords int64) error {
return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords)
}

func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error {
return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val)
return tts.prometheusChecker.checkGauge(metric, val, attributesForExporterMetrics(tts.id))

Check warning on line 88 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L88

Added line #L88 was not covered by tests
}

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
Expand Down
12 changes: 8 additions & 4 deletions component/componenttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (pc *prometheusChecker) checkExporter(exporter component.ID, datatype strin
return errs
}

func (pc *prometheusChecker) checkExporterQueueSize(exporter component.ID, datatype component.DataType, size int64) error {
attrs := attributesForExporterMetrics(exporter)
attrs = append(attrs, attribute.String("data_type", datatype.String()))
return pc.checkGauge("otelcol_exporter_queue_size", size, attrs)

Check warning on line 98 in component/componenttest/otelprometheuschecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelprometheuschecker.go#L95-L98

Added lines #L95 - L98 were not covered by tests
}

func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, datatype string, enqueueFailed int64) error {
if enqueueFailed == 0 {
return nil
Expand All @@ -100,10 +106,8 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d
return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs)
}

func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error {
exporterAttrs := attributesForExporterMetrics(exporter)

ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs)
func (pc *prometheusChecker) checkGauge(metric string, val int64, attrs []attribute.KeyValue) error {
ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, attrs)

Check warning on line 110 in component/componenttest/otelprometheuschecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelprometheuschecker.go#L109-L110

Added lines #L109 - L110 were not covered by tests
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func WithQueue(config QueueSettings) Option {
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand All @@ -132,7 +132,7 @@ func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Facto
DataType: o.signal,
ExporterSettings: o.set,
}
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep.telemetryBuilder)
o.queueSender = newQueueSender(queueFactory(context.Background(), set, cfg), o.set, cfg.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
}
}
Expand Down Expand Up @@ -250,7 +250,7 @@ type baseExporter struct {
}

func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set, DataType: signal})
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion exporter/exporterhelper/obsexporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ObsReport struct {
level configtelemetry.Level
spanNamePrefix string
tracer trace.Tracer
dataType component.DataType

otelAttrs []attribute.KeyValue
telemetryBuilder *metadata.TelemetryBuilder
Expand All @@ -38,6 +39,7 @@ type ObsReport struct {
type ObsReportSettings struct {
ExporterID component.ID
ExporterCreateSettings exporter.Settings
DataType component.DataType
}

// NewObsReport creates a new Exporter.
Expand All @@ -58,7 +60,7 @@ func newExporter(cfg ObsReportSettings) (*ObsReport, error) {
level: cfg.ExporterCreateSettings.TelemetrySettings.MetricsLevel,
spanNamePrefix: obsmetrics.ExporterPrefix + cfg.ExporterID.String(),
tracer: cfg.ExporterCreateSettings.TracerProvider.Tracer(cfg.ExporterID.String()),

dataType: cfg.DataType,
otelAttrs: []attribute.KeyValue{
attribute.String(obsmetrics.ExporterKey, cfg.ExporterID.String()),
},
Expand Down
25 changes: 13 additions & 12 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal/queue"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
Expand Down Expand Up @@ -74,18 +73,18 @@ type queueSender struct {
traceAttribute attribute.KeyValue
consumers *queue.Consumers[Request]

telemetryBuilder *metadata.TelemetryBuilder
exporterID component.ID
obsrep *ObsReport
exporterID component.ID
}

func newQueueSender(q exporterqueue.Queue[Request], set exporter.Settings, numConsumers int,
exportFailureMessage string, telemetryBuilder *metadata.TelemetryBuilder) *queueSender {
exportFailureMessage string, obsrep *ObsReport) *queueSender {
qs := &queueSender{
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
telemetryBuilder: telemetryBuilder,
exporterID: set.ID,
queue: q,
numConsumers: numConsumers,
traceAttribute: attribute.String(obsmetrics.ExporterKey, set.ID.String()),
obsrep: obsrep,
exporterID: set.ID,
}
consumeFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
Expand All @@ -105,10 +104,12 @@ func (qs *queueSender) Start(ctx context.Context, host component.Host) error {
return err
}

opts := metric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.exporterID.String())))
dataTypeAttr := attribute.String(obsmetrics.DataTypeKey, qs.obsrep.dataType.String())
return multierr.Append(
qs.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) }, opts),
qs.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) }, opts),
qs.obsrep.telemetryBuilder.InitExporterQueueSize(func() int64 { return int64(qs.queue.Size()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute, dataTypeAttr))),
qs.obsrep.telemetryBuilder.InitExporterQueueCapacity(func() int64 { return int64(qs.queue.Capacity()) },
metric.WithAttributeSet(attribute.NewSet(qs.traceAttribute))),
)
}

Expand Down
10 changes: 6 additions & 4 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadata"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal/queue"
Expand Down Expand Up @@ -221,7 +220,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
for i := 0; i < 7; i++ {
require.NoError(t, be.send(context.Background(), newErrorRequest()))
}
require.NoError(t, tt.CheckExporterMetricGauge("otelcol_exporter_queue_size", int64(7)))
require.NoError(t, tt.CheckExporterQueueSizeMetric(int64(7)))

assert.NoError(t, be.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -426,9 +425,12 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
func TestQueueSenderNoStartShutdown(t *testing.T) {
queue := queue.NewBoundedMemoryQueue[Request](queue.MemoryQueueSettings[Request]{})
set := exportertest.NewNopSettings()
builder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
obsrep, err := NewObsReport(ObsReportSettings{
ExporterID: exporterID,
ExporterCreateSettings: exportertest.NewNopSettings(),
})
assert.NoError(t, err)
qs := newQueueSender(queue, set, 1, "", builder)
qs := newQueueSender(queue, set, 1, "", obsrep)
assert.NoError(t, qs.Shutdown(context.Background()))
}

Expand Down
3 changes: 3 additions & 0 deletions internal/obsreportconfig/obsmetrics/obs_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ const (
// ExporterKey used to identify exporters in metrics and traces.
ExporterKey = "exporter"

// DataTypeKey used to identify the data type in the queue size metric.
DataTypeKey = "data_type"

// SentSpansKey used to track spans sent by exporters.
SentSpansKey = "sent_spans"
// FailedToSendSpansKey used to track spans that failed to be sent by exporters.
Expand Down

0 comments on commit 6e55752

Please sign in to comment.