From 78d9cebb92678152674eb749af15cd7742d3ac48 Mon Sep 17 00:00:00 2001 From: Arsene Date: Sat, 13 Jan 2024 19:24:17 +0000 Subject: [PATCH] refactor: upgrade the otelconnect dependency and fix breaking change (#205) --- actors/actor_system.go | 31 +++++-- actors/api.go | 45 ++++++++-- actors/pid.go | 87 +++++++++++++++---- .../actor-cluster/dnssd/service/service.go | 7 +- go.mod | 4 +- go.sum | 7 +- 6 files changed, 143 insertions(+), 38 deletions(-) diff --git a/actors/actor_system.go b/actors/actor_system.go index 198882dc..c23d58cf 100644 --- a/actors/actor_system.go +++ b/actors/actor_system.go @@ -1164,21 +1164,34 @@ func (x *actorSystem) enableClustering(ctx context.Context) { func (x *actorSystem) enableRemoting(ctx context.Context) { // add some logging information x.logger.Info("enabling remoting...") - // create a function to handle the observability - interceptor := func(tp trace.TracerProvider, mp otelmetric.MeterProvider) connect.Interceptor { - return otelconnect.NewInterceptor( - otelconnect.WithTracerProvider(tp), - otelconnect.WithMeterProvider(mp), + + // define a variable to hold the interceptor + var interceptor *otelconnect.Interceptor + var err error + // only set the observability when metric or trace is enabled + if x.metricEnabled.Load() || x.traceEnabled.Load() { + // create an interceptor and panic + interceptor, err = otelconnect.NewInterceptor( + otelconnect.WithTracerProvider(x.telemetry.TracerProvider), + otelconnect.WithMeterProvider(x.telemetry.MeterProvider), ) + // panic when there is an error + if err != nil { + x.logger.Panic(errors.Wrap(err, "failed to initialize observability feature")) + } + } + + // create the handler option + var opts []connect.HandlerOption + // set handler options when interceptor is defined + if interceptor != nil { + opts = append(opts, connect.WithInterceptors(interceptor)) } // create a http service mux mux := http.NewServeMux() // create the resource and handler - path, handler := internalpbconnect.NewRemotingServiceHandler( - x, - connect.WithInterceptors(interceptor(x.telemetry.TracerProvider, x.telemetry.MeterProvider)), - ) + path, handler := internalpbconnect.NewRemotingServiceHandler(x, opts...) mux.Handle(path, handler) // create the address serverAddr := fmt.Sprintf("%s:%d", x.remotingHost, x.remotingPort) diff --git a/actors/api.go b/actors/api.go index 658efd69..9f8736f5 100644 --- a/actors/api.go +++ b/actors/api.go @@ -195,11 +195,18 @@ func RemoteTell(ctx context.Context, to *addresspb.Address, message proto.Messag return ErrInvalidRemoteMessage(err) } + // create an interceptor + interceptor, err := otelconnect.NewInterceptor() + // handle the error + if err != nil { + return err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( http.Client(), http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(otelconnect.NewInterceptor()), + connect.WithInterceptors(interceptor), connect.WithGRPC(), ) // prepare the rpcRequest to send @@ -225,11 +232,18 @@ func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message return nil, ErrInvalidRemoteMessage(err) } + // create an interceptor + interceptor, err := otelconnect.NewInterceptor() + // handle the error + if err != nil { + return nil, err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( http.Client(), http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(otelconnect.NewInterceptor()), + connect.WithInterceptors(interceptor), connect.WithGRPC(), ) // prepare the rpcRequest to send @@ -252,11 +266,18 @@ func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message // RemoteLookup look for an actor address on a remote node. func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) { + // create an interceptor + interceptor, err := otelconnect.NewInterceptor() + // handle the error + if err != nil { + return nil, err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( http.Client(), http.URL(host, port), - connect.WithInterceptors(otelconnect.NewInterceptor()), + connect.WithInterceptors(interceptor), connect.WithGRPC(), ) @@ -283,6 +304,13 @@ func RemoteLookup(ctx context.Context, host string, port int, name string) (addr // RemoteBatchTell sends bulk asynchronous messages to an actor func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...proto.Message) error { + // create an interceptor + interceptor, err := otelconnect.NewInterceptor() + // handle the error + if err != nil { + return err + } + // define a variable holding the remote messages var remoteMessages []*anypb.Any // iterate the list of messages and pack them @@ -301,7 +329,7 @@ func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...pro remoteClient := internalpbconnect.NewRemotingServiceClient( http.Client(), http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(otelconnect.NewInterceptor()), + connect.WithInterceptors(interceptor), connect.WithGRPC(), ) @@ -320,6 +348,13 @@ func RemoteBatchTell(ctx context.Context, to *addresspb.Address, messages ...pro // RemoteBatchAsk sends bulk messages to an actor with responses expected func RemoteBatchAsk(ctx context.Context, to *addresspb.Address, messages ...proto.Message) (responses []*anypb.Any, err error) { + // create an interceptor + interceptor, err := otelconnect.NewInterceptor() + // handle the error + if err != nil { + return nil, err + } + // define a variable holding the remote messages var remoteMessages []*anypb.Any // iterate the list of messages and pack them @@ -338,7 +373,7 @@ func RemoteBatchAsk(ctx context.Context, to *addresspb.Address, messages ...prot remoteClient := internalpbconnect.NewRemotingServiceClient( http.Client(), http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(otelconnect.NewInterceptor()), + connect.WithInterceptors(interceptor), connect.WithGRPC(), ) diff --git a/actors/pid.go b/actors/pid.go index 67bd3242..0539f030 100644 --- a/actors/pid.go +++ b/actors/pid.go @@ -791,12 +791,18 @@ func (p *pid) BatchAsk(ctx context.Context, to PID, messages ...proto.Message) ( // RemoteLookup look for an actor address on a remote node. If the actorSystem is nil then the lookup will be done // using the same actor system as the PID actor system func (p *pid) RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) { + // get the gRPC client connection options + clientConnectionOptions, err := p.gRPCClientConnectionOptions() + // handle the error + if err != nil { + return nil, err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( p.httpClient, http.URL(host, port), - connect.WithInterceptors(p.interceptor()), - connect.WithGRPC(), + clientConnectionOptions..., ) // prepare the request to send @@ -827,12 +833,18 @@ func (p *pid) RemoteTell(ctx context.Context, to *addresspb.Address, message pro return err } + // get the gRPC client connection options + clientConnectionOptions, err := p.gRPCClientConnectionOptions() + // handle the error + if err != nil { + return err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( p.httpClient, http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(p.interceptor()), - connect.WithGRPC(), + clientConnectionOptions..., ) // construct the from address @@ -874,12 +886,18 @@ func (p *pid) RemoteAsk(ctx context.Context, to *addresspb.Address, message prot return nil, err } + // get the gRPC client connection options + clientConnectionOptions, err := p.gRPCClientConnectionOptions() + // handle the error + if err != nil { + return nil, err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( p.httpClient, http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(p.interceptor()), - connect.WithGRPC(), + clientConnectionOptions..., ) // construct the from address @@ -939,12 +957,18 @@ func (p *pid) RemoteBatchTell(ctx context.Context, to *addresspb.Address, messag Id: p.ActorPath().ID().String(), } + // get the gRPC client connection options + clientConnectionOptions, err := p.gRPCClientConnectionOptions() + // handle the error + if err != nil { + return err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( http.Client(), http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(otelconnect.NewInterceptor()), - connect.WithGRPC(), + clientConnectionOptions..., ) // prepare the remote batch tell request @@ -987,12 +1011,18 @@ func (p *pid) RemoteBatchAsk(ctx context.Context, to *addresspb.Address, message Id: p.ActorPath().ID().String(), } + // get the gRPC client connection options + clientConnectionOptions, err := p.gRPCClientConnectionOptions() + // handle the error + if err != nil { + return nil, err + } + // create an instance of remote client service remoteClient := internalpbconnect.NewRemotingServiceClient( http.Client(), http.URL(to.GetHost(), int(to.GetPort())), - connect.WithInterceptors(otelconnect.NewInterceptor()), - connect.WithGRPC(), + clientConnectionOptions..., ) // prepare the remote batch tell request @@ -1376,14 +1406,6 @@ func (p *pid) passivationListener() { p.logger.Infof("Actor=%s successfully passivated", p.ActorPath().String()) } -// interceptor create an interceptor based upon the telemetry provided -func (p *pid) interceptor() connect.Interceptor { - return otelconnect.NewInterceptor( - otelconnect.WithTracerProvider(p.telemetry.TracerProvider), - otelconnect.WithMeterProvider(p.telemetry.MeterProvider), - ) -} - // setBehavior is a utility function that helps set the actor behavior func (p *pid) setBehavior(behavior Behavior) { p.semaphore.Lock() @@ -1559,3 +1581,32 @@ func (p *pid) registerMetrics() error { return err } + +// gRPCClientConnectionOptions returns the gRPC client connections options +func (p *pid) gRPCClientConnectionOptions() ([]connect.ClientOption, error) { + // define a variable to hold the interceptor + var interceptor *otelconnect.Interceptor + var err error + // only set the observability when metric or trace is enabled + if p.metricEnabled.Load() || p.traceEnabled.Load() { + // create an interceptor and panic + interceptor, err = otelconnect.NewInterceptor( + otelconnect.WithTracerProvider(p.telemetry.TracerProvider), + otelconnect.WithMeterProvider(p.telemetry.MeterProvider)) + // panic when there is an error + if err != nil { + return nil, errors.Wrap(err, "failed to initialize observability feature") + } + } + + // create the handler option + clientConnectionOptions := []connect.ClientOption{ + connect.WithGRPC(), + } + // add the grpc connection + // set handler options when interceptor is defined + if interceptor != nil { + clientConnectionOptions = append(clientConnectionOptions, connect.WithInterceptors(interceptor)) + } + return clientConnectionOptions, err +} diff --git a/examples/actor-cluster/dnssd/service/service.go b/examples/actor-cluster/dnssd/service/service.go index c7074483..5f44db46 100644 --- a/examples/actor-cluster/dnssd/service/service.go +++ b/examples/actor-cluster/dnssd/service/service.go @@ -234,9 +234,14 @@ func (s *AccountService) Stop(ctx context.Context) error { func (s *AccountService) listenAndServe() { // create a http service mux mux := http.NewServeMux() + // create an interceptor + interceptor, err := otelconnect.NewInterceptor() + if err != nil { + s.logger.Panic(err) + } // create the resource and handler path, handler := samplepbconnect.NewAccountServiceHandler(s, - connect.WithInterceptors(otelconnect.NewInterceptor())) + connect.WithInterceptors(interceptor)) mux.Handle(path, handler) // create the address serverAddr := fmt.Sprintf(":%d", s.port) diff --git a/go.mod b/go.mod index 3f3a761a..5402089d 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.21 require ( connectrpc.com/connect v1.14.0 - connectrpc.com/otelconnect v0.6.0 + connectrpc.com/otelconnect v0.7.0 github.com/buraksezer/olric v0.5.4 github.com/caarlos0/env/v10 v10.0.0 github.com/cespare/xxhash/v2 v2.2.0 @@ -53,7 +53,7 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect - github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.20.0 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect diff --git a/go.sum b/go.sum index abdb0ed3..1332f7cb 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ connectrpc.com/connect v1.14.0 h1:PDS+J7uoz5Oui2VEOMcfz6Qft7opQM9hPiKvtGC01pA= connectrpc.com/connect v1.14.0/go.mod h1:uoAq5bmhhn43TwhaKdGKN/bZcGtzPW1v+ngDTn5u+8s= -connectrpc.com/otelconnect v0.6.0 h1:VJAdQL9+sgdUw9+7+J+jq8pQo/h1S7tSFv2+vDcR7bU= -connectrpc.com/otelconnect v0.6.0/go.mod h1:jdcs0uiwXQVmSMgTJ2dAaWR5VbpNd7QKNkuoH7n86RA= +connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY= +connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/RoaringBitmap/roaring v1.2.1/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/RoaringBitmap/roaring v1.5.0 h1:V0VCSiHjroItEYCM3guC8T83ehi5QMt3oM9EefTTOms= @@ -65,8 +65,9 @@ github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= +github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs=