From 281956154852a99907951e8fa6cca1afcce7a9d7 Mon Sep 17 00:00:00 2001 From: Daniel Selans Date: Tue, 26 Mar 2024 13:48:34 -0700 Subject: [PATCH] include subscription id in the relay name --- backends/gcppubsub/relay.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/backends/gcppubsub/relay.go b/backends/gcppubsub/relay.go index 7888f719..de53daed 100644 --- a/backends/gcppubsub/relay.go +++ b/backends/gcppubsub/relay.go @@ -58,10 +58,20 @@ func (g *GCPPubSub) Relay(ctx context.Context, relayOpts *opts.RelayOptions, rel if sc != nil { g.log.Debug("Processing message via streamdal SDK") + operationName := "relay" + + if relayOpts != nil && relayOpts.GcpPubsub != nil && relayOpts.GcpPubsub.GetArgs() != nil { + if relayOpts.GcpPubsub.GetArgs().SubscriptionId == "" { + operationName = "relay-unknown-subid" + } else { + operationName = "relay-" + relayOpts.GcpPubsub.GetArgs().SubscriptionId + } + } + resp := sc.Process(ctx, &sdk.ProcessRequest{ ComponentName: "gcp-pubsub", OperationType: sdk.OperationTypeConsumer, - OperationName: "relay", + OperationName: operationName, Data: msg.Data, })