Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1/2 Add receiverhelper functions for creating flexible "scraper" metrics receiver #1886

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 265 additions & 0 deletions receiver/receiverhelper/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package receiverhelper

import (
"context"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
)

// Start specifies the function invoked when the receiver is being started.
type Start func(context.Context, component.Host) error

// Shutdown specifies the function invoked when the receiver is being shutdown.
type Shutdown func(context.Context) error

// Option apply changes to internal options.
type Option func(*baseReceiver)

// WithStart overrides the default Start function for a receiver.
// The default shutdown function does nothing and always returns nil.
func WithStart(start Start) Option {
return func(o *baseReceiver) {
o.start = start
}
}

// WithShutdown overrides the default Shutdown function for a receiver.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown Shutdown) Option {
return func(o *baseReceiver) {
o.shutdown = shutdown
}
}

type baseReceiver struct {
fullName string
start Start
shutdown Shutdown
}

// Construct the internalOptions from multiple Option.
func newBaseReceiver(fullName string, options ...Option) baseReceiver {
br := baseReceiver{fullName: fullName}

for _, op := range options {
op(&br)
}

return br
}

// Start the receiver, invoked during service start.
func (br *baseReceiver) Start(ctx context.Context, host component.Host) error {
if br.start != nil {
return br.start(ctx, host)
}
return nil
}

// Shutdown the receiver, invoked during service shutdown.
func (br *baseReceiver) Shutdown(ctx context.Context) error {
if br.shutdown != nil {
return br.shutdown(ctx)
}
return nil
}

// MetricOption apply changes to internal options.
type MetricOption func(*metricsReceiver)

// WithBaseOptions applies any base options to a metrics receiver.
func WithBaseOptions(options ...Option) MetricOption {
Copy link
Member Author

@james-bebbington james-bebbington Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm interested if anyone can think of a better way of doing this btw. This seems slightly nicer than changing the NewMetricReceiver definition to:

func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.MetricsConsumer, baseOptions []Option, options ...MetricOption) (component.Receiver, error)

return func(o *metricsReceiver) {
for _, option := range options {
option(&o.baseReceiver)
}
}
}

// WithDefaultCollectionInterval overrides the default collection
// interval (1 minute) that will be applied to all scrapers if not
// overridden by the individual scraper.
func WithDefaultCollectionInterval(defaultCollectionInterval time.Duration) MetricOption {
return func(o *metricsReceiver) {
o.defaultCollectionInterval = defaultCollectionInterval
}
}

// AddScraper configures the provided scrape function to be called with
// the specified options, and at the specified collection interval (one
// minute by default).
//
// Observability information will be reported, and the scraped metrics
// will be passed to the next consumer.
func AddScraper(cfg ScraperConfig, scrape Scrape, options ...ScraperOption) MetricOption {
return func(o *metricsReceiver) {
o.scrapers = append(o.scrapers, newScraper(cfg, scrape, options...))
}
}

type metricsReceiver struct {
baseReceiver
defaultCollectionInterval time.Duration
nextConsumer consumer.MetricsConsumer

scrapers []*scraper
done chan struct{}
}

// NewMetricReceiver creates a Receiver with the configured options.
func NewMetricReceiver(config configmodels.Receiver, nextConsumer consumer.MetricsConsumer, options ...MetricOption) (component.Receiver, error) {
if nextConsumer == nil {
return nil, componenterror.ErrNilNextConsumer
}

mr := &metricsReceiver{
baseReceiver: newBaseReceiver(config.Name()),
defaultCollectionInterval: time.Minute,
nextConsumer: nextConsumer,
done: make(chan struct{}),
}

for _, op := range options {
op(mr)
}

// wrap the start function with a call to initialize scrapers
// and start scraping
start := mr.start
mr.start = func(ctx context.Context, host component.Host) error {
if start != nil {
if err := start(ctx, host); err != nil {
return err
}
}

if err := mr.initializeScrapers(ctx); err != nil {
return err
}

mr.startScraping()
return nil
}

// wrap the shutdown function with a call to close scrapers
// and stop scraping
shutdown := mr.shutdown
mr.shutdown = func(ctx context.Context) error {
mr.stopScraping()

var errors []error

if err := mr.closeScrapers(ctx); err != nil {
errors = append(errors, err)
}

if shutdown != nil {
if err := shutdown(ctx); err != nil {
errors = append(errors, err)
}
}

return componenterror.CombineErrors(errors)
}

return mr, nil
}

// initializeScrapers initializes all the scrapers
func (mr *metricsReceiver) initializeScrapers(ctx context.Context) error {
for _, scraper := range mr.scrapers {
if scraper.initialize == nil {
continue
}

if err := scraper.initialize(ctx); err != nil {
return err
}
}

return nil
}

// startScraping initiates a ticker that calls Scrape based on the configured
// collection interval.
func (mr *metricsReceiver) startScraping() {
// TODO1: use one ticker for each set of scrapers that have the same collection interval.
// TODO2: consider allowing different "Scrape" functions to be configured, i.e. functions
// that return MetricsSlice or ResourceMetricsSlice (similar to the existing Scraper
// & ResourceScraper interfaces in the host metrics receiver). That will allow data
// from multiple scrapers (that have the same collection interval) to be batched.

for i := 0; i < len(mr.scrapers); i++ {
scraper := mr.scrapers[i]
go func() {
collectionInterval := mr.defaultCollectionInterval
if scraper.cfg.CollectionInterval() != 0 {
collectionInterval = scraper.cfg.CollectionInterval()
}

ticker := time.NewTicker(collectionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
mr.scrapeAndReport(context.Background(), scraper)
case <-mr.done:
return
}
}
}()
}
}

// scrapeAndReport calls the Scrape function of the provided Scraper, records
// observability information, and passes the scraped metrics to the next component.
func (mr *metricsReceiver) scrapeAndReport(ctx context.Context, scraper *scraper) {
// TODO: Add observability metrics support
metrics, err := scraper.scrape(ctx)
if err != nil {
return
}

mr.nextConsumer.ConsumeMetrics(ctx, metrics)
}

// stopScraping stops the ticker
func (mr *metricsReceiver) stopScraping() {
close(mr.done)
}

// closeScrapers closes all the scrapers
func (mr *metricsReceiver) closeScrapers(ctx context.Context) error {
var errors []error

for _, scraper := range mr.scrapers {
if scraper.close == nil {
continue
}

if err := scraper.close(ctx); err != nil {
errors = append(errors, err)
}
}

return componenterror.CombineErrors(errors)
}
Copy link
Member Author

@james-bebbington james-bebbington Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we would want to add / extend NewTraceReceiver, NewMetricReceiver, & NewLogsReceiver functions here at some point that implement standard HTTP and/or gRPC servers for "push" type receivers.

I haven't implemented any of those types of receivers so I'm not sure if it would be straightforward to factor out that code, or worth the effort. For now, these helper functions are only intended to support the NewMetricReceiver function where you are configuring at least one scrape target, i.e. for "pull" type receivers.

Loading