Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] fix bug with queue size and capacity metrics #8716

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/codeboten_fix-queue-size-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: fix bug with queue size and capacity metrics

# One or more tracking issues or pull requests related to the change
issues: [8682]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
49 changes: 47 additions & 2 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,24 @@ import (

"go.opencensus.io/metric/metricdata"
"go.opentelemetry.io/otel/attribute"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

const defaultQueueSize = 1000

var errSendingQueueIsFull = errors.New("sending_queue is full")
var (
errSendingQueueIsFull = errors.New("sending_queue is full")
scopeName = "go.opentelemetry.io/collector/exporterhelper"
)

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
Expand Down Expand Up @@ -74,6 +80,9 @@ type queueSender struct {
traceAttribute attribute.KeyValue
logger *zap.Logger
requeuingEnabled bool

metricCapacity otelmetric.Int64ObservableGauge
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender {
Expand Down Expand Up @@ -131,8 +140,44 @@ func (qs *queueSender) start(ctx context.Context, host component.Host, set expor
return err
}

if obsreportconfig.UseOtelForInternalMetricsfeatureGate.IsEnabled() {
return qs.recordWithOtel(set.MeterProvider.Meter(scopeName))
}
return qs.recordWithOC()
}

func (qs *queueSender) recordWithOtel(meter otelmetric.Meter) error {
var err, errs error

attrs := otelmetric.WithAttributeSet(attribute.NewSet(attribute.String(obsmetrics.ExporterKey, qs.fullName)))

qs.metricSize, err = meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_size",
otelmetric.WithDescription("Current size of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error {
o.Observe(int64(qs.queue.Size()), attrs)
return nil
}),
)
errs = multierr.Append(errs, err)

qs.metricCapacity, err = meter.Int64ObservableGauge(
obsmetrics.ExporterKey+"/queue_capacity",
otelmetric.WithDescription("Fixed capacity of the retry queue (in batches)"),
otelmetric.WithUnit("1"),
otelmetric.WithInt64Callback(func(_ context.Context, o otelmetric.Int64Observer) error {
o.Observe(int64(qs.queue.Capacity()), attrs)
return nil
}))

errs = multierr.Append(errs, err)
return errs
}

func (qs *queueSender) recordWithOC() error {
// Start reporting queue length metric
err = globalInstruments.queueSize.UpsertEntry(func() int64 {
err := globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(qs.queue.Size())
}, metricdata.NewLabelValue(qs.fullName))
if err != nil {
Expand Down
42 changes: 41 additions & 1 deletion exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)
Expand Down Expand Up @@ -132,11 +134,24 @@ func TestQueuedRetryHappyPath(t *testing.T) {
ocs.checkDroppedItemsCount(t, 0)
}

// Force the state of feature gate for a test
func setFeatureGateForTest(t testing.TB, gate *featuregate.Gate, enabled bool) func() {
originalValue := gate.IsEnabled()
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), enabled))
return func() {
require.NoError(t, featuregate.GlobalRegistry().Set(gate.ID(), originalValue))
}
}

func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := NewDefaultRetrySettings()
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

Expand All @@ -150,6 +165,31 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
checkValueForGlobalManager(t, defaultExporterTags, int64(0), "exporter/queue_size")
}

func TestQueuedRetry_QueueMetricsReportedUsingOTel(t *testing.T) {
resetFlag := setFeatureGateForTest(t, obsreportconfig.UseOtelForInternalMetricsfeatureGate, true)
defer resetFlag()

tt, err := obsreporttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := NewDefaultRetrySettings()
set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings, BuildInfo: component.NewDefaultBuildInfo()}
be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))

require.NoError(t, tt.CheckExporterMetricGauge("exporter_queue_capacity", int64(defaultQueueSize)))

for i := 0; i < 7; i++ {
require.NoError(t, be.send(newErrorRequest(context.Background())))
}
require.NoError(t, tt.CheckExporterMetricGauge("exporter_queue_size", int64(7)))

assert.NoError(t, be.Shutdown(context.Background()))
}

func TestNoCancellationContext(t *testing.T) {
deadline := time.Now().Add(1 * time.Second)
ctx, cancelFunc := context.WithDeadline(context.Background(), deadline)
Expand Down
4 changes: 4 additions & 0 deletions obsreport/obsreporttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func (tts *TestTelemetry) CheckExporterLogs(sentLogRecords, sendFailedLogRecords
return tts.prometheusChecker.checkExporterLogs(tts.id, sentLogRecords, sendFailedLogRecords)
}

func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) error {
return tts.prometheusChecker.checkExporterMetricGauge(tts.id, metric, val)
}

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
Expand Down
18 changes: 18 additions & 0 deletions obsreport/obsreporttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ func (pc *prometheusChecker) checkExporterEnqueueFailed(exporter component.ID, d
return pc.checkCounter(fmt.Sprintf("exporter_enqueue_failed_%s", datatype), enqueueFailed, exporterAttrs)
}

func (pc *prometheusChecker) checkExporterMetricGauge(exporter component.ID, metric string, val int64) error {
exporterAttrs := attributesForExporterMetrics(exporter)
// Forces a flush for the opencensus view data.
_, _ = view.RetrieveData(metric)

ts, err := pc.getMetric(metric, io_prometheus_client.MetricType_GAUGE, exporterAttrs)
if err != nil {
return err
}

expected := float64(val)
if math.Abs(ts.GetGauge().GetValue()-expected) > 0.0001 {
return fmt.Errorf("values for metric '%s' did not match, expected '%f' got '%f'", metric, expected, ts.GetGauge().GetValue())
}

return nil
}

func (pc *prometheusChecker) checkCounter(expectedMetric string, value int64, attrs []attribute.KeyValue) error {
// Forces a flush for the opencensus view data.
_, _ = view.RetrieveData(expectedMetric)
Expand Down