Skip to content

Commit

Permalink
remote target wait for initiated threads to finish on close (#13524)
Browse files Browse the repository at this point in the history
  • Loading branch information
ettec authored Jun 14, 2024
1 parent 0228243 commit d736d9e
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 69 deletions.
5 changes: 5 additions & 0 deletions .changeset/sour-pigs-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal remote target wait until initiated threads exit on close
74 changes: 44 additions & 30 deletions core/capabilities/remote/target/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,48 +52,61 @@ func NewClient(remoteCapabilityInfo commoncap.CapabilityInfo, localDonInfo commo
}
}

func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
delete(c.messageIDToCallerRequest, messageID)
}
}
}

func (c *client) Start(ctx context.Context) error {
return c.StartOnce(c.Name(), func() error {
c.wg.Add(1)
go func() {
defer c.wg.Done()
ticker := time.NewTicker(c.requestTimeout)
defer ticker.Stop()
c.lggr.Info("TargetClient started")
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.expireRequests()
}
}
c.checkForExpiredRequests()
}()
c.lggr.Info("TargetClient started")
return nil
})
}

func (c *client) Close() error {
return c.StopOnce(c.Name(), func() error {
close(c.stopCh)
c.cancelAllRequests(errors.New("client closed"))
c.wg.Wait()
c.lggr.Info("TargetClient closed")
return nil
})
}

func (c *client) checkForExpiredRequests() {
ticker := time.NewTicker(c.requestTimeout)
defer ticker.Stop()
for {
select {
case <-c.stopCh:
return
case <-ticker.C:
c.expireRequests()
}
}
}

func (c *client) expireRequests() {
c.mutex.Lock()
defer c.mutex.Unlock()

for messageID, req := range c.messageIDToCallerRequest {
if req.Expired() {
req.Cancel(errors.New("request expired"))
delete(c.messageIDToCallerRequest, messageID)
}
}
}

func (c *client) cancelAllRequests(err error) {
c.mutex.Lock()
defer c.mutex.Unlock()
for _, req := range c.messageIDToCallerRequest {
req.Cancel(err)
}
}

func (c *client) Info(ctx context.Context) (commoncap.CapabilityInfo, error) {
return c.remoteCapabilityInfo, nil
}
Expand Down Expand Up @@ -121,8 +134,11 @@ func (c *client) Execute(ctx context.Context, capReq commoncap.CapabilityRequest
return nil, fmt.Errorf("request for message ID %s already exists", messageID)
}

cCtx, _ := c.stopCh.NewCtx()
req, err := request.NewClientRequest(cCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
// TODO confirm reasons for below workaround and see if can be resolved
// The context passed in by the workflow engine is cancelled prior to the results being read from the response channel
// The wrapping of the context with 'WithoutCancel' is a workaround for that behaviour.
requestCtx := context.WithoutCancel(ctx)
req, err := request.NewClientRequest(requestCtx, c.lggr, capReq, messageID, c.remoteCapabilityInfo, c.localDONInfo, c.dispatcher,
c.requestTimeout)
if err != nil {
return nil, fmt.Errorf("failed to create client request: %w", err)
Expand All @@ -146,11 +162,9 @@ func (c *client) Receive(msg *types.MessageBody) {
return
}

go func() {
if err := req.OnMessage(ctx, msg); err != nil {
c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err)
}
}()
if err := req.OnMessage(ctx, msg); err != nil {
c.lggr.Errorw("failed to add response to request", "messageID", messageID, "err", err)
}
}

func GetMessageIDForRequest(req commoncap.CapabilityRequest) (string, error) {
Expand Down
4 changes: 3 additions & 1 deletion core/capabilities/remote/target/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
ID: "workflow-don",
}

broker := newTestMessageBroker()
broker := newTestAsyncMessageBroker(100)

receivers := make([]remotetypes.Receiver, numCapabilityPeers)
for i := 0; i < numCapabilityPeers; i++ {
Expand All @@ -172,6 +172,8 @@ func testClient(ctx context.Context, t *testing.T, numWorkflowPeers int, workflo
callers[i] = caller
}

servicetest.Run(t, broker)

executeInputs, err := values.NewMap(
map[string]any{
"executeValue1": "aValue1",
Expand Down
91 changes: 71 additions & 20 deletions core/capabilities/remote/target/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/target"
Expand Down Expand Up @@ -214,7 +215,7 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
F: workflowDonF,
}

broker := newTestMessageBroker()
broker := newTestAsyncMessageBroker(1000)

workflowDONs := map[string]commoncap.DON{
workflowDonInfo.ID: workflowDonInfo,
Expand All @@ -240,6 +241,8 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
workflowNodes[i] = workflowNode
}

servicetest.Run(t, broker)

executeInputs, err := values.NewMap(
map[string]any{
"executeValue1": "aValue1",
Expand Down Expand Up @@ -271,49 +274,97 @@ func testRemoteTarget(ctx context.Context, t *testing.T, underlying commoncap.Ta
wg.Wait()
}

type testMessageBroker struct {
type testAsyncMessageBroker struct {
services.StateMachine
nodes map[p2ptypes.PeerID]remotetypes.Receiver

sendCh chan *remotetypes.MessageBody

stopCh services.StopChan
wg sync.WaitGroup
}

func (a *testAsyncMessageBroker) HealthReport() map[string]error {
return nil
}

func (a *testAsyncMessageBroker) Name() string {
return "testAsyncMessageBroker"
}

func newTestMessageBroker() *testMessageBroker {
return &testMessageBroker{
nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver),
func newTestAsyncMessageBroker(sendChBufferSize int) *testAsyncMessageBroker {
return &testAsyncMessageBroker{
nodes: make(map[p2ptypes.PeerID]remotetypes.Receiver),
stopCh: make(services.StopChan),
sendCh: make(chan *remotetypes.MessageBody, sendChBufferSize),
}
}

func (r *testMessageBroker) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher {
func (a *testAsyncMessageBroker) Start(ctx context.Context) error {
return a.StartOnce("testAsyncMessageBroker", func() error {
a.wg.Add(1)
go func() {
defer a.wg.Done()

for {
select {
case <-a.stopCh:
return
case msg := <-a.sendCh:
receiverId := toPeerID(msg.Receiver)

receiver, ok := a.nodes[receiverId]
if !ok {
panic("server not found for peer id")
}

receiver.Receive(msg)
}
}
}()
return nil
})
}

func (a *testAsyncMessageBroker) Close() error {
return a.StopOnce("testAsyncMessageBroker", func() error {
close(a.stopCh)

a.wg.Wait()
return nil
})
}

func (a *testAsyncMessageBroker) NewDispatcherForNode(nodePeerID p2ptypes.PeerID) remotetypes.Dispatcher {
return &nodeDispatcher{
callerPeerID: nodePeerID,
broker: r,
broker: a,
}
}

func (r *testMessageBroker) RegisterReceiverNode(nodePeerID p2ptypes.PeerID, node remotetypes.Receiver) {
if _, ok := r.nodes[nodePeerID]; ok {
func (a *testAsyncMessageBroker) RegisterReceiverNode(nodePeerID p2ptypes.PeerID, node remotetypes.Receiver) {
if _, ok := a.nodes[nodePeerID]; ok {
panic("node already registered")
}

r.nodes[nodePeerID] = node
a.nodes[nodePeerID] = node
}

func (r *testMessageBroker) Send(msg *remotetypes.MessageBody) {
receiverId := toPeerID(msg.Receiver)

receiver, ok := r.nodes[receiverId]
if !ok {
panic("server not found for peer id")
}

receiver.Receive(msg)
func (a *testAsyncMessageBroker) Send(msg *remotetypes.MessageBody) {
a.sendCh <- msg
}

func toPeerID(id []byte) p2ptypes.PeerID {
return [32]byte(id)
}

type broker interface {
Send(msg *remotetypes.MessageBody)
}

type nodeDispatcher struct {
callerPeerID p2ptypes.PeerID
broker *testMessageBroker
broker broker
}

func (t *nodeDispatcher) Send(peerID p2ptypes.PeerID, msgBody *remotetypes.MessageBody) error {
Expand Down
17 changes: 14 additions & 3 deletions core/capabilities/remote/target/request/client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

type ClientRequest struct {
cancelFn context.CancelFunc
responseCh chan commoncap.CapabilityResponse
createdAt time.Time
responseIDCount map[[32]byte]int
Expand All @@ -33,6 +34,7 @@ type ClientRequest struct {

respSent bool
mux sync.Mutex
wg *sync.WaitGroup
}

func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.CapabilityRequest, messageID string,
Expand All @@ -56,9 +58,14 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
lggr.Debugw("sending request to peers", "execID", req.Metadata.WorkflowExecutionID, "schedule", peerIDToTransmissionDelay)

responseReceived := make(map[p2ptypes.PeerID]bool)

ctxWithCancel, cancelFn := context.WithCancel(ctx)
wg := &sync.WaitGroup{}
for peerID, delay := range peerIDToTransmissionDelay {
responseReceived[peerID] = false
go func(peerID ragep2ptypes.PeerID, delay time.Duration) {
wg.Add(1)
go func(ctx context.Context, peerID ragep2ptypes.PeerID, delay time.Duration) {
defer wg.Done()
message := &types.MessageBody{
CapabilityId: remoteCapabilityInfo.ID,
CapabilityDonId: remoteCapabilityDonInfo.ID,
Expand All @@ -69,7 +76,7 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
}

select {
case <-ctx.Done():
case <-ctxWithCancel.Done():
lggr.Debugw("context done, not sending request to peer", "execID", req.Metadata.WorkflowExecutionID, "peerID", peerID)
return
case <-time.After(delay):
Expand All @@ -79,17 +86,19 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap
lggr.Errorw("failed to send message", "peerID", peerID, "err", err)
}
}
}(peerID, delay)
}(ctxWithCancel, peerID, delay)
}

return &ClientRequest{
cancelFn: cancelFn,
createdAt: time.Now(),
requestTimeout: requestTimeout,
requiredIdenticalResponses: int(remoteCapabilityDonInfo.F + 1),
responseIDCount: make(map[[32]byte]int),
errorCount: make(map[string]int),
responseReceived: responseReceived,
responseCh: make(chan commoncap.CapabilityResponse, 1),
wg: wg,
}, nil
}

Expand All @@ -102,6 +111,8 @@ func (c *ClientRequest) Expired() bool {
}

func (c *ClientRequest) Cancel(err error) {
c.cancelFn()
c.wg.Wait()
c.mux.Lock()
defer c.mux.Unlock()
if !c.respSent {
Expand Down
Loading

0 comments on commit d736d9e

Please sign in to comment.