Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Jul 31, 2023
1 parent 61eb2c0 commit e9e3288
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 29 deletions.
12 changes: 9 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.0-rc.0
github.com/hashicorp/go-hclog v0.14.1
github.com/hashicorp/go-plugin v1.4.10
github.com/jpillora/backoff v1.0.0
github.com/linkedin/goavro/v2 v2.12.0
github.com/mwitkow/grpc-proxy v0.0.0-20230212185441-f345521cb9c9
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.15.0
github.com/riferrei/srclient v0.5.4
Expand Down Expand Up @@ -36,7 +38,6 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.3 // indirect
github.com/hashicorp/go-hclog v0.14.1 // indirect
github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
Expand All @@ -63,5 +64,10 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
)

// until merged upstream: https://github.com/hashicorp/go-plugin/pull/257
replace github.com/hashicorp/go-plugin => github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75
replace (
// until merged upstream: https://github.com/hashicorp/go-plugin/pull/257
github.com/hashicorp/go-plugin => github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75

// until merged upstream: https://github.com/mwitkow/grpc-proxy/pull/69
github.com/mwitkow/grpc-proxy => github.com/jmank88/grpc-proxy v0.0.0-20230731114312-86ed94c93231
)
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
Expand Down Expand Up @@ -176,6 +177,8 @@ github.com/jhump/protoreflect v1.12.0 h1:1NQ4FpWMgn3by/n1X0fbeKEUxP1wBt7+Oitpv01
github.com/jhump/protoreflect v1.12.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75 h1:KYTOmcwuezD27O7vNF15lj8H7imCBMXCq1RzCdj4e3A=
github.com/jmank88/go-plugin v0.0.0-20230604120638-7bb12ec27e75/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0=
github.com/jmank88/grpc-proxy v0.0.0-20230731114312-86ed94c93231 h1:CAa5/6QLk2AAgTlqMPfPRkf0ebjtPYoJFhTqe7zJaXs=
github.com/jmank88/grpc-proxy v0.0.0-20230731114312-86ed94c93231/go.mod h1:MvMXoufZAtqExNexqi4cjrNYE9MefKddKylxjS+//n0=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
Expand Down Expand Up @@ -269,6 +272,7 @@ github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXl
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down Expand Up @@ -324,6 +328,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
Expand Down Expand Up @@ -360,6 +365,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
Expand Down Expand Up @@ -414,7 +421,10 @@ golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -476,6 +486,7 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -534,6 +545,7 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A=
google.golang.org/genproto v0.0.0-20220503193339-ba3ae3f07e29/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA=
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s=
Expand All @@ -551,6 +563,7 @@ google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
Expand Down Expand Up @@ -598,6 +611,7 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
37 changes: 37 additions & 0 deletions pkg/loop/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,40 @@ GRPC client & server implementations.
### `package pb`

Protocol buffer definitions & generated code.

## Communication

GRPC client/server pairs are used to communicated between the host and each plugin.
Plugins cannot communicate directly with one another, but the host can proxy a connection between them.

Here are the main components for the case of Median:
```mermaid
sequenceDiagram
autonumber
participant relayer as Relayer (plugin)
participant core as Chainlink (host)
participant median as Median (plugin)
Note over core: KeystoreServer
core->>+relayer: NewRelayer
Note over relayer: KeystoreClient
Note over relayer: RelayerServer
relayer->>-core: Relayer ID
Note over core: RelayerClient
core->>+relayer: NewMedianProvider
Note over relayer: MedianProviderServer
relayer->>-core: MedianProvider ID
Note over core: GRPC Proxy
core->>+median: NewMedianFactory
Note over median: MedianProviderClient
Note over median: MedianFactoryServer
median->>-core: MedianFactory ID
Note over core: MedianFactoryClient
core->>+median: NewReportingPlugin
Note over median: ReportingPluginServer
median->>-core: ReportingPlugin ID
Note over core: ReportingPluginClient
```
20 changes: 12 additions & 8 deletions pkg/loop/internal/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,18 @@ func (b *brokerExt) dial(id uint32) (conn *grpc.ClientConn, err error) {
return b.broker.DialWithOptions(id, b.DialOpts...)
}

func (b *brokerExt) serve(name string, register func(*grpc.Server), deps ...resource) (uint32, resource, error) {
func (b *brokerExt) serveNew(name string, register func(*grpc.Server), deps ...resource) (uint32, resource, error) {
var server *grpc.Server
if b.NewServer == nil {
server = grpc.NewServer()
} else {
server = b.NewServer(nil)
}
register(server)
return b.serve(name, server, deps...)
}

func (b *brokerExt) serve(name string, server *grpc.Server, deps ...resource) (uint32, resource, error) {
id := b.broker.NextId()
b.Logger.Debugf("Serving %s on connection %d", name, id)
lis, err := b.broker.Accept(id)
Expand All @@ -99,13 +110,6 @@ func (b *brokerExt) serve(name string, register func(*grpc.Server), deps ...reso
return 0, resource{}, ErrConnAccept{Name: name, ID: id, Err: err}
}

var server *grpc.Server
if b.NewServer == nil {
server = grpc.NewServer()
} else {
server = b.NewServer(nil)
}
register(server)
go func() {
defer b.closeAll(deps...)
if err := server.Serve(lis); err != nil {
Expand Down
48 changes: 35 additions & 13 deletions pkg/loop/internal/median.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"time"

"github.com/mwitkow/grpc-proxy/proxy"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"

Expand Down Expand Up @@ -42,39 +43,53 @@ func NewPluginMedianClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.Cli
return &PluginMedianClient{pluginClient: pc, median: pb.NewPluginMedianClient(pc), serviceClient: newServiceClient(pc.brokerExt, pc)}
}

// TODO move; rename
type HasClientConn interface {
// ClientConn returns the underlying connection to support proxying.
ClientConn() grpc.ClientConnInterface
}

func (m *PluginMedianClient) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog ErrorLog) (ReportingPluginFactory, error) {
cc := m.newClientConn("MedianPluginFactory", func(ctx context.Context) (id uint32, deps resources, err error) {
dataSourceID, dsRes, err := m.serve("DataSource", func(s *grpc.Server) {
dataSourceID, dsRes, err := m.serveNew("DataSource", func(s *grpc.Server) {
pb.RegisterDataSourceServer(s, &dataSourceServer{impl: dataSource})
})
if err != nil {
return 0, nil, err
}
deps.Add(dsRes)

juelsPerFeeCoinDataSourceID, juelsPerFeeCoinDataSourceRes, err := m.serve("JuelsPerFeeCoinDataSource", func(s *grpc.Server) {
juelsPerFeeCoinDataSourceID, juelsPerFeeCoinDataSourceRes, err := m.serveNew("JuelsPerFeeCoinDataSource", func(s *grpc.Server) {
pb.RegisterDataSourceServer(s, &dataSourceServer{impl: juelsPerFeeCoin})
})
if err != nil {
return 0, nil, err
}
deps.Add(juelsPerFeeCoinDataSourceRes)

providerID, providerRes, err := m.serve("MedianProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: provider})
pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: provider.OffchainConfigDigester()})
pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: provider.ContractConfigTracker()})
pb.RegisterContractTransmitterServer(s, &contractTransmitterServer{impl: provider.ContractTransmitter()})
pb.RegisterReportCodecServer(s, &reportCodecServer{impl: provider.ReportCodec()})
pb.RegisterMedianContractServer(s, &medianContractServer{impl: provider.MedianContract()})
pb.RegisterOnchainConfigCodecServer(s, &onchainConfigCodecServer{impl: provider.OnchainConfigCodec()})
})
var (
providerID uint32
providerRes resource
)
if grpcProvider, ok := provider.(HasClientConn); ok { //TODO ensure both paths are tested
providerID, providerRes, err = m.serve("MedianProvider", proxy.NewProxy(grpcProvider.ClientConn()))
} else {
providerID, providerRes, err = m.serveNew("MedianProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: provider})
pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: provider.OffchainConfigDigester()})
pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: provider.ContractConfigTracker()})
pb.RegisterContractTransmitterServer(s, &contractTransmitterServer{impl: provider.ContractTransmitter()})
pb.RegisterReportCodecServer(s, &reportCodecServer{impl: provider.ReportCodec()})
pb.RegisterMedianContractServer(s, &medianContractServer{impl: provider.MedianContract()})
pb.RegisterOnchainConfigCodecServer(s, &onchainConfigCodecServer{impl: provider.OnchainConfigCodec()})
})
}
if err != nil {
return 0, nil, err
}
deps.Add(providerRes)

errorLogID, errorLogRes, err := m.serve("ErrorLog", func(s *grpc.Server) {
errorLogID, errorLogRes, err := m.serveNew("ErrorLog", func(s *grpc.Server) {
pb.RegisterErrorLogServer(s, &errorLogServer{impl: errorLog})
})
if err != nil {
Expand Down Expand Up @@ -152,7 +167,7 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N
return nil, err
}

id, _, err := m.serve("ReportingPluginProvider", func(s *grpc.Server) {
id, _, err := m.serveNew("ReportingPluginProvider", func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: factory})
pb.RegisterReportingPluginFactoryServer(s, newReportingPluginFactoryServer(factory, m.brokerExt))
}, dsRes, juelsRes, providerRes, errorLogRes)
Expand All @@ -163,6 +178,11 @@ func (m *pluginMedianServer) NewMedianFactory(ctx context.Context, request *pb.N
return &pb.NewMedianFactoryReply{ReportingPluginFactoryID: id}, nil
}

var (
_ types.MedianProvider = (*medianProviderClient)(nil)
_ HasClientConn = (*medianProviderClient)(nil)
)

type medianProviderClient struct {
*configProviderClient
contractTransmitter libocr.ContractTransmitter
Expand All @@ -171,6 +191,8 @@ type medianProviderClient struct {
onchainConfigCodec median.OnchainConfigCodec
}

func (m *medianProviderClient) ClientConn() grpc.ClientConnInterface { return m.cc }

func newMedianProviderClient(b *brokerExt, cc grpc.ClientConnInterface) *medianProviderClient {
m := &medianProviderClient{configProviderClient: newConfigProviderClient(b.named("MedianProviderClient"), cc)}
m.contractTransmitter = &contractTransmitterClient{b, pb.NewContractTransmitterClient(m.cc)}
Expand Down
8 changes: 4 additions & 4 deletions pkg/loop/internal/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewPluginRelayerClient(broker Broker, brokerCfg BrokerConfig, conn *grpc.Cl
func (p *PluginRelayerClient) NewRelayer(ctx context.Context, config string, keystore Keystore) (Relayer, error) {
cc := p.newClientConn("Relayer", func(ctx context.Context) (id uint32, deps resources, err error) {
var ksRes resource
id, ksRes, err = p.serve("Keystore", func(s *grpc.Server) {
id, ksRes, err = p.serveNew("Keystore", func(s *grpc.Server) {
pb.RegisterKeystoreServer(s, &keystoreServer{impl: keystore})
})
if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func (p *pluginRelayerServer) NewRelayer(ctx context.Context, request *pb.NewRel

const name = "Relayer"
rRes := resource{r, name}
id, _, err := p.serve(name, func(s *grpc.Server) {
id, _, err := p.serveNew(name, func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: r})
pb.RegisterRelayerServer(s, newChainRelayerServer(r, p.brokerExt))
}, rRes, ksRes)
Expand Down Expand Up @@ -342,7 +342,7 @@ func (r *relayerServer) NewConfigProvider(ctx context.Context, request *pb.NewCo
}

const name = "ConfigProvider"
id, _, err := r.serve(name, func(s *grpc.Server) {
id, _, err := r.serveNew(name, func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: cp})
pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: cp.OffchainConfigDigester()})
pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: cp.ContractConfigTracker()})
Expand Down Expand Up @@ -379,7 +379,7 @@ func (r *relayerServer) NewMedianProvider(ctx context.Context, request *pb.NewMe
const name = "MedianProvider"
providerRes := resource{name: name, Closer: provider}

id, _, err := r.serve(name, func(s *grpc.Server) {
id, _, err := r.serveNew(name, func(s *grpc.Server) {
pb.RegisterServiceServer(s, &serviceServer{srv: provider})
pb.RegisterOffchainConfigDigesterServer(s, &offchainConfigDigesterServer{impl: provider.OffchainConfigDigester()})
pb.RegisterContractConfigTrackerServer(s, &contractConfigTrackerServer{impl: provider.ContractConfigTracker()})
Expand Down
2 changes: 1 addition & 1 deletion pkg/loop/internal/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (r *reportingPluginFactoryServer) NewReportingPlugin(ctx context.Context, r
}

const name = "ReportingPlugin"
id, _, err := r.serve(name, func(s *grpc.Server) {
id, _, err := r.serveNew(name, func(s *grpc.Server) {
pb.RegisterReportingPluginServer(s, &reportingPluginServer{impl: rp})
}, resource{rp, name})
if err != nil {
Expand Down

0 comments on commit e9e3288

Please sign in to comment.