Skip to content

Commit

Permalink
implement go.schedule.duration
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Aug 7, 2024
1 parent 3920b31 commit 91d530d
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `go.opentelemetry.io/contrib/config` package supports configuring `with_resource_constant_labels` for the prometheus exporter. (#5890)
- Add new runtime metrics to `go.opentelemetry.io/contrib/instrumentation/runtime`, which are still disabled by default. (#5870)
- Support for the `OTEL_HTTP_CLIENT_COMPATIBILITY_MODE=http/dup` environment variable in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp` to emit attributes for both the v1.20.0 and v1.24.0 semantic conventions. (#5401)
- Add `NewProducer` to `go.opentelemetry.io/contrib/instrumentation/runtime`, which allows collecting the `go.schedule.duration` histogram metric from the Go runtime. (#5991)

### Removed

Expand Down
8 changes: 6 additions & 2 deletions instrumentation/runtime/example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ var res = resource.NewWithAttributes(
)

func main() {
exp, err := stdoutmetric.New()
exp, err := stdoutmetric.New(stdoutmetric.WithPrettyPrint())
if err != nil {
log.Fatal(err)
}

// Register the exporter with an SDK via a periodic reader.
read := metric.NewPeriodicReader(exp, metric.WithInterval(1*time.Second))
read := metric.NewPeriodicReader(
exp,
metric.WithInterval(1*time.Second),
metric.WithProducer(runtime.NewProducer()),
)
provider := metric.NewMeterProvider(metric.WithResource(res), metric.WithReader(read))
defer func() {
err := provider.Shutdown(context.Background())
Expand Down
4 changes: 4 additions & 0 deletions instrumentation/runtime/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ require (
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/metric v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/sdk/metric v1.28.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/sys v0.21.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
8 changes: 8 additions & 0 deletions instrumentation/runtime/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand All @@ -15,8 +17,14 @@ go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/sdk/metric v1.28.0 h1:OkuaKgKrgAbYrrY0t92c+cC+2F6hsFNnCQArXCKlg08=
go.opentelemetry.io/otel/sdk/metric v1.28.0/go.mod h1:cWPjykihLAPvXKi4iZc1dpER3Jdq2Z0YLse3moQUCpg=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
29 changes: 26 additions & 3 deletions instrumentation/runtime/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ type Option interface {
apply(*config)
}

// ProducerOption supports configuring optional settings for runtime metrics using a
// metric producer in addition to standard instrumentation.
type ProducerOption interface {
Option
applyProducer(*config)
}

// DefaultMinimumReadMemStatsInterval is the default minimum interval
// between calls to runtime.ReadMemStats(). Use the
// WithMinimumReadMemStatsInterval() option to modify this setting in
Expand All @@ -36,7 +43,7 @@ const DefaultMinimumReadMemStatsInterval time.Duration = 15 * time.Second
// WithMinimumReadMemStatsInterval sets a minimum interval between calls to
// runtime.ReadMemStats(), which is a relatively expensive call to make
// frequently. This setting is ignored when `d` is negative.
func WithMinimumReadMemStatsInterval(d time.Duration) Option {
func WithMinimumReadMemStatsInterval(d time.Duration) ProducerOption {
return minimumReadMemStatsIntervalOption(d)
}

Expand All @@ -48,6 +55,8 @@ func (o minimumReadMemStatsIntervalOption) apply(c *config) {
}
}

func (o minimumReadMemStatsIntervalOption) applyProducer(c *config) { o.apply(c) }

// WithMeterProvider sets the Metric implementation to use for
// reporting. If this option is not used, the global metric.MeterProvider
// will be used. `provider` must be non-nil.
Expand All @@ -66,11 +75,25 @@ func (o metricProviderOption) apply(c *config) {
// newConfig computes a config from the supplied Options.
func newConfig(opts ...Option) config {
c := config{
MeterProvider: otel.GetMeterProvider(),
MinimumReadMemStatsInterval: DefaultMinimumReadMemStatsInterval,
MeterProvider: otel.GetMeterProvider(),
}
for _, opt := range opts {
opt.apply(&c)
}
if c.MinimumReadMemStatsInterval <= 0 {
c.MinimumReadMemStatsInterval = DefaultMinimumReadMemStatsInterval
}
return c
}

// newConfig computes a config from the supplied ProducerOptions.
func newProducerConfig(opts ...ProducerOption) config {
c := config{}
for _, opt := range opts {
opt.applyProducer(&c)
}
if c.MinimumReadMemStatsInterval <= 0 {
c.MinimumReadMemStatsInterval = DefaultMinimumReadMemStatsInterval
}
return c
}
112 changes: 112 additions & 0 deletions instrumentation/runtime/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package runtime // import "go.opentelemetry.io/contrib/instrumentation/runtime"

import (
"context"
"fmt"
"math"
"runtime/metrics"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var startTime time.Time

func init() {
startTime = time.Now()
}

var histogramMetrics = []string{goSchedLatencies}

type Producer struct {
collector *goCollector
}

var _ metric.Producer = (*Producer)(nil)

func NewProducer(opts ...ProducerOption) *Producer {
c := newProducerConfig(opts...)
return &Producer{
collector: newCollector(c.MinimumReadMemStatsInterval, histogramMetrics),
}
}

func (p *Producer) Produce(context.Context) ([]metricdata.ScopeMetrics, error) {
p.collector.refresh()
// Use the last collection time (which may or may not be now) for the timestamp.
histDp := convertRuntimeHistogram(p.collector.getHistogram(goSchedLatencies), p.collector.lastCollect)
if len(histDp) == 0 {
return nil, fmt.Errorf("unable to obtain go.schedule.duration metric from the runtime")
}
return []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{
Name: ScopeName,
Version: Version(),
},
Metrics: []metricdata.Metrics{
{
Name: "go.schedule.duration",
Description: "The time goroutines have spent in the scheduler in a runnable state before actually running.",
Unit: "s",
Data: metricdata.Histogram[float64]{
Temporality: metricdata.CumulativeTemporality,
DataPoints: histDp,
},
},
},
},
}, nil
}

var emptySet = attribute.EmptySet()

func convertRuntimeHistogram(runtimeHist *metrics.Float64Histogram, ts time.Time) []metricdata.HistogramDataPoint[float64] {
if runtimeHist == nil {
return nil
}
bounds := runtimeHist.Buckets
counts := runtimeHist.Counts
if len(bounds) < 2 {
// runtime histograms are guaranteed to have at least two bucket boundaries.
return nil
}
// trim the first bucket since it is a lower bound. OTel histogram boundaries only have an upper bound.
bounds = bounds[1:]
if bounds[len(bounds)-1] == math.Inf(1) {
// trim the last bucket if it is +Inf, since the +Inf boundary is implicit in OTel.
bounds = bounds[:len(bounds)-1]
} else {
// if the last bucket is not +Inf, append an extra zero count since
// the implicit +Inf bucket won't have any observations.
counts = append(counts, 0)
}
count := uint64(0)
sum := float64(0)
for i, c := range counts {
count += c
// This computed sum is an underestimate, since it assumes each
// observation happens at the bucket's lower bound.
if i > 0 && count != 0 {
sum += bounds[i-1] * float64(count)
}
}

return []metricdata.HistogramDataPoint[float64]{
{
StartTime: startTime,
Count: count,
Sum: sum,
Time: ts,
Bounds: bounds,
BucketCounts: counts,
Attributes: *emptySet,
},
}
}
50 changes: 25 additions & 25 deletions instrumentation/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

Expand All @@ -32,17 +31,12 @@ const (
goGoroutines = "/sched/goroutines:goroutines"
goMaxProcs = "/sched/gomaxprocs:threads"
goConfigGC = "/gc/gogc:percent"
goSchedLatencies = "/sched/latencies:seconds"
)

// Start initializes reporting of runtime metrics using the supplied config.
func Start(opts ...Option) error {
c := newConfig(opts...)
if c.MinimumReadMemStatsInterval < 0 {
c.MinimumReadMemStatsInterval = DefaultMinimumReadMemStatsInterval
}
if c.MeterProvider == nil {
c.MeterProvider = otel.GetMeterProvider()
}
meter := c.MeterProvider.Meter(
ScopeName,
metric.WithInstrumentationVersion(Version()),
Expand Down Expand Up @@ -121,28 +115,28 @@ func Start(opts ...Option) error {
stackMemoryOpt := metric.WithAttributeSet(
attribute.NewSet(attribute.String("go.memory.type", "stack")),
)
collector := newCollector(c.MinimumReadMemStatsInterval)
collector := newCollector(c.MinimumReadMemStatsInterval, runtimeMetrics)
var lock sync.Mutex
_, err = meter.RegisterCallback(
func(ctx context.Context, o metric.Observer) error {
lock.Lock()
defer lock.Unlock()
collector.refresh()
stackMemory := collector.get(goHeapMemory)
stackMemory := collector.getInt(goHeapMemory)
o.ObserveInt64(memoryUsedInstrument, stackMemory, stackMemoryOpt)
totalMemory := collector.get(goTotalMemory) - collector.get(goMemoryReleased)
totalMemory := collector.getInt(goTotalMemory) - collector.getInt(goMemoryReleased)
otherMemory := totalMemory - stackMemory
o.ObserveInt64(memoryUsedInstrument, otherMemory, otherMemoryOpt)
// Only observe the limit metric if a limit exists
if limit := collector.get(goMemoryLimit); limit != math.MaxInt64 {
if limit := collector.getInt(goMemoryLimit); limit != math.MaxInt64 {
o.ObserveInt64(memoryLimitInstrument, limit)
}
o.ObserveInt64(memoryAllocatedInstrument, collector.get(goMemoryAllocated))
o.ObserveInt64(memoryAllocationsInstrument, collector.get(goMemoryAllocations))
o.ObserveInt64(memoryGCGoalInstrument, collector.get(goMemoryGoal))
o.ObserveInt64(goroutineCountInstrument, collector.get(goGoroutines))
o.ObserveInt64(processorLimitInstrument, collector.get(goMaxProcs))
o.ObserveInt64(gogcConfigInstrument, collector.get(goConfigGC))
o.ObserveInt64(memoryAllocatedInstrument, collector.getInt(goMemoryAllocated))
o.ObserveInt64(memoryAllocationsInstrument, collector.getInt(goMemoryAllocations))
o.ObserveInt64(memoryGCGoalInstrument, collector.getInt(goMemoryGoal))
o.ObserveInt64(goroutineCountInstrument, collector.getInt(goGoroutines))
o.ObserveInt64(processorLimitInstrument, collector.getInt(goMaxProcs))
o.ObserveInt64(gogcConfigInstrument, collector.getInt(goConfigGC))
return nil
},
memoryUsedInstrument,
Expand All @@ -157,7 +151,6 @@ func Start(opts ...Option) error {
if err != nil {
return err
}
// TODO (#5655) support go.schedule.duration
return nil
}

Expand Down Expand Up @@ -188,19 +181,19 @@ type goCollector struct {
sampleMap map[string]*metrics.Sample
}

func newCollector(minimumInterval time.Duration) *goCollector {
func newCollector(minimumInterval time.Duration, metricNames []string) *goCollector {
g := &goCollector{
sampleBuffer: make([]metrics.Sample, 0, len(runtimeMetrics)),
sampleMap: make(map[string]*metrics.Sample, len(runtimeMetrics)),
sampleBuffer: make([]metrics.Sample, 0, len(metricNames)),
sampleMap: make(map[string]*metrics.Sample, len(metricNames)),
minimumInterval: minimumInterval,
now: time.Now,
}
for _, runtimeMetric := range runtimeMetrics {
g.sampleBuffer = append(g.sampleBuffer, metrics.Sample{Name: runtimeMetric})
for _, metricName := range metricNames {
g.sampleBuffer = append(g.sampleBuffer, metrics.Sample{Name: metricName})
// sampleMap references a position in the sampleBuffer slice. If an
// element is appended to sampleBuffer, it must be added to sampleMap
// for the sample to be accessible in sampleMap.
g.sampleMap[runtimeMetric] = &g.sampleBuffer[len(g.sampleBuffer)-1]
g.sampleMap[metricName] = &g.sampleBuffer[len(g.sampleBuffer)-1]
}
return g
}
Expand All @@ -216,9 +209,16 @@ func (g *goCollector) refresh() {
g.lastCollect = now
}

func (g *goCollector) get(name string) int64 {
func (g *goCollector) getInt(name string) int64 {
if s, ok := g.sampleMap[name]; ok && s.Value.Kind() == metrics.KindUint64 {
return int64(s.Value.Uint64())
}
return 0
}

func (g *goCollector) getHistogram(name string) *metrics.Float64Histogram {
if s, ok := g.sampleMap[name]; ok && s.Value.Kind() == metrics.KindFloat64Histogram {
return s.Value.Float64Histogram()
}
return nil
}
10 changes: 5 additions & 5 deletions instrumentation/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@ import (
func TestRefreshGoCollector(t *testing.T) {
// buffer for allocating memory
var buffer [][]byte
collector := newCollector(10 * time.Second)
collector := newCollector(10*time.Second, runtimeMetrics)
testClock := newClock()
collector.now = testClock.now
// before the first refresh, all counters are zero
assert.Zero(t, collector.get(goMemoryAllocations))
assert.Zero(t, collector.getInt(goMemoryAllocations))
// after the first refresh, counters are non-zero
buffer = allocateMemory(buffer)
collector.refresh()
initialAllocations := collector.get(goMemoryAllocations)
initialAllocations := collector.getInt(goMemoryAllocations)
assert.NotZero(t, initialAllocations)
// if less than the refresh time has elapsed, the value is not updated
// on refresh.
testClock.increment(9 * time.Second)
collector.refresh()
buffer = allocateMemory(buffer)
assert.Equal(t, initialAllocations, collector.get(goMemoryAllocations))
assert.Equal(t, initialAllocations, collector.getInt(goMemoryAllocations))
// if greater than the refresh time has elapsed, the value changes.
testClock.increment(2 * time.Second)
collector.refresh()
_ = allocateMemory(buffer)
assert.NotEqual(t, initialAllocations, collector.get(goMemoryAllocations))
assert.NotEqual(t, initialAllocations, collector.getInt(goMemoryAllocations))
}

func allocateMemory(buffer [][]byte) [][]byte {
Expand Down
Loading

0 comments on commit 91d530d

Please sign in to comment.