diff --git a/.changeset/shiny-ligers-compete.md b/.changeset/shiny-ligers-compete.md new file mode 100644 index 00000000000..d621b94183c --- /dev/null +++ b/.changeset/shiny-ligers-compete.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal logging of non determinism in target server diff --git a/core/capabilities/remote/target/request/client_request.go b/core/capabilities/remote/target/request/client_request.go index 3952a2f7f64..50a742c2188 100644 --- a/core/capabilities/remote/target/request/client_request.go +++ b/core/capabilities/remote/target/request/client_request.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "google.golang.org/protobuf/proto" + ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -46,7 +48,8 @@ func NewClientRequest(ctx context.Context, lggr logger.Logger, req commoncap.Cap return nil, errors.New("remote capability info missing DON") } - rawRequest, err := pb.MarshalCapabilityRequest(req) + rawRequest, err := proto.MarshalOptions{Deterministic: true}.Marshal(pb.CapabilityRequestToProto(req)) + if err != nil { return nil, fmt.Errorf("failed to marshal capability request: %w", err) } diff --git a/core/capabilities/remote/target/server.go b/core/capabilities/remote/target/server.go index 086be15f270..ea9caf81eff 100644 --- a/core/capabilities/remote/target/server.go +++ b/core/capabilities/remote/target/server.go @@ -32,9 +32,12 @@ type server struct { workflowDONs map[uint32]commoncap.DON dispatcher types.Dispatcher - requestIDToRequest map[string]*request.ServerRequest + requestIDToRequest map[string]requestAndMsgID requestTimeout time.Duration + // Used to detect messages with the same message id but different payloads + messageIDToRequestIDsCount map[string]map[string]int + receiveLock sync.Mutex stopCh services.StopChan wg sync.WaitGroup @@ -43,6 +46,11 @@ type server struct { var _ types.Receiver = &server{} var _ services.Service = &server{} +type requestAndMsgID struct { + request *request.ServerRequest + messageID string +} + func NewServer(peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, capInfo commoncap.CapabilityInfo, localDonInfo commoncap.DON, workflowDONs map[uint32]commoncap.DON, dispatcher types.Dispatcher, requestTimeout time.Duration, lggr logger.Logger) *server { return &server{ @@ -53,8 +61,9 @@ func NewServer(peerID p2ptypes.PeerID, underlying commoncap.TargetCapability, ca workflowDONs: workflowDONs, dispatcher: dispatcher, - requestIDToRequest: map[string]*request.ServerRequest{}, - requestTimeout: requestTimeout, + requestIDToRequest: map[string]requestAndMsgID{}, + messageIDToRequestIDsCount: map[string]map[string]int{}, + requestTimeout: requestTimeout, lggr: lggr.Named("TargetServer"), stopCh: make(services.StopChan), @@ -96,12 +105,13 @@ func (r *server) expireRequests() { defer r.receiveLock.Unlock() for requestID, executeReq := range r.requestIDToRequest { - if executeReq.Expired() { - err := executeReq.Cancel(types.Error_TIMEOUT, "request expired") + if executeReq.request.Expired() { + err := executeReq.request.Cancel(types.Error_TIMEOUT, "request expired") if err != nil { r.lggr.Errorw("failed to cancel request", "request", executeReq, "err", err) } delete(r.requestIDToRequest, requestID) + delete(r.messageIDToRequestIDsCount, executeReq.messageID) } } } @@ -122,6 +132,19 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { hash := sha256.Sum256(msg.Payload) requestID := messageId + hex.EncodeToString(hash[:]) + if requestIDs, ok := r.messageIDToRequestIDsCount[messageId]; ok { + requestIDs[requestID] = requestIDs[requestID] + 1 + } else { + r.messageIDToRequestIDsCount[messageId] = map[string]int{requestID: 1} + } + + requestIDs := r.messageIDToRequestIDsCount[messageId] + if len(requestIDs) > 1 { + // This is a potential attack vector as well as a situation that will occur if the client is sending non-deterministic payloads + // so a warning is logged + r.lggr.Warnw("received messages with the same id and different payloads", "messageID", messageId, "requestIDToCount", requestIDs) + } + if _, ok := r.requestIDToRequest[requestID]; !ok { callingDon, ok := r.workflowDONs[msg.CallerDonId] if !ok { @@ -129,15 +152,18 @@ func (r *server) Receive(ctx context.Context, msg *types.MessageBody) { return } - r.requestIDToRequest[requestID] = request.NewServerRequest(r.underlying, r.capInfo.ID, r.localDonInfo.ID, r.peerID, - callingDon, messageId, r.dispatcher, r.requestTimeout, r.lggr) + r.requestIDToRequest[requestID] = requestAndMsgID{ + request: request.NewServerRequest(r.underlying, r.capInfo.ID, r.localDonInfo.ID, r.peerID, + callingDon, messageId, r.dispatcher, r.requestTimeout, r.lggr), + messageID: messageId, + } } - req := r.requestIDToRequest[requestID] + reqAndMsgID := r.requestIDToRequest[requestID] - err := req.OnMessage(ctx, msg) + err := reqAndMsgID.request.OnMessage(ctx, msg) if err != nil { - r.lggr.Errorw("request failed to OnMessage new message", "request", req, "err", err) + r.lggr.Errorw("request failed to OnMessage new message", "request", reqAndMsgID, "err", err) } }