From 91d530d90bfc3db9f9bce32a54d3bf36c295fc13 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 7 Aug 2024 02:39:20 +0000 Subject: [PATCH] implement go.schedule.duration --- CHANGELOG.md | 1 + instrumentation/runtime/example/main.go | 8 +- instrumentation/runtime/go.mod | 4 + instrumentation/runtime/go.sum | 8 ++ instrumentation/runtime/options.go | 29 ++++- instrumentation/runtime/producer.go | 112 ++++++++++++++++++ instrumentation/runtime/runtime.go | 50 ++++---- instrumentation/runtime/runtime_test.go | 10 +- instrumentation/runtime/test/producer_test.go | 48 ++++++++ 9 files changed, 235 insertions(+), 35 deletions(-) create mode 100644 instrumentation/runtime/producer.go create mode 100644 instrumentation/runtime/test/producer_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c7a01b70bb7..1557b6904ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `go.opentelemetry.io/contrib/config` package supports configuring `with_resource_constant_labels` for the prometheus exporter. (#5890) - Add new runtime metrics to `go.opentelemetry.io/contrib/instrumentation/runtime`, which are still disabled by default. (#5870) - Support for the `OTEL_HTTP_CLIENT_COMPATIBILITY_MODE=http/dup` environment variable in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp` to emit attributes for both the v1.20.0 and v1.24.0 semantic conventions. (#5401) +- Add `NewProducer` to `go.opentelemetry.io/contrib/instrumentation/runtime`, which allows collecting the `go.schedule.duration` histogram metric from the Go runtime. (#5991) ### Removed diff --git a/instrumentation/runtime/example/main.go b/instrumentation/runtime/example/main.go index faebd5dd431..4068dd39287 100644 --- a/instrumentation/runtime/example/main.go +++ b/instrumentation/runtime/example/main.go @@ -27,13 +27,17 @@ var res = resource.NewWithAttributes( ) func main() { - exp, err := stdoutmetric.New() + exp, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint()) if err != nil { log.Fatal(err) } // Register the exporter with an SDK via a periodic reader. - read := metric.NewPeriodicReader(exp, metric.WithInterval(1*time.Second)) + read := metric.NewPeriodicReader( + exp, + metric.WithInterval(1*time.Second), + metric.WithProducer(runtime.NewProducer()), + ) provider := metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(read)) defer func() { err := provider.Shutdown(context.Background()) diff --git a/instrumentation/runtime/go.mod b/instrumentation/runtime/go.mod index df529525c79..fbca6a21890 100644 --- a/instrumentation/runtime/go.mod +++ b/instrumentation/runtime/go.mod @@ -6,13 +6,17 @@ require ( github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/metric v1.28.0 + go.opentelemetry.io/otel/sdk v1.28.0 + go.opentelemetry.io/otel/sdk/metric v1.28.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/trace v1.28.0 // indirect + golang.org/x/sys v0.21.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/instrumentation/runtime/go.sum b/instrumentation/runtime/go.sum index e172f54e06c..eeb337d9b05 100644 --- a/instrumentation/runtime/go.sum +++ b/instrumentation/runtime/go.sum @@ -7,6 +7,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= @@ -15,8 +17,14 @@ go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= +go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE= +go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg= +go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08= +go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/instrumentation/runtime/options.go b/instrumentation/runtime/options.go index 30046ab3509..9973b769f71 100644 --- a/instrumentation/runtime/options.go +++ b/instrumentation/runtime/options.go @@ -27,6 +27,13 @@ type Option interface { apply(*config) } +// ProducerOption supports configuring optional settings for runtime metrics using a +// metric producer in addition to standard instrumentation. +type ProducerOption interface { + Option + applyProducer(*config) +} + // DefaultMinimumReadMemStatsInterval is the default minimum interval // between calls to runtime.ReadMemStats(). Use the // WithMinimumReadMemStatsInterval() option to modify this setting in @@ -36,7 +43,7 @@ const DefaultMinimumReadMemStatsInterval time.Duration = 15 * time.Second // WithMinimumReadMemStatsInterval sets a minimum interval between calls to // runtime.ReadMemStats(), which is a relatively expensive call to make // frequently. This setting is ignored when `d` is negative. -func WithMinimumReadMemStatsInterval(d time.Duration) Option { +func WithMinimumReadMemStatsInterval(d time.Duration) ProducerOption { return minimumReadMemStatsIntervalOption(d) } @@ -48,6 +55,8 @@ func (o minimumReadMemStatsIntervalOption) apply(c *config) { } } +func (o minimumReadMemStatsIntervalOption) applyProducer(c *config) { o.apply(c) } + // WithMeterProvider sets the Metric implementation to use for // reporting. If this option is not used, the global metric.MeterProvider // will be used. `provider` must be non-nil. @@ -66,11 +75,25 @@ func (o metricProviderOption) apply(c *config) { // newConfig computes a config from the supplied Options. func newConfig(opts ...Option) config { c := config{ - MeterProvider: otel.GetMeterProvider(), - MinimumReadMemStatsInterval: DefaultMinimumReadMemStatsInterval, + MeterProvider: otel.GetMeterProvider(), } for _, opt := range opts { opt.apply(&c) } + if c.MinimumReadMemStatsInterval <= 0 { + c.MinimumReadMemStatsInterval = DefaultMinimumReadMemStatsInterval + } + return c +} + +// newConfig computes a config from the supplied ProducerOptions. +func newProducerConfig(opts ...ProducerOption) config { + c := config{} + for _, opt := range opts { + opt.applyProducer(&c) + } + if c.MinimumReadMemStatsInterval <= 0 { + c.MinimumReadMemStatsInterval = DefaultMinimumReadMemStatsInterval + } return c } diff --git a/instrumentation/runtime/producer.go b/instrumentation/runtime/producer.go new file mode 100644 index 00000000000..37900d5b14d --- /dev/null +++ b/instrumentation/runtime/producer.go @@ -0,0 +1,112 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package runtime // import "go.opentelemetry.io/contrib/instrumentation/runtime" + +import ( + "context" + "fmt" + "math" + "runtime/metrics" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +var startTime time.Time + +func init() { + startTime = time.Now() +} + +var histogramMetrics = []string{goSchedLatencies} + +type Producer struct { + collector *goCollector +} + +var _ metric.Producer = (*Producer)(nil) + +func NewProducer(opts ...ProducerOption) *Producer { + c := newProducerConfig(opts...) + return &Producer{ + collector: newCollector(c.MinimumReadMemStatsInterval, histogramMetrics), + } +} + +func (p *Producer) Produce(context.Context) ([]metricdata.ScopeMetrics, error) { + p.collector.refresh() + // Use the last collection time (which may or may not be now) for the timestamp. + histDp := convertRuntimeHistogram(p.collector.getHistogram(goSchedLatencies), p.collector.lastCollect) + if len(histDp) == 0 { + return nil, fmt.Errorf("unable to obtain go.schedule.duration metric from the runtime") + } + return []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{ + Name: ScopeName, + Version: Version(), + }, + Metrics: []metricdata.Metrics{ + { + Name: "go.schedule.duration", + Description: "The time goroutines have spent in the scheduler in a runnable state before actually running.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: histDp, + }, + }, + }, + }, + }, nil +} + +var emptySet = attribute.EmptySet() + +func convertRuntimeHistogram(runtimeHist *metrics.Float64Histogram, ts time.Time) []metricdata.HistogramDataPoint[float64] { + if runtimeHist == nil { + return nil + } + bounds := runtimeHist.Buckets + counts := runtimeHist.Counts + if len(bounds) < 2 { + // runtime histograms are guaranteed to have at least two bucket boundaries. + return nil + } + // trim the first bucket since it is a lower bound. OTel histogram boundaries only have an upper bound. + bounds = bounds[1:] + if bounds[len(bounds)-1] == math.Inf(1) { + // trim the last bucket if it is +Inf, since the +Inf boundary is implicit in OTel. + bounds = bounds[:len(bounds)-1] + } else { + // if the last bucket is not +Inf, append an extra zero count since + // the implicit +Inf bucket won't have any observations. + counts = append(counts, 0) + } + count := uint64(0) + sum := float64(0) + for i, c := range counts { + count += c + // This computed sum is an underestimate, since it assumes each + // observation happens at the bucket's lower bound. + if i > 0 && count != 0 { + sum += bounds[i-1] * float64(count) + } + } + + return []metricdata.HistogramDataPoint[float64]{ + { + StartTime: startTime, + Count: count, + Sum: sum, + Time: ts, + Bounds: bounds, + BucketCounts: counts, + Attributes: *emptySet, + }, + } +} diff --git a/instrumentation/runtime/runtime.go b/instrumentation/runtime/runtime.go index be7911bb450..4ea0b43a75c 100644 --- a/instrumentation/runtime/runtime.go +++ b/instrumentation/runtime/runtime.go @@ -10,7 +10,6 @@ import ( "sync" "time" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -32,17 +31,12 @@ const ( goGoroutines = "/sched/goroutines:goroutines" goMaxProcs = "/sched/gomaxprocs:threads" goConfigGC = "/gc/gogc:percent" + goSchedLatencies = "/sched/latencies:seconds" ) // Start initializes reporting of runtime metrics using the supplied config. func Start(opts ...Option) error { c := newConfig(opts...) - if c.MinimumReadMemStatsInterval < 0 { - c.MinimumReadMemStatsInterval = DefaultMinimumReadMemStatsInterval - } - if c.MeterProvider == nil { - c.MeterProvider = otel.GetMeterProvider() - } meter := c.MeterProvider.Meter( ScopeName, metric.WithInstrumentationVersion(Version()), @@ -121,28 +115,28 @@ func Start(opts ...Option) error { stackMemoryOpt := metric.WithAttributeSet( attribute.NewSet(attribute.String("go.memory.type", "stack")), ) - collector := newCollector(c.MinimumReadMemStatsInterval) + collector := newCollector(c.MinimumReadMemStatsInterval, runtimeMetrics) var lock sync.Mutex _, err = meter.RegisterCallback( func(ctx context.Context, o metric.Observer) error { lock.Lock() defer lock.Unlock() collector.refresh() - stackMemory := collector.get(goHeapMemory) + stackMemory := collector.getInt(goHeapMemory) o.ObserveInt64(memoryUsedInstrument, stackMemory, stackMemoryOpt) - totalMemory := collector.get(goTotalMemory) - collector.get(goMemoryReleased) + totalMemory := collector.getInt(goTotalMemory) - collector.getInt(goMemoryReleased) otherMemory := totalMemory - stackMemory o.ObserveInt64(memoryUsedInstrument, otherMemory, otherMemoryOpt) // Only observe the limit metric if a limit exists - if limit := collector.get(goMemoryLimit); limit != math.MaxInt64 { + if limit := collector.getInt(goMemoryLimit); limit != math.MaxInt64 { o.ObserveInt64(memoryLimitInstrument, limit) } - o.ObserveInt64(memoryAllocatedInstrument, collector.get(goMemoryAllocated)) - o.ObserveInt64(memoryAllocationsInstrument, collector.get(goMemoryAllocations)) - o.ObserveInt64(memoryGCGoalInstrument, collector.get(goMemoryGoal)) - o.ObserveInt64(goroutineCountInstrument, collector.get(goGoroutines)) - o.ObserveInt64(processorLimitInstrument, collector.get(goMaxProcs)) - o.ObserveInt64(gogcConfigInstrument, collector.get(goConfigGC)) + o.ObserveInt64(memoryAllocatedInstrument, collector.getInt(goMemoryAllocated)) + o.ObserveInt64(memoryAllocationsInstrument, collector.getInt(goMemoryAllocations)) + o.ObserveInt64(memoryGCGoalInstrument, collector.getInt(goMemoryGoal)) + o.ObserveInt64(goroutineCountInstrument, collector.getInt(goGoroutines)) + o.ObserveInt64(processorLimitInstrument, collector.getInt(goMaxProcs)) + o.ObserveInt64(gogcConfigInstrument, collector.getInt(goConfigGC)) return nil }, memoryUsedInstrument, @@ -157,7 +151,6 @@ func Start(opts ...Option) error { if err != nil { return err } - // TODO (#5655) support go.schedule.duration return nil } @@ -188,19 +181,19 @@ type goCollector struct { sampleMap map[string]*metrics.Sample } -func newCollector(minimumInterval time.Duration) *goCollector { +func newCollector(minimumInterval time.Duration, metricNames []string) *goCollector { g := &goCollector{ - sampleBuffer: make([]metrics.Sample, 0, len(runtimeMetrics)), - sampleMap: make(map[string]*metrics.Sample, len(runtimeMetrics)), + sampleBuffer: make([]metrics.Sample, 0, len(metricNames)), + sampleMap: make(map[string]*metrics.Sample, len(metricNames)), minimumInterval: minimumInterval, now: time.Now, } - for _, runtimeMetric := range runtimeMetrics { - g.sampleBuffer = append(g.sampleBuffer, metrics.Sample{Name: runtimeMetric}) + for _, metricName := range metricNames { + g.sampleBuffer = append(g.sampleBuffer, metrics.Sample{Name: metricName}) // sampleMap references a position in the sampleBuffer slice. If an // element is appended to sampleBuffer, it must be added to sampleMap // for the sample to be accessible in sampleMap. - g.sampleMap[runtimeMetric] = &g.sampleBuffer[len(g.sampleBuffer)-1] + g.sampleMap[metricName] = &g.sampleBuffer[len(g.sampleBuffer)-1] } return g } @@ -216,9 +209,16 @@ func (g *goCollector) refresh() { g.lastCollect = now } -func (g *goCollector) get(name string) int64 { +func (g *goCollector) getInt(name string) int64 { if s, ok := g.sampleMap[name]; ok && s.Value.Kind() == metrics.KindUint64 { return int64(s.Value.Uint64()) } return 0 } + +func (g *goCollector) getHistogram(name string) *metrics.Float64Histogram { + if s, ok := g.sampleMap[name]; ok && s.Value.Kind() == metrics.KindFloat64Histogram { + return s.Value.Float64Histogram() + } + return nil +} diff --git a/instrumentation/runtime/runtime_test.go b/instrumentation/runtime/runtime_test.go index 8b27e347d27..4b5863bb600 100644 --- a/instrumentation/runtime/runtime_test.go +++ b/instrumentation/runtime/runtime_test.go @@ -13,27 +13,27 @@ import ( func TestRefreshGoCollector(t *testing.T) { // buffer for allocating memory var buffer [][]byte - collector := newCollector(10 * time.Second) + collector := newCollector(10*time.Second, runtimeMetrics) testClock := newClock() collector.now = testClock.now // before the first refresh, all counters are zero - assert.Zero(t, collector.get(goMemoryAllocations)) + assert.Zero(t, collector.getInt(goMemoryAllocations)) // after the first refresh, counters are non-zero buffer = allocateMemory(buffer) collector.refresh() - initialAllocations := collector.get(goMemoryAllocations) + initialAllocations := collector.getInt(goMemoryAllocations) assert.NotZero(t, initialAllocations) // if less than the refresh time has elapsed, the value is not updated // on refresh. testClock.increment(9 * time.Second) collector.refresh() buffer = allocateMemory(buffer) - assert.Equal(t, initialAllocations, collector.get(goMemoryAllocations)) + assert.Equal(t, initialAllocations, collector.getInt(goMemoryAllocations)) // if greater than the refresh time has elapsed, the value changes. testClock.increment(2 * time.Second) collector.refresh() _ = allocateMemory(buffer) - assert.NotEqual(t, initialAllocations, collector.get(goMemoryAllocations)) + assert.NotEqual(t, initialAllocations, collector.getInt(goMemoryAllocations)) } func allocateMemory(buffer [][]byte) [][]byte { diff --git a/instrumentation/runtime/test/producer_test.go b/instrumentation/runtime/test/producer_test.go new file mode 100644 index 00000000000..622022731a9 --- /dev/null +++ b/instrumentation/runtime/test/producer_test.go @@ -0,0 +1,48 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package runtime // import "go.opentelemetry.io/contrib/instrumentation/runtime/test" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/contrib/instrumentation/runtime" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func TestProducer(t *testing.T) { + reader := metric.NewManualReader(metric.WithProducer(runtime.NewProducer())) + _ = metric.NewMeterProvider(metric.WithReader(reader)) + rm := metricdata.ResourceMetrics{} + err := reader.Collect(context.Background(), &rm) + assert.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + + expectedScopeMetric := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: "go.opentelemetry.io/contrib/instrumentation/runtime", + Version: runtime.Version(), + }, + Metrics: []metricdata.Metrics{ + { + Name: "go.schedule.duration", + Description: "The time goroutines have spent in the scheduler in a runnable state before actually running.", + Unit: "s", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {}, + }, + }, + }, + }, + } + metricdatatest.AssertEqual(t, expectedScopeMetric, rm.ScopeMetrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) +}