Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(routingprocessor): allow routing for all signals #5869

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions processor/routingprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Routing processor

Routes traces to specific exporters.
Routes logs, metrics or traces to specific exporters.

This processor will either read a header from the incoming HTTP request (gRPC or plain HTTP), or it will read a resource attribute, and direct the trace information to specific exporters based on the value read.

This processor *does not* let traces to continue through the pipeline and will emit a warning in case other processor(s) are defined after this one. Similarly, exporters defined as part of the pipeline are not authoritative: if you add an exporter to the pipeline, make sure you add it to this processor *as well*, otherwise it won't be used at all. All exporters defined as part of this processor *must also* be defined as part of the pipeline's exporters.
This processor *does not* let traces to continue through the pipeline and will emit a warning in case other processor(s) are defined after this one.
Similarly, exporters defined as part of the pipeline are not authoritative: if you add an exporter to the pipeline, make sure you add it to this processor *as well*, otherwise it won't be used at all.
All exporters defined as part of this processor *must also* be defined as part of the pipeline's exporters.
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved

Given that this processor depends on information provided by the client via HTTP headers or resource attributes, caution must be taken when processors that aggregate data like `batch` or `groupbytrace` are used as part of the pipeline.

Expand Down Expand Up @@ -39,6 +41,10 @@ exporters:
endpoint: localhost:24250
```

The full list of settings exposed for this processor are documented [here](./config.go) with detailed sample configuration [here](./testdata/config.yaml).
The full list of settings exposed for this processor are documented [here](./config.go) with detailed sample configuration files:

- [logs](./testdata/config_logs.yaml)
- [metrics](./testdata/config_metrics.yaml)
- [traces](./testdata/config_traces.yaml)

[context_docs]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/context/context.md
43 changes: 42 additions & 1 deletion processor/routingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package routingprocessor

import (
"fmt"

"go.opentelemetry.io/collector/config"
)

Expand All @@ -32,7 +34,7 @@ type Config struct {
// - "resource" - the attribute must exist in resource attributes
// The default value is "context".
// Optional.
AttributeSource string `mapstructure:"attribute_source"`
AttributeSource AttributeSource `mapstructure:"attribute_source"`
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved

// FromAttribute contains the attribute name to look up the route value. This attribute should be part of the context propagated
// down from the previous receivers and/or processors. If all the receivers and processors are propagating the entire context correctly,
Expand All @@ -47,6 +49,45 @@ type Config struct {
Table []RoutingTableItem `mapstructure:"table"`
}

// Validate checks if the processor configuration is valid.
func (c *Config) Validate() error {
// validate that every route has a value for the routing attribute and has
// at least one exporter
for _, item := range c.Table {
if len(item.Value) == 0 {
return fmt.Errorf("invalid (empty) route : %w", errEmptyRoute)
}

if len(item.Exporters) == 0 {
return fmt.Errorf("invalid route %s: %w", item.Value, errNoExporters)
}
}

// validate that there's at least one item in the table
if len(c.Table) == 0 {
return fmt.Errorf("invalid routing table: %w", errNoTableItems)
}

// we also need a "FromAttribute" value
if len(c.FromAttribute) == 0 {
return fmt.Errorf(
"invalid attribute to read the route's value from: %w",
errNoMissingFromAttribute,
)
}

return nil
}

type AttributeSource string

const (
contextAttributeSource = AttributeSource("context")
resourceAttributeSource = AttributeSource("resource")

defaultAttributeSource = contextAttributeSource
)

// RoutingTableItem specifies how data should be routed to the different exporters
type RoutingTableItem struct {
// Value represents a possible value for the field specified under FromAttribute. Required.
Expand Down
122 changes: 94 additions & 28 deletions processor/routingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,112 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/exporter/otlpexporter"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/jaegerexporter"
)

func TestLoadConfig(t *testing.T) {
factories, err := componenttest.NopFactories()
assert.NoError(t, err)

factory := NewFactory()
factories.Processors[typeStr] = factory
testcases := []struct {
configPath string
factoriesFunc func(component.Factories) component.Factories
expectedConfig *Config
}{
{
configPath: "config_traces.yaml",
factoriesFunc: func(factories component.Factories) component.Factories {
// we don't need to use them in this test, but the config has them
factories.Exporters["otlp"] = otlpexporter.NewFactory()
factories.Exporters["jaeger"] = jaegerexporter.NewFactory()
return factories
},
expectedConfig: &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
DefaultExporters: []string{"otlp"},
AttributeSource: "context",
FromAttribute: "X-Tenant",
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"jaeger/acme", "otlp/acme"},
},
{
Value: "globex",
Exporters: []string{"otlp/globex"},
},
},
},
},
{
configPath: "config_metrics.yaml",
factoriesFunc: func(factories component.Factories) component.Factories {
// we don't need to use it in this test, but the config has them
factories.Exporters["logging"] = loggingexporter.NewFactory()
return factories
},
expectedConfig: &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
DefaultExporters: []string{"logging/default"},
AttributeSource: "context",
FromAttribute: "X-Custom-Metrics-Header",
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"logging/acme"},
},
{
Value: "globex",
Exporters: []string{"logging/globex"},
},
},
},
},
{
configPath: "config_logs.yaml",
factoriesFunc: func(factories component.Factories) component.Factories {
// we don't need to use it in this test, but the config has them
factories.Exporters["logging"] = loggingexporter.NewFactory()
return factories
},
expectedConfig: &Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
DefaultExporters: []string{"logging/default"},
AttributeSource: "context",
FromAttribute: "X-Custom-Logs-Header",
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"logging/acme"},
},
{
Value: "globex",
Exporters: []string{"logging/globex"},
},
},
},
},
}

// we don't need to use them in this test, but the config has them
factories.Exporters["otlp"] = otlpexporter.NewFactory()
factories.Exporters["jaeger"] = jaegerexporter.NewFactory()
for _, tc := range testcases {
t.Run(tc.configPath, func(t *testing.T) {
tc := tc

cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", "config.yaml"), factories)
factories, err := componenttest.NopFactories()
assert.NoError(t, err)
factories.Processors[typeStr] = NewFactory()
factories = tc.factoriesFunc(factories)

require.NoError(t, err)
require.NotNil(t, cfg)
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", tc.configPath), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

parsed := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, parsed,
&Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
DefaultExporters: []string{"otlp"},
AttributeSource: "context",
FromAttribute: "X-Tenant",
Table: []RoutingTableItem{
{
Value: "acme",
Exporters: []string{"jaeger/acme", "otlp/acme"},
},
{
Value: "globex",
Exporters: []string{"otlp/globex"},
},
},
parsed := cfg.Processors[config.NewComponentID(typeStr)]
assert.Equal(t, tc.expectedConfig, parsed)
})
}
}
77 changes: 77 additions & 0 deletions processor/routingprocessor/extract.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package routingprocessor

import (
"context"
"strings"

"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)

// extractor is responsible for extracting configured attributes from the processed data.
// Currently it can be extract the attributes from context or resource attributes.
type extractor struct {
fromAttr string
logger *zap.Logger
}

// newExtractor creates new extractor which can extract attributes from logs,
// metrics and traces from from requested attribute source and from the provided
// attribute name.
func newExtractor(fromAttr string, logger *zap.Logger) extractor {
return extractor{
fromAttr: fromAttr,
logger: logger,
}
}

// extractAttrFromResource extract string value from the requested resource attribute.
func (e extractor) extractAttrFromResource(r pdata.Resource) string {
firstResourceAttributes := r.Attributes()
routingAttribute, found := firstResourceAttributes.Get(e.fromAttr)
if !found {
return ""
}

return routingAttribute.AsString()
}

func (e extractor) extractFromContext(ctx context.Context) string {
// right now, we only support looking up attributes from requests that have
// gone through the gRPC server in that case, it will add the HTTP headers
// as context metadata
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ""
}

// we have gRPC metadata in the context but does it have our key?
values, ok := md[strings.ToLower(e.fromAttr)]
if !ok {
return ""
}

if len(values) > 1 {
e.logger.Debug("more than one value found for the attribute, using only the first",
zap.Strings("values", values),
zap.String("attribute", e.fromAttr),
)
}

return values[0]
}
Loading