diff --git a/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml b/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml new file mode 100644 index 000000000000..ca4d1c8218f0 --- /dev/null +++ b/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml @@ -0,0 +1,17 @@ +change_type: enhancement + +component: googlecloudpubsubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Turn noisy `warn` log about Pub/Sub servers into `debug`, and turn the reset count into a metric + +issues: [37571] + +subtext: | + The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers + recurrently close the connection after a time period to avoid a long-running sticky connection. Before the + receiver logged `warn` log lines everytime this happened. These log lines are moved to debug so that fleets with + lots of collectors with the receiver don't span logs at warn level. To keep track of the resets, whenever a + connection reset happens a `otelcol_receiver_googlecloudpubsub_stream_restarts` metric is increased by one. + +change_logs: [user] diff --git a/receiver/googlecloudpubsubreceiver/documentation.md b/receiver/googlecloudpubsubreceiver/documentation.md new file mode 100644 index 000000000000..7cde645140cc --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/documentation.md @@ -0,0 +1,20 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# googlecloudpubsub + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_receiver.googlecloudpubsub.stream_restarts + +Number of times the stream (re)starts due to a Pub/Sub servers connection close + +The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers +recurrently close the connection after a time period to avoid a long-running sticky connection. This metric +counts the number of the resets that occurred during the lifetime of the container. + + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/receiver/googlecloudpubsubreceiver/factory.go b/receiver/googlecloudpubsubreceiver/factory.go index 802718a55fb4..d627c3d0efbc 100644 --- a/receiver/googlecloudpubsubreceiver/factory.go +++ b/receiver/googlecloudpubsubreceiver/factory.go @@ -41,24 +41,24 @@ func (factory *pubsubReceiverFactory) CreateDefaultConfig() component.Config { return &Config{} } -func (factory *pubsubReceiverFactory) ensureReceiver(params receiver.Settings, config component.Config) (*pubsubReceiver, error) { +func (factory *pubsubReceiverFactory) ensureReceiver(settings receiver.Settings, config component.Config) (*pubsubReceiver, error) { receiver := factory.receivers[config.(*Config)] if receiver != nil { return receiver, nil } rconfig := config.(*Config) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: params.ID, + ReceiverID: settings.ID, Transport: reportTransport, - ReceiverCreateSettings: params, + ReceiverCreateSettings: settings, }) if err != nil { return nil, err } receiver = &pubsubReceiver{ - logger: params.Logger, + settings: settings, obsrecv: obsrecv, - userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", params.BuildInfo.Version), + userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", settings.BuildInfo.Version), config: rconfig, } factory.receivers[config.(*Config)] = receiver diff --git a/receiver/googlecloudpubsubreceiver/go.mod b/receiver/googlecloudpubsubreceiver/go.mod index c4a930341d8c..58f0d85edd03 100644 --- a/receiver/googlecloudpubsubreceiver/go.mod +++ b/receiver/googlecloudpubsubreceiver/go.mod @@ -20,6 +20,10 @@ require ( go.opentelemetry.io/collector/pdata v1.25.0 go.opentelemetry.io/collector/receiver v0.119.0 go.opentelemetry.io/collector/receiver/receivertest v0.119.0 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 @@ -72,11 +76,7 @@ require ( go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect diff --git a/receiver/googlecloudpubsubreceiver/internal/handler.go b/receiver/googlecloudpubsubreceiver/internal/handler.go index 58695666ed94..17b126860b0d 100644 --- a/receiver/googlecloudpubsubreceiver/internal/handler.go +++ b/receiver/googlecloudpubsubreceiver/internal/handler.go @@ -13,9 +13,14 @@ import ( "time" "cloud.google.com/go/pubsub/apiv1/pubsubpb" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" ) // Time to wait before restarting, when the stream stopped @@ -36,7 +41,8 @@ type StreamHandler struct { streamWaitGroup sync.WaitGroup // wait group for the handler handlerWaitGroup sync.WaitGroup - logger *zap.Logger + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder // time that acknowledge loop waits before acknowledging messages ackBatchWait time.Duration @@ -51,19 +57,21 @@ func (handler *StreamHandler) ack(ackID string) { func NewHandler( ctx context.Context, - logger *zap.Logger, + settings receiver.Settings, + telemetryBuilder *metadata.TelemetryBuilder, client SubscriberClient, clientID string, subscription string, callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error, ) (*StreamHandler, error) { handler := StreamHandler{ - logger: logger, - client: client, - clientID: clientID, - subscription: subscription, - pushMessage: callback, - ackBatchWait: 10 * time.Second, + settings: settings, + telemetryBuilder: telemetryBuilder, + client: client, + clientID: clientID, + subscription: subscription, + pushMessage: callback, + ackBatchWait: 10 * time.Second, } return &handler, handler.initStream(ctx) } @@ -85,6 +93,11 @@ func (handler *StreamHandler) initStream(ctx context.Context) error { _ = handler.stream.CloseSend() return err } + handler.telemetryBuilder.ReceiverGooglecloudpubsubStreamRestarts.Add(ctx, 1, + metric.WithAttributes( + attribute.String("otelcol.component.kind", "receiver"), + attribute.String("otelcol.component.id", handler.settings.ID.String()), + )) return nil } @@ -102,7 +115,7 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) { var loopCtx context.Context loopCtx, cancel := context.WithCancel(ctx) - handler.logger.Info("Starting Streaming Pull") + handler.settings.Logger.Debug("Starting Streaming Pull") handler.streamWaitGroup.Add(2) go handler.requestStream(loopCtx, cancel) go handler.responseStream(loopCtx, cancel) @@ -117,13 +130,13 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) { if handler.isRunning.Load() { err := handler.initStream(ctx) if err != nil { - handler.logger.Error("Failed to recovery stream.") + handler.settings.Logger.Error("Failed to recovery stream.") } } - handler.logger.Warn("End of recovery loop, restarting.") + handler.settings.Logger.Debug("End of recovery loop, restarting.") time.Sleep(streamRecoveryBackoffPeriod) } - handler.logger.Warn("Shutting down recovery loop.") + handler.settings.Logger.Warn("Shutting down recovery loop.") handler.handlerWaitGroup.Done() } @@ -157,15 +170,15 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context. for { if err := handler.acknowledgeMessages(); err != nil { if errors.Is(err, io.EOF) { - handler.logger.Warn("EOF reached") + handler.settings.Logger.Warn("EOF reached") break } - handler.logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err)) + handler.settings.Logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err)) break } select { case <-ctx.Done(): - handler.logger.Warn("requestStream <-ctx.Done()") + handler.settings.Logger.Debug("requestStream <-ctx.Done()") case <-timer.C: timer.Reset(handler.ackBatchWait) } @@ -176,7 +189,7 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context. } } cancel() - handler.logger.Warn("Request Stream loop ended.") + handler.settings.Logger.Debug("Request Stream loop ended.") _ = handler.stream.CloseSend() handler.streamWaitGroup.Done() } @@ -202,18 +215,18 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context case errors.Is(err, io.EOF): activeStreaming = false case !grpcStatus: - handler.logger.Warn("response stream breaking on error", + handler.settings.Logger.Warn("response stream breaking on error", zap.Error(err)) activeStreaming = false case s.Code() == codes.Unavailable: - handler.logger.Info("response stream breaking on gRPC s 'Unavailable'") + handler.settings.Logger.Debug("response stream breaking on gRPC s 'Unavailable'") activeStreaming = false case s.Code() == codes.NotFound: - handler.logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream") + handler.settings.Logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream") time.Sleep(time.Second * 60) activeStreaming = false default: - handler.logger.Warn("response stream breaking on gRPC s "+s.Message(), + handler.settings.Logger.Warn("response stream breaking on gRPC s "+s.Message(), zap.String("s", s.Message()), zap.Error(err)) activeStreaming = false @@ -221,11 +234,11 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context } if errors.Is(ctx.Err(), context.Canceled) { // Canceling the loop, collector is probably stopping - handler.logger.Warn("response stream ctx.Err() == context.Canceled") + handler.settings.Logger.Warn("response stream ctx.Err() == context.Canceled") break } } cancel() - handler.logger.Warn("Response Stream loop ended.") + handler.settings.Logger.Debug("Response Stream loop ended.") handler.streamWaitGroup.Done() } diff --git a/receiver/googlecloudpubsubreceiver/internal/handler_test.go b/receiver/googlecloudpubsubreceiver/internal/handler_test.go index 94b285eb35ca..bf860038ff1c 100644 --- a/receiver/googlecloudpubsubreceiver/internal/handler_test.go +++ b/receiver/googlecloudpubsubreceiver/internal/handler_test.go @@ -12,10 +12,12 @@ import ( "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/pstest" "github.com/stretchr/testify/assert" - "go.uber.org/zap/zaptest" + "go.opentelemetry.io/collector/receiver/receivertest" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" ) func TestCancelStream(t *testing.T) { @@ -41,10 +43,13 @@ func TestCancelStream(t *testing.T) { }) assert.NoError(t, err) + settings := receivertest.NewNopSettings() + telemetryBuilder, _ := metadata.NewTelemetryBuilder(settings.TelemetrySettings) + client, err := pubsub.NewSubscriberClient(ctx, copts...) assert.NoError(t, err) - handler, err := NewHandler(ctx, zaptest.NewLogger(t), client, "client-id", "projects/my-project/subscriptions/otlp", + handler, err := NewHandler(ctx, settings, telemetryBuilder, client, "client-id", "projects/my-project/subscriptions/otlp", func(context.Context, *pubsubpb.ReceivedMessage) error { return nil }) diff --git a/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go new file mode 100644 index 000000000000..1f1ffdc1ba8c --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go @@ -0,0 +1,68 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + "sync" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ReceiverGooglecloudpubsubStreamRestarts metric.Int64Counter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// Shutdown unregister all registered callbacks for async instruments. +func (builder *TelemetryBuilder) Shutdown() { + builder.mu.Lock() + defer builder.mu.Unlock() + for _, reg := range builder.registrations { + reg.Unregister() + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op.apply(&builder) + } + builder.meter = Meter(settings) + var err, errs error + builder.ReceiverGooglecloudpubsubStreamRestarts, err = builder.meter.Int64Counter( + "otelcol_receiver.googlecloudpubsub.stream_restarts", + metric.WithDescription("Number of times the stream (re)starts due to a Pub/Sub servers connection close"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go new file mode 100644 index 000000000000..8e1906aa90f4 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,74 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := componenttest.NewNopTelemetrySettings() + applied := false + _, err := NewTelemetryBuilder(set, telemetryBuilderOptionFunc(func(b *TelemetryBuilder) { + applied = true + })) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go new file mode 100644 index 000000000000..974f9ba960ab --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go @@ -0,0 +1,89 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +// Deprecated: [v0.119.0] Use componenttest.Telemetry +type Telemetry struct { + *componenttest.Telemetry +} + +// Deprecated: [v0.119.0] Use componenttest.NewTelemetry +func SetupTelemetry(opts ...componenttest.TelemetryOption) Telemetry { + return Telemetry{Telemetry: componenttest.NewTelemetry(opts...)} +} + +// Deprecated: [v0.119.0] Use metadatatest.NewSettings +func (tt *Telemetry) NewSettings() receiver.Settings { + return NewSettings(tt.Telemetry) +} + +func NewSettings(tt *componenttest.Telemetry) receiver.Settings { + set := receivertest.NewNopSettings() + set.ID = component.NewID(component.MustNewType("googlecloudpubsub")) + set.TelemetrySettings = tt.NewTelemetrySettings() + return set +} + +// Deprecated: [v0.119.0] Use metadatatest.AssertEqual* +func (tt *Telemetry) AssertMetrics(t *testing.T, expected []metricdata.Metrics, opts ...metricdatatest.Option) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.Reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := getMetricFromResource(want.Name, md) + metricdatatest.AssertEqual(t, want, got, opts...) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), lenMetrics(md)) +} + +func AssertEqualReceiverGooglecloudpubsubStreamRestarts(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver.googlecloudpubsub.stream_restarts", + Description: "Number of times the stream (re)starts due to a Pub/Sub servers connection close", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver.googlecloudpubsub.stream_restarts") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func getMetricFromResource(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func lenMetrics(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go new file mode 100644 index 000000000000..84e0b9500ecd --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go @@ -0,0 +1,42 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" +) + +func TestSetupTelemetry(t *testing.T) { + testTel := SetupTelemetry() + tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) + require.NoError(t, err) + defer tb.Shutdown() + tb.ReceiverGooglecloudpubsubStreamRestarts.Add(context.Background(), 1) + + testTel.AssertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_receiver.googlecloudpubsub.stream_restarts", + Description: "Number of times the stream (re)starts due to a Pub/Sub servers connection close", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {}, + }, + }, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + AssertEqualReceiverGooglecloudpubsubStreamRestarts(t, testTel.Telemetry, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + + require.NoError(t, testTel.Shutdown(context.Background())) +} diff --git a/receiver/googlecloudpubsubreceiver/metadata.yaml b/receiver/googlecloudpubsubreceiver/metadata.yaml index 28d8f8e03fbd..f5fbc6ac1c87 100644 --- a/receiver/googlecloudpubsubreceiver/metadata.yaml +++ b/receiver/googlecloudpubsubreceiver/metadata.yaml @@ -8,6 +8,20 @@ status: codeowners: active: [alexvanboxel] +telemetry: + metrics: + receiver.googlecloudpubsub.stream_restarts: + enabled: true + description: Number of times the stream (re)starts due to a Pub/Sub servers connection close + unit: "1" + sum: + value_type: int + monotonic: true + extended_documentation: | + The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers + recurrently close the connection after a time period to avoid a long-running sticky connection. This metric + counts the number of the resets that occurred during the lifetime of the container. + tests: config: project: my-project diff --git a/receiver/googlecloudpubsubreceiver/receiver.go b/receiver/googlecloudpubsubreceiver/receiver.go index 9fb36f6b1d6d..4c444f2d8f97 100644 --- a/receiver/googlecloudpubsubreceiver/receiver.go +++ b/receiver/googlecloudpubsubreceiver/receiver.go @@ -21,16 +21,17 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" ) // https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#streamingpullrequest type pubsubReceiver struct { - logger *zap.Logger + settings receiver.Settings obsrecv *receiverhelper.ObsReport tracesConsumer consumer.Traces metricsConsumer consumer.Metrics @@ -43,6 +44,7 @@ type pubsubReceiver struct { logsUnmarshaler plog.Unmarshaler handler *internal.StreamHandler startOnce sync.Once + telemetryBuilder *metadata.TelemetryBuilder } type buildInEncoding int @@ -118,6 +120,11 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, host component.Host) return } receiver.client = client + receiver.telemetryBuilder, err = metadata.NewTelemetryBuilder(receiver.settings.TelemetrySettings) + if err != nil { + startErr = fmt.Errorf("failed to create telemetry builder: %w", err) + return + } err = createHandlerFn(ctx) if err != nil { @@ -194,9 +201,9 @@ func (receiver *pubsubReceiver) setMarshallerFromEncodingID(encodingID buildInEn func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { if receiver.handler != nil { - receiver.logger.Info("Stopping Google Pubsub receiver") + receiver.settings.Logger.Info("Stopping Google Pubsub receiver") receiver.handler.CancelNow() - receiver.logger.Info("Stopped Google Pubsub receiver") + receiver.settings.Logger.Info("Stopped Google Pubsub receiver") receiver.handler = nil } if receiver.client == nil { @@ -370,7 +377,8 @@ func (receiver *pubsubReceiver) createMultiplexingReceiverHandler(ctx context.Co var err error receiver.handler, err = internal.NewHandler( ctx, - receiver.logger, + receiver.settings, + receiver.telemetryBuilder, receiver.client, receiver.config.ClientID, receiver.config.Subscription, @@ -432,7 +440,8 @@ func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error receiver.handler, err = internal.NewHandler( ctx, - receiver.logger, + receiver.settings, + receiver.telemetryBuilder, receiver.client, receiver.config.ClientID, receiver.config.Subscription, diff --git a/receiver/googlecloudpubsubreceiver/receiver_test.go b/receiver/googlecloudpubsubreceiver/receiver_test.go index 72eca61b1315..9d32fa157cc4 100644 --- a/receiver/googlecloudpubsubreceiver/receiver_test.go +++ b/receiver/googlecloudpubsubreceiver/receiver_test.go @@ -18,8 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/testdata" @@ -27,9 +25,9 @@ import ( func createBaseReceiver() (*pstest.Server, *pubsubReceiver) { srv := pstest.NewServer() - core, _ := observer.New(zap.WarnLevel) + settings := receivertest.NewNopSettings() return srv, &pubsubReceiver{ - logger: zap.New(core), + settings: settings, userAgent: "test-user-agent", config: &Config{ @@ -100,8 +98,7 @@ func TestReceiver(t *testing.T) { }) assert.NoError(t, err) - core, _ := observer.New(zap.WarnLevel) - params := receivertest.NewNopSettings() + settings := receivertest.NewNopSettings() traceSink := new(consumertest.TracesSink) metricSink := new(consumertest.MetricsSink) logSink := new(consumertest.LogsSink) @@ -110,12 +107,12 @@ func TestReceiver(t *testing.T) { ReceiverID: component.NewID(metadata.Type), Transport: reportTransport, LongLivedCtx: false, - ReceiverCreateSettings: params, + ReceiverCreateSettings: settings, }) require.NoError(t, err) receiver := &pubsubReceiver{ - logger: zap.New(core), + settings: settings, obsrecv: obsrecv, userAgent: "test-user-agent",