diff --git a/go.mod b/go.mod index b909baae1f..a1469c9fe3 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,11 @@ require ( github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/google/uuid v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0 + github.com/hashicorp/go-hclog v0.14.1 github.com/hashicorp/go-plugin v1.4.10 github.com/jpillora/backoff v1.0.0 github.com/linkedin/goavro/v2 v2.12.0 + github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.15.0 github.com/riferrei/srclient v0.5.4 @@ -36,7 +38,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect - github.com/hashicorp/go-hclog v0.14.1 // indirect github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect @@ -63,5 +64,10 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -// until merged upstream: https://github.com/hashicorp/go-plugin/pull/257 -replace github.com/hashicorp/go-plugin => github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75 +replace ( + // until merged upstream: https://github.com/hashicorp/go-plugin/pull/257 + github.com/hashicorp/go-plugin => github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75 + + // until merged upstream: https://github.com/mwitkow/grpc-proxy/pull/69 + github.com/mwitkow/grpc-proxy => github.com/jmank88/grpc-proxy v0.0.0-20230731114312-86ed94c93231 +) diff --git a/go.sum b/go.sum index 3afb49a668..b5010b6815 100644 --- a/go.sum +++ b/go.sum @@ -113,6 +113,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= @@ -176,6 +177,8 @@ github.com/jhump/protoreflect v1.12.0 h1:1NQ4FpWMgn3by/n1X0fbeKEUxP1wBt7+Oitpv01 github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI= github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75 h1:KYTOmcwuezD27O7vNF15lj8H7imCBMXCq1RzCdj4e3A= github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0= +github.com/jmank88/grpc-proxy v0.0.0-20230731114312-86ed94c93231 h1:CAa5/6QLk2AAgTlqMPfPRkf0ebjtPYoJFhTqe7zJaXs= +github.com/jmank88/grpc-proxy v0.0.0-20230731114312-86ed94c93231/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -269,6 +272,7 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= @@ -324,6 +328,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= @@ -360,6 +365,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= +golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= @@ -414,7 +421,10 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -476,6 +486,7 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -534,6 +545,7 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4= google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= @@ -551,6 +563,7 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= @@ -598,6 +611,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= diff --git a/pkg/loop/README.md b/pkg/loop/README.md index 450d3235a5..342fd53738 100644 --- a/pkg/loop/README.md +++ b/pkg/loop/README.md @@ -45,3 +45,40 @@ GRPC client & server implementations. ### `package pb` Protocol buffer definitions & generated code. + +## Communication + +GRPC client/server pairs are used to communicated between the host and each plugin. +Plugins cannot communicate directly with one another, but the host can proxy a connection between them. + +Here are the main components for the case of Median: +```mermaid +sequenceDiagram + autonumber + participant relayer as Relayer (plugin) + participant core as Chainlink (host) + participant median as Median (plugin) + + Note over core: KeystoreServer + core->>+relayer: NewRelayer + Note over relayer: KeystoreClient + Note over relayer: RelayerServer + relayer->>-core: Relayer ID + Note over core: RelayerClient + + core->>+relayer: NewMedianProvider + Note over relayer: MedianProviderServer + relayer->>-core: MedianProvider ID + Note over core: GRPC Proxy + core->>+median: NewMedianFactory + Note over median: MedianProviderClient + Note over median: MedianFactoryServer + median->>-core: MedianFactory ID + Note over core: MedianFactoryClient + + + core->>+median: NewReportingPlugin + Note over median: ReportingPluginServer + median->>-core: ReportingPlugin ID + Note over core: ReportingPluginClient +``` \ No newline at end of file diff --git a/pkg/loop/internal/broker.go b/pkg/loop/internal/broker.go index 0fd942d99e..e3ea37d92e 100644 --- a/pkg/loop/internal/broker.go +++ b/pkg/loop/internal/broker.go @@ -90,7 +90,18 @@ func (b *brokerExt) dial(id uint32) (conn *grpc.ClientConn, err error) { return b.broker.DialWithOptions(id, b.DialOpts...) } -func (b *brokerExt) serve(name string, register func(*grpc.Server), deps ...resource) (uint32, resource, error) { +func (b *brokerExt) serveNew(name string, register func(*grpc.Server), deps ...resource) (uint32, resource, error) { + var server *grpc.Server + if b.NewServer == nil { + server = grpc.NewServer() + } else { + server = b.NewServer(nil) + } + register(server) + return b.serve(name, server, deps...) +} + +func (b *brokerExt) serve(name string, server *grpc.Server, deps ...resource) (uint32, resource, error) { id := b.broker.NextId() b.Logger.Debugf("Serving %s on connection %d", name, id) lis, err := b.broker.Accept(id) @@ -99,13 +110,6 @@ func (b *brokerExt) serve(name string, register func(*grpc.Server), deps ...reso return 0, resource{}, ErrConnAccept{Name: name, ID: id, Err: err} } - var server *grpc.Server - if b.NewServer == nil { - server = grpc.NewServer() - } else { - server = b.NewServer(nil) - } - register(server) go func() { defer b.closeAll(deps...) if err := server.Serve(lis); err != nil { diff --git a/pkg/loop/internal/median.go b/pkg/loop/internal/median.go index 0a7779dbdd..3bc49360c5 100644 --- a/pkg/loop/internal/median.go +++ b/pkg/loop/internal/median.go @@ -7,6 +7,7 @@ import ( "math/big" "time" + "github.com/mwitkow/grpc-proxy/proxy" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" @@ -42,9 +43,15 @@ func NewPluginMedianClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.Cli return &PluginMedianClient{pluginClient: pc, median: pb.NewPluginMedianClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)} } +// TODO move; rename +type HasClientConn interface { + // ClientConn returns the underlying connection to support proxying. + ClientConn() grpc.ClientConnInterface +} + func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog ErrorLog) (ReportingPluginFactory, error) { cc := m.newClientConn("MedianPluginFactory", func(ctx context.Context) (id uint32, deps resources, err error) { - dataSourceID, dsRes, err := m.serve("DataSource", func(s *grpc.Server) { + dataSourceID, dsRes, err := m.serveNew("DataSource", func(s *grpc.Server) { pb.RegisterDataSourceServer(s, &dataSourceServer{impl: dataSource}) }) if err != nil { @@ -52,7 +59,7 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type } deps.Add(dsRes) - juelsPerFeeCoinDataSourceID, juelsPerFeeCoinDataSourceRes, err := m.serve("JuelsPerFeeCoinDataSource", func(s *grpc.Server) { + juelsPerFeeCoinDataSourceID, juelsPerFeeCoinDataSourceRes, err := m.serveNew("JuelsPerFeeCoinDataSource", func(s *grpc.Server) { pb.RegisterDataSourceServer(s, &dataSourceServer{impl: juelsPerFeeCoin}) }) if err != nil { @@ -60,21 +67,29 @@ func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider type } deps.Add(juelsPerFeeCoinDataSourceRes) - providerID, providerRes, err := m.serve("MedianProvider", func(s *grpc.Server) { - pb.RegisterServiceServer(s, &serviceServer{srv: provider}) - pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: provider.OffchainConfigDigester()}) - pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: provider.ContractConfigTracker()}) - pb.RegisterContractTransmitterServer(s, &contractTransmitterServer{impl: provider.ContractTransmitter()}) - pb.RegisterReportCodecServer(s, &reportCodecServer{impl: provider.ReportCodec()}) - pb.RegisterMedianContractServer(s, &medianContractServer{impl: provider.MedianContract()}) - pb.RegisterOnchainConfigCodecServer(s, &onchainConfigCodecServer{impl: provider.OnchainConfigCodec()}) - }) + var ( + providerID uint32 + providerRes resource + ) + if grpcProvider, ok := provider.(HasClientConn); ok { //TODO ensure both paths are tested + providerID, providerRes, err = m.serve("MedianProvider", proxy.NewProxy(grpcProvider.ClientConn())) + } else { + providerID, providerRes, err = m.serveNew("MedianProvider", func(s *grpc.Server) { + pb.RegisterServiceServer(s, &serviceServer{srv: provider}) + pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: provider.OffchainConfigDigester()}) + pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: provider.ContractConfigTracker()}) + pb.RegisterContractTransmitterServer(s, &contractTransmitterServer{impl: provider.ContractTransmitter()}) + pb.RegisterReportCodecServer(s, &reportCodecServer{impl: provider.ReportCodec()}) + pb.RegisterMedianContractServer(s, &medianContractServer{impl: provider.MedianContract()}) + pb.RegisterOnchainConfigCodecServer(s, &onchainConfigCodecServer{impl: provider.OnchainConfigCodec()}) + }) + } if err != nil { return 0, nil, err } deps.Add(providerRes) - errorLogID, errorLogRes, err := m.serve("ErrorLog", func(s *grpc.Server) { + errorLogID, errorLogRes, err := m.serveNew("ErrorLog", func(s *grpc.Server) { pb.RegisterErrorLogServer(s, &errorLogServer{impl: errorLog}) }) if err != nil { @@ -152,7 +167,7 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N return nil, err } - id, _, err := m.serve("ReportingPluginProvider", func(s *grpc.Server) { + id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: factory}) pb.RegisterReportingPluginFactoryServer(s, newReportingPluginFactoryServer(factory, m.brokerExt)) }, dsRes, juelsRes, providerRes, errorLogRes) @@ -163,6 +178,11 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N return &pb.NewMedianFactoryReply{ReportingPluginFactoryID: id}, nil } +var ( + _ types.MedianProvider = (*medianProviderClient)(nil) + _ HasClientConn = (*medianProviderClient)(nil) +) + type medianProviderClient struct { *configProviderClient contractTransmitter libocr.ContractTransmitter @@ -171,6 +191,8 @@ type medianProviderClient struct { onchainConfigCodec median.OnchainConfigCodec } +func (m *medianProviderClient) ClientConn() grpc.ClientConnInterface { return m.cc } + func newMedianProviderClient(b *brokerExt, cc grpc.ClientConnInterface) *medianProviderClient { m := &medianProviderClient{configProviderClient: newConfigProviderClient(b.named("MedianProviderClient"), cc)} m.contractTransmitter = &contractTransmitterClient{b, pb.NewContractTransmitterClient(m.cc)} diff --git a/pkg/loop/internal/relayer.go b/pkg/loop/internal/relayer.go index 988d484f21..13ec9b72f2 100644 --- a/pkg/loop/internal/relayer.go +++ b/pkg/loop/internal/relayer.go @@ -43,7 +43,7 @@ func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.Cl func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore Keystore) (Relayer, error) { cc := p.newClientConn("Relayer", func(ctx context.Context) (id uint32, deps resources, err error) { var ksRes resource - id, ksRes, err = p.serve("Keystore", func(s *grpc.Server) { + id, ksRes, err = p.serveNew("Keystore", func(s *grpc.Server) { pb.RegisterKeystoreServer(s, &keystoreServer{impl: keystore}) }) if err != nil { @@ -100,7 +100,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel const name = "Relayer" rRes := resource{r, name} - id, _, err := p.serve(name, func(s *grpc.Server) { + id, _, err := p.serveNew(name, func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: r}) pb.RegisterRelayerServer(s, newChainRelayerServer(r, p.brokerExt)) }, rRes, ksRes) @@ -342,7 +342,7 @@ func (r *relayerServer) NewConfigProvider(ctx context.Context, request *pb.NewCo } const name = "ConfigProvider" - id, _, err := r.serve(name, func(s *grpc.Server) { + id, _, err := r.serveNew(name, func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: cp}) pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: cp.OffchainConfigDigester()}) pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: cp.ContractConfigTracker()}) @@ -379,7 +379,7 @@ func (r *relayerServer) NewMedianProvider(ctx context.Context, request *pb.NewMe const name = "MedianProvider" providerRes := resource{name: name, Closer: provider} - id, _, err := r.serve(name, func(s *grpc.Server) { + id, _, err := r.serveNew(name, func(s *grpc.Server) { pb.RegisterServiceServer(s, &serviceServer{srv: provider}) pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: provider.OffchainConfigDigester()}) pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: provider.ContractConfigTracker()}) diff --git a/pkg/loop/internal/reporting.go b/pkg/loop/internal/reporting.go index c75725e198..69c287bad5 100644 --- a/pkg/loop/internal/reporting.go +++ b/pkg/loop/internal/reporting.go @@ -106,7 +106,7 @@ func (r *reportingPluginFactoryServer) NewReportingPlugin(ctx context.Context, r } const name = "ReportingPlugin" - id, _, err := r.serve(name, func(s *grpc.Server) { + id, _, err := r.serveNew(name, func(s *grpc.Server) { pb.RegisterReportingPluginServer(s, &reportingPluginServer{impl: rp}) }, resource{rp, name}) if err != nil {