Skip to content

Commit

Permalink
[receiver/googlecloudpubsub] Turn noisy warn in reset metric (#37571)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvanboxel committed Feb 5, 2025
1 parent f399f0f commit 3e893b0
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 47 deletions.
17 changes: 17 additions & 0 deletions .chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
change_type: enhancement

component: googlecloudpubsubreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Turn noisy `warn` log about Pub/Sub servers into `debug`, and turn the reset count into a metric

issues: [37571]

subtext: |
The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers
recurrently close the connection after a time period to avoid a long-running sticky connection. Before the
receiver logged `warn` log lines everytime this happened. These log lines are moved to debug so that fleets with
lots of collectors with the receiver don't span logs at warn level. To keep track of the resets, whenever a
connection reset happens a `otelcol_receiver_googlecloudpubsub_stream_restarts` metric is increased by one.
change_logs: [user]
20 changes: 20 additions & 0 deletions receiver/googlecloudpubsubreceiver/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# googlecloudpubsub

## Internal Telemetry

The following telemetry is emitted by this component.

### otelcol_receiver.googlecloudpubsub.stream_restarts

Number of times the stream (re)starts due to a Pub/Sub servers connection close

The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers
recurrently close the connection after a time period to avoid a long-running sticky connection. This metric
counts the number of the resets that occurred during the lifetime of the container.


| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |
10 changes: 5 additions & 5 deletions receiver/googlecloudpubsubreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@ func (factory *pubsubReceiverFactory) CreateDefaultConfig() component.Config {
return &Config{}
}

func (factory *pubsubReceiverFactory) ensureReceiver(params receiver.Settings, config component.Config) (*pubsubReceiver, error) {
func (factory *pubsubReceiverFactory) ensureReceiver(settings receiver.Settings, config component.Config) (*pubsubReceiver, error) {
receiver := factory.receivers[config.(*Config)]
if receiver != nil {
return receiver, nil
}
rconfig := config.(*Config)
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: params.ID,
ReceiverID: settings.ID,
Transport: reportTransport,
ReceiverCreateSettings: params,
ReceiverCreateSettings: settings,
})
if err != nil {
return nil, err
}
receiver = &pubsubReceiver{
logger: params.Logger,
settings: settings,
obsrecv: obsrecv,
userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", params.BuildInfo.Version),
userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", settings.BuildInfo.Version),
config: rconfig,
}
factory.receivers[config.(*Config)] = receiver
Expand Down
8 changes: 4 additions & 4 deletions receiver/googlecloudpubsubreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ require (
go.opentelemetry.io/collector/pdata v1.25.0
go.opentelemetry.io/collector/receiver v0.119.0
go.opentelemetry.io/collector/receiver/receivertest v0.119.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.uber.org/goleak v1.3.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -72,11 +76,7 @@ require (
go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/oauth2 v0.25.0 // indirect
Expand Down
57 changes: 35 additions & 22 deletions receiver/googlecloudpubsubreceiver/internal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ import (
"time"

"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata"
)

// Time to wait before restarting, when the stream stopped
Expand All @@ -36,7 +41,8 @@ type StreamHandler struct {
streamWaitGroup sync.WaitGroup
// wait group for the handler
handlerWaitGroup sync.WaitGroup
logger *zap.Logger
settings receiver.Settings
telemetryBuilder *metadata.TelemetryBuilder
// time that acknowledge loop waits before acknowledging messages
ackBatchWait time.Duration

Expand All @@ -51,19 +57,21 @@ func (handler *StreamHandler) ack(ackID string) {

func NewHandler(
ctx context.Context,
logger *zap.Logger,
settings receiver.Settings,
telemetryBuilder *metadata.TelemetryBuilder,
client SubscriberClient,
clientID string,
subscription string,
callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error,
) (*StreamHandler, error) {
handler := StreamHandler{
logger: logger,
client: client,
clientID: clientID,
subscription: subscription,
pushMessage: callback,
ackBatchWait: 10 * time.Second,
settings: settings,
telemetryBuilder: telemetryBuilder,
client: client,
clientID: clientID,
subscription: subscription,
pushMessage: callback,
ackBatchWait: 10 * time.Second,
}
return &handler, handler.initStream(ctx)
}
Expand All @@ -85,6 +93,11 @@ func (handler *StreamHandler) initStream(ctx context.Context) error {
_ = handler.stream.CloseSend()
return err
}
handler.telemetryBuilder.ReceiverGooglecloudpubsubStreamRestarts.Add(ctx, 1,
metric.WithAttributes(
attribute.String("otelcol.component.kind", "receiver"),
attribute.String("otelcol.component.id", handler.settings.ID.String()),
))
return nil
}

Expand All @@ -102,7 +115,7 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) {
var loopCtx context.Context
loopCtx, cancel := context.WithCancel(ctx)

handler.logger.Info("Starting Streaming Pull")
handler.settings.Logger.Debug("Starting Streaming Pull")
handler.streamWaitGroup.Add(2)
go handler.requestStream(loopCtx, cancel)
go handler.responseStream(loopCtx, cancel)
Expand All @@ -117,13 +130,13 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) {
if handler.isRunning.Load() {
err := handler.initStream(ctx)
if err != nil {
handler.logger.Error("Failed to recovery stream.")
handler.settings.Logger.Error("Failed to recovery stream.")
}
}
handler.logger.Warn("End of recovery loop, restarting.")
handler.settings.Logger.Debug("End of recovery loop, restarting.")
time.Sleep(streamRecoveryBackoffPeriod)
}
handler.logger.Warn("Shutting down recovery loop.")
handler.settings.Logger.Warn("Shutting down recovery loop.")
handler.handlerWaitGroup.Done()
}

Expand Down Expand Up @@ -157,15 +170,15 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context.
for {
if err := handler.acknowledgeMessages(); err != nil {
if errors.Is(err, io.EOF) {
handler.logger.Warn("EOF reached")
handler.settings.Logger.Warn("EOF reached")
break
}
handler.logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err))
handler.settings.Logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err))
break
}
select {
case <-ctx.Done():
handler.logger.Warn("requestStream <-ctx.Done()")
handler.settings.Logger.Debug("requestStream <-ctx.Done()")
case <-timer.C:
timer.Reset(handler.ackBatchWait)
}
Expand All @@ -176,7 +189,7 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context.
}
}
cancel()
handler.logger.Warn("Request Stream loop ended.")
handler.settings.Logger.Debug("Request Stream loop ended.")
_ = handler.stream.CloseSend()
handler.streamWaitGroup.Done()
}
Expand All @@ -202,30 +215,30 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context
case errors.Is(err, io.EOF):
activeStreaming = false
case !grpcStatus:
handler.logger.Warn("response stream breaking on error",
handler.settings.Logger.Warn("response stream breaking on error",
zap.Error(err))
activeStreaming = false
case s.Code() == codes.Unavailable:
handler.logger.Info("response stream breaking on gRPC s 'Unavailable'")
handler.settings.Logger.Debug("response stream breaking on gRPC s 'Unavailable'")
activeStreaming = false
case s.Code() == codes.NotFound:
handler.logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream")
handler.settings.Logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream")
time.Sleep(time.Second * 60)
activeStreaming = false
default:
handler.logger.Warn("response stream breaking on gRPC s "+s.Message(),
handler.settings.Logger.Warn("response stream breaking on gRPC s "+s.Message(),
zap.String("s", s.Message()),
zap.Error(err))
activeStreaming = false
}
}
if errors.Is(ctx.Err(), context.Canceled) {
// Canceling the loop, collector is probably stopping
handler.logger.Warn("response stream ctx.Err() == context.Canceled")
handler.settings.Logger.Warn("response stream ctx.Err() == context.Canceled")
break
}
}
cancel()
handler.logger.Warn("Response Stream loop ended.")
handler.settings.Logger.Debug("Response Stream loop ended.")
handler.streamWaitGroup.Done()
}
9 changes: 7 additions & 2 deletions receiver/googlecloudpubsubreceiver/internal/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"cloud.google.com/go/pubsub/pstest"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"go.opentelemetry.io/collector/receiver/receivertest"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata"
)

func TestCancelStream(t *testing.T) {
Expand All @@ -41,10 +43,13 @@ func TestCancelStream(t *testing.T) {
})
assert.NoError(t, err)

settings := receivertest.NewNopSettings()
telemetryBuilder, _ := metadata.NewTelemetryBuilder(settings.TelemetrySettings)

client, err := pubsub.NewSubscriberClient(ctx, copts...)
assert.NoError(t, err)

handler, err := NewHandler(ctx, zaptest.NewLogger(t), client, "client-id", "projects/my-project/subscriptions/otlp",
handler, err := NewHandler(ctx, settings, telemetryBuilder, client, "client-id", "projects/my-project/subscriptions/otlp",
func(context.Context, *pubsubpb.ReceivedMessage) error {
return nil
})
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3e893b0

Please sign in to comment.