diff --git a/CHANGELOG.md b/CHANGELOG.md index 472a8fce616..10ac79452ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - [bridge/ot] Fall-back to TextMap carrier when it's not ot.HttpHeaders. (#3679) +- The `Collect` method of the `"go.opentelemetry.io/otel/sdk/metric".Reader` interface is updated to accept the `metricdata.ResourceMetrics` value the collection will be made into. This change is made to enable memory reuse by SDK users. (#3732) ### Fixed diff --git a/exporters/prometheus/exporter.go b/exporters/prometheus/exporter.go index 734d6315b4c..feb8d0c5acc 100644 --- a/exporters/prometheus/exporter.go +++ b/exporters/prometheus/exporter.go @@ -113,7 +113,9 @@ func (c *collector) Describe(ch chan<- *prometheus.Desc) { // Collect implements prometheus.Collector. func (c *collector) Collect(ch chan<- prometheus.Metric) { - metrics, err := c.reader.Collect(context.TODO()) + // TODO (#3047): Use a sync.Pool instead of allocating metrics every Collect. + metrics := metricdata.ResourceMetrics{} + err := c.reader.Collect(context.TODO(), &metrics) if err != nil { otel.Handle(err) if err == metric.ErrReaderNotRegistered { diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 0a75079d275..9a30cd1749d 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -93,7 +93,7 @@ func BenchmarkCounterCollectOneAttr(b *testing.B) { for i := 0; i < b.N; i++ { cntr.Add(ctx, 1, attribute.Int("K", 1)) - _, _ = rdr.Collect(ctx) + _ = rdr.Collect(ctx, nil) } } @@ -104,7 +104,7 @@ func BenchmarkCounterCollectTenAttrs(b *testing.B) { for j := 0; j < 10; j++ { cntr.Add(ctx, 1, attribute.Int("K", j)) } - _, _ = rdr.Collect(ctx) + _ = rdr.Collect(ctx, nil) } } @@ -140,7 +140,7 @@ func benchCollectHistograms(count int) func(*testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - collectedMetrics, _ = r.Collect(ctx) + _ = r.Collect(ctx, &collectedMetrics) if len(collectedMetrics.ScopeMetrics[0].Metrics) != count { b.Fatalf("got %d metrics, want %d", len(collectedMetrics.ScopeMetrics[0].Metrics), count) } diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index d5dd6e4af6a..6e48a4599ca 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -32,7 +32,7 @@ type reader struct { externalProducers []Producer temporalityFunc TemporalitySelector aggregationFunc AggregationSelector - collectFunc func(context.Context) (metricdata.ResourceMetrics, error) + collectFunc func(context.Context, *metricdata.ResourceMetrics) error forceFlushFunc func(context.Context) error shutdownFunc func(context.Context) error } @@ -48,8 +48,8 @@ func (r *reader) RegisterProducer(p Producer) { r.externalProducers = append(r.e func (r *reader) temporality(kind InstrumentKind) metricdata.Temporality { return r.temporalityFunc(kind) } -func (r *reader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - return r.collectFunc(ctx) +func (r *reader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error { + return r.collectFunc(ctx, rm) } func (r *reader) ForceFlush(ctx context.Context) error { return r.forceFlushFunc(ctx) } func (r *reader) Shutdown(ctx context.Context) error { return r.shutdownFunc(ctx) } diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 48a8b291e77..f9b405915fc 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -112,11 +113,17 @@ func (mr *manualReader) Shutdown(context.Context) error { } // Collect gathers all metrics from the SDK and other Producers, calling any -// callbacks necessary. Collect will return an error if called after shutdown. -func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { +// callbacks necessary and stores the result in rm. +// +// Collect will return an error if called after shutdown. +// Collect will return an error if rm is a nil ResourceMetrics. +func (mr *manualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error { + if rm == nil { + return errors.New("manual reader: *metricdata.ResourceMetrics is nil") + } p := mr.sdkProducer.Load() if p == nil { - return metricdata.ResourceMetrics{}, ErrReaderNotRegistered + return ErrReaderNotRegistered } ph, ok := p.(produceHolder) @@ -126,12 +133,13 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics // happen, return an error instead of panicking so a users code does // not halt in the processes. err := fmt.Errorf("manual reader: invalid producer: %T", p) - return metricdata.ResourceMetrics{}, err + return err } - - rm, err := ph.produce(ctx) + // TODO (#3047): When produce is updated to accept output as param, pass rm. + rmTemp, err := ph.produce(ctx) + *rm = rmTemp if err != nil { - return metricdata.ResourceMetrics{}, err + return err } var errs []error for _, producer := range mr.externalProducers.Load().([]Producer) { @@ -141,7 +149,7 @@ func (mr *manualReader) Collect(ctx context.Context) (metricdata.ResourceMetrics } rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...) } - return rm, unifyErrors(errs) + return unifyErrors(errs) } // manualReaderConfig contains configuration options for a ManualReader. diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 154ae924fd5..d86be80c308 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -472,7 +472,8 @@ func TestMeterCreatesInstruments(t *testing.T) { tt.fn(t, m) - rm, err := rdr.Collect(context.Background()) + rm := metricdata.ResourceMetrics{} + err := rdr.Collect(context.Background(), &rm) assert.NoError(t, err) require.Len(t, rm.ScopeMetrics, 1) @@ -566,7 +567,7 @@ func TestCallbackObserverNonRegistered(t *testing.T) { var got metricdata.ResourceMetrics assert.NotPanics(t, func() { - got, err = rdr.Collect(context.Background()) + err = rdr.Collect(context.Background(), &got) }) assert.NoError(t, err) @@ -660,7 +661,8 @@ func TestGlobalInstRegisterCallback(t *testing.T) { _, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr) assert.NoError(t, err) - got, err := rdr.Collect(context.Background()) + got := metricdata.ResourceMetrics{} + err = rdr.Collect(context.Background(), &got) assert.NoError(t, err) assert.Lenf(t, l.messages, 0, "Warnings and errors logged:\n%s", l) metricdatatest.AssertEqual(t, metricdata.ResourceMetrics{ @@ -772,7 +774,8 @@ func TestMetersProvideScope(t *testing.T) { }, } - got, err := rdr.Collect(context.Background()) + got := metricdata.ResourceMetrics{} + err = rdr.Collect(context.Background(), &got) assert.NoError(t, err) metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) } @@ -816,14 +819,14 @@ func TestUnregisterUnregisters(t *testing.T) { require.NoError(t, err) ctx := context.Background() - _, err = r.Collect(ctx) + err = r.Collect(ctx, &metricdata.ResourceMetrics{}) require.NoError(t, err) assert.True(t, called, "callback not called for registered callback") called = false require.NoError(t, reg.Unregister(), "unregister") - _, err = r.Collect(ctx) + err = r.Collect(ctx, &metricdata.ResourceMetrics{}) require.NoError(t, err) assert.False(t, called, "callback called for unregistered callback") } @@ -869,7 +872,8 @@ func TestRegisterCallbackDropAggregations(t *testing.T) { ) require.NoError(t, err) - data, err := r.Collect(context.Background()) + data := metricdata.ResourceMetrics{} + err = r.Collect(context.Background(), &data) require.NoError(t, err) assert.False(t, called, "callback called for all drop instruments") @@ -1238,7 +1242,8 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { ).Meter("TestAttributeFilter") require.NoError(t, tt.register(t, mtr)) - m, err := rdr.Collect(context.Background()) + m := metricdata.ResourceMetrics{} + err := rdr.Collect(context.Background(), &m) assert.NoError(t, err) require.Len(t, m.ScopeMetrics, 1) @@ -1331,7 +1336,8 @@ func TestAsynchronousExample(t *testing.T) { collect := func(t *testing.T) { t.Helper() - got, err := reader.Collect(context.Background()) + got := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &got) require.NoError(t, err) require.Len(t, got.ScopeMetrics, 1) metricdatatest.AssertEqual(t, *want, got.ScopeMetrics[0], metricdatatest.IgnoreTimestamp()) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 8425e42e16a..2c1d00bed93 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -16,6 +16,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -206,21 +207,29 @@ func (r *periodicReader) aggregation(kind InstrumentKind) aggregation.Aggregatio // collectAndExport gather all metric data related to the periodicReader r from // the SDK and exports it with r's exporter. func (r *periodicReader) collectAndExport(ctx context.Context) error { - m, err := r.Collect(ctx) + // TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect. + rm := metricdata.ResourceMetrics{} + err := r.Collect(ctx, &rm) if err == nil { - err = r.export(ctx, m) + err = r.export(ctx, rm) } return err } // Collect gathers and returns all metric data related to the Reader from -// the SDK and other Producers. The returned metric data is not exported -// to the configured exporter, it is left to the caller to handle that if -// desired. +// the SDK and other Producers and stores the result in rm. The returned metric +// data is not exported to the configured exporter, it is left to the caller to +// handle that if desired. // -// An error is returned if this is called after Shutdown. -func (r *periodicReader) Collect(ctx context.Context) (metricdata.ResourceMetrics, error) { - return r.collect(ctx, r.sdkProducer.Load()) +// An error is returned if this is called after Shutdown. An error is return if rm is nil. +func (r *periodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error { + if rm == nil { + return errors.New("periodic reader: *metricdata.ResourceMetrics is nil") + } + // TODO (#3047): When collect is updated to accept output as param, pass rm. + rmTemp, err := r.collect(ctx, r.sdkProducer.Load()) + *rm = rmTemp + return err } // collect unwraps p as a produceHolder and returns its produce results. diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index 63de3b1bac1..9ba6c867d5b 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -65,8 +65,9 @@ type Reader interface { aggregation(InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type. // Collect gathers and returns all metric data related to the Reader from - // the SDK. An error is returned if this is called after Shutdown. - Collect(context.Context) (metricdata.ResourceMetrics, error) + // the SDK and stores it in out. An error is returned if this is called + // after Shutdown or if out is nil. + Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error // ForceFlush flushes all metric measurements held in an export pipeline. // diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 26ac13f5dd7..5397fec3b18 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -53,13 +53,14 @@ func (ts *readerTestSuite) TearDownTest() { } func (ts *readerTestSuite) TestErrorForNotRegistered() { - _, err := ts.Reader.Collect(context.Background()) + err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{}) ts.ErrorIs(err, ErrReaderNotRegistered) } func (ts *readerTestSuite) TestSDKProducer() { ts.Reader.register(testSDKProducer{}) - m, err := ts.Reader.Collect(context.Background()) + m := metricdata.ResourceMetrics{} + err := ts.Reader.Collect(context.Background(), &m) ts.NoError(err) ts.Equal(testResourceMetricsA, m) } @@ -67,7 +68,8 @@ func (ts *readerTestSuite) TestSDKProducer() { func (ts *readerTestSuite) TestExternalProducer() { ts.Reader.register(testSDKProducer{}) ts.Reader.RegisterProducer(testExternalProducer{}) - m, err := ts.Reader.Collect(context.Background()) + m := metricdata.ResourceMetrics{} + err := ts.Reader.Collect(context.Background(), &m) ts.NoError(err) ts.Equal(testResourceMetricsAB, m) } @@ -78,7 +80,8 @@ func (ts *readerTestSuite) TestCollectAfterShutdown() { ts.Reader.RegisterProducer(testExternalProducer{}) ts.Require().NoError(ts.Reader.Shutdown(ctx)) - m, err := ts.Reader.Collect(ctx) + m := metricdata.ResourceMetrics{} + err := ts.Reader.Collect(ctx, &m) ts.ErrorIs(err, ErrReaderShutdown) ts.Equal(metricdata.ResourceMetrics{}, m) } @@ -113,7 +116,7 @@ func (ts *readerTestSuite) TestMultipleRegister() { // This should be ignored. ts.Reader.register(p1) - _, err := ts.Reader.Collect(context.Background()) + err := ts.Reader.Collect(context.Background(), &metricdata.ResourceMetrics{}) ts.Equal(assert.AnError, err) } @@ -134,7 +137,8 @@ func (ts *readerTestSuite) TestExternalProducerPartialSuccess() { }, ) - m, err := ts.Reader.Collect(context.Background()) + m := metricdata.ResourceMetrics{} + err := ts.Reader.Collect(context.Background(), &m) ts.Equal(assert.AnError, err) ts.Equal(testResourceMetricsAB, m) } @@ -146,7 +150,8 @@ func (ts *readerTestSuite) TestSDKFailureBlocksExternalProducer() { }}) ts.Reader.RegisterProducer(testExternalProducer{}) - m, err := ts.Reader.Collect(context.Background()) + m := metricdata.ResourceMetrics{} + err := ts.Reader.Collect(context.Background(), &m) ts.Equal(assert.AnError, err) ts.Equal(metricdata.ResourceMetrics{}, m) } @@ -165,7 +170,7 @@ func (ts *readerTestSuite) TestMethodConcurrency() { wg.Add(1) go func() { defer wg.Done() - _, _ = ts.Reader.Collect(ctx) + _ = ts.Reader.Collect(ctx, nil) }() wg.Add(1) @@ -190,11 +195,17 @@ func (ts *readerTestSuite) TestShutdownBeforeRegister() { ts.Reader.register(testSDKProducer{}) ts.Reader.RegisterProducer(testExternalProducer{}) - m, err := ts.Reader.Collect(ctx) + m := metricdata.ResourceMetrics{} + err := ts.Reader.Collect(ctx, &m) ts.ErrorIs(err, ErrReaderShutdown) ts.Equal(metricdata.ResourceMetrics{}, m) } +func (ts *readerTestSuite) TestCollectNilResourceMetricError() { + ctx := context.Background() + ts.Assert().Error(ts.Reader.Collect(ctx, nil)) +} + var testScopeMetricsA = metricdata.ScopeMetrics{ Scope: instrumentation.Scope{Name: "sdk/metric/test/reader"}, Metrics: []metricdata.Metrics{{ @@ -279,7 +290,7 @@ func benchReaderCollectFunc(r Reader) func(*testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - collectedMetrics, err = r.Collect(ctx) + err = r.Collect(ctx, &collectedMetrics) assert.Equalf(b, testResourceMetricsA, collectedMetrics, "unexpected Collect response: (%#v, %v)", collectedMetrics, err) } }