-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[processor/transform] Add ContextStatements config #15382
Changes from 24 commits
07457a8
5f8471c
2da5d8a
ffc1fee
38b7d9c
9133edf
cab0096
edfe1a4
cdf7c7e
a19bd17
14d60b1
7e044a4
8950da0
438ce15
34ff332
83acb90
2b0db77
0db5378
1bb36bc
ca2b55d
fecfb09
fd20cfb
9c5d53e
bf06646
3291593
f95c418
7964803
98feb98
8bb990d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" | ||
import ( | ||
"fmt" | ||
"strings" | ||
) | ||
|
||
type ContextID string | ||
|
||
const ( | ||
Resource ContextID = "resource" | ||
Scope ContextID = "scope" | ||
Trace ContextID = "trace" | ||
SpanEvent ContextID = "spanevent" | ||
Metric ContextID = "metric" | ||
DataPoint ContextID = "datapoint" | ||
Log ContextID = "log" | ||
) | ||
|
||
func (c *ContextID) UnmarshalText(text []byte) error { | ||
str := ContextID(strings.ToLower(string(text))) | ||
switch str { | ||
case Resource, Scope, Trace, SpanEvent, Metric, DataPoint, Log: | ||
*c = str | ||
return nil | ||
default: | ||
return fmt.Errorf("unknown context %v", str) | ||
} | ||
} | ||
|
||
type ContextStatements struct { | ||
Context ContextID `mapstructure:"context"` | ||
Statements []string `mapstructure:"statements"` | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" | ||
|
||
import ( | ||
"context" | ||
"go.opentelemetry.io/collector/consumer" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllogs" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/pdata/plog" | ||
) | ||
|
||
var _ consumer.Logs = &logStatements{} | ||
|
||
type logStatements []*ottl.Statement[ottllogs.TransformContext] | ||
|
||
func (l logStatements) Capabilities() consumer.Capabilities { | ||
return consumer.Capabilities{ | ||
MutatesData: true, | ||
} | ||
} | ||
|
||
func (l logStatements) ConsumeLogs(ctx context.Context, td plog.Logs) error { | ||
TylerHelmuth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i := 0; i < td.ResourceLogs().Len(); i++ { | ||
rlogs := td.ResourceLogs().At(i) | ||
for j := 0; j < rlogs.ScopeLogs().Len(); j++ { | ||
slogs := rlogs.ScopeLogs().At(j) | ||
logs := slogs.LogRecords() | ||
for k := 0; k < logs.Len(); k++ { | ||
tCtx := ottllogs.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) | ||
for _, statement := range l { | ||
_, _, err := statement.Execute(ctx, tCtx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
type LogParserCollection struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this type, or we can replace it with something like this: func ParseLogStatements(functions map[string]interface{}, settings component.TelemetrySettings, contextStatements ContextStatements) (consumer.Logs, error) {
logParser := ottllogs.NewParser(functions, settings)
switch contextStatements.Context {
case Log:
lStatements, err := logParser.ParseStatements(contextStatements.Statements)
if err != nil {
return nil, err
}
return logStatements(lStatements), nil
default:
statements, err := pc.parseCommonContextStatements(contextStatements)
if err != nil {
return nil, err
}
return statements, nil
}
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bogdandrutu I started with this approach but added the extra type to simplify the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But if |
||
parserCollection | ||
logParser ottl.Parser[ottllogs.TransformContext] | ||
} | ||
|
||
func NewLogParserCollection(functions map[string]interface{}, settings component.TelemetrySettings) LogParserCollection { | ||
TylerHelmuth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return LogParserCollection{ | ||
parserCollection: parserCollection{ | ||
settings: settings, | ||
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), | ||
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), | ||
}, | ||
logParser: ottllogs.NewParser(functions, settings), | ||
} | ||
} | ||
|
||
func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) { | ||
switch contextStatements.Context { | ||
case Log: | ||
lStatements, err := pc.logParser.ParseStatements(contextStatements.Statements) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return logStatements(lStatements), nil | ||
default: | ||
statements, err := pc.parseCommonContextStatements(contextStatements) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return statements, nil | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" | ||
|
||
import ( | ||
"context" | ||
"go.opentelemetry.io/collector/consumer" | ||
|
||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoints" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" | ||
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" | ||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/pdata/pcommon" | ||
"go.opentelemetry.io/collector/pdata/pmetric" | ||
) | ||
|
||
var _ consumer.Metrics = &metricStatements{} | ||
|
||
type metricStatements []*ottl.Statement[ottlmetric.TransformContext] | ||
|
||
func (m metricStatements) Capabilities() consumer.Capabilities { | ||
return consumer.Capabilities{ | ||
MutatesData: true, | ||
} | ||
} | ||
|
||
func (m metricStatements) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error { | ||
TylerHelmuth marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i := 0; i < td.ResourceMetrics().Len(); i++ { | ||
rmetrics := td.ResourceMetrics().At(i) | ||
for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { | ||
smetrics := rmetrics.ScopeMetrics().At(j) | ||
metrics := smetrics.Metrics() | ||
for k := 0; k < metrics.Len(); k++ { | ||
tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Scope(), rmetrics.Resource()) | ||
for _, statement := range m { | ||
_, _, err := statement.Execute(ctx, tCtx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
var _ consumer.Metrics = &dataPointStatements{} | ||
|
||
type dataPointStatements []*ottl.Statement[ottldatapoints.TransformContext] | ||
|
||
func (d dataPointStatements) Capabilities() consumer.Capabilities { | ||
return consumer.Capabilities{ | ||
MutatesData: true, | ||
} | ||
} | ||
|
||
func (d dataPointStatements) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error { | ||
for i := 0; i < td.ResourceMetrics().Len(); i++ { | ||
rmetrics := td.ResourceMetrics().At(i) | ||
for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { | ||
smetrics := rmetrics.ScopeMetrics().At(j) | ||
metrics := smetrics.Metrics() | ||
for k := 0; k < metrics.Len(); k++ { | ||
metric := metrics.At(k) | ||
var err error | ||
switch metric.Type() { | ||
case pmetric.MetricTypeSum: | ||
err = d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) | ||
case pmetric.MetricTypeGauge: | ||
err = d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) | ||
case pmetric.MetricTypeHistogram: | ||
err = d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) | ||
case pmetric.MetricTypeExponentialHistogram: | ||
err = d.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) | ||
case pmetric.MetricTypeSummary: | ||
err = d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { | ||
for i := 0; i < dps.Len(); i++ { | ||
tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) | ||
err := d.callFunctions(ctx, tCtx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { | ||
for i := 0; i < dps.Len(); i++ { | ||
tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) | ||
err := d.callFunctions(ctx, tCtx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { | ||
for i := 0; i < dps.Len(); i++ { | ||
tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) | ||
err := d.callFunctions(ctx, tCtx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { | ||
for i := 0; i < dps.Len(); i++ { | ||
tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) | ||
err := d.callFunctions(ctx, tCtx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (d dataPointStatements) callFunctions(ctx context.Context, tCtx ottldatapoints.TransformContext) error { | ||
for _, statement := range d { | ||
_, _, err := statement.Execute(ctx, tCtx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
type MetricParserCollection struct { | ||
parserCollection | ||
metricParser ottl.Parser[ottlmetric.TransformContext] | ||
dataPointParser ottl.Parser[ottldatapoints.TransformContext] | ||
} | ||
|
||
func NewMetricParserCollection(functions map[string]interface{}, settings component.TelemetrySettings) MetricParserCollection { | ||
return MetricParserCollection{ | ||
parserCollection: parserCollection{ | ||
settings: settings, | ||
resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), | ||
scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), | ||
}, | ||
metricParser: ottlmetric.NewParser(functions, settings), | ||
dataPointParser: ottldatapoints.NewParser(functions, settings), | ||
} | ||
} | ||
|
||
func (pc MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Metrics, error) { | ||
switch contextStatements.Context { | ||
case Metric: | ||
mStatements, err := pc.metricParser.ParseStatements(contextStatements.Statements) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return metricStatements(mStatements), nil | ||
case DataPoint: | ||
dpStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return dataPointStatements(dpStatements), nil | ||
default: | ||
statements, err := pc.parseCommonContextStatements(contextStatements) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return statements, nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need these specialized calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bogdandrutu I made specialized calls to represent the possibility of Resource-only functions or Scope-only functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we need a public API for that, just add a comment to the "Functions" that can be initialized with multiple contexts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bogdandrutu This isn't a public API, it would never be moved to the OTTL. This is a way for the transform processor to enforce its definition of what functions are allowed to be used for Scope and Resource. Similar to how each signal in the transform processor has their own
functions.go
.