diff --git a/internal/testcomponents/example_receiver_test.go b/internal/testcomponents/example_receiver_test.go index 32f415cc0585..144a16032f48 100644 --- a/internal/testcomponents/example_receiver_test.go +++ b/internal/testcomponents/example_receiver_test.go @@ -23,7 +23,11 @@ import ( "go.opentelemetry.io/collector/component/componenttest" ) +<<<<<<< HEAD func TestExampleReceiver(t *testing.T) { +======= +func TestExampleReceiverProducer(t *testing.T) { +>>>>>>> c162af37 (Refactor pipelines builder) rcv := &ExampleReceiver{} host := componenttest.NewNopHost() assert.False(t, rcv.Started) diff --git a/service/host.go b/service/host.go index c960cd2fa37b..c9d823f89d8e 100644 --- a/service/host.go +++ b/service/host.go @@ -28,9 +28,7 @@ type serviceHost struct { factories component.Factories buildInfo component.BuildInfo - builtExporters *builder.BuiltExporters - builtReceivers builder.Receivers - builtPipelines builder.BuiltPipelines + builtPipelines *builder.BuiltPipelines builtExtensions *extensions.BuiltExtensions } @@ -56,9 +54,9 @@ func (host *serviceHost) GetFactory(kind component.Kind, componentType config.Ty } func (host *serviceHost) GetExtensions() map[config.ComponentID]component.Extension { - return host.builtExtensions.ToMap() + return host.builtExtensions.GetExtensions() } func (host *serviceHost) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter { - return host.builtExporters.ToMapByDataType() + return host.builtPipelines.GetExporters() } diff --git a/service/internal/builder/capabilities.go b/service/internal/builder/capabilities.go new file mode 100644 index 000000000000..3fe0462ffbac --- /dev/null +++ b/service/internal/builder/capabilities.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "go.opentelemetry.io/collector/consumer" +) + +func wrapLogs(logs consumer.Logs, cap consumer.Capabilities) consumer.Logs { + return capLogs{Logs: logs, cap: cap} +} + +type capLogs struct { + consumer.Logs + cap consumer.Capabilities +} + +func (mts capLogs) Capabilities() consumer.Capabilities { + return mts.cap +} + +func wrapMetrics(metrics consumer.Metrics, cap consumer.Capabilities) consumer.Metrics { + return capMetrics{Metrics: metrics, cap: cap} +} + +type capMetrics struct { + consumer.Metrics + cap consumer.Capabilities +} + +func (mts capMetrics) Capabilities() consumer.Capabilities { + return mts.cap +} + +func wrapTraces(traces consumer.Traces, cap consumer.Capabilities) consumer.Traces { + return capTraces{Traces: traces, cap: cap} +} + +type capTraces struct { + consumer.Traces + cap consumer.Capabilities +} + +func (mts capTraces) Capabilities() consumer.Capabilities { + return mts.cap +} diff --git a/service/internal/builder/capabilities_test.go b/service/internal/builder/capabilities_test.go new file mode 100644 index 000000000000..9ce53faef499 --- /dev/null +++ b/service/internal/builder/capabilities_test.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/internal/testdata" +) + +func TestWrapLogs(t *testing.T) { + sink := &consumertest.LogsSink{} + require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) + + wrap := wrapLogs(sink, consumer.Capabilities{MutatesData: true}) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) + + assert.NoError(t, wrap.ConsumeLogs(context.Background(), testdata.GenerateLogsOneLogRecord())) + assert.Len(t, sink.AllLogs(), 1) + assert.Equal(t, testdata.GenerateLogsOneLogRecord(), sink.AllLogs()[0]) +} + +func TestWrapMetrics(t *testing.T) { + sink := &consumertest.MetricsSink{} + require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) + + wrap := wrapMetrics(sink, consumer.Capabilities{MutatesData: true}) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) + + assert.NoError(t, wrap.ConsumeMetrics(context.Background(), testdata.GenerateMetricsOneMetric())) + assert.Len(t, sink.AllMetrics(), 1) + assert.Equal(t, testdata.GenerateMetricsOneMetric(), sink.AllMetrics()[0]) +} + +func TestWrapTraces(t *testing.T) { + sink := &consumertest.TracesSink{} + require.Equal(t, consumer.Capabilities{MutatesData: false}, sink.Capabilities()) + + wrap := wrapTraces(sink, consumer.Capabilities{MutatesData: true}) + assert.Equal(t, consumer.Capabilities{MutatesData: true}, wrap.Capabilities()) + + assert.NoError(t, wrap.ConsumeTraces(context.Background(), testdata.GenerateTracesOneSpan())) + assert.Len(t, sink.AllTraces(), 1) + assert.Equal(t, testdata.GenerateTracesOneSpan(), sink.AllTraces()[0]) +} diff --git a/service/internal/builder/exporters_builder.go b/service/internal/builder/exporters_builder.go deleted file mode 100644 index de0e6ebb46a3..000000000000 --- a/service/internal/builder/exporters_builder.go +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder // import "go.opentelemetry.io/collector/service/internal/builder" - -import ( - "context" - "errors" - "fmt" - - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/service/internal/components" -) - -// BuiltExporters is a map of exporters created from exporter configs. -type BuiltExporters struct { - settings component.TelemetrySettings - exporters map[config.DataType]map[config.ComponentID]component.Exporter -} - -// StartAll starts all exporters. -func (exps BuiltExporters) StartAll(ctx context.Context, host component.Host) error { - for dt, expByID := range exps.exporters { - for expID, exp := range expByID { - expLogger := exporterLogger(exps.settings.Logger, expID, dt) - expLogger.Info("Exporter is starting...") - if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { - return err - } - expLogger.Info("Exporter started.") - } - } - return nil -} - -// ShutdownAll stops all exporters. -func (exps BuiltExporters) ShutdownAll(ctx context.Context) error { - var errs error - for _, expByID := range exps.exporters { - for _, exp := range expByID { - errs = multierr.Append(errs, exp.Shutdown(ctx)) - } - } - return errs -} - -func (exps BuiltExporters) ToMapByDataType() map[config.DataType]map[config.ComponentID]component.Exporter { - exportersMap := make(map[config.DataType]map[config.ComponentID]component.Exporter) - - exportersMap[config.TracesDataType] = make(map[config.ComponentID]component.Exporter, len(exps.exporters[config.TracesDataType])) - exportersMap[config.MetricsDataType] = make(map[config.ComponentID]component.Exporter, len(exps.exporters[config.MetricsDataType])) - exportersMap[config.LogsDataType] = make(map[config.ComponentID]component.Exporter, len(exps.exporters[config.LogsDataType])) - - for dt, expByID := range exps.exporters { - for expID, exp := range expByID { - exportersMap[dt][expID] = exp - } - } - - return exportersMap -} - -// BuildExporters builds Exporters from config. -func BuildExporters( - ctx context.Context, - settings component.TelemetrySettings, - buildInfo component.BuildInfo, - cfg *config.Config, - factories map[config.Type]component.ExporterFactory, -) (*BuiltExporters, error) { - exps := &BuiltExporters{ - settings: settings, - exporters: make(map[config.DataType]map[config.ComponentID]component.Exporter), - } - - // Go over all pipelines. The data type of the pipeline defines what data type - // each exporter is expected to receive. - - // Iterate over pipelines. - for pipelineID, pipeline := range cfg.Service.Pipelines { - dt := pipelineID.Type() - if _, ok := exps.exporters[dt]; !ok { - exps.exporters[dt] = make(map[config.ComponentID]component.Exporter) - } - expByID := exps.exporters[dt] - - // Iterate over all exporters for this pipeline. - for _, expID := range pipeline.Exporters { - // If already created an exporter for this [DataType, ComponentID] nothing to do, will reuse this instance. - if _, ok := expByID[expID]; ok { - continue - } - - set := component.ExporterCreateSettings{ - TelemetrySettings: settings, - BuildInfo: buildInfo, - } - set.TelemetrySettings.Logger = exporterLogger(settings.Logger, expID, dt) - - expCfg, existsCfg := cfg.Exporters[expID] - if !existsCfg { - return nil, fmt.Errorf("exporter %q is not configured", expID) - } - - factory, existsFactory := factories[expID.Type()] - if !existsFactory { - return nil, fmt.Errorf("exporter factory not found for type: %s", expID.Type()) - } - - exp, err := buildExporter(ctx, factory, set, expCfg, pipelineID) - if err != nil { - return nil, err - } - - expByID[expID] = exp - } - } - return exps, nil -} - -func buildExporter( - ctx context.Context, - factory component.ExporterFactory, - set component.ExporterCreateSettings, - cfg config.Exporter, - pipelineID config.ComponentID, -) (component.Exporter, error) { - var err error - var exporter component.Exporter - switch pipelineID.Type() { - case config.TracesDataType: - exporter, err = factory.CreateTracesExporter(ctx, set, cfg) - - case config.MetricsDataType: - exporter, err = factory.CreateMetricsExporter(ctx, set, cfg) - - case config.LogsDataType: - exporter, err = factory.CreateLogsExporter(ctx, set, cfg) - - default: - // Could not create because this exporter does not support this data type. - return nil, exporterTypeMismatchErr(cfg, pipelineID) - } - - if err != nil { - if errors.Is(err, component.ErrDataTypeIsNotSupported) { - // Could not create because this exporter does not support this data type. - return nil, exporterTypeMismatchErr(cfg, pipelineID) - } - return nil, fmt.Errorf("error creating %v exporter: %w", cfg.ID(), err) - } - - set.Logger.Info("Exporter was built.") - - return exporter, nil -} - -func exporterTypeMismatchErr( - config config.Exporter, - pipelineID config.ComponentID, -) error { - return fmt.Errorf( - "pipeline %q of data type %q has an exporter %v, which does not support that data type", - pipelineID, pipelineID.Type(), config.ID(), - ) -} - -func exporterLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger { - return logger.With( - zap.String(components.ZapKindKey, components.ZapKindExporter), - zap.String(components.ZapDataTypeKey, string(dt)), - zap.String(components.ZapNameKey, id.String())) -} diff --git a/service/internal/builder/exporters_builder_test.go b/service/internal/builder/exporters_builder_test.go deleted file mode 100644 index ad9d036970ce..000000000000 --- a/service/internal/builder/exporters_builder_test.go +++ /dev/null @@ -1,145 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "context" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" - "go.opentelemetry.io/collector/service/servicetest" -) - -func TestBuildExporters(t *testing.T) { - factories, err := componenttest.NopFactories() - assert.NoError(t, err) - - cfg := &config.Config{ - Exporters: map[config.ComponentID]config.Exporter{ - config.NewComponentID("nop"): factories.Exporters["nop"].CreateDefaultConfig(), - }, - - Service: config.Service{ - Pipelines: map[config.ComponentID]*config.Pipeline{ - config.NewComponentID("traces"): { - Exporters: []config.ComponentID{config.NewComponentID("nop")}, - }, - config.NewComponentID("metrics"): { - Exporters: []config.ComponentID{config.NewComponentID("nop")}, - }, - config.NewComponentID("logs"): { - Exporters: []config.ComponentID{config.NewComponentID("nop")}, - }, - }, - }, - } - - exporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - exps := exporters.ToMapByDataType() - require.Len(t, exps, 3) - assert.NotNil(t, exps[config.TracesDataType][config.NewComponentID("nop")]) - assert.NotNil(t, exps[config.MetricsDataType][config.NewComponentID("nop")]) - assert.NotNil(t, exps[config.LogsDataType][config.NewComponentID("nop")]) - - // Ensure it can be started. - assert.NoError(t, exporters.StartAll(context.Background(), componenttest.NewNopHost())) - - // Ensure it can be stopped. - assert.NoError(t, exporters.ShutdownAll(context.Background())) - - // Remove the pipeline so that the exporter is not attached to any pipeline. - // This should result in creating an exporter that has none of consumption - // functions set. - cfg.Service.Pipelines = map[config.ComponentID]*config.Pipeline{} - exporters, err = BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - exps = exporters.ToMapByDataType() - require.Len(t, exps, 3) - assert.Len(t, exps[config.TracesDataType], 0) - assert.Len(t, exps[config.MetricsDataType], 0) - assert.Len(t, exps[config.LogsDataType], 0) -} - -func TestBuildExportersStartStopAll(t *testing.T) { - traceExporter := &testcomponents.ExampleExporter{} - metricExporter := &testcomponents.ExampleExporter{} - logsExporter := &testcomponents.ExampleExporter{} - exps := &BuiltExporters{ - settings: componenttest.NewNopTelemetrySettings(), - exporters: map[config.DataType]map[config.ComponentID]component.Exporter{ - config.TracesDataType: { - config.NewComponentID("example"): traceExporter, - }, - config.MetricsDataType: { - config.NewComponentID("example"): metricExporter, - }, - config.LogsDataType: { - config.NewComponentID("example"): logsExporter, - }, - }, - } - assert.False(t, traceExporter.Started) - assert.False(t, metricExporter.Started) - assert.False(t, logsExporter.Started) - - assert.NoError(t, exps.StartAll(context.Background(), componenttest.NewNopHost())) - assert.True(t, traceExporter.Started) - assert.True(t, metricExporter.Started) - assert.True(t, logsExporter.Started) - - assert.NoError(t, exps.ShutdownAll(context.Background())) - assert.True(t, traceExporter.Stopped) - assert.True(t, metricExporter.Stopped) - assert.True(t, logsExporter.Stopped) -} - -func TestBuildExportersNotSupportedDataType(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - configFile string - }{ - { - configFile: "not_supported_exporter_logs.yaml", - }, - { - configFile: "not_supported_exporter_metrics.yaml", - }, - { - configFile: "not_supported_exporter_traces.yaml", - }, - } - - for _, test := range tests { - t.Run(test.configFile, func(t *testing.T) { - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories) - require.Nil(t, err) - - _, err = BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.Error(t, err) - }) - } -} diff --git a/service/internal/builder/factories_test.go b/service/internal/builder/factories_test.go deleted file mode 100644 index 988997d838e3..000000000000 --- a/service/internal/builder/factories_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" -) - -func createTestFactories() component.Factories { - exampleReceiverFactory := testcomponents.ExampleReceiverFactory - exampleProcessorFactory := testcomponents.ExampleProcessorFactory - exampleExporterFactory := testcomponents.ExampleExporterFactory - badReceiverFactory := newBadReceiverFactory() - badProcessorFactory := newBadProcessorFactory() - badExporterFactory := newBadExporterFactory() - - factories := component.Factories{ - Receivers: map[config.Type]component.ReceiverFactory{ - exampleReceiverFactory.Type(): exampleReceiverFactory, - badReceiverFactory.Type(): badReceiverFactory, - }, - Processors: map[config.Type]component.ProcessorFactory{ - exampleProcessorFactory.Type(): exampleProcessorFactory, - badProcessorFactory.Type(): badProcessorFactory, - }, - Exporters: map[config.Type]component.ExporterFactory{ - exampleExporterFactory.Type(): exampleExporterFactory, - badExporterFactory.Type(): badExporterFactory, - }, - } - - return factories -} - -func newBadReceiverFactory() component.ReceiverFactory { - return component.NewReceiverFactory("bf", func() config.Receiver { - return &struct { - config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - }{ - ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("bf")), - } - }) -} - -func newBadProcessorFactory() component.ProcessorFactory { - return component.NewProcessorFactory("bf", func() config.Processor { - return &struct { - config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - }{ - ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("bf")), - } - }) -} - -func newBadExporterFactory() component.ExporterFactory { - return component.NewExporterFactory("bf", func() config.Exporter { - return &struct { - config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct - }{ - ExporterSettings: config.NewExporterSettings(config.NewComponentID("bf")), - } - }) -} diff --git a/service/internal/builder/pipelines.go b/service/internal/builder/pipelines.go new file mode 100644 index 000000000000..78335fa35cbb --- /dev/null +++ b/service/internal/builder/pipelines.go @@ -0,0 +1,550 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder // import "go.opentelemetry.io/collector/service/internal/builder" + +import ( + "context" + "errors" + "fmt" + "net/http" + "sort" + + "go.uber.org/multierr" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/fanoutconsumer" + "go.opentelemetry.io/collector/service/internal/zpages" +) + +const ( + zPipelineName = "zpipelinename" + zComponentName = "zcomponentname" + zComponentKind = "zcomponentkind" +) + +// baseConsumer redeclared here since not public in consumer package. May consider to make that public. +type baseConsumer interface { + Capabilities() consumer.Capabilities +} + +type builtComponent struct { + id config.ComponentID + comp component.Component +} + +type builtPipeline struct { + lastConsumer baseConsumer + + receivers []builtComponent + processors []builtComponent + exporters []builtComponent +} + +// BuiltPipelines is a map of allExporters created from exporter configs. +type BuiltPipelines struct { + telemetry component.TelemetrySettings + + allReceivers map[config.DataType]map[config.ComponentID]component.Receiver + allExporters map[config.DataType]map[config.ComponentID]component.Exporter + + pipelines map[config.ComponentID]*builtPipeline +} + +// StartAll starts all pipelines. +// +// Start with exporters, processors (in revers configured order), then receivers. +// This is important so that components that are earlier in the pipeline and reference components that are +// later in the pipeline do not start sending data to later pipelines which are not yet started. +func (bps *BuiltPipelines) StartAll(ctx context.Context, host component.Host) error { + bps.telemetry.Logger.Info("Starting exporters...") + for dt, expByID := range bps.allExporters { + for expID, exp := range expByID { + expLogger := exporterLogger(bps.telemetry.Logger, expID, dt) + expLogger.Info("Exporter is starting...") + if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { + return err + } + expLogger.Info("Exporter started.") + } + } + + bps.telemetry.Logger.Info("Starting processors...") + for pipelineID, bp := range bps.pipelines { + for i := len(bp.processors) - 1; i >= 0; i-- { + procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID) + procLogger.Info("Processor is starting...") + if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil { + return err + } + procLogger.Info("Processor started.") + } + } + + bps.telemetry.Logger.Info("Starting receivers...") + for dt, recvByID := range bps.allReceivers { + for recvID, recv := range recvByID { + recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt) + recvLogger.Info("Exporter is starting...") + if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil { + return err + } + recvLogger.Info("Exporter started.") + } + } + return nil +} + +// ShutdownAll stops all pipelines. +// +// Shutdown order is the reverse of starting: receivers, processors, then exporters. +// This gives senders a chance to send all their data to a not "shutdown" component. +func (bps *BuiltPipelines) ShutdownAll(ctx context.Context) error { + var errs error + bps.telemetry.Logger.Info("Stopping receivers...") + for _, recvByID := range bps.allReceivers { + for _, recv := range recvByID { + errs = multierr.Append(errs, recv.Shutdown(ctx)) + } + } + + bps.telemetry.Logger.Info("Stopping processors...") + for _, bp := range bps.pipelines { + for _, p := range bp.processors { + errs = multierr.Append(errs, p.comp.Shutdown(ctx)) + } + } + + bps.telemetry.Logger.Info("Stopping exporters...") + for _, expByID := range bps.allExporters { + for _, exp := range expByID { + errs = multierr.Append(errs, exp.Shutdown(ctx)) + } + } + + return errs +} + +func (bps *BuiltPipelines) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter { + exportersMap := make(map[config.DataType]map[config.ComponentID]component.Exporter) + + exportersMap[config.TracesDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.TracesDataType])) + exportersMap[config.MetricsDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.MetricsDataType])) + exportersMap[config.LogsDataType] = make(map[config.ComponentID]component.Exporter, len(bps.allExporters[config.LogsDataType])) + + for dt, expByID := range bps.allExporters { + for expID, exp := range expByID { + exportersMap[dt][expID] = exp + } + } + + return exportersMap +} + +// Build builds all pipelines from config. +func Build(ctx context.Context, settings component.TelemetrySettings, buildInfo component.BuildInfo, cfg *config.Config, factories component.Factories) (*BuiltPipelines, error) { + exps := &BuiltPipelines{ + telemetry: settings, + allReceivers: make(map[config.DataType]map[config.ComponentID]component.Receiver), + allExporters: make(map[config.DataType]map[config.ComponentID]component.Exporter), + pipelines: make(map[config.ComponentID]*builtPipeline, len(cfg.Service.Pipelines)), + } + + receiversConsumers := make(map[config.DataType]map[config.ComponentID][]baseConsumer) + + // Iterate over all pipelines, and create exporters, then processors. + // Receivers cannot be created since we need to know all consumers, a.k.a. we need all pipelines build up to the + // first processor. + for pipelineID, pipeline := range cfg.Service.Pipelines { + // The data type of the pipeline defines what data type each exporter is expected to receive. + if _, ok := exps.allExporters[pipelineID.Type()]; !ok { + exps.allExporters[pipelineID.Type()] = make(map[config.ComponentID]component.Exporter) + } + expByID := exps.allExporters[pipelineID.Type()] + + bp := &builtPipeline{ + receivers: make([]builtComponent, len(pipeline.Receivers)), + processors: make([]builtComponent, len(pipeline.Processors)), + exporters: make([]builtComponent, len(pipeline.Exporters)), + } + exps.pipelines[pipelineID] = bp + + // Iterate over all Exporters for this pipeline. + for i, expID := range pipeline.Exporters { + // If already created an exporter for this [DataType, ComponentID] nothing to do, will reuse this instance. + if exp, ok := expByID[expID]; ok { + bp.exporters[i] = builtComponent{id: expID, comp: exp} + continue + } + + exp, err := buildExporter(ctx, settings, buildInfo, cfg.Exporters, factories.Exporters, expID, pipelineID) + if err != nil { + return nil, err + } + + bp.exporters[i] = builtComponent{id: expID, comp: exp} + expByID[expID] = exp + } + + // Build a fan out consumer to all exporters. + switch pipelineID.Type() { + case config.TracesDataType: + bp.lastConsumer = buildFanOutExportersTracesConsumer(bp.exporters) + case config.MetricsDataType: + bp.lastConsumer = buildFanOutExportersMetricsConsumer(bp.exporters) + case config.LogsDataType: + bp.lastConsumer = buildFanOutExportersLogsConsumer(bp.exporters) + default: + return nil, fmt.Errorf("create fan-out exporter in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type()) + } + + mutatesConsumedData := bp.lastConsumer.Capabilities().MutatesData + // Build the processors backwards, starting from the last one. + // The last processor points to fan out consumer to all Exporters, then the processor itself becomes a + // consumer for the one that precedes it in the pipeline and so on. + for i := len(pipeline.Processors) - 1; i >= 0; i-- { + procID := pipeline.Processors[i] + + proc, err := buildProcessor(ctx, settings, buildInfo, cfg.Processors, factories.Processors, procID, pipelineID, bp.lastConsumer) + if err != nil { + return nil, err + } + + bp.processors[i] = builtComponent{id: procID, comp: proc} + bp.lastConsumer = proc.(baseConsumer) + mutatesConsumedData = mutatesConsumedData || bp.lastConsumer.Capabilities().MutatesData + } + + // Some consumers may not correctly implement the Capabilities, and ignore the next consumer when calculated the Capabilities. + // Because of this wrap the first consumer if any consumers in the pipeline mutate the data and the first says that it doesn't. + switch pipelineID.Type() { + case config.TracesDataType: + bp.lastConsumer = capTraces{Traces: bp.lastConsumer.(consumer.Traces), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + case config.MetricsDataType: + bp.lastConsumer = capMetrics{Metrics: bp.lastConsumer.(consumer.Metrics), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + case config.LogsDataType: + bp.lastConsumer = capLogs{Logs: bp.lastConsumer.(consumer.Logs), cap: consumer.Capabilities{MutatesData: mutatesConsumedData}} + default: + return nil, fmt.Errorf("create cap consumer in pipeline %q, data type %q is not supported", pipelineID, pipelineID.Type()) + } + + // The data type of the pipeline defines what data type each exporter is expected to receive. + if _, ok := receiversConsumers[pipelineID.Type()]; !ok { + receiversConsumers[pipelineID.Type()] = make(map[config.ComponentID][]baseConsumer) + } + recvConsByID := receiversConsumers[pipelineID.Type()] + // Iterate over all Receivers for this pipeline and just append the lastConsumer as a consumer for the receiver. + for _, recvID := range pipeline.Receivers { + recvConsByID[recvID] = append(recvConsByID[recvID], bp.lastConsumer) + } + } + + // Now that we built the `receiversConsumers` map, we can build the receivers as well. + for pipelineID, pipeline := range cfg.Service.Pipelines { + // The data type of the pipeline defines what data type each exporter is expected to receive. + if _, ok := exps.allReceivers[pipelineID.Type()]; !ok { + exps.allReceivers[pipelineID.Type()] = make(map[config.ComponentID]component.Receiver) + } + recvByID := exps.allReceivers[pipelineID.Type()] + bp := exps.pipelines[pipelineID] + + // Iterate over all Receivers for this pipeline. + for i, recvID := range pipeline.Receivers { + // If already created a receiver for this [DataType, ComponentID] nothing to do. + if exp, ok := recvByID[recvID]; ok { + bp.receivers[i] = builtComponent{id: recvID, comp: exp} + continue + } + + recv, err := buildReceiver(ctx, settings, buildInfo, cfg.Receivers, factories.Receivers, recvID, pipelineID, receiversConsumers[pipelineID.Type()][recvID]) + if err != nil { + return nil, err + } + + bp.receivers[i] = builtComponent{id: recvID, comp: recv} + recvByID[recvID] = recv + } + } + return exps, nil +} + +func buildExporter( + ctx context.Context, + settings component.TelemetrySettings, + buildInfo component.BuildInfo, + cfgs map[config.ComponentID]config.Exporter, + factories map[config.Type]component.ExporterFactory, + id config.ComponentID, + pipelineID config.ComponentID, +) (component.Exporter, error) { + cfg, existsCfg := cfgs[id] + if !existsCfg { + return nil, fmt.Errorf("exporter %q is not configured", id) + } + + factory, existsFactory := factories[id.Type()] + if !existsFactory { + return nil, fmt.Errorf("exporter factory not available for type: %q", id) + } + + set := component.ExporterCreateSettings{ + TelemetrySettings: settings, + BuildInfo: buildInfo, + } + set.TelemetrySettings.Logger = exporterLogger(settings.Logger, id, pipelineID.Type()) + + exp, err := createExporter(ctx, set, cfg, id, pipelineID, factory) + if err != nil { + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + // Could not create because this exporter does not support this data type. + return nil, fmt.Errorf("pipeline %q has an exporter %q, which does not support that data type", pipelineID, id) + } + return nil, fmt.Errorf("error creating %v exporter: %w", id, err) + } + + return exp, nil +} + +func createExporter(ctx context.Context, set component.ExporterCreateSettings, cfg config.Exporter, id config.ComponentID, pipelineID config.ComponentID, factory component.ExporterFactory) (component.Exporter, error) { + switch pipelineID.Type() { + case config.TracesDataType: + return factory.CreateTracesExporter(ctx, set, cfg) + + case config.MetricsDataType: + return factory.CreateMetricsExporter(ctx, set, cfg) + + case config.LogsDataType: + return factory.CreateLogsExporter(ctx, set, cfg) + } + return nil, fmt.Errorf("error creating exporter %q in pipeline %q, data type %s is not supported", id, pipelineID, pipelineID.Type()) +} + +func buildFanOutExportersTracesConsumer(exporters []builtComponent) consumer.Traces { + consumers := make([]consumer.Traces, 0, len(exporters)) + for _, exp := range exporters { + consumers = append(consumers, exp.comp.(consumer.Traces)) + } + // Create a junction point that fans out to all allExporters. + return fanoutconsumer.NewTraces(consumers) +} + +func buildFanOutExportersMetricsConsumer(exporters []builtComponent) consumer.Metrics { + consumers := make([]consumer.Metrics, 0, len(exporters)) + for _, exp := range exporters { + consumers = append(consumers, exp.comp.(consumer.Metrics)) + } + // Create a junction point that fans out to all allExporters. + return fanoutconsumer.NewMetrics(consumers) +} + +func buildFanOutExportersLogsConsumer(exporters []builtComponent) consumer.Logs { + consumers := make([]consumer.Logs, 0, len(exporters)) + for _, exp := range exporters { + consumers = append(consumers, exp.comp.(consumer.Logs)) + } + // Create a junction point that fans out to all allExporters. + return fanoutconsumer.NewLogs(consumers) +} + +func exporterLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger { + return logger.With( + zap.String(components.ZapKindKey, components.ZapKindExporter), + zap.String(components.ZapDataTypeKey, string(dt)), + zap.String(components.ZapNameKey, id.String())) +} + +func buildProcessor(ctx context.Context, + settings component.TelemetrySettings, + buildInfo component.BuildInfo, + cfgs map[config.ComponentID]config.Processor, + factories map[config.Type]component.ProcessorFactory, + id config.ComponentID, + pipelineID config.ComponentID, + next baseConsumer, +) (component.Processor, error) { + procCfg, existsCfg := cfgs[id] + if !existsCfg { + return nil, fmt.Errorf("processor %q is not configured", id) + } + + factory, existsFactory := factories[id.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for type: %q", id) + } + + set := component.ProcessorCreateSettings{ + TelemetrySettings: settings, + BuildInfo: buildInfo, + } + set.TelemetrySettings.Logger = processorLogger(settings.Logger, id, pipelineID) + + proc, err := createProcessor(ctx, set, procCfg, id, pipelineID, next, factory) + if err != nil { + // Could not create because this processor does not support this data type. + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + return nil, fmt.Errorf("pipeline %q has a prorcessor %q, which does not support that data type", pipelineID, id) + } + return nil, fmt.Errorf("error creating processor %q in pipeline %q: %w", id, pipelineID, err) + } + return proc, nil +} + +func createProcessor(ctx context.Context, set component.ProcessorCreateSettings, cfg config.Processor, id config.ComponentID, pipelineID config.ComponentID, next baseConsumer, factory component.ProcessorFactory) (component.Processor, error) { + switch pipelineID.Type() { + case config.TracesDataType: + return factory.CreateTracesProcessor(ctx, set, cfg, next.(consumer.Traces)) + + case config.MetricsDataType: + return factory.CreateMetricsProcessor(ctx, set, cfg, next.(consumer.Metrics)) + + case config.LogsDataType: + return factory.CreateLogsProcessor(ctx, set, cfg, next.(consumer.Logs)) + } + return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) +} + +func processorLogger(logger *zap.Logger, procID config.ComponentID, pipelineID config.ComponentID) *zap.Logger { + return logger.With( + zap.String(components.ZapKindKey, components.ZapKindProcessor), + zap.String(components.ZapNameKey, procID.String()), + zap.String(components.ZapKindPipeline, pipelineID.String())) +} + +func buildReceiver(ctx context.Context, + settings component.TelemetrySettings, + buildInfo component.BuildInfo, + cfgs map[config.ComponentID]config.Receiver, + factories map[config.Type]component.ReceiverFactory, + id config.ComponentID, + pipelineID config.ComponentID, + nexts []baseConsumer, +) (component.Receiver, error) { + cfg, existsCfg := cfgs[id] + if !existsCfg { + return nil, fmt.Errorf("receiver %q is not configured", id) + } + + factory, existsFactory := factories[id.Type()] + if !existsFactory { + return nil, fmt.Errorf("processor factory not available for type: %q", id) + } + + set := component.ReceiverCreateSettings{ + TelemetrySettings: settings, + BuildInfo: buildInfo, + } + set.TelemetrySettings.Logger = receiverLogger(settings.Logger, id, pipelineID.Type()) + + recv, err := createReceiver(ctx, set, cfg, id, pipelineID, nexts, factory) + if err != nil { + // Could not create because this receiver does not support this data type. + if errors.Is(err, component.ErrDataTypeIsNotSupported) { + return nil, fmt.Errorf("pipeline %q has a receiver %q, which does not support that data type", pipelineID, id) + } + return nil, fmt.Errorf("cannot create receiver %v: %w", id, err) + } + + return recv, nil +} + +func createReceiver(ctx context.Context, set component.ReceiverCreateSettings, cfg config.Receiver, id config.ComponentID, pipelineID config.ComponentID, nexts []baseConsumer, factory component.ReceiverFactory) (component.Receiver, error) { + switch pipelineID.Type() { + case config.TracesDataType: + var consumers []consumer.Traces + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Traces)) + } + return factory.CreateTracesReceiver(ctx, set, cfg, fanoutconsumer.NewTraces(consumers)) + case config.MetricsDataType: + var consumers []consumer.Metrics + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Metrics)) + } + return factory.CreateMetricsReceiver(ctx, set, cfg, fanoutconsumer.NewMetrics(consumers)) + case config.LogsDataType: + var consumers []consumer.Logs + for _, next := range nexts { + consumers = append(consumers, next.(consumer.Logs)) + } + return factory.CreateLogsReceiver(ctx, set, cfg, fanoutconsumer.NewLogs(consumers)) + } + return nil, fmt.Errorf("error creating receiver %q in pipeline %q, data type %q is not supported", id, pipelineID, pipelineID.Type()) +} + +func receiverLogger(logger *zap.Logger, id config.ComponentID, dt config.DataType) *zap.Logger { + return logger.With( + zap.String(components.ZapKindKey, components.ZapKindReceiver), + zap.String(components.ZapNameKey, id.String()), + zap.String(components.ZapKindPipeline, string(dt))) +} + +func (bps *BuiltPipelines) ZPagesRequest(w http.ResponseWriter, r *http.Request) { + qValues := r.URL.Query() + pipelineName := qValues.Get(zPipelineName) + componentName := qValues.Get(zComponentName) + componentKind := qValues.Get(zComponentKind) + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Pipelines"}) + zpages.WriteHTMLPipelinesSummaryTable(w, bps.getPipelinesSummaryTableData()) + if pipelineName != "" && componentName != "" && componentKind != "" { + fullName := componentName + if componentKind == "processor" { + fullName = pipelineName + "/" + componentName + } + zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ + Name: componentKind + ": " + fullName, + }) + // TODO: Add config + status info. + } + zpages.WriteHTMLPageFooter(w) +} + +func (bps *BuiltPipelines) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData { + sumData := zpages.SummaryPipelinesTableData{} + sumData.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(bps.pipelines)) + for c, p := range bps.pipelines { + // TODO: Change the template to use ID. + var recvs []string + for _, bRecv := range p.receivers { + recvs = append(recvs, bRecv.id.String()) + } + var procs []string + for _, bProc := range p.processors { + procs = append(procs, bProc.id.String()) + } + var exps []string + for _, bExp := range p.exporters { + exps = append(exps, bExp.id.String()) + } + row := zpages.SummaryPipelinesTableRowData{ + FullName: c.String(), + InputType: string(c.Type()), + MutatesData: p.lastConsumer.Capabilities().MutatesData, + Receivers: recvs, + Processors: procs, + Exporters: exps, + } + sumData.Rows = append(sumData.Rows, row) + } + + sort.Slice(sumData.Rows, func(i, j int) bool { + return sumData.Rows[i].FullName < sumData.Rows[j].FullName + }) + return sumData +} diff --git a/service/internal/builder/pipelines_builder.go b/service/internal/builder/pipelines_builder.go deleted file mode 100644 index 3b006c6c3d9a..000000000000 --- a/service/internal/builder/pipelines_builder.go +++ /dev/null @@ -1,310 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder // import "go.opentelemetry.io/collector/service/internal/builder" - -import ( - "context" - "fmt" - - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/fanoutconsumer" -) - -// builtPipeline is a pipeline that is built based on a config. -// It can have a trace and/or a metrics consumer (the consumer is either the first -// processor in the pipeline or the exporter if pipeline has no processors). -type builtPipeline struct { - logger *zap.Logger - firstTC consumer.Traces - firstMC consumer.Metrics - firstLC consumer.Logs - - // Config is the configuration of this Pipeline. - Config *config.Pipeline - // MutatesData is set to true if any processors in the pipeline - // can mutate the TraceData or MetricsData input argument. - MutatesData bool - - processors []component.Processor -} - -// BuiltPipelines is a map of build pipelines created from pipeline configs. -type BuiltPipelines map[config.ComponentID]*builtPipeline - -func (bps BuiltPipelines) StartProcessors(ctx context.Context, host component.Host) error { - for _, bp := range bps { - bp.logger.Info("Pipeline is starting...") - hostWrapper := components.NewHostWrapper(host, bp.logger) - // Start in reverse order, starting from the back of processors pipeline. - // This is important so that processors that are earlier in the pipeline and - // reference processors that are later in the pipeline do not start sending - // data to later pipelines which are not yet started. - for i := len(bp.processors) - 1; i >= 0; i-- { - if err := bp.processors[i].Start(ctx, hostWrapper); err != nil { - return err - } - } - bp.logger.Info("Pipeline is started.") - } - return nil -} - -func (bps BuiltPipelines) ShutdownProcessors(ctx context.Context) error { - var errs error - for _, bp := range bps { - bp.logger.Info("Pipeline is shutting down...") - for _, p := range bp.processors { - errs = multierr.Append(errs, p.Shutdown(ctx)) - } - bp.logger.Info("Pipeline is shutdown.") - } - - return errs -} - -// pipelinesBuilder builds Pipelines from config. -type pipelinesBuilder struct { - settings component.TelemetrySettings - buildInfo component.BuildInfo - config *config.Config - exporters *BuiltExporters - factories map[config.Type]component.ProcessorFactory -} - -// BuildPipelines builds pipeline processors from config. Requires exporters to be already -// built via BuildExporters. -func BuildPipelines( - settings component.TelemetrySettings, - buildInfo component.BuildInfo, - config *config.Config, - exporters *BuiltExporters, - factories map[config.Type]component.ProcessorFactory, -) (BuiltPipelines, error) { - pb := &pipelinesBuilder{settings, buildInfo, config, exporters, factories} - - pipelineProcessors := make(BuiltPipelines) - for pipelineID, pipeline := range pb.config.Service.Pipelines { - bp, err := pb.buildPipeline(context.Background(), pipelineID, pipeline) - if err != nil { - return nil, err - } - pipelineProcessors[pipelineID] = bp - } - - return pipelineProcessors, nil -} - -// Builds a pipeline of processors. Returns the first processor in the pipeline. -// The last processor in the pipeline will be plugged to fan out the data into exporters -// that are configured for this pipeline. -func (pb *pipelinesBuilder) buildPipeline(ctx context.Context, pipelineID config.ComponentID, pipelineCfg *config.Pipeline) (*builtPipeline, error) { - - // BuildProcessors the pipeline backwards. - - // First create a consumer junction point that fans out the data to all exporters. - var tc consumer.Traces - var mc consumer.Metrics - var lc consumer.Logs - - // Take into consideration the Capabilities for the exporter as well. - mutatesConsumedData := false - switch pipelineID.Type() { - case config.TracesDataType: - tc = pb.buildFanoutExportersTracesConsumer(pipelineCfg.Exporters) - mutatesConsumedData = tc.Capabilities().MutatesData - case config.MetricsDataType: - mc = pb.buildFanoutExportersMetricsConsumer(pipelineCfg.Exporters) - mutatesConsumedData = mc.Capabilities().MutatesData - case config.LogsDataType: - lc = pb.buildFanoutExportersLogsConsumer(pipelineCfg.Exporters) - mutatesConsumedData = lc.Capabilities().MutatesData - } - - processors := make([]component.Processor, len(pipelineCfg.Processors)) - - // Now build the processors backwards, starting from the last one. - // The last processor points to consumer which fans out to exporters, then - // the processor itself becomes a consumer for the one that precedes it in - // in the pipeline and so on. - for i := len(pipelineCfg.Processors) - 1; i >= 0; i-- { - procID := pipelineCfg.Processors[i] - - procCfg, existsCfg := pb.config.Processors[procID] - if !existsCfg { - return nil, fmt.Errorf("processor %q is not configured", procID) - } - - factory, existsFactory := pb.factories[procID.Type()] - if !existsFactory { - return nil, fmt.Errorf("processor factory for type %q is not configured", procID.Type()) - } - - // This processor must point to the next consumer and then - // it becomes the next for the previous one (previous in the pipeline, - // which we will build in the next loop iteration). - var err error - set := component.ProcessorCreateSettings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: pb.settings.Logger.With( - zap.String(components.ZapKindKey, components.ZapKindProcessor), - zap.String(components.ZapNameKey, procID.String()), - zap.String(components.ZapKindPipeline, pipelineID.String())), - TracerProvider: pb.settings.TracerProvider, - MeterProvider: pb.settings.MeterProvider, - MetricsLevel: pb.config.Telemetry.Metrics.Level, - }, - BuildInfo: pb.buildInfo, - } - - switch pipelineID.Type() { - case config.TracesDataType: - var proc component.TracesProcessor - if proc, err = factory.CreateTracesProcessor(ctx, set, procCfg, tc); err != nil { - return nil, fmt.Errorf("error creating processor %q in pipeline %q: %w", procID, pipelineID, err) - } - // Check if the factory really created the processor. - if proc == nil { - return nil, fmt.Errorf("factory for %v produced a nil processor", procID) - } - mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData - processors[i] = proc - tc = proc - case config.MetricsDataType: - var proc component.MetricsProcessor - if proc, err = factory.CreateMetricsProcessor(ctx, set, procCfg, mc); err != nil { - return nil, fmt.Errorf("error creating processor %q in pipeline %q: %w", procID, pipelineID, err) - } - // Check if the factory really created the processor. - if proc == nil { - return nil, fmt.Errorf("factory for %v produced a nil processor", procID) - } - mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData - processors[i] = proc - mc = proc - - case config.LogsDataType: - var proc component.LogsProcessor - if proc, err = factory.CreateLogsProcessor(ctx, set, procCfg, lc); err != nil { - return nil, fmt.Errorf("error creating processor %q in pipeline %q: %w", procID, pipelineID, err) - } - // Check if the factory really created the processor. - if proc == nil { - return nil, fmt.Errorf("factory for %v produced a nil processor", procID) - } - mutatesConsumedData = mutatesConsumedData || proc.Capabilities().MutatesData - processors[i] = proc - lc = proc - - default: - return nil, fmt.Errorf("error creating processor %q in pipeline %q, data type %s is not supported", - procID, pipelineID, pipelineID.Type()) - } - } - - pipelineLogger := pb.settings.Logger.With(zap.String(components.ZapKindKey, components.ZapKindPipeline), - zap.String(components.ZapNameKey, pipelineID.String())) - pipelineLogger.Info("Pipeline was built.") - - // Some consumers may not correctly implement the Capabilities, - // and ignore the next consumer when calculated the Capabilities. - // Because of this wrap the first consumer if any consumers in the pipeline - // mutate the data and the first says that it doesn't. - if tc != nil { - tc = capabilitiesTraces{Traces: tc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} - } - if mc != nil { - mc = capabilitiesMetrics{Metrics: mc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} - } - if lc != nil { - lc = capabilitiesLogs{Logs: lc, capabilities: consumer.Capabilities{MutatesData: mutatesConsumedData}} - } - bp := &builtPipeline{ - logger: pipelineLogger, - firstTC: tc, - firstMC: mc, - firstLC: lc, - Config: pipelineCfg, - MutatesData: mutatesConsumedData, - processors: processors, - } - - return bp, nil -} - -func (pb *pipelinesBuilder) buildFanoutExportersTracesConsumer(exporterIDs []config.ComponentID) consumer.Traces { - tracesExporters := pb.exporters.exporters[config.TracesDataType] - var exporters []consumer.Traces - for _, expID := range exporterIDs { - exporters = append(exporters, tracesExporters[expID].(consumer.Traces)) - } - - // Create a junction point that fans out to all exporters. - return fanoutconsumer.NewTraces(exporters) -} - -func (pb *pipelinesBuilder) buildFanoutExportersMetricsConsumer(exporterIDs []config.ComponentID) consumer.Metrics { - metricsExporters := pb.exporters.exporters[config.MetricsDataType] - var exporters []consumer.Metrics - for _, expID := range exporterIDs { - exporters = append(exporters, metricsExporters[expID].(consumer.Metrics)) - } - - // Create a junction point that fans out to all exporters. - return fanoutconsumer.NewMetrics(exporters) -} - -func (pb *pipelinesBuilder) buildFanoutExportersLogsConsumer(exporterIDs []config.ComponentID) consumer.Logs { - logsExporters := pb.exporters.exporters[config.LogsDataType] - var exporters []consumer.Logs - for _, expID := range exporterIDs { - exporters = append(exporters, logsExporters[expID].(consumer.Logs)) - } - - // Create a junction point that fans out to all exporters. - return fanoutconsumer.NewLogs(exporters) -} - -type capabilitiesLogs struct { - consumer.Logs - capabilities consumer.Capabilities -} - -func (mts capabilitiesLogs) Capabilities() consumer.Capabilities { - return mts.capabilities -} - -type capabilitiesMetrics struct { - consumer.Metrics - capabilities consumer.Capabilities -} - -func (mts capabilitiesMetrics) Capabilities() consumer.Capabilities { - return mts.capabilities -} - -type capabilitiesTraces struct { - consumer.Traces - capabilities consumer.Capabilities -} - -func (mts capabilitiesTraces) Capabilities() consumer.Capabilities { - return mts.capabilities -} diff --git a/service/internal/builder/pipelines_builder_test.go b/service/internal/builder/pipelines_builder_test.go deleted file mode 100644 index f270f04dc5db..000000000000 --- a/service/internal/builder/pipelines_builder_test.go +++ /dev/null @@ -1,263 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "context" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" - "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/service/servicetest" -) - -func TestBuildPipelines(t *testing.T) { - tests := []struct { - name string - pipelineID config.ComponentID - exporterNames []config.ComponentID - }{ - { - name: "one-exporter", - pipelineID: config.NewComponentID("traces"), - exporterNames: []config.ComponentID{config.NewComponentID("exampleexporter")}, - }, - { - name: "multi-exporter", - pipelineID: config.NewComponentIDWithName("traces", "2"), - exporterNames: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testPipeline(t, test.pipelineID, test.exporterNames) - }) - } -} - -func createExampleConfig(dataType config.DataType) *config.Config { - exampleReceiverFactory := testcomponents.ExampleReceiverFactory - exampleProcessorFactory := testcomponents.ExampleProcessorFactory - exampleExporterFactory := testcomponents.ExampleExporterFactory - - cfg := &config.Config{ - Receivers: map[config.ComponentID]config.Receiver{ - config.NewComponentID(exampleReceiverFactory.Type()): exampleReceiverFactory.CreateDefaultConfig(), - }, - Processors: map[config.ComponentID]config.Processor{ - config.NewComponentID(exampleProcessorFactory.Type()): exampleProcessorFactory.CreateDefaultConfig(), - }, - Exporters: map[config.ComponentID]config.Exporter{ - config.NewComponentID(exampleExporterFactory.Type()): exampleExporterFactory.CreateDefaultConfig(), - }, - Service: config.Service{ - Pipelines: map[config.ComponentID]*config.Pipeline{ - config.NewComponentID(dataType): { - Receivers: []config.ComponentID{config.NewComponentID(exampleReceiverFactory.Type())}, - Processors: []config.ComponentID{config.NewComponentID(exampleProcessorFactory.Type())}, - Exporters: []config.ComponentID{config.NewComponentID(exampleExporterFactory.Type())}, - }, - }, - }, - } - return cfg -} - -func TestBuildPipelines_BuildVarious(t *testing.T) { - - factories := createTestFactories() - - tests := []struct { - dataType config.DataType - shouldFail bool - }{ - { - dataType: config.LogsDataType, - shouldFail: false, - }, - { - dataType: "nosuchdatatype", - shouldFail: true, - }, - } - - for _, test := range tests { - t.Run(string(test.dataType), func(t *testing.T) { - cfg := createExampleConfig(test.dataType) - - // BuildProcessors the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - if test.shouldFail { - assert.Error(t, err) - return - } - - require.NoError(t, err) - require.Len(t, allExporters.ToMapByDataType()[config.LogsDataType], 1) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - - assert.NoError(t, err) - require.NotNil(t, pipelineProcessors) - - err = pipelineProcessors.StartProcessors(context.Background(), componenttest.NewNopHost()) - assert.NoError(t, err) - - processor := pipelineProcessors[config.NewComponentID(test.dataType)] - - // Ensure pipeline has its fields correctly populated. - require.NotNil(t, processor) - assert.Nil(t, processor.firstTC) - assert.Nil(t, processor.firstMC) - assert.NotNil(t, processor.firstLC) - - // Compose the list of created exporters. - exporterIDs := []config.ComponentID{config.NewComponentID("exampleexporter")} - var exporters []component.Exporter - for _, expID := range exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[test.dataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send Logs via processor and verify that all exporters of the pipeline receive it. - - // First check that there are no logs in the exporters yet. - var exporterConsumers []*testcomponents.ExampleExporter - for _, exporter := range exporters { - expConsumer := exporter.(*testcomponents.ExampleExporter) - exporterConsumers = append(exporterConsumers, expConsumer) - require.Equal(t, len(expConsumer.Logs), 0) - } - - // Send one custom data. - log := plog.Logs{} - require.NoError(t, processor.firstLC.ConsumeLogs(context.Background(), log)) - - // Now verify received data. - for _, expConsumer := range exporterConsumers { - // Check that the trace is received by exporter. - require.Equal(t, 1, len(expConsumer.Logs)) - - // Verify that span is successfully delivered. - assert.EqualValues(t, log, expConsumer.Logs[0]) - } - - err = pipelineProcessors.ShutdownProcessors(context.Background()) - assert.NoError(t, err) - }) - } -} - -func testPipeline(t *testing.T, pipelineID config.ComponentID, exporterIDs []config.ComponentID) { - factories, err := testcomponents.ExampleComponents() - assert.NoError(t, err) - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "pipelines_builder.yaml"), factories) - // Unmarshal the config - require.Nil(t, err) - - // BuildProcessors the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - - assert.NoError(t, err) - require.NotNil(t, pipelineProcessors) - - assert.NoError(t, pipelineProcessors.StartProcessors(context.Background(), componenttest.NewNopHost())) - - processor := pipelineProcessors[pipelineID] - - // Ensure pipeline has its fields correctly populated. - require.NotNil(t, processor) - assert.NotNil(t, processor.firstTC) - assert.Nil(t, processor.firstMC) - - // Compose the list of created exporters. - var exporters []component.Exporter - for _, expID := range exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[config.TracesDataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send TraceData via processor and verify that all exporters of the pipeline receive it. - - // First check that there are no traces in the exporters yet. - var exporterConsumers []*testcomponents.ExampleExporter - for _, exporter := range exporters { - expConsumer := exporter.(*testcomponents.ExampleExporter) - exporterConsumers = append(exporterConsumers, expConsumer) - require.Equal(t, len(expConsumer.Traces), 0) - } - - td := testdata.GenerateTracesOneSpan() - require.NoError(t, processor.firstTC.ConsumeTraces(context.Background(), td)) - - // Now verify received data. - for _, expConsumer := range exporterConsumers { - // Check that the trace is received by exporter. - require.Equal(t, 1, len(expConsumer.Traces)) - - // Verify that span is successfully delivered. - assert.EqualValues(t, td, expConsumer.Traces[0]) - } - - err = pipelineProcessors.ShutdownProcessors(context.Background()) - assert.NoError(t, err) -} - -func TestBuildPipelines_NotSupportedDataType(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - configFile string - }{ - { - configFile: "not_supported_processor_logs.yaml", - }, - { - configFile: "not_supported_processor_metrics.yaml", - }, - { - configFile: "not_supported_processor_traces.yaml", - }, - } - - for _, test := range tests { - t.Run(test.configFile, func(t *testing.T) { - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories) - require.Nil(t, err) - - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.Error(t, err) - assert.Zero(t, len(pipelineProcessors)) - }) - } -} diff --git a/service/internal/builder/pipelines_test.go b/service/internal/builder/pipelines_test.go new file mode 100644 index 000000000000..d43535855cab --- /dev/null +++ b/service/internal/builder/pipelines_test.go @@ -0,0 +1,289 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/internal/testcomponents" + "go.opentelemetry.io/collector/internal/testdata" + "go.opentelemetry.io/collector/service/servicetest" +) + +func TestBuild(t *testing.T) { + tests := []struct { + name string + receiverIDs []config.ComponentID + processorIDs []config.ComponentID + exporterIDs []config.ComponentID + expectedRequests int + }{ + { + name: "pipelines_simple.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_simple_multi_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentID("exampleprocessor")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_simple_no_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 1, + }, + { + name: "pipelines_multi.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver"), config.NewComponentIDWithName("examplereceiver", "1")}, + processorIDs: []config.ComponentID{config.NewComponentID("exampleprocessor"), config.NewComponentIDWithName("exampleprocessor", "1")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "1")}, + expectedRequests: 2, + }, + { + name: "pipelines_multi_no_proc.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver"), config.NewComponentIDWithName("examplereceiver", "1")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "1")}, + expectedRequests: 2, + }, + { + name: "pipelines_exporter_multi_pipeline.yaml", + receiverIDs: []config.ComponentID{config.NewComponentID("examplereceiver")}, + exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, + expectedRequests: 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + factories, err := testcomponents.ExampleComponents() + assert.NoError(t, err) + + cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.name), factories) + require.NoError(t, err) + + // Build the pipeline + pipelines, err := Build(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories) + assert.NoError(t, err) + + assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + + // Verify exporters created, started and empty. + for _, expID := range test.exporterIDs { + traceExporter := pipelines.GetExporters()[config.TracesDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, traceExporter.Started) + assert.Equal(t, len(traceExporter.Traces), 0) + + // Validate metrics. + metricsExporter := pipelines.GetExporters()[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, metricsExporter.Started) + assert.Zero(t, len(metricsExporter.Traces)) + + // Validate logs. + logsExporter := pipelines.GetExporters()[config.LogsDataType][expID].(*testcomponents.ExampleExporter) + assert.True(t, logsExporter.Started) + assert.Zero(t, len(logsExporter.Traces)) + } + + // Verify processors created in the given order and started. + for i, procID := range test.processorIDs { + traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].processors[i] + assert.Equal(t, procID, traceProcessor.id) + assert.True(t, traceProcessor.comp.(*testcomponents.ExampleProcessor).Started) + + // Validate metrics. + metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].processors[i] + assert.Equal(t, procID, metricsProcessor.id) + assert.True(t, metricsProcessor.comp.(*testcomponents.ExampleProcessor).Started) + + // Validate logs. + logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].processors[i] + assert.Equal(t, procID, logsProcessor.id) + assert.True(t, logsProcessor.comp.(*testcomponents.ExampleProcessor).Started) + } + + // Verify receivers created, started and send data to confirm pipelines correctly connected. + for _, recvID := range test.receiverIDs { + traceReceiver := pipelines.allReceivers[config.TracesDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, traceReceiver.Started) + // Send traces. + assert.NoError(t, traceReceiver.ConsumeTraces(context.Background(), testdata.GenerateTracesOneSpan())) + + metricsReceiver := pipelines.allReceivers[config.MetricsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, metricsReceiver.Started) + // Send metrics. + assert.NoError(t, metricsReceiver.ConsumeMetrics(context.Background(), testdata.GenerateMetricsOneMetric())) + + logsReceiver := pipelines.allReceivers[config.LogsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, logsReceiver.Started) + // Send logs. + assert.NoError(t, logsReceiver.ConsumeLogs(context.Background(), testdata.GenerateLogsOneLogRecord())) + } + + assert.NoError(t, pipelines.ShutdownAll(context.Background())) + + // Verify receivers shutdown. + for _, recvID := range test.receiverIDs { + traceReceiver := pipelines.allReceivers[config.TracesDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, traceReceiver.Stopped) + + metricsReceiver := pipelines.allReceivers[config.MetricsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, metricsReceiver.Stopped) + + logsReceiver := pipelines.allReceivers[config.LogsDataType][recvID].(*testcomponents.ExampleReceiver) + assert.True(t, logsReceiver.Stopped) + } + + // Verify processors shutdown. + for i := range test.processorIDs { + traceProcessor := pipelines.pipelines[config.NewComponentID(config.TracesDataType)].processors[i] + assert.True(t, traceProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) + + // Validate metrics. + metricsProcessor := pipelines.pipelines[config.NewComponentID(config.MetricsDataType)].processors[i] + assert.True(t, metricsProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) + + // Validate logs. + logsProcessor := pipelines.pipelines[config.NewComponentID(config.LogsDataType)].processors[i] + assert.True(t, logsProcessor.comp.(*testcomponents.ExampleProcessor).Stopped) + } + + // Now verify that exporters received data, and are shutdown. + for _, expID := range test.exporterIDs { + // Validate traces. + traceExporter := pipelines.GetExporters()[config.TracesDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, traceExporter.Traces, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateTracesOneSpan(), traceExporter.Traces[0]) + assert.True(t, traceExporter.Stopped) + + // Validate metrics. + metricsExporter := pipelines.GetExporters()[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, metricsExporter.Metrics, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateMetricsOneMetric(), metricsExporter.Metrics[0]) + assert.True(t, metricsExporter.Stopped) + + // Validate logs. + logsExporter := pipelines.GetExporters()[config.LogsDataType][expID].(*testcomponents.ExampleExporter) + require.Len(t, logsExporter.Logs, test.expectedRequests) + assert.EqualValues(t, testdata.GenerateLogsOneLogRecord(), logsExporter.Logs[0]) + assert.True(t, logsExporter.Stopped) + } + }) + } +} + +func TestBuildExportersNotSupportedDataType(t *testing.T) { + nopReceiverFactory := componenttest.NewNopReceiverFactory() + nopProcessorFactory := componenttest.NewNopProcessorFactory() + nopExporterFactory := componenttest.NewNopExporterFactory() + badReceiverFactory := newBadReceiverFactory() + badProcessorFactory := newBadProcessorFactory() + badExporterFactory := newBadExporterFactory() + + tests := []struct { + configFile string + }{ + {configFile: "not_supported_exporter_logs.yaml"}, + {configFile: "not_supported_exporter_metrics.yaml"}, + {configFile: "not_supported_exporter_traces.yaml"}, + {configFile: "not_supported_processor_logs.yaml"}, + {configFile: "not_supported_processor_metrics.yaml"}, + {configFile: "not_supported_processor_traces.yaml"}, + {configFile: "not_supported_receiver_traces.yaml"}, + {configFile: "not_supported_receiver_metrics.yaml"}, + {configFile: "not_supported_receiver_traces.yaml"}, + {configFile: "unknown_exporter_config.yaml"}, + {configFile: "unknown_exporter_factory.yaml"}, + {configFile: "unknown_processor_config.yaml"}, + {configFile: "unknown_processor_factory.yaml"}, + {configFile: "unknown_receiver_config.yaml"}, + {configFile: "unknown_receiver_factory.yaml"}, + } + + for _, test := range tests { + t.Run(test.configFile, func(t *testing.T) { + factories := component.Factories{ + Receivers: map[config.Type]component.ReceiverFactory{ + nopReceiverFactory.Type(): nopReceiverFactory, + "unknown": nopReceiverFactory, + badReceiverFactory.Type(): badReceiverFactory, + }, + Processors: map[config.Type]component.ProcessorFactory{ + nopProcessorFactory.Type(): nopProcessorFactory, + "unknown": nopProcessorFactory, + badProcessorFactory.Type(): badProcessorFactory, + }, + Exporters: map[config.Type]component.ExporterFactory{ + nopExporterFactory.Type(): nopExporterFactory, + "unknown": nopExporterFactory, + badExporterFactory.Type(): badExporterFactory, + }, + } + + cfg, err := servicetest.LoadConfig(filepath.Join("testdata", test.configFile), factories) + require.NoError(t, err) + + delete(factories.Exporters, "unknown") + delete(factories.Processors, "unknown") + delete(factories.Receivers, "unknown") + + _, err = Build(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories) + assert.Error(t, err) + }) + } +} + +func newBadReceiverFactory() component.ReceiverFactory { + return component.NewReceiverFactory("bf", func() config.Receiver { + return &struct { + config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ReceiverSettings: config.NewReceiverSettings(config.NewComponentID("bf")), + } + }) +} + +func newBadProcessorFactory() component.ProcessorFactory { + return component.NewProcessorFactory("bf", func() config.Processor { + return &struct { + config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ProcessorSettings: config.NewProcessorSettings(config.NewComponentID("bf")), + } + }) +} + +func newBadExporterFactory() component.ExporterFactory { + return component.NewExporterFactory("bf", func() config.Exporter { + return &struct { + config.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ExporterSettings: config.NewExporterSettings(config.NewComponentID("bf")), + } + }) +} diff --git a/service/internal/builder/receivers_builder.go b/service/internal/builder/receivers_builder.go deleted file mode 100644 index f73d2be3e488..000000000000 --- a/service/internal/builder/receivers_builder.go +++ /dev/null @@ -1,293 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder // import "go.opentelemetry.io/collector/service/internal/builder" - -import ( - "context" - "errors" - "fmt" - - "go.uber.org/multierr" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/service/internal/components" - "go.opentelemetry.io/collector/service/internal/fanoutconsumer" -) - -var errUnusedReceiver = errors.New("receiver defined but not used by any pipeline") - -// builtReceiver is a receiver that is built based on a config. It can have -// a trace and/or a metrics component. -type builtReceiver struct { - logger *zap.Logger - receiver component.Receiver -} - -// Start starts the receiver. -func (rcv *builtReceiver) Start(ctx context.Context, host component.Host) error { - return rcv.receiver.Start(ctx, components.NewHostWrapper(host, rcv.logger)) -} - -// Shutdown stops the receiver. -func (rcv *builtReceiver) Shutdown(ctx context.Context) error { - return rcv.receiver.Shutdown(ctx) -} - -// Receivers is a map of receivers created from receiver configs. -type Receivers map[config.ComponentID]*builtReceiver - -// ShutdownAll stops all receivers. -func (rcvs Receivers) ShutdownAll(ctx context.Context) error { - var err error - for _, rcv := range rcvs { - err = multierr.Append(err, rcv.Shutdown(ctx)) - } - - return err -} - -// StartAll starts all receivers. -func (rcvs Receivers) StartAll(ctx context.Context, host component.Host) error { - for _, rcv := range rcvs { - rcv.logger.Info("Receiver is starting...") - - if err := rcv.Start(ctx, host); err != nil { - return err - } - rcv.logger.Info("Receiver started.") - } - return nil -} - -// receiversBuilder builds receivers from config. -type receiversBuilder struct { - config *config.Config - builtPipelines BuiltPipelines - factories map[config.Type]component.ReceiverFactory -} - -// BuildReceivers builds Receivers from config. -func BuildReceivers( - settings component.TelemetrySettings, - buildInfo component.BuildInfo, - cfg *config.Config, - builtPipelines BuiltPipelines, - factories map[config.Type]component.ReceiverFactory, -) (Receivers, error) { - rb := &receiversBuilder{cfg, builtPipelines, factories} - - receivers := make(Receivers) - for recvID, recvCfg := range cfg.Receivers { - set := component.ReceiverCreateSettings{ - TelemetrySettings: component.TelemetrySettings{ - Logger: settings.Logger.With( - zap.String(components.ZapKindKey, components.ZapKindReceiver), - zap.String(components.ZapNameKey, recvID.String())), - TracerProvider: settings.TracerProvider, - MeterProvider: settings.MeterProvider, - MetricsLevel: cfg.Telemetry.Metrics.Level, - }, - BuildInfo: buildInfo, - } - - rcv, err := rb.buildReceiver(context.Background(), set, recvID, recvCfg) - if err != nil { - if errors.Is(err, errUnusedReceiver) { - set.Logger.Info("Ignoring receiver as it is not used by any pipeline") - continue - } - return nil, err - } - receivers[recvID] = rcv - } - - return receivers, nil -} - -// hasReceiver returns true if the pipeline is attached to specified receiver. -func hasReceiver(pipeline *config.Pipeline, receiverID config.ComponentID) bool { - for _, id := range pipeline.Receivers { - if id == receiverID { - return true - } - } - return false -} - -type attachedPipelines map[config.DataType][]*builtPipeline - -func (rb *receiversBuilder) findPipelinesToAttach(receiverID config.ComponentID) (attachedPipelines, error) { - // A receiver may be attached to multiple pipelines. Pipelines may consume different - // data types. We need to compile the list of pipelines of each type that must be - // attached to this receiver according to configuration. - - pipelinesToAttach := make(attachedPipelines) - - // Iterate over all pipelines. - for pipelineID, pipelineCfg := range rb.config.Service.Pipelines { - // Get the first processor of the pipeline. - pipelineProcessor := rb.builtPipelines[pipelineID] - if pipelineProcessor == nil { - return nil, fmt.Errorf("cannot find pipeline %q", pipelineID) - } - - // Is this receiver attached to the pipeline? - if hasReceiver(pipelineCfg, receiverID) { - if _, exists := pipelinesToAttach[pipelineID.Type()]; !exists { - pipelinesToAttach[pipelineID.Type()] = make([]*builtPipeline, 0) - } - - // Yes, add it to the list of pipelines of corresponding data type. - pipelinesToAttach[pipelineID.Type()] = append(pipelinesToAttach[pipelineID.Type()], pipelineProcessor) - } - } - - return pipelinesToAttach, nil -} - -func attachReceiverToPipelines( - ctx context.Context, - set component.ReceiverCreateSettings, - factory component.ReceiverFactory, - dataType config.DataType, - id config.ComponentID, - cfg config.Receiver, - rcv *builtReceiver, - builtPipelines []*builtPipeline, -) error { - // There are pipelines of the specified data type that must be attached to - // the receiver. Create the receiver of corresponding data type and make - // sure its output is fanned out to all attached pipelines. - var err error - var createdReceiver component.Receiver - - switch dataType { - case config.TracesDataType: - junction := buildFanoutTraceConsumer(builtPipelines) - createdReceiver, err = factory.CreateTracesReceiver(ctx, set, cfg, junction) - - case config.MetricsDataType: - junction := buildFanoutMetricConsumer(builtPipelines) - createdReceiver, err = factory.CreateMetricsReceiver(ctx, set, cfg, junction) - - case config.LogsDataType: - junction := buildFanoutLogConsumer(builtPipelines) - createdReceiver, err = factory.CreateLogsReceiver(ctx, set, cfg, junction) - - default: - err = component.ErrDataTypeIsNotSupported - } - - if err != nil { - if errors.Is(err, component.ErrDataTypeIsNotSupported) { - return fmt.Errorf( - "receiver %v does not support %s but it was used in a %s pipeline", - id, dataType, dataType) - } - return fmt.Errorf("cannot create receiver %v: %w", id, err) - } - - // Check if the factory really created the receiver. - if createdReceiver == nil { - return fmt.Errorf("factory for %v produced a nil receiver", id) - } - - if rcv.receiver != nil { - // The receiver was previously created for this config. This can happen if the - // same receiver type supports more than one data type. In that case we expect - // that CreateTracesReceiver and CreateMetricsReceiver return the same value. - if rcv.receiver != createdReceiver { - return fmt.Errorf( - "factory for %q is implemented incorrectly: "+ - "CreateTracesReceiver, CreateMetricsReceiver and CreateLogsReceiver must return "+ - "the same receiver pointer when creating receivers of different data types", - id, - ) - } - } - rcv.receiver = createdReceiver - - set.Logger.Info("Receiver was built.", zap.String("datatype", string(dataType))) - - return nil -} - -func (rb *receiversBuilder) buildReceiver(ctx context.Context, set component.ReceiverCreateSettings, id config.ComponentID, cfg config.Receiver) (*builtReceiver, error) { - - // First find pipelines that must be attached to this receiver. - pipelinesToAttach, err := rb.findPipelinesToAttach(id) - if err != nil { - return nil, err - } - - // Prepare to build the receiver. - factory := rb.factories[id.Type()] - if factory == nil { - return nil, fmt.Errorf("receiver factory not found for: %v", cfg.ID()) - } - rcv := &builtReceiver{ - logger: set.Logger, - } - - // Now we have list of pipelines broken down by data type. Iterate for each data type. - for dataType, pipelines := range pipelinesToAttach { - if len(pipelines) == 0 { - // No pipelines of this data type are attached to this receiver. - continue - } - - // Attach the corresponding part of the receiver to all pipelines that require - // this data type. - if err = attachReceiverToPipelines(ctx, set, factory, dataType, id, cfg, rcv, pipelines); err != nil { - return nil, err - } - } - - if rcv.receiver == nil { - return nil, errUnusedReceiver - } - - return rcv, nil -} - -func buildFanoutTraceConsumer(pipelines []*builtPipeline) consumer.Traces { - var pipelineConsumers []consumer.Traces - for _, pipeline := range pipelines { - pipelineConsumers = append(pipelineConsumers, pipeline.firstTC) - } - // Create a junction point that fans out to all pipelines. - return fanoutconsumer.NewTraces(pipelineConsumers) -} - -func buildFanoutMetricConsumer(pipelines []*builtPipeline) consumer.Metrics { - var pipelineConsumers []consumer.Metrics - for _, pipeline := range pipelines { - pipelineConsumers = append(pipelineConsumers, pipeline.firstMC) - } - // Create a junction point that fans out to all pipelines. - return fanoutconsumer.NewMetrics(pipelineConsumers) -} - -func buildFanoutLogConsumer(pipelines []*builtPipeline) consumer.Logs { - var pipelineConsumers []consumer.Logs - for _, pipeline := range pipelines { - pipelineConsumers = append(pipelineConsumers, pipeline.firstLC) - } - // Create a junction point that fans out to all pipelines. - return fanoutconsumer.NewLogs(pipelineConsumers) -} diff --git a/service/internal/builder/receivers_builder_test.go b/service/internal/builder/receivers_builder_test.go deleted file mode 100644 index 157d56c04057..000000000000 --- a/service/internal/builder/receivers_builder_test.go +++ /dev/null @@ -1,321 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package builder - -import ( - "context" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config" - "go.opentelemetry.io/collector/internal/testcomponents" - "go.opentelemetry.io/collector/internal/testdata" - "go.opentelemetry.io/collector/pdata/plog" - "go.opentelemetry.io/collector/service/servicetest" -) - -type testCase struct { - name string - receiverID config.ComponentID - exporterIDs []config.ComponentID - spanDuplicationByExporter map[config.ComponentID]int - hasTraces bool - hasMetrics bool -} - -func TestBuildReceivers(t *testing.T) { - tests := []testCase{ - { - name: "one-exporter", - receiverID: config.NewComponentID("examplereceiver"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter")}, - hasTraces: true, - hasMetrics: true, - }, - { - name: "multi-exporter", - receiverID: config.NewComponentIDWithName("examplereceiver", "2"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - hasTraces: true, - }, - { - name: "multi-metrics-receiver", - receiverID: config.NewComponentIDWithName("examplereceiver", "3"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - hasTraces: false, - hasMetrics: true, - }, - { - name: "multi-receiver-multi-exporter", - receiverID: config.NewComponentIDWithName("examplereceiver", "multi"), - exporterIDs: []config.ComponentID{config.NewComponentID("exampleexporter"), config.NewComponentIDWithName("exampleexporter", "2")}, - - // Check pipelines_builder.yaml to understand this case. - // We have 2 pipelines, one exporting to one exporter, the other - // exporting to both exporters, so we expect a duplication on - // one of the exporters, but not on the other. - spanDuplicationByExporter: map[config.ComponentID]int{ - config.NewComponentID("exampleexporter"): 2, config.NewComponentIDWithName("exampleexporter", "2"): 1, - }, - hasTraces: true, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testReceivers(t, test) - }) - } -} - -func testReceivers(t *testing.T, test testCase) { - factories, err := testcomponents.ExampleComponents() - assert.NoError(t, err) - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "pipelines_builder.yaml"), factories) - require.NoError(t, err) - - // Build the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - - assert.NoError(t, err) - require.NotNil(t, receivers) - - receiver := receivers[test.receiverID] - - // Ensure receiver has its fields correctly populated. - require.NotNil(t, receiver) - - assert.NotNil(t, receiver.receiver) - - // Compose the list of created exporters. - var exporters []component.Exporter - for _, expID := range test.exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[config.TracesDataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send TraceData via receiver and verify that all exporters of the pipeline receive it. - - // First check that there are no traces in the exporters yet. - for _, exporter := range exporters { - consumer := exporter.(*testcomponents.ExampleExporter) - require.Equal(t, len(consumer.Traces), 0) - require.Equal(t, len(consumer.Metrics), 0) - } - - td := testdata.GenerateTracesOneSpan() - if test.hasTraces { - traceProducer := receiver.receiver.(*testcomponents.ExampleReceiver) - assert.NoError(t, traceProducer.ConsumeTraces(context.Background(), td)) - } - - md := testdata.GenerateMetricsOneMetric() - if test.hasMetrics { - metricsProducer := receiver.receiver.(*testcomponents.ExampleReceiver) - assert.NoError(t, metricsProducer.ConsumeMetrics(context.Background(), md)) - } - - // Now verify received data. - for _, expID := range test.exporterIDs { - // Validate traces. - if test.hasTraces { - var spanDuplicationCount int - if test.spanDuplicationByExporter != nil { - spanDuplicationCount = test.spanDuplicationByExporter[expID] - } else { - spanDuplicationCount = 1 - } - - traceConsumer := allExporters.exporters[config.TracesDataType][expID].(*testcomponents.ExampleExporter) - require.Equal(t, spanDuplicationCount, len(traceConsumer.Traces)) - - for i := 0; i < spanDuplicationCount; i++ { - assert.EqualValues(t, td, traceConsumer.Traces[i]) - } - } - - // Validate metrics. - if test.hasMetrics { - metricsConsumer := allExporters.exporters[config.MetricsDataType][expID].(*testcomponents.ExampleExporter) - require.Equal(t, 1, len(metricsConsumer.Metrics)) - assert.EqualValues(t, md, metricsConsumer.Metrics[0]) - } - } -} - -func TestBuildReceiversBuildCustom(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - dataType config.DataType - shouldFail bool - }{ - { - dataType: config.LogsDataType, - shouldFail: false, - }, - { - dataType: "nosuchdatatype", - shouldFail: true, - }, - } - - for _, test := range tests { - t.Run(string(test.dataType), func(t *testing.T) { - cfg := createExampleConfig(test.dataType) - - // Build the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - if test.shouldFail { - assert.Error(t, err) - return - } - - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - - assert.NoError(t, err) - require.NotNil(t, receivers) - - receiver := receivers[config.NewComponentID("examplereceiver")] - - // Ensure receiver has its fields correctly populated. - require.NotNil(t, receiver) - - assert.NotNil(t, receiver.receiver) - - // Compose the list of created exporters. - exporterIDs := []config.ComponentID{config.NewComponentID("exampleexporter")} - var exporters []component.Exporter - for _, expID := range exporterIDs { - // Ensure exporter is created. - exp := allExporters.exporters[config.LogsDataType][expID] - require.NotNil(t, exp) - exporters = append(exporters, exp) - } - - // Send Data via receiver and verify that all exporters of the pipeline receive it. - - // First check that there are no traces in the exporters yet. - for _, exporter := range exporters { - consumer := exporter.(*testcomponents.ExampleExporter) - require.Equal(t, len(consumer.Logs), 0) - } - - // Send one data. - log := plog.Logs{} - producer := receiver.receiver.(*testcomponents.ExampleReceiver) - require.NoError(t, producer.ConsumeLogs(context.Background(), log)) - - // Now verify received data. - for _, exporter := range exporters { - // Validate exported data. - consumer := exporter.(*testcomponents.ExampleExporter) - require.Equal(t, 1, len(consumer.Logs)) - assert.EqualValues(t, log, consumer.Logs[0]) - } - }) - } -} - -func TestBuildReceivers_StartAll(t *testing.T) { - receivers := make(Receivers) - receiver := &testcomponents.ExampleReceiver{} - - receivers[config.NewComponentID("example")] = &builtReceiver{ - logger: zap.NewNop(), - receiver: receiver, - } - - assert.False(t, receiver.Started) - assert.NoError(t, receivers.StartAll(context.Background(), componenttest.NewNopHost())) - assert.True(t, receiver.Started) - - assert.False(t, receiver.Stopped) - assert.NoError(t, receivers.ShutdownAll(context.Background())) - assert.True(t, receiver.Stopped) -} - -func TestBuildReceivers_Unused(t *testing.T) { - factories, err := testcomponents.ExampleComponents() - assert.NoError(t, err) - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", "unused_receiver.yaml"), factories) - assert.NoError(t, err) - - // Build the pipeline - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - assert.NoError(t, err) - assert.NotNil(t, receivers) - - assert.NoError(t, receivers.StartAll(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, receivers.ShutdownAll(context.Background())) -} - -func TestBuildReceivers_NotSupportedDataType(t *testing.T) { - factories := createTestFactories() - - tests := []struct { - configFile string - }{ - { - configFile: "not_supported_receiver_logs.yaml", - }, - { - configFile: "not_supported_receiver_metrics.yaml", - }, - { - configFile: "not_supported_receiver_traces.yaml", - }, - } - - for _, test := range tests { - t.Run(test.configFile, func(t *testing.T) { - - cfg, err := servicetest.LoadConfigAndValidate(filepath.Join("testdata", test.configFile), factories) - assert.NoError(t, err) - require.NotNil(t, cfg) - - allExporters, err := BuildExporters(context.Background(), componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, factories.Exporters) - assert.NoError(t, err) - - pipelineProcessors, err := BuildPipelines(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, allExporters, factories.Processors) - assert.NoError(t, err) - - receivers, err := BuildReceivers(componenttest.NewNopTelemetrySettings(), component.NewDefaultBuildInfo(), cfg, pipelineProcessors, factories.Receivers) - assert.Error(t, err) - assert.Zero(t, len(receivers)) - }) - } -} diff --git a/service/internal/builder/testdata/not_supported_exporter_logs.yaml b/service/internal/builder/testdata/not_supported_exporter_logs.yaml index 5dcb27600fe9..2907aabeb5f8 100644 --- a/service/internal/builder/testdata/not_supported_exporter_logs.yaml +++ b/service/internal/builder/testdata/not_supported_exporter_logs.yaml @@ -1,10 +1,10 @@ receivers: - examplereceiver: + nop: exporters: bf: service: pipelines: logs: - receivers: [examplereceiver] + receivers: [nop] exporters: [bf] diff --git a/service/internal/builder/testdata/not_supported_exporter_metrics.yaml b/service/internal/builder/testdata/not_supported_exporter_metrics.yaml index c46e8b5c40d7..962aad9ef812 100644 --- a/service/internal/builder/testdata/not_supported_exporter_metrics.yaml +++ b/service/internal/builder/testdata/not_supported_exporter_metrics.yaml @@ -1,10 +1,10 @@ receivers: - examplereceiver: + nop: exporters: bf: service: pipelines: metrics: - receivers: [examplereceiver] + receivers: [nop] exporters: [bf] diff --git a/service/internal/builder/testdata/not_supported_exporter_traces.yaml b/service/internal/builder/testdata/not_supported_exporter_traces.yaml index b1b4283e8ee3..88aaba0a2143 100644 --- a/service/internal/builder/testdata/not_supported_exporter_traces.yaml +++ b/service/internal/builder/testdata/not_supported_exporter_traces.yaml @@ -1,10 +1,10 @@ receivers: - examplereceiver: + nop: exporters: bf: service: pipelines: traces: - receivers: [examplereceiver] + receivers: [nop] exporters: [bf] diff --git a/service/internal/builder/testdata/not_supported_processor_logs.yaml b/service/internal/builder/testdata/not_supported_processor_logs.yaml index bf8ce83017ec..49636decb7bf 100644 --- a/service/internal/builder/testdata/not_supported_processor_logs.yaml +++ b/service/internal/builder/testdata/not_supported_processor_logs.yaml @@ -1,13 +1,13 @@ receivers: - examplereceiver: + nop: processors: bf: exporters: - exampleexporter: + nop: service: pipelines: logs: - receivers: [examplereceiver] + receivers: [nop] processors: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_processor_metrics.yaml b/service/internal/builder/testdata/not_supported_processor_metrics.yaml index 38d0cd79a142..bb62de57e18c 100644 --- a/service/internal/builder/testdata/not_supported_processor_metrics.yaml +++ b/service/internal/builder/testdata/not_supported_processor_metrics.yaml @@ -1,13 +1,13 @@ receivers: - examplereceiver: + nop: processors: bf: exporters: - exampleexporter: + nop: service: pipelines: metrics: - receivers: [examplereceiver] + receivers: [nop] processors: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_processor_traces.yaml b/service/internal/builder/testdata/not_supported_processor_traces.yaml index d9c931a2d7bd..79baed24a29b 100644 --- a/service/internal/builder/testdata/not_supported_processor_traces.yaml +++ b/service/internal/builder/testdata/not_supported_processor_traces.yaml @@ -1,13 +1,13 @@ receivers: - examplereceiver: + nop: processors: bf: exporters: - exampleexporter: + nop: service: pipelines: traces: - receivers: [examplereceiver] + receivers: [nop] processors: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_receiver_logs.yaml b/service/internal/builder/testdata/not_supported_receiver_logs.yaml index e4ac06f49b88..f59d862f0aea 100644 --- a/service/internal/builder/testdata/not_supported_receiver_logs.yaml +++ b/service/internal/builder/testdata/not_supported_receiver_logs.yaml @@ -1,10 +1,10 @@ receivers: bf: # this is the bad receiver factory exporters: - exampleexporter: + nop: service: pipelines: logs: receivers: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_receiver_metrics.yaml b/service/internal/builder/testdata/not_supported_receiver_metrics.yaml index e73eb326c001..20edc1d388c5 100644 --- a/service/internal/builder/testdata/not_supported_receiver_metrics.yaml +++ b/service/internal/builder/testdata/not_supported_receiver_metrics.yaml @@ -1,10 +1,10 @@ receivers: bf: # this is the bad receiver factory exporters: - exampleexporter: + nop: service: pipelines: metrics: receivers: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/not_supported_receiver_traces.yaml b/service/internal/builder/testdata/not_supported_receiver_traces.yaml index 1359d79578cd..ec29c48b204a 100644 --- a/service/internal/builder/testdata/not_supported_receiver_traces.yaml +++ b/service/internal/builder/testdata/not_supported_receiver_traces.yaml @@ -1,10 +1,10 @@ receivers: bf: # this is the bad receiver factory exporters: - exampleexporter: + nop: service: pipelines: traces: receivers: [bf] - exporters: [exampleexporter] + exporters: [nop] diff --git a/service/internal/builder/testdata/pipelines_builder.yaml b/service/internal/builder/testdata/pipelines_builder.yaml deleted file mode 100644 index ef965adb5fac..000000000000 --- a/service/internal/builder/testdata/pipelines_builder.yaml +++ /dev/null @@ -1,40 +0,0 @@ -receivers: - examplereceiver: - examplereceiver/2: - examplereceiver/3: - examplereceiver/multi: - -processors: - exampleprocessor: - -exporters: - exampleexporter: - exampleexporter/2: - -service: - pipelines: - traces: - receivers: [examplereceiver, examplereceiver/multi] - processors: [exampleprocessor] - exporters: [exampleexporter] - - traces/2: - receivers: [examplereceiver/2, examplereceiver/multi] - processors: [exampleprocessor] - exporters: [exampleexporter, exampleexporter/2] - - metrics: - receivers: [examplereceiver] - exporters: [exampleexporter] - - metrics/2: - receivers: [examplereceiver/3] - exporters: [exampleexporter] - - metrics/3: - receivers: [examplereceiver/3] - exporters: [exampleexporter/2] - - logs: - receivers: [examplereceiver/3] - exporters: [exampleexporter/2] diff --git a/service/internal/builder/testdata/pipelines_exporter_multi_pipeline.yaml b/service/internal/builder/testdata/pipelines_exporter_multi_pipeline.yaml new file mode 100644 index 000000000000..f0932b14d202 --- /dev/null +++ b/service/internal/builder/testdata/pipelines_exporter_multi_pipeline.yaml @@ -0,0 +1,37 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [ examplereceiver ] + processors: [ exampleprocessor ] + exporters: [ exampleexporter ] + + traces/1: + receivers: [ examplereceiver ] + exporters: [ exampleexporter ] + + metrics: + receivers: [ examplereceiver ] + processors: [ exampleprocessor ] + exporters: [ exampleexporter ] + + metrics/1: + receivers: [ examplereceiver ] + exporters: [ exampleexporter ] + + logs: + receivers: [ examplereceiver ] + processors: [ exampleprocessor ] + exporters: [ exampleexporter ] + + logs/1: + receivers: [ examplereceiver ] + exporters: [ exampleexporter ] diff --git a/service/internal/builder/testdata/pipelines_multi.yaml b/service/internal/builder/testdata/pipelines_multi.yaml new file mode 100644 index 000000000000..8f08e07dc757 --- /dev/null +++ b/service/internal/builder/testdata/pipelines_multi.yaml @@ -0,0 +1,28 @@ +receivers: + examplereceiver: + examplereceiver/1: + +processors: + exampleprocessor: + exampleprocessor/1: + +exporters: + exampleexporter: + exampleexporter/1: + +service: + pipelines: + traces: + receivers: [ examplereceiver, examplereceiver/1 ] + processors: [ exampleprocessor, exampleprocessor/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + metrics: + receivers: [ examplereceiver, examplereceiver/1 ] + processors: [ exampleprocessor, exampleprocessor/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + logs: + receivers: [ examplereceiver, examplereceiver/1 ] + processors: [ exampleprocessor, exampleprocessor/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] diff --git a/service/internal/builder/testdata/pipelines_multi_no_proc.yaml b/service/internal/builder/testdata/pipelines_multi_no_proc.yaml new file mode 100644 index 000000000000..5746e41fe5f4 --- /dev/null +++ b/service/internal/builder/testdata/pipelines_multi_no_proc.yaml @@ -0,0 +1,21 @@ +receivers: + examplereceiver: + examplereceiver/1: + +exporters: + exampleexporter: + exampleexporter/1: + +service: + pipelines: + traces: + receivers: [ examplereceiver, examplereceiver/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + metrics: + receivers: [ examplereceiver, examplereceiver/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] + + logs: + receivers: [ examplereceiver, examplereceiver/1 ] + exporters: [ exampleexporter, exampleexporter/1 ] diff --git a/service/internal/builder/testdata/pipelines_simple.yaml b/service/internal/builder/testdata/pipelines_simple.yaml new file mode 100644 index 000000000000..c18ca604ab5b --- /dev/null +++ b/service/internal/builder/testdata/pipelines_simple.yaml @@ -0,0 +1,25 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [exampleexporter] + + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [exampleexporter] + + logs: + receivers: [examplereceiver] + processors: [exampleprocessor] + exporters: [exampleexporter] diff --git a/service/internal/builder/testdata/pipelines_simple_multi_proc.yaml b/service/internal/builder/testdata/pipelines_simple_multi_proc.yaml new file mode 100644 index 000000000000..bb51c870843b --- /dev/null +++ b/service/internal/builder/testdata/pipelines_simple_multi_proc.yaml @@ -0,0 +1,25 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + processors: [exampleprocessor, exampleprocessor] + exporters: [exampleexporter] + + metrics: + receivers: [examplereceiver] + processors: [exampleprocessor, exampleprocessor] + exporters: [exampleexporter] + + logs: + receivers: [examplereceiver] + processors: [exampleprocessor, exampleprocessor] + exporters: [exampleexporter] diff --git a/service/internal/builder/testdata/pipelines_simple_no_proc.yaml b/service/internal/builder/testdata/pipelines_simple_no_proc.yaml new file mode 100644 index 000000000000..351687e69c59 --- /dev/null +++ b/service/internal/builder/testdata/pipelines_simple_no_proc.yaml @@ -0,0 +1,22 @@ +receivers: + examplereceiver: + +processors: + exampleprocessor: + +exporters: + exampleexporter: + +service: + pipelines: + traces: + receivers: [examplereceiver] + exporters: [exampleexporter] + + metrics: + receivers: [examplereceiver] + exporters: [exampleexporter] + + logs: + receivers: [examplereceiver] + exporters: [exampleexporter] diff --git a/service/internal/builder/testdata/unknown_exporter_config.yaml b/service/internal/builder/testdata/unknown_exporter_config.yaml new file mode 100644 index 000000000000..a205cbfa0169 --- /dev/null +++ b/service/internal/builder/testdata/unknown_exporter_config.yaml @@ -0,0 +1,10 @@ +receivers: + nop: +exporters: + nop: + +service: + pipelines: + logs: + receivers: [nop] + exporters: [nop/1] \ No newline at end of file diff --git a/service/internal/builder/testdata/unknown_exporter_factory.yaml b/service/internal/builder/testdata/unknown_exporter_factory.yaml new file mode 100644 index 000000000000..51e20dba8006 --- /dev/null +++ b/service/internal/builder/testdata/unknown_exporter_factory.yaml @@ -0,0 +1,10 @@ +receivers: + nop: +exporters: + unknown: + +service: + pipelines: + logs: + receivers: [nop] + exporters: [unknown] diff --git a/service/internal/builder/testdata/unknown_processor_config.yaml b/service/internal/builder/testdata/unknown_processor_config.yaml new file mode 100644 index 000000000000..f7df08fb164c --- /dev/null +++ b/service/internal/builder/testdata/unknown_processor_config.yaml @@ -0,0 +1,13 @@ +receivers: + nop: +processors: + nop: +exporters: + nop: + +service: + pipelines: + metrics: + receivers: [nop] + processors: [nop/1] + exporters: [nop] \ No newline at end of file diff --git a/service/internal/builder/testdata/unknown_processor_factory.yaml b/service/internal/builder/testdata/unknown_processor_factory.yaml new file mode 100644 index 000000000000..315b40c00207 --- /dev/null +++ b/service/internal/builder/testdata/unknown_processor_factory.yaml @@ -0,0 +1,13 @@ +receivers: + nop: +processors: + unknown: +exporters: + nop: + +service: + pipelines: + metrics: + receivers: [nop] + processors: [unknown] + exporters: [nop] \ No newline at end of file diff --git a/service/internal/builder/testdata/unknown_receiver_config.yaml b/service/internal/builder/testdata/unknown_receiver_config.yaml new file mode 100644 index 000000000000..ad1558070ed7 --- /dev/null +++ b/service/internal/builder/testdata/unknown_receiver_config.yaml @@ -0,0 +1,10 @@ +receivers: + nop: +exporters: + nop: + +service: + pipelines: + traces: + receivers: [nop/1] + exporters: [nop] diff --git a/service/internal/builder/testdata/unknown_receiver_factory.yaml b/service/internal/builder/testdata/unknown_receiver_factory.yaml new file mode 100644 index 000000000000..8e0eb5952dde --- /dev/null +++ b/service/internal/builder/testdata/unknown_receiver_factory.yaml @@ -0,0 +1,10 @@ +receivers: + unknown: +exporters: + nop: + +service: + pipelines: + traces: + receivers: [unknown] + exporters: [nop] diff --git a/service/internal/builder/testdata/unused_receiver.yaml b/service/internal/builder/testdata/unused_receiver.yaml deleted file mode 100644 index f8dc448ac284..000000000000 --- a/service/internal/builder/testdata/unused_receiver.yaml +++ /dev/null @@ -1,12 +0,0 @@ -receivers: - examplereceiver: - examplereceiver/2: -processors: -exporters: - exampleexporter: - -service: - pipelines: - traces: - receivers: [examplereceiver] - exporters: [exampleexporter] \ No newline at end of file diff --git a/service/internal/extensions/extensions.go b/service/internal/extensions/extensions.go index 176d241d60a8..9459c9b72d02 100644 --- a/service/internal/extensions/extensions.go +++ b/service/internal/extensions/extensions.go @@ -17,6 +17,8 @@ package extensions // import "go.opentelemetry.io/collector/service/internal/ext import ( "context" "fmt" + "net/http" + "sort" "go.uber.org/multierr" "go.uber.org/zap" @@ -24,18 +26,22 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/zpages" ) +const zExtensionName = "zextensionname" + // BuiltExtensions is a map of extensions created from extension configs. type BuiltExtensions struct { - settings component.TelemetrySettings - extMap map[config.ComponentID]component.Extension + telemetry component.TelemetrySettings + extMap map[config.ComponentID]component.Extension } // StartAll starts all extensions. -func (exts BuiltExtensions) StartAll(ctx context.Context, host component.Host) error { - for extID, ext := range exts.extMap { - extLogger := extensionLogger(exts.settings.Logger, extID) +func (bes *BuiltExtensions) StartAll(ctx context.Context, host component.Host) error { + bes.telemetry.Logger.Info("Starting extensions...") + for extID, ext := range bes.extMap { + extLogger := extensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { return err @@ -46,20 +52,21 @@ func (exts BuiltExtensions) StartAll(ctx context.Context, host component.Host) e } // ShutdownAll stops all extensions. -func (exts BuiltExtensions) ShutdownAll(ctx context.Context) error { +func (bes *BuiltExtensions) ShutdownAll(ctx context.Context) error { + bes.telemetry.Logger.Info("Stopping extensions...") var errs error - for _, ext := range exts.extMap { + for _, ext := range bes.extMap { errs = multierr.Append(errs, ext.Shutdown(ctx)) } return errs } -func (exts BuiltExtensions) NotifyPipelineReady() error { - for extID, ext := range exts.extMap { +func (bes *BuiltExtensions) NotifyPipelineReady() error { + for extID, ext := range bes.extMap { if pw, ok := ext.(component.PipelineWatcher); ok { if err := pw.Ready(); err != nil { - extensionLogger(exts.settings.Logger, extID).Error("Error notifying extension that the pipeline was started.") + extensionLogger(bes.telemetry.Logger, extID).Error("Error notifying extension that the pipeline was started.") return err } } @@ -67,10 +74,10 @@ func (exts BuiltExtensions) NotifyPipelineReady() error { return nil } -func (exts BuiltExtensions) NotifyPipelineNotReady() error { +func (bes *BuiltExtensions) NotifyPipelineNotReady() error { // Notify extensions in reverse order. var errs error - for _, ext := range exts.extMap { + for _, ext := range bes.extMap { if pw, ok := ext.(component.PipelineWatcher); ok { errs = multierr.Append(errs, pw.NotReady()) } @@ -78,14 +85,40 @@ func (exts BuiltExtensions) NotifyPipelineNotReady() error { return errs } -func (exts BuiltExtensions) ToMap() map[config.ComponentID]component.Extension { - result := make(map[config.ComponentID]component.Extension, len(exts.extMap)) - for extID, v := range exts.extMap { +func (bes *BuiltExtensions) GetExtensions() map[config.ComponentID]component.Extension { + result := make(map[config.ComponentID]component.Extension, len(bes.extMap)) + for extID, v := range bes.extMap { result[extID] = v } return result } +func (bes *BuiltExtensions) ZPagesRequest(w http.ResponseWriter, r *http.Request) { + extensionName := r.URL.Query().Get(zExtensionName) + + w.Header().Set("Content-Type", "text/html; charset=utf-8") + zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Extensions"}) + data := zpages.SummaryExtensionsTableData{} + + data.Rows = make([]zpages.SummaryExtensionsTableRowData, 0, len(bes.extMap)) + for id := range bes.extMap { + row := zpages.SummaryExtensionsTableRowData{FullName: id.String()} + data.Rows = append(data.Rows, row) + } + + sort.Slice(data.Rows, func(i, j int) bool { + return data.Rows[i].FullName < data.Rows[j].FullName + }) + zpages.WriteHTMLExtensionsSummaryTable(w, data) + if extensionName != "" { + zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ + Name: extensionName, + }) + // TODO: Add config + status info. + } + zpages.WriteHTMLPageFooter(w) +} + // Build builds BuiltExtensions from config. func Build( ctx context.Context, @@ -96,8 +129,8 @@ func Build( factories map[config.Type]component.ExtensionFactory, ) (*BuiltExtensions, error) { exts := &BuiltExtensions{ - settings: settings, - extMap: make(map[config.ComponentID]component.Extension), + telemetry: settings, + extMap: make(map[config.ComponentID]component.Extension), } for _, extID := range serviceExtensions { extCfg, existsCfg := extensionsConfigs[extID] diff --git a/service/internal/extensions/extensions_test.go b/service/internal/extensions/extensions_test.go index 9565948d3246..38fc674f5e04 100644 --- a/service/internal/extensions/extensions_test.go +++ b/service/internal/extensions/extensions_test.go @@ -27,17 +27,10 @@ import ( "go.opentelemetry.io/collector/config" ) -func TestServiceSetupExtensions(t *testing.T) { - errExtensionFactory := component.NewExtensionFactory( - "err", - func() config.Extension { - cfg := config.NewExtensionSettings(config.NewComponentID("err")) - return &cfg - }, - func(ctx context.Context, set component.ExtensionCreateSettings, extension config.Extension) (component.Extension, error) { - return nil, errors.New("cannot create \"err\" extension type") - }, - ) +func TestBuildExtensions(t *testing.T) { + nopExtensionFactory := componenttest.NewNopExtensionFactory() + nopExtensionConfig := nopExtensionFactory.CreateDefaultConfig() + errExtensionFactory := newCreateErrorExtensionFactory() errExtensionConfig := errExtensionFactory.CreateDefaultConfig() badExtensionFactory := newBadExtensionFactory() badExtensionCfg := badExtensionFactory.CreateDefaultConfig() @@ -59,12 +52,12 @@ func TestServiceSetupExtensions(t *testing.T) { { name: "missing_extension_factory", extensionsConfigs: map[config.ComponentID]config.Extension{ - config.NewComponentID(errExtensionFactory.Type()): errExtensionConfig, + config.NewComponentID("unknown"): nopExtensionConfig, }, serviceExtensions: []config.ComponentID{ - config.NewComponentID(errExtensionFactory.Type()), + config.NewComponentID("unknown"), }, - wantErrMsg: "extension factory for type \"err\" is not configured", + wantErrMsg: "extension factory for type \"unknown\" is not configured", }, { name: "error_on_create_extension", @@ -122,3 +115,19 @@ func newBadExtensionFactory() component.ExtensionFactory { }, ) } + +func newCreateErrorExtensionFactory() component.ExtensionFactory { + return component.NewExtensionFactory( + "err", + func() config.Extension { + return &struct { + config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + }{ + ExtensionSettings: config.NewExtensionSettings(config.NewComponentID("err")), + } + }, + func(ctx context.Context, set component.ExtensionCreateSettings, extension config.Extension) (component.Extension, error) { + return nil, errors.New("cannot create \"err\" extension type") + }, + ) +} diff --git a/service/service.go b/service/service.go index 6da5cefd3437..70502d01a3a8 100644 --- a/service/service.go +++ b/service/service.go @@ -68,48 +68,22 @@ func newService(set *settings) (*service, error) { return nil, fmt.Errorf("cannot build extensions: %w", err) } - // Pipeline is built backwards, starting from exporters, so that we create objects - // which are referenced before objects which reference them. - - // First create exporters. - if srv.host.builtExporters, err = builder.BuildExporters(context.Background(), srv.telemetry, srv.buildInfo, srv.config, srv.host.factories.Exporters); err != nil { - return nil, fmt.Errorf("cannot build exporters: %w", err) - } - - // Create pipelines and their processors and plug exporters to the end of the pipelines. - if srv.host.builtPipelines, err = builder.BuildPipelines(srv.telemetry, srv.buildInfo, srv.config, srv.host.builtExporters, srv.host.factories.Processors); err != nil { + if srv.host.builtPipelines, err = builder.Build(context.Background(), srv.telemetry, srv.buildInfo, srv.config, srv.host.factories); err != nil { return nil, fmt.Errorf("cannot build pipelines: %w", err) } - // Create receivers and plug them into the start of the pipelines. - if srv.host.builtReceivers, err = builder.BuildReceivers(srv.telemetry, srv.buildInfo, srv.config, srv.host.builtPipelines, srv.host.factories.Receivers); err != nil { - return nil, fmt.Errorf("cannot build receivers: %w", err) - } - return srv, nil } func (srv *service) Start(ctx context.Context) error { - srv.telemetry.Logger.Info("Starting extensions...") if err := srv.host.builtExtensions.StartAll(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) } - srv.telemetry.Logger.Info("Starting exporters...") - if err := srv.host.builtExporters.StartAll(ctx, srv.host); err != nil { + if err := srv.host.builtPipelines.StartAll(ctx, srv.host); err != nil { return fmt.Errorf("cannot start exporters: %w", err) } - srv.telemetry.Logger.Info("Starting processors...") - if err := srv.host.builtPipelines.StartProcessors(ctx, srv.host); err != nil { - return fmt.Errorf("cannot start processors: %w", err) - } - - srv.telemetry.Logger.Info("Starting receivers...") - if err := srv.host.builtReceivers.StartAll(ctx, srv.host); err != nil { - return fmt.Errorf("cannot start receivers: %w", err) - } - return srv.host.builtExtensions.NotifyPipelineReady() } @@ -121,26 +95,11 @@ func (srv *service) Shutdown(ctx context.Context) error { errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err)) } - // Pipeline shutdown order is the reverse of building/starting: first receivers, then flushing pipelines - // giving senders a chance to send all their data. This may take time, the allowed - // time should be part of configuration. - - srv.telemetry.Logger.Info("Stopping receivers...") - if err := srv.host.builtReceivers.ShutdownAll(ctx); err != nil { - errs = multierr.Append(errs, fmt.Errorf("failed to shutdown receivers: %w", err)) - } - - srv.telemetry.Logger.Info("Stopping processors...") - if err := srv.host.builtPipelines.ShutdownProcessors(ctx); err != nil { - errs = multierr.Append(errs, fmt.Errorf("failed to shutdown processors: %w", err)) - } - - srv.telemetry.Logger.Info("Stopping exporters...") - if err := srv.host.builtExporters.ShutdownAll(ctx); err != nil { + // Pipeline shutdown order is the reverse of building/starting. + if err := srv.host.builtPipelines.ShutdownAll(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown exporters: %w", err)) } - srv.telemetry.Logger.Info("Stopping extensions...") if err := srv.host.builtExtensions.ShutdownAll(ctx); err != nil { errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err)) } diff --git a/service/zpages.go b/service/zpages.go index 5421b468703e..c3eac389453b 100644 --- a/service/zpages.go +++ b/service/zpages.go @@ -17,7 +17,6 @@ package service // import "go.opentelemetry.io/collector/service" import ( "net/http" "path" - "sort" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/featuregate" @@ -30,23 +29,16 @@ const ( pipelinezPath = "pipelinez" extensionzPath = "extensionz" featurezPath = "featurez" - - zPipelineName = "zpipelinename" - zComponentName = "zcomponentname" - zComponentKind = "zcomponentkind" - zExtensionName = "zextensionname" ) func (host *serviceHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) { - mux.HandleFunc(path.Join(pathPrefix, servicezPath), host.handleServicezRequest) - mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), host.handlePipelinezRequest) + mux.HandleFunc(path.Join(pathPrefix, servicezPath), host.zPagesRequest) + mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), host.builtPipelines.ZPagesRequest) + mux.HandleFunc(path.Join(pathPrefix, extensionzPath), host.builtExtensions.ZPagesRequest) mux.HandleFunc(path.Join(pathPrefix, featurezPath), handleFeaturezRequest) - mux.HandleFunc(path.Join(pathPrefix, extensionzPath), func(w http.ResponseWriter, r *http.Request) { - handleExtensionzRequest(host, w, r) - }) } -func (host *serviceHost) handleServicezRequest(w http.ResponseWriter, r *http.Request) { +func (host *serviceHost) zPagesRequest(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Service " + host.buildInfo.Command}) zpages.WriteHTMLPropertiesTable(w, zpages.PropertiesTableData{Name: "Build Info", Properties: getBuildInfoProperties(host.buildInfo)}) @@ -69,94 +61,6 @@ func (host *serviceHost) handleServicezRequest(w http.ResponseWriter, r *http.Re zpages.WriteHTMLPageFooter(w) } -func (host *serviceHost) handlePipelinezRequest(w http.ResponseWriter, r *http.Request) { - qValues := r.URL.Query() - pipelineName := qValues.Get(zPipelineName) - componentName := qValues.Get(zComponentName) - componentKind := qValues.Get(zComponentKind) - - w.Header().Set("Content-Type", "text/html; charset=utf-8") - zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Pipelines"}) - zpages.WriteHTMLPipelinesSummaryTable(w, host.getPipelinesSummaryTableData()) - if pipelineName != "" && componentName != "" && componentKind != "" { - fullName := componentName - if componentKind == "processor" { - fullName = pipelineName + "/" + componentName - } - zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ - Name: componentKind + ": " + fullName, - }) - // TODO: Add config + status info. - } - zpages.WriteHTMLPageFooter(w) -} - -func (host *serviceHost) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData { - data := zpages.SummaryPipelinesTableData{} - - data.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(host.builtPipelines)) - for c, p := range host.builtPipelines { - // TODO: Change the template to use ID. - var recvs []string - for _, recvID := range p.Config.Receivers { - recvs = append(recvs, recvID.String()) - } - var procs []string - for _, procID := range p.Config.Processors { - procs = append(procs, procID.String()) - } - var exps []string - for _, expID := range p.Config.Exporters { - exps = append(exps, expID.String()) - } - row := zpages.SummaryPipelinesTableRowData{ - FullName: c.String(), - InputType: string(c.Type()), - MutatesData: p.MutatesData, - Receivers: recvs, - Processors: procs, - Exporters: exps, - } - data.Rows = append(data.Rows, row) - } - - sort.Slice(data.Rows, func(i, j int) bool { - return data.Rows[i].FullName < data.Rows[j].FullName - }) - return data -} - -func handleExtensionzRequest(host component.Host, w http.ResponseWriter, r *http.Request) { - extensionName := r.URL.Query().Get(zExtensionName) - - w.Header().Set("Content-Type", "text/html; charset=utf-8") - zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Extensions"}) - zpages.WriteHTMLExtensionsSummaryTable(w, getExtensionsSummaryTableData(host)) - if extensionName != "" { - zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{ - Name: extensionName, - }) - // TODO: Add config + status info. - } - zpages.WriteHTMLPageFooter(w) -} - -func getExtensionsSummaryTableData(host component.Host) zpages.SummaryExtensionsTableData { - data := zpages.SummaryExtensionsTableData{} - - extensions := host.GetExtensions() - data.Rows = make([]zpages.SummaryExtensionsTableRowData, 0, len(extensions)) - for c := range extensions { - row := zpages.SummaryExtensionsTableRowData{FullName: c.String()} - data.Rows = append(data.Rows, row) - } - - sort.Slice(data.Rows, func(i, j int) bool { - return data.Rows[i].FullName < data.Rows[j].FullName - }) - return data -} - func handleFeaturezRequest(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Feature Gates"})