-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Meter provider don't work with multiple readers #3720
Comments
Verified with this modified version of the prometheus example: package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/exporters/stdout/stdoutmetric"
api "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/sdk/metric"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func main() {
ctx := context.Background()
// The exporter embeds a default OpenTelemetry Reader and
// implements prometheus.Collector, allowing it to be used as
// both a Reader and Collector.
exporter, err := prometheus.New()
if err != nil {
log.Fatal(err)
}
stdExporter, err := stdoutmetric.New()
if err != nil {
log.Fatal(err)
}
provider := metric.NewMeterProvider(
metric.WithReader(
metric.NewPeriodicReader(stdExporter, metric.WithInterval(8*time.Second)),
),
metric.WithReader(exporter),
)
meter := provider.Meter("github.com/open-telemetry/opentelemetry-go/example/prometheus")
// Start the prometheus HTTP server and pass the exporter Collector to it
go serveMetrics()
attrs := []attribute.KeyValue{
attribute.Key("A").String("B"),
attribute.Key("C").String("D"),
}
// This is the equivalent of prometheus.NewCounterVec
counter, err := meter.Float64Counter("foo", instrument.WithDescription("a simple counter"))
if err != nil {
log.Fatal(err)
}
counter.Add(ctx, 5, attrs...)
gauge, err := meter.Float64ObservableGauge("bar", instrument.WithDescription("a fun little gauge"))
if err != nil {
log.Fatal(err)
}
_, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error {
n := -10. + rand.Float64()*(90.) // [-10, 100)
o.ObserveFloat64(gauge, n, attrs...)
return nil
}, gauge)
if err != nil {
log.Fatal(err)
}
// This is the equivalent of prometheus.NewHistogramVec
histogram, err := meter.Float64Histogram("baz", instrument.WithDescription("a very nice histogram"))
if err != nil {
log.Fatal(err)
}
histogram.Record(ctx, 23, attrs...)
histogram.Record(ctx, 7, attrs...)
histogram.Record(ctx, 101, attrs...)
histogram.Record(ctx, 105, attrs...)
ctx, _ = signal.NotifyContext(ctx, os.Interrupt)
<-ctx.Done()
}
func serveMetrics() {
log.Printf("serving metrics at localhost:2223/metrics")
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":2223", nil)
if err != nil {
fmt.Printf("error serving http: %v", err)
return
}
} Running that code and querying http://localhost:2223/metrics returns Prometheus system metrics, but not the OTel metrics defined. The stdout exporter correctly outputs the messages instead. Switching the order of the registered readers reverses the behavior: the Prometheus endpoint exposes the metrics, but not the stdout exporter. |
module go.opentelemetry.io/otel/example/prometheus
go 1.18
require (
github.com/prometheus/client_golang v1.14.0
go.opentelemetry.io/otel v1.13.0
go.opentelemetry.io/otel/exporters/prometheus v0.36.0
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.36.0
go.opentelemetry.io/otel/metric v0.36.0
go.opentelemetry.io/otel/sdk/metric v0.36.0
)
require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
go.opentelemetry.io/otel/sdk v1.13.0 // indirect
go.opentelemetry.io/otel/trace v1.13.0 // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
google.golang.org/protobuf v1.28.1 // indirect
)
replace go.opentelemetry.io/otel => ../..
replace go.opentelemetry.io/otel/exporters/prometheus => ../../exporters/prometheus
replace go.opentelemetry.io/otel/sdk => ../../sdk
replace go.opentelemetry.io/otel/sdk/metric => ../../sdk/metric
replace go.opentelemetry.io/otel/metric => ../../metric
replace go.opentelemetry.io/otel/trace => ../../trace |
This looks to not be unique to a Prometheus and stdout exporter. Adding another reader for the stdout exporter produces blank metrics. // ...
provider := metric.NewMeterProvider(
metric.WithReader(
metric.NewPeriodicReader(stdExporter, metric.WithInterval(8*time.Second)),
),
metric.WithReader(
metric.NewPeriodicReader(stdExporter, metric.WithInterval(8*time.Second)),
),
metric.WithReader(exporter),
)
// ...
|
This looks to be a bug with instrument caching. The creation of a new aggregator in a cache is linked with the addition of that aggregator to a pipeline: opentelemetry-go/sdk/metric/pipeline.go Lines 295 to 314 in 441a173
The situation that is happening is the first pipeline for the first reader correctly resolves the aggregator and adds it to the pipeline. The second, and any other, resolve the already cached aggregator. Since it was already cached it is not added to the new pipeline. |
I was able to verify the following change resolves the bug: agg, err := i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
}
if agg == nil { // Drop aggregator.
return nil, nil
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
}
return agg, err
})
if err == nil {
i.pipeline.addSync(scope, instrumentSync{
name: stream.Name,
description: stream.Description,
unit: stream.Unit,
aggregator: agg,
})
}
return agg, err Though this needs to handle when the same instrument is asked for multiple times. The aggregator should not be added to the pipeline after the first time in that situation. Probably need a cache here. |
Planning to build into fix for open-telemetry#3720.
How about move the In method func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, instrumentID]
return &meter{
scope: s,
pipes: p,
int64IP: newInstProvider[int64](s, p, &viewCache),
float64IP: newInstProvider[float64](s, p, &viewCache),
}
} Add func newResolver[N int64 | float64](p pipelines, vc *cache[string, instrumentID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
c := newInstrumentCache[N](nil, vc)
in[i] = newInserter(p[i], c)
}
return resolver[N]{in}
} I have already test this change, it works for multi reader. |
This comment was marked as outdated.
This comment was marked as outdated.
Planning to build into fix for open-telemetry#3720.
Planning to build into fix for open-telemetry#3720.
I think I might have misspoke. The instrument cache needs to be scoped that duplicated aggregators are not used to calculate the same aggregation, but I think you were correct is stating that per-reader there should be independent aggregators. I'm updating #3724 |
The current pipeline resolution path will only add the resolved aggregators to the pipeline when it creates one (cache miss). It will not add it if there is a cache hit. This means (since we cache instruments at the meter level, not the pipeline level) the first reader in a multiple-reader setup is the only one that will collect data for that aggregator. All other readers will have a cache hit and nothing is added to the pipeline. This is causing open-telemetry#3720. This resolves open-telemetry#3720 by moving the instrument caching into the inserter. This means aggregators are cached at the reader level, not the meter.
* Merge instrument cache to inserter The current pipeline resolution path will only add the resolved aggregators to the pipeline when it creates one (cache miss). It will not add it if there is a cache hit. This means (since we cache instruments at the meter level, not the pipeline level) the first reader in a multiple-reader setup is the only one that will collect data for that aggregator. All other readers will have a cache hit and nothing is added to the pipeline. This is causing #3720. This resolves #3720 by moving the instrument caching into the inserter. This means aggregators are cached at the reader level, not the meter. * Rename aggCV to aggVal --------- Co-authored-by: Chester Cheung <[email protected]>
Description
I upgrade metric SDK from v0.32.0 to v0.36.0, find the MeterProvider doesn't support multiple readers register, it only push data to frist one.
Environment
Steps To Reproduce
ps: stdout metric exporter is the first reader.
curl localhost:2223/metrics | grep baz_bucket
is empty.Expected behavior
Stdout and Prometheus http response both include the metrics.
The text was updated successfully, but these errors were encountered: