From cb4343df96920a2bc375076d0c145c5426878b1b Mon Sep 17 00:00:00 2001 From: Doctor Vince Date: Wed, 4 Dec 2024 15:41:45 -0500 Subject: [PATCH] handle cases where a message redelivery can corrupt running state (#47) --- controller_nats.go | 40 +++++++++++++++++----------- status.go | 35 +++++++++++++++++++----- status_test.go | 66 +++++++++++++++++++--------------------------- 3 files changed, 80 insertions(+), 61 deletions(-) diff --git a/controller_nats.go b/controller_nats.go index 13f4032..2f1613c 100644 --- a/controller_nats.go +++ b/controller_nats.go @@ -373,27 +373,35 @@ func (n *NatsController) processConditionFromEvent(ctx context.Context, msg even n.stream.(*events.NatsJetstream), n.logger, ) - if err != nil { - msg := "publisher init failed" - n.logger.WithError(err).WithFields(logrus.Fields{ - "conditionID": cond.ID.String(), - }).Error(msg) - - // send this message back on the bus to be redelivered, or atleast attempt to. - eventAcknowleger.nak() - metricsEventsCounter(true, "nack") - spanEvent( - span, - cond, - n.ID(), - fmt.Sprintf("sent nack, info: %s, err: %s", msg, err.Error()), - ) + if err == nil { + n.processCondition(ctx, cond, publisher, eventAcknowleger) + return + } + if errors.Is(err, errInProgress) { + n.logger.WithError(err). + WithFields(logrus.Fields{ + "conditionID": cond.ID.String(), + }).Warn("this controller is already handling the event") + eventAcknowleger.inProgress() return } - n.processCondition(ctx, cond, publisher, eventAcknowleger) + n.logger.WithError(err).WithFields(logrus.Fields{ + "conditionID": cond.ID.String(), + }).Error("failed to create NATS publisher") + + // send this message back on the bus to be redelivered, or atleast attempt to. + eventAcknowleger.nak() + + metricsEventsCounter(true, "nack") + spanEvent( + span, + cond, + n.ID(), + fmt.Sprintf("sent nack, info: %s", err.Error()), + ) } func conditionFromEvent(e events.Message) (*condition.Condition, error) { diff --git a/status.go b/status.go index 8a5a8ab..f726e91 100644 --- a/status.go +++ b/status.go @@ -23,12 +23,14 @@ import ( ) var ( - kvTTL = 10 * 24 * time.Hour - errKV = errors.New("unable to bind to status KV bucket") - errGetKey = errors.New("error fetching existing key, value for update") - errUnmarshalKey = errors.New("error unmarshal key, value for update") - errStatusValue = errors.New("condition status value error") - errStatusPublish = errors.New("condition status publish error") + kvTTL = 10 * 24 * time.Hour + errKV = errors.New("unable to bind to status KV bucket") + errGetKey = errors.New("error fetching existing key, value for update") + errUnmarshalKey = errors.New("error unmarshal key, value for update") + errStatusValue = errors.New("condition status value error") + errStatusPublish = errors.New("condition status publish error") + errOtherController = errors.New("condition being handled by another controller") + errInProgress = errors.New("condition is in progress") ) // ConditionStatusPublisher defines an interface for publishing status updates for conditions. @@ -69,6 +71,27 @@ func NewNatsConditionStatusPublisher( return nil, errors.Wrap(errKV, err.Error()) } + // before we return, check the status (if any) of this condition and bail if the + // condition is in progress somewhere else + key := condition.StatusValueKVKey(facilityCode, conditionID) + entry, err := statusKV.Get(key) + + switch err { + case nil: + existing := &condition.StatusValue{} + if err := json.Unmarshal(entry.Value(), existing); err != nil { + return nil, fmt.Errorf("%w: %s", errStatusValue, err.Error()) + } + if existing.WorkerID != controllerID.String() { + return nil, fmt.Errorf("%w: %s", errOtherController, existing.WorkerID) + } + return nil, fmt.Errorf("%w: last update %s", errInProgress, existing.UpdatedAt.String()) + case nats.ErrKeyNotFound: + // no existing state, we can proceed + default: + return nil, fmt.Errorf("%w: %s", errGetKey, err.Error()) + } + return &NatsConditionStatusPublisher{ facilityCode: facilityCode, conditionID: conditionID, diff --git a/status_test.go b/status_test.go index b5cfe29..87b60cc 100644 --- a/status_test.go +++ b/status_test.go @@ -83,7 +83,7 @@ func TestNewNatsConditionStatusPublisher(t *testing.T) { logger: logrus.New(), } - // test happy case + // test happy case -- no existing status p, err := NewNatsConditionStatusPublisher( "test", cond.ID.String(), @@ -95,62 +95,50 @@ func TestNewNatsConditionStatusPublisher(t *testing.T) { controller.logger, ) - publisher, ok := p.(*NatsConditionStatusPublisher) - assert.True(t, ok) + require.NotNil(t, p, "publisher constructor") + require.NoError(t, err) - require.Nil(t, err) - require.NotNil(t, publisher, "publisher constructor") + publisher := p.(*NatsConditionStatusPublisher) assert.Equal(t, controller.facilityCode, publisher.facilityCode) assert.Equal(t, cond.ID.String(), publisher.conditionID) assert.Equal(t, controller.logger, publisher.log) - // Test re-initialized publisher will set lastRev to KV revision and subsequent publishes work + // Write a status value to the KV to simulate a condition in progress serverID := uuid.New() - require.NotPanics(t, - func() { - errP := publisher.Publish( - context.Background(), - serverID.String(), - condition.Pending, - []byte(`{"pending": "true"}`), - false, - ) - require.NoError(t, errP) - }, - "publish 1", - ) + errP := publisher.Publish(context.Background(), serverID.String(), condition.Pending, + []byte(`{"pending": "true"}`), false) - p, err = NewNatsConditionStatusPublisher( + require.NoError(t, errP, "writing status") + + // create a new publisher with a different controller id, expect failure + otherID := registry.GetID("other") + _, err = NewNatsConditionStatusPublisher( "test", cond.ID.String(), facilityCode, cond.Kind, - controllerID, + otherID, 0, evJS, controller.logger, ) + require.Error(t, err) + require.ErrorIs(t, err, errOtherController, "new controller id") - publisher, ok = p.(*NatsConditionStatusPublisher) - assert.True(t, ok) - - require.Nil(t, err) - require.NotNil(t, publisher, "publisher constructor") - - require.NotPanics(t, - func() { - errP := publisher.Publish( - context.Background(), - serverID.String(), - condition.Active, - []byte(`{"some work...": "true"}`), - false, - ) - require.NoError(t, errP) - }, - "publish 2", + // create a new publisher with our own controller id, expect failure to avoid having multiple go-routines racing + _, err = NewNatsConditionStatusPublisher( + "test", + cond.ID.String(), + facilityCode, + cond.Kind, + controllerID, + 0, + evJS, + controller.logger, ) + require.Error(t, err) + require.ErrorIs(t, err, errInProgress, "same controller id") } func TestPublish(t *testing.T) {