Skip to content

Commit

Permalink
(otelarrowexporter): Create exporter per unique value of configured m…
Browse files Browse the repository at this point in the history
…etadataKeys (#34827)

**Description:** 

This PR forks
#34235
and adds unit tests.

**Link to tracking Issue:** 

#34178

---------

Co-authored-by: kristina.pathak <[email protected]>
Co-authored-by: Joshua MacDonald <[email protected]>
  • Loading branch information
3 people authored Sep 19, 2024
1 parent 36b603d commit 5601dfb
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 43 deletions.
27 changes: 27 additions & 0 deletions .chloggen/arrow_exporter_by_metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allow separate arrow exporter per unique value of configured metadataKeys.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34178]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
11 changes: 11 additions & 0 deletions exporter/otelarrowexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ streams.

- `prioritizer` (default: "leastloaded"): policy for distributing load across multiple streams.

### Matching Metadata Per Stream

The following configuration values allow for separate streams per unique
metadata combinations:
- `metadata_keys` (default = empty): When set, this exporter will create one
arrow exporter instance per distinct combination of values in the
client.Metadata.
- `metadata_cardinality_limit` (default = 1000): When metadata_keys is not empty,
this setting limits the number of unique combinations of metadata key values
that will be processed over the lifetime of the exporter.

### Network Configuration

This component uses `round_robin` by default as the gRPC load
Expand Down
36 changes: 36 additions & 0 deletions exporter/otelarrowexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col

import (
"fmt"
"strings"
"time"

"github.com/open-telemetry/otel-arrow/pkg/config"
Expand Down Expand Up @@ -45,6 +46,23 @@ type Config struct {
// exporter is built and configured via code instead of yaml.
// Uses include custom dialer, custom user-agent, etc.
UserDialOptions []grpc.DialOption `mapstructure:"-"`

// MetadataKeys is a list of client.Metadata keys that will be
// used to form distinct exporters. If this setting is empty,
// a single exporter instance will be used. When this setting
// is not empty, one exporter will be used per distinct
// combination of values for the listed metadata keys.
//
// Empty value and unset metadata are treated as distinct cases.
//
// Entries are case-insensitive. Duplicated entries will
// trigger a validation error.
MetadataKeys []string `mapstructure:"metadata_keys"`

// MetadataCardinalityLimit indicates the maximum number of
// exporter instances that will be created through a distinct
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`
}

// ArrowConfig includes whether Arrow is enabled and the number of
Expand Down Expand Up @@ -90,6 +108,24 @@ var _ component.Config = (*Config)(nil)

var _ component.ConfigValidator = (*ArrowConfig)(nil)

func (cfg *Config) Validate() error {
err := cfg.Arrow.Validate()
if err != nil {
return err
}

uniq := map[string]bool{}
for _, k := range cfg.MetadataKeys {
l := strings.ToLower(k)
if _, has := uniq[l]; has {
return fmt.Errorf("duplicate entry in metadata_keys: %q (case-insensitive)", l)
}
uniq[l] = true
}

return nil
}

// Validate returns an error when the number of streams is less than 1.
func (cfg *ArrowConfig) Validate() error {
if cfg.NumStreams < 1 {
Expand Down
39 changes: 20 additions & 19 deletions exporter/otelarrowexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,16 @@ func createDefaultConfig() component.Config {
}
}

func (exp *baseExporter) helperOptions() []exporterhelper.Option {
func helperOptions(e exp) []exporterhelper.Option {
cfg := e.getConfig().(*Config)
return []exporterhelper.Option{
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(exp.config.TimeoutSettings),
exporterhelper.WithRetry(exp.config.RetryConfig),
exporterhelper.WithQueue(exp.config.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithBatcher(exp.config.BatcherConfig),
exporterhelper.WithShutdown(exp.shutdown),
exporterhelper.WithTimeout(cfg.TimeoutSettings),
exporterhelper.WithRetry(cfg.RetryConfig),
exporterhelper.WithQueue(cfg.QueueSettings),
exporterhelper.WithStart(e.start),
exporterhelper.WithBatcher(cfg.BatcherConfig),
exporterhelper.WithShutdown(e.shutdown),
}
}

Expand All @@ -103,13 +104,13 @@ func createTracesExporter(
set exporter.Settings,
cfg component.Config,
) (exporter.Traces, error) {
exp, err := newExporter(cfg, set, createArrowTracesStream)
e, err := newMetadataExporter(cfg, set, createArrowTracesStream)
if err != nil {
return nil, err
}
return exporterhelper.NewTracesExporter(ctx, exp.settings, exp.config,
exp.pushTraces,
exp.helperOptions()...,
return exporterhelper.NewTracesExporter(ctx, e.getSettings(), e.getConfig(),
e.pushTraces,
helperOptions(e)...,
)
}

Expand All @@ -122,13 +123,13 @@ func createMetricsExporter(
set exporter.Settings,
cfg component.Config,
) (exporter.Metrics, error) {
exp, err := newExporter(cfg, set, createArrowMetricsStream)
e, err := newMetadataExporter(cfg, set, createArrowMetricsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewMetricsExporter(ctx, exp.settings, exp.config,
exp.pushMetrics,
exp.helperOptions()...,
return exporterhelper.NewMetricsExporter(ctx, e.getSettings(), e.getConfig(),
e.pushMetrics,
helperOptions(e)...,
)
}

Expand All @@ -141,12 +142,12 @@ func createLogsExporter(
set exporter.Settings,
cfg component.Config,
) (exporter.Logs, error) {
exp, err := newExporter(cfg, set, createArrowLogsStream)
e, err := newMetadataExporter(cfg, set, createArrowLogsStream)
if err != nil {
return nil, err
}
return exporterhelper.NewLogsExporter(ctx, exp.settings, exp.config,
exp.pushLogs,
exp.helperOptions()...,
return exporterhelper.NewLogsExporter(ctx, e.getSettings(), e.getConfig(),
e.pushLogs,
helperOptions(e)...,
)
}
3 changes: 1 addition & 2 deletions exporter/otelarrowexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func TestCreateDefaultConfig(t *testing.T) {
cfg := factory.CreateDefaultConfig()
assert.NotNil(t, cfg, "failed to create default config")
assert.NoError(t, componenttest.CheckConfigStruct(cfg))
ocfg, ok := factory.CreateDefaultConfig().(*Config)
assert.True(t, ok)
ocfg := factory.CreateDefaultConfig().(*Config)
assert.Equal(t, ocfg.RetryConfig, configretry.NewDefaultBackOffConfig())
assert.Equal(t, ocfg.QueueSettings, exporterhelper.NewDefaultQueueConfig())
assert.Equal(t, ocfg.TimeoutSettings, exporterhelper.NewDefaultTimeoutConfig())
Expand Down
203 changes: 203 additions & 0 deletions exporter/otelarrowexporter/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter"

import (
"context"
"errors"
"fmt"
"runtime"
"sort"
"strings"
"sync"

arrowPkg "github.com/apache/arrow/go/v16/arrow"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/multierr"
"google.golang.org/grpc/metadata"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
)

var (
// errTooManyExporters is returned when the MetadataCardinalityLimit has been reached.
errTooManyExporters = consumererror.NewPermanent(errors.New("too many exporter metadata-value combinations"))
)

type metadataExporter struct {
config *Config
settings exporter.Settings
scf streamClientFactory
host component.Host

metadataKeys []string
exporters sync.Map
netReporter *netstats.NetworkReporter

userAgent string

// Guards the size and the storing logic to ensure no more than limit items are stored.
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
lock sync.Mutex
size int
}

var _ exp = (*metadataExporter)(nil)

func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) {
oCfg := cfg.(*Config)
netReporter, err := netstats.NewExporterNetworkReporter(set)
if err != nil {
return nil, err
}
userAgent := fmt.Sprintf("%s/%s (%s/%s)",
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)

if !oCfg.Arrow.Disabled {
// Ignoring an error because Validate() was called.
_ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd)

userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams)
}
// use lower-case, to be consistent with http/2 headers.
mks := make([]string, len(oCfg.MetadataKeys))
for i, k := range oCfg.MetadataKeys {
mks[i] = strings.ToLower(k)
}
sort.Strings(mks)
if len(mks) == 0 {
return newExporter(cfg, set, streamClientFactory, userAgent, netReporter)
}
return &metadataExporter{
config: oCfg,
settings: set,
scf: streamClientFactory,
metadataKeys: mks,
userAgent: userAgent,
netReporter: netReporter,
}, nil
}

func (e *metadataExporter) getSettings() exporter.Settings {
return e.settings
}

func (e *metadataExporter) getConfig() component.Config {
return e.config
}

func (e *metadataExporter) start(_ context.Context, host component.Host) (err error) {
e.host = host
return nil
}

func (e *metadataExporter) shutdown(ctx context.Context) error {
var err error
e.exporters.Range(func(_ any, value any) bool {
be := value.(exp)
err = multierr.Append(err, be.shutdown(ctx))
return true
})
return err
}

func (e *metadataExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
s, mdata := e.getAttrSet(ctx, e.metadataKeys)

be, err := e.getOrCreateExporter(ctx, s, mdata)
if err != nil {
return err
}
return be.pushTraces(ctx, td)
}

func (e *metadataExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error {
s, mdata := e.getAttrSet(ctx, e.metadataKeys)

be, err := e.getOrCreateExporter(ctx, s, mdata)
if err != nil {
return err
}

return be.pushMetrics(ctx, md)
}

func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
s, mdata := e.getAttrSet(ctx, e.metadataKeys)

be, err := e.getOrCreateExporter(ctx, s, mdata)
if err != nil {
return err
}

return be.pushLogs(ctx, ld)
}

func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) {
e.lock.Lock()
defer e.lock.Unlock()

if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) {
return nil, errTooManyExporters
}

v, ok := e.exporters.Load(s)
if ok {
return v.(exp), nil
}

newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent, e.netReporter)
if err != nil {
return nil, fmt.Errorf("failed to create exporter: %w", err)
}

var loaded bool
v, loaded = e.exporters.LoadOrStore(s, newExp)
if !loaded {
// set metadata keys for base exporter to add them to the outgoing context.
newExp.(*baseExporter).setMetadata(md)

// Start the goroutine only if we added the object to the map, otherwise is already started.
err = newExp.start(ctx, e.host)
if err != nil {
e.exporters.Delete(s)
return nil, fmt.Errorf("failed to start exporter: %w", err)
}

e.size++
}

return v.(exp), nil
}

// getAttrSet is code taken from the core collector's batchprocessor multibatch logic.
// https://github.com/open-telemetry/opentelemetry-collector/blob/v0.107.0/processor/batchprocessor/batch_processor.go#L298
func (e *metadataExporter) getAttrSet(ctx context.Context, keys []string) (attribute.Set, metadata.MD) {
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
md := map[string][]string{}
var attrs []attribute.KeyValue
for _, k := range keys {
// Lookup the value in the incoming metadata, copy it
// into the outgoing metadata, and create a unique
// value for the attributeSet.
vs := info.Metadata.Get(k)
md[k] = vs
if len(vs) == 1 {
attrs = append(attrs, attribute.String(k, vs[0]))
} else {
attrs = append(attrs, attribute.StringSlice(k, vs))
}
}
return attribute.NewSet(attrs...), metadata.MD(md)
}
Loading

0 comments on commit 5601dfb

Please sign in to comment.