Skip to content

Commit

Permalink
Merge instrument cache to inserter
Browse files Browse the repository at this point in the history
The current pipeline resolution path will only add the resolved
aggregators to the pipeline when it creates one (cache miss). It will
not add it if there is a cache hit. This means (since we cache
instruments at the meter level, not the pipeline level) the first reader
in a multiple-reader setup is the only one that will collect data for
that aggregator. All other readers will have a cache hit and nothing is
added to the pipeline. This is causing open-telemetry#3720.

This resolves open-telemetry#3720 by moving the instrument caching into the inserter.
This means aggregators are cached at the reader level, not the meter.
  • Loading branch information
MrAlias committed Feb 15, 2023
1 parent 0062bb6 commit b26380c
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 95 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- [bridge/ot] Fall-back to TextMap carrier when it's not ot.HttpHeaders. (#3679)

### Fixed

- Multi-reader `MeterProvider`s now export metrics for all readers, instead of just the first reader. (#3720, #3724)

## [1.13.0/0.36.0] 2023-02-07

### Added
Expand Down
56 changes: 0 additions & 56 deletions sdk/metric/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"sync"

"go.opentelemetry.io/otel/sdk/metric/internal"
)

// cache is a locking storage used to quickly return already computed values.
Expand Down Expand Up @@ -54,57 +52,3 @@ func (c *cache[K, V]) Lookup(key K, f func() V) V {
c.data[key] = val
return val
}

// instrumentCache is a cache of instruments. It is scoped at the Meter level
// along with a number type. Meaning all instruments it contains need to belong
// to the same instrumentation.Scope (implicitly) and number type (explicitly).
type instrumentCache[N int64 | float64] struct {
// aggregators is used to ensure duplicate creations of the same instrument
// return the same instance of that instrument's aggregator.
aggregators *cache[instrumentID, aggVal[N]]
// views is used to ensure if instruments with the same name are created,
// but do not have the same identifying properties, a warning is logged.
views *cache[string, instrumentID]
}

// newInstrumentCache returns a new instrumentCache that uses ac as the
// underlying cache for aggregators and vc as the cache for views. If ac or vc
// are nil, a new empty cache will be used.
func newInstrumentCache[N int64 | float64](ac *cache[instrumentID, aggVal[N]], vc *cache[string, instrumentID]) instrumentCache[N] {
if ac == nil {
ac = &cache[instrumentID, aggVal[N]]{}
}
if vc == nil {
vc = &cache[string, instrumentID]{}
}
return instrumentCache[N]{aggregators: ac, views: vc}
}

// LookupAggregator returns the Aggregator and error for a cached instrument if
// it exist in the cache. Otherwise, f is called and its returned value is set
// in the cache and returned.
//
// LookupAggregator is safe to call concurrently.
func (c instrumentCache[N]) LookupAggregator(id instrumentID, f func() (internal.Aggregator[N], error)) (agg internal.Aggregator[N], err error) {
v := c.aggregators.Lookup(id, func() aggVal[N] {
a, err := f()
return aggVal[N]{Aggregator: a, Err: err}
})
return v.Aggregator, v.Err
}

// aggVal is the cached value of an instrumentCache's aggregators cache.
type aggVal[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// Unique returns if id is unique or a duplicate instrument. If an instrument
// with the same name has already been created, that instrumentID will be
// returned along with false. Otherwise, id is returned with true.
//
// Unique is safe to call concurrently.
func (c instrumentCache[N]) Unique(id instrumentID) (instrumentID, bool) {
got := c.views.Lookup(id.Name, func() instrumentID { return id })
return got, id == got
}
13 changes: 4 additions & 9 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,11 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter {
// meter is asked to create are logged to the user.
var viewCache cache[string, instrumentID]

// Passing nil as the ac parameter to newInstrumentCache will have each
// create its own aggregator cache.
ic := newInstrumentCache[int64](nil, &viewCache)
fc := newInstrumentCache[float64](nil, &viewCache)

return &meter{
scope: s,
pipes: p,
int64IP: newInstProvider(s, p, ic),
float64IP: newInstProvider(s, p, fc),
int64IP: newInstProvider[int64](s, p, &viewCache),
float64IP: newInstProvider[float64](s, p, &viewCache),
}
}

Expand Down Expand Up @@ -375,8 +370,8 @@ type instProvider[N int64 | float64] struct {
resolve resolver[N]
}

func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c instrumentCache[N]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver(p, c)}
func newInstProvider[N int64 | float64](s instrumentation.Scope, p pipelines, c *cache[string, instrumentID]) *instProvider[N] {
return &instProvider[N]{scope: s, pipes: p, resolve: newResolver[N](p, c)}
}

func (p *instProvider[N]) aggs(kind InstrumentKind, name, desc string, u unit.Unit) ([]internal.Aggregator[N], error) {
Expand Down
49 changes: 38 additions & 11 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,32 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err
// inserter facilitates inserting of new instruments from a single scope into a
// pipeline.
type inserter[N int64 | float64] struct {
cache instrumentCache[N]
// aggregators is a cache that holds Aggregators inserted into the
// underlying reader pipeline. This cache ensures no duplicate Aggregators
// are inserted into the reader pipeline and if a new request during an
// instrument creation asks for the same Aggregator the same instance is
// returned.
aggregators *cache[instrumentID, aggCV[N]]

// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, instrumentID]

pipeline *pipeline
}

func newInserter[N int64 | float64](p *pipeline, c instrumentCache[N]) *inserter[N] {
return &inserter[N]{cache: c, pipeline: p}
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instrumentID]) *inserter[N] {
if vc == nil {
vc = &cache[string, instrumentID]{}
}
return &inserter[N]{
aggregators: &cache[instrumentID, aggCV[N]]{},
views: vc,
pipeline: p,
}
}

// Instrument inserts the instrument inst with instUnit into a pipeline. All
Expand Down Expand Up @@ -261,6 +281,12 @@ func (i *inserter[N]) Instrument(inst Instrument) ([]internal.Aggregator[N], err
return aggs, errs.errorOrNil()
}

// aggCV is the cached value in an aggregators cache.
type aggCV[N int64 | float64] struct {
Aggregator internal.Aggregator[N]
Err error
}

// cachedAggregator returns the appropriate Aggregator for an instrument
// configuration. If the exact instrument has been created within the
// inst.Scope, that Aggregator instance will be returned. Otherwise, a new
Expand Down Expand Up @@ -292,13 +318,13 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
return i.cache.LookupAggregator(id, func() (internal.Aggregator[N], error) {
cv := i.aggregators.Lookup(id, func() aggCV[N] {
agg, err := i.aggregator(stream.Aggregation, kind, id.Temporality, id.Monotonic)
if err != nil {
return nil, err
return aggCV[N]{nil, err}
}
if agg == nil { // Drop aggregator.
return nil, nil
return aggCV[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = internal.NewFilter(agg, stream.AttributeFilter)
Expand All @@ -310,15 +336,16 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
unit: stream.Unit,
aggregator: agg,
})
return agg, err
return aggCV[N]{agg, err}
})
return cv.Aggregator, cv.Err
}

// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id instrumentID) {
existing, unique := i.cache.Unique(id)
if unique {
existing := i.views.Lookup(id.Name, func() instrumentID { return id })
if id == existing {
return
}

Expand Down Expand Up @@ -491,10 +518,10 @@ type resolver[N int64 | float64] struct {
inserters []*inserter[N]
}

func newResolver[N int64 | float64](p pipelines, c instrumentCache[N]) resolver[N] {
func newResolver[N int64 | float64](p pipelines, vc *cache[string, instrumentID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter(p[i], c)
in[i] = newInserter[N](p[i], vc)
}
return resolver[N]{in}
}
Expand Down
55 changes: 38 additions & 17 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
c := newInstrumentCache[N](nil, nil)
i := newInserter(newPipeline(nil, tt.reader, tt.views), c)
var c cache[string, instrumentID]
i := newInserter[N](newPipeline(nil, tt.reader, tt.views), &c)
got, err := i.Instrument(tt.inst)
assert.ErrorIs(t, err, tt.wantErr)
require.Len(t, got, tt.wantLen)
Expand All @@ -227,9 +227,14 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
}

func TestCreateAggregators(t *testing.T) {
t.Run("Int64", testCreateAggregators[int64])
t.Run("Float64", testCreateAggregators[float64])
}

func testInvalidInstrumentShouldPanic[N int64 | float64]() {
c := newInstrumentCache[N](nil, nil)
i := newInserter(newPipeline(nil, NewManualReader(), []View{defaultView}), c)
var c cache[string, instrumentID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
inst := Instrument{
Name: "foo",
Kind: InstrumentKind(255),
Expand All @@ -242,9 +247,25 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) {
assert.Panics(t, testInvalidInstrumentShouldPanic[float64])
}

func TestCreateAggregators(t *testing.T) {
t.Run("Int64", testCreateAggregators[int64])
t.Run("Float64", testCreateAggregators[float64])
func TestPipelinesAggregatorForEachReader(t *testing.T) {
r0, r1 := NewManualReader(), NewManualReader()
pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil)
require.Len(t, pipes, 2, "created pipelines")

inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, instrumentID]
r := newResolver[int64](pipes, &c)
aggs, err := r.Aggregators(inst)
require.NoError(t, err, "resolved Aggregators error")
require.Len(t, aggs, 2, "instrument aggregators")

for i, p := range pipes {
var aggN int
for _, is := range p.aggregations {
aggN += len(is)
}
assert.Equalf(t, 1, aggN, "pipeline %d: number of instrumentSync", i)
}
}

func TestPipelineRegistryCreateAggregators(t *testing.T) {
Expand Down Expand Up @@ -309,8 +330,8 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
c := newInstrumentCache[int64](nil, nil)
r := newResolver(p, c)
var c cache[string, instrumentID]
r := newResolver[int64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)

Expand All @@ -319,8 +340,8 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo

func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
c := newInstrumentCache[float64](nil, nil)
r := newResolver(p, c)
var c cache[string, instrumentID]
r := newResolver[float64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)

Expand All @@ -346,13 +367,13 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}

vc := cache[string, instrumentID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
var vc cache[string, instrumentID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(inst)
assert.Error(t, err)
assert.Len(t, intAggs, 0)

rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
rf := newResolver[float64](p, &vc)
floatAggs, err := rf.Aggregators(inst)
assert.Error(t, err)
assert.Len(t, floatAggs, 0)
Expand Down Expand Up @@ -397,8 +418,8 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {

p := newPipelines(resource.Empty(), readers, views)

vc := cache[string, instrumentID]{}
ri := newResolver(p, newInstrumentCache[int64](nil, &vc))
var vc cache[string, instrumentID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(fooInst)
assert.NoError(t, err)
assert.Equal(t, 0, l.InfoN(), "no info logging should happen")
Expand All @@ -413,7 +434,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {

// Creating a float foo instrument should log a warning because there is an
// int foo instrument.
rf := newResolver(p, newInstrumentCache[float64](nil, &vc))
rf := newResolver[float64](p, &vc)
floatAggs, err := rf.Aggregators(fooInst)
assert.NoError(t, err)
assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged")
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c := newInstrumentCache[N](nil, nil)
i := newInserter(test.pipe, c)
var c cache[string, instrumentID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)
assert.Len(t, got, 1, "default view not applied")
Expand Down

0 comments on commit b26380c

Please sign in to comment.