diff --git a/processor/transformprocessor/internal/common/config.go b/processor/transformprocessor/internal/common/config.go new file mode 100644 index 000000000000..75c159703e2f --- /dev/null +++ b/processor/transformprocessor/internal/common/config.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" +import ( + "fmt" + "strings" +) + +type ContextID string + +const ( + Resource ContextID = "resource" + Scope ContextID = "scope" + Trace ContextID = "trace" + SpanEvent ContextID = "spanevent" + Metric ContextID = "metric" + DataPoint ContextID = "datapoint" + Log ContextID = "log" +) + +func (c *ContextID) UnmarshalText(text []byte) error { + str := ContextID(strings.ToLower(string(text))) + switch str { + case Resource, Scope, Trace, SpanEvent, Metric, DataPoint, Log: + *c = str + return nil + default: + return fmt.Errorf("unknown context %v", str) + } +} + +type ContextStatements struct { + Context ContextID `mapstructure:"context"` + Statements []string `mapstructure:"statements"` +} diff --git a/processor/transformprocessor/internal/common/functions.go b/processor/transformprocessor/internal/common/functions.go index 7b1e370d71d6..710d52faeed0 100644 --- a/processor/transformprocessor/internal/common/functions.go +++ b/processor/transformprocessor/internal/common/functions.go @@ -15,6 +15,8 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" import ( + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/ottlfuncs" ) @@ -38,3 +40,11 @@ func Functions[K any]() map[string]interface{} { "delete_matching_keys": ottlfuncs.DeleteMatchingKeys[K], } } + +func ResourceFunctions() map[string]interface{} { + return Functions[ottlresource.TransformContext]() +} + +func ScopeFunctions() map[string]interface{} { + return Functions[ottlscope.TransformContext]() +} diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go new file mode 100644 index 000000000000..a13f6751e45c --- /dev/null +++ b/processor/transformprocessor/internal/common/logs.go @@ -0,0 +1,102 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllogs" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" +) + +var _ consumer.Logs = &logStatements{} + +type logStatements []*ottl.Statement[ottllogs.TransformContext] + +func (l logStatements) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rlogs := ld.ResourceLogs().At(i) + for j := 0; j < rlogs.ScopeLogs().Len(); j++ { + slogs := rlogs.ScopeLogs().At(j) + logs := slogs.LogRecords() + for k := 0; k < logs.Len(); k++ { + tCtx := ottllogs.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource()) + for _, statement := range l { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + } + } + return nil +} + +type LogParserCollection struct { + parserCollection + logParser ottl.Parser[ottllogs.TransformContext] +} + +type LogParserCollectionOption func(*LogParserCollection) error + +func NewLogParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { + lpc := &LogParserCollection{ + parserCollection: parserCollection{ + settings: settings, + resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), + scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), + }, + logParser: ottllogs.NewParser(functions, settings), + } + + for _, op := range options { + err := op(lpc) + if err != nil { + return nil, err + } + } + + return lpc, nil +} + +func (pc LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) { + switch contextStatements.Context { + case Log: + lStatements, err := pc.logParser.ParseStatements(contextStatements.Statements) + if err != nil { + return nil, err + } + return logStatements(lStatements), nil + default: + statements, err := pc.parseCommonContextStatements(contextStatements) + if err != nil { + return nil, err + } + return statements, nil + } +} diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go new file mode 100644 index 000000000000..75b8a8141b1f --- /dev/null +++ b/processor/transformprocessor/internal/common/metrics.go @@ -0,0 +1,206 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottldatapoints" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" +) + +var _ consumer.Metrics = &metricStatements{} + +type metricStatements []*ottl.Statement[ottlmetric.TransformContext] + +func (m metricStatements) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rmetrics := md.ResourceMetrics().At(i) + for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { + smetrics := rmetrics.ScopeMetrics().At(j) + metrics := smetrics.Metrics() + for k := 0; k < metrics.Len(); k++ { + tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Scope(), rmetrics.Resource()) + for _, statement := range m { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + } + } + return nil +} + +var _ consumer.Metrics = &dataPointStatements{} + +type dataPointStatements []*ottl.Statement[ottldatapoints.TransformContext] + +func (d dataPointStatements) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rmetrics := md.ResourceMetrics().At(i) + for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { + smetrics := rmetrics.ScopeMetrics().At(j) + metrics := smetrics.Metrics() + for k := 0; k < metrics.Len(); k++ { + metric := metrics.At(k) + var err error + switch metric.Type() { + case pmetric.MetricTypeSum: + err = d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeGauge: + err = d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeHistogram: + err = d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeExponentialHistogram: + err = d.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + case pmetric.MetricTypeSummary: + err = d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource()) + } + if err != nil { + return err + } + } + } + } + return nil +} + +func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + for i := 0; i < dps.Len(); i++ { + tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) + err := d.callFunctions(ctx, tCtx) + if err != nil { + return err + } + } + return nil +} + +func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + for i := 0; i < dps.Len(); i++ { + tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) + err := d.callFunctions(ctx, tCtx) + if err != nil { + return err + } + } + return nil +} + +func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + for i := 0; i < dps.Len(); i++ { + tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) + err := d.callFunctions(ctx, tCtx) + if err != nil { + return err + } + } + return nil +} + +func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource) error { + for i := 0; i < dps.Len(); i++ { + tCtx := ottldatapoints.NewTransformContext(dps.At(i), metric, metrics, is, resource) + err := d.callFunctions(ctx, tCtx) + if err != nil { + return err + } + } + return nil +} + +func (d dataPointStatements) callFunctions(ctx context.Context, tCtx ottldatapoints.TransformContext) error { + for _, statement := range d { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + return nil +} + +type MetricParserCollection struct { + parserCollection + metricParser ottl.Parser[ottlmetric.TransformContext] + dataPointParser ottl.Parser[ottldatapoints.TransformContext] +} + +type MetricParserCollectionOption func(*MetricParserCollection) error + +func NewMetricParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { + mpc := &MetricParserCollection{ + parserCollection: parserCollection{ + settings: settings, + resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), + scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), + }, + metricParser: ottlmetric.NewParser(functions, settings), + dataPointParser: ottldatapoints.NewParser(functions, settings), + } + + for _, op := range options { + err := op(mpc) + if err != nil { + return nil, err + } + } + + return mpc, nil +} + +func (pc MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Metrics, error) { + switch contextStatements.Context { + case Metric: + mStatements, err := pc.metricParser.ParseStatements(contextStatements.Statements) + if err != nil { + return nil, err + } + return metricStatements(mStatements), nil + case DataPoint: + dpStatements, err := pc.dataPointParser.ParseStatements(contextStatements.Statements) + if err != nil { + return nil, err + } + return dataPointStatements(dpStatements), nil + default: + statements, err := pc.parseCommonContextStatements(contextStatements) + if err != nil { + return nil, err + } + return statements, nil + } +} diff --git a/processor/transformprocessor/internal/common/processor.go b/processor/transformprocessor/internal/common/processor.go new file mode 100644 index 000000000000..9d49fb45ddb4 --- /dev/null +++ b/processor/transformprocessor/internal/common/processor.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "context" + "fmt" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" +) + +var _ consumer.Traces = &resourceStatements{} +var _ consumer.Metrics = &resourceStatements{} +var _ consumer.Logs = &resourceStatements{} +var _ baseContext = &resourceStatements{} + +type resourceStatements []*ottl.Statement[ottlresource.TransformContext] + +func (r resourceStatements) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rspans := td.ResourceSpans().At(i) + tCtx := ottlresource.NewTransformContext(rspans.Resource()) + for _, statement := range r { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + return nil +} + +func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rmetrics := md.ResourceMetrics().At(i) + tCtx := ottlresource.NewTransformContext(rmetrics.Resource()) + for _, statement := range r { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + return nil +} + +func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rlogs := ld.ResourceLogs().At(i) + tCtx := ottlresource.NewTransformContext(rlogs.Resource()) + for _, statement := range r { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + return nil +} + +var _ consumer.Traces = &scopeStatements{} +var _ consumer.Metrics = &scopeStatements{} +var _ consumer.Logs = &scopeStatements{} +var _ baseContext = &scopeStatements{} + +type scopeStatements []*ottl.Statement[ottlscope.TransformContext] + +func (s scopeStatements) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rspans := td.ResourceSpans().At(i) + for j := 0; j < rspans.ScopeSpans().Len(); j++ { + sspans := rspans.ScopeSpans().At(j) + tCtx := ottlscope.NewTransformContext(sspans.Scope(), rspans.Resource()) + for _, statement := range s { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + } + return nil +} + +func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rmetrics := md.ResourceMetrics().At(i) + for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { + smetrics := rmetrics.ScopeMetrics().At(j) + tCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource()) + for _, statement := range s { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + } + return nil +} + +func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rlogs := ld.ResourceLogs().At(i) + for j := 0; j < rlogs.ScopeLogs().Len(); j++ { + slogs := rlogs.ScopeLogs().At(j) + tCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource()) + for _, statement := range s { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + } + return nil +} + +type parserCollection struct { + settings component.TelemetrySettings + resourceParser ottl.Parser[ottlresource.TransformContext] + scopeParser ottl.Parser[ottlscope.TransformContext] +} + +type baseContext interface { + consumer.Traces + consumer.Metrics + consumer.Logs +} + +func (pc parserCollection) parseCommonContextStatements(contextStatement ContextStatements) (baseContext, error) { + switch contextStatement.Context { + case Resource: + statements, err := pc.resourceParser.ParseStatements(contextStatement.Statements) + if err != nil { + return nil, err + } + return resourceStatements(statements), nil + case Scope: + statements, err := pc.scopeParser.ParseStatements(contextStatement.Statements) + if err != nil { + return nil, err + } + return scopeStatements(statements), nil + default: + return nil, fmt.Errorf("unknown context %v", contextStatement.Context) + } +} diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go new file mode 100644 index 000000000000..aa1f5d3819e5 --- /dev/null +++ b/processor/transformprocessor/internal/common/traces.go @@ -0,0 +1,140 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlresource" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottltraces" +) + +var _ consumer.Traces = &traceStatements{} + +type traceStatements []*ottl.Statement[ottltraces.TransformContext] + +func (t traceStatements) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rspans := td.ResourceSpans().At(i) + for j := 0; j < rspans.ScopeSpans().Len(); j++ { + sspans := rspans.ScopeSpans().At(j) + spans := sspans.Spans() + for k := 0; k < spans.Len(); k++ { + tCtx := ottltraces.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource()) + for _, statement := range t { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + } + } + return nil +} + +var _ consumer.Traces = &spanEventStatements{} + +type spanEventStatements []*ottl.Statement[ottlspanevent.TransformContext] + +func (s spanEventStatements) Capabilities() consumer.Capabilities { + return consumer.Capabilities{ + MutatesData: true, + } +} + +func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + for i := 0; i < td.ResourceSpans().Len(); i++ { + rspans := td.ResourceSpans().At(i) + for j := 0; j < rspans.ScopeSpans().Len(); j++ { + sspans := rspans.ScopeSpans().At(j) + spans := sspans.Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + spanEvents := span.Events() + for n := 0; n < spanEvents.Len(); n++ { + tCtx := ottlspanevent.NewTransformContext(spanEvents.At(k), span, sspans.Scope(), rspans.Resource()) + for _, statement := range s { + _, _, err := statement.Execute(ctx, tCtx) + if err != nil { + return err + } + } + } + } + } + } + return nil +} + +type TraceParserCollection struct { + parserCollection + traceParser ottl.Parser[ottltraces.TransformContext] + spanEventParser ottl.Parser[ottlspanevent.TransformContext] +} + +type TraceParserCollectionOption func(*TraceParserCollection) error + +func NewTraceParserCollection(functions map[string]interface{}, settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { + tpc := &TraceParserCollection{ + parserCollection: parserCollection{ + settings: settings, + resourceParser: ottlresource.NewParser(ResourceFunctions(), settings), + scopeParser: ottlscope.NewParser(ScopeFunctions(), settings), + }, + traceParser: ottltraces.NewParser(functions, settings), + } + + for _, op := range options { + err := op(tpc) + if err != nil { + return nil, err + } + } + + return tpc, nil +} + +func (pc TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Traces, error) { + switch contextStatements.Context { + case Trace: + tStatements, err := pc.traceParser.ParseStatements(contextStatements.Statements) + if err != nil { + return nil, err + } + return traceStatements(tStatements), nil + case SpanEvent: + seStatements, err := pc.spanEventParser.ParseStatements(contextStatements.Statements) + if err != nil { + return nil, err + } + return spanEventStatements(seStatements), nil + default: + return pc.parseCommonContextStatements(contextStatements) + } +}