Skip to content

Commit

Permalink
handle cases where a message redelivery can corrupt running state (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
DoctorVin authored Dec 4, 2024
1 parent d13d83d commit cb4343d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 61 deletions.
40 changes: 24 additions & 16 deletions controller_nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,27 +373,35 @@ func (n *NatsController) processConditionFromEvent(ctx context.Context, msg even
n.stream.(*events.NatsJetstream),
n.logger,
)
if err != nil {
msg := "publisher init failed"
n.logger.WithError(err).WithFields(logrus.Fields{
"conditionID": cond.ID.String(),
}).Error(msg)

// send this message back on the bus to be redelivered, or atleast attempt to.
eventAcknowleger.nak()

metricsEventsCounter(true, "nack")
spanEvent(
span,
cond,
n.ID(),
fmt.Sprintf("sent nack, info: %s, err: %s", msg, err.Error()),
)
if err == nil {
n.processCondition(ctx, cond, publisher, eventAcknowleger)
return
}

if errors.Is(err, errInProgress) {
n.logger.WithError(err).
WithFields(logrus.Fields{
"conditionID": cond.ID.String(),
}).Warn("this controller is already handling the event")
eventAcknowleger.inProgress()
return
}

n.processCondition(ctx, cond, publisher, eventAcknowleger)
n.logger.WithError(err).WithFields(logrus.Fields{
"conditionID": cond.ID.String(),
}).Error("failed to create NATS publisher")

// send this message back on the bus to be redelivered, or atleast attempt to.
eventAcknowleger.nak()

metricsEventsCounter(true, "nack")
spanEvent(
span,
cond,
n.ID(),
fmt.Sprintf("sent nack, info: %s", err.Error()),
)
}

func conditionFromEvent(e events.Message) (*condition.Condition, error) {
Expand Down
35 changes: 29 additions & 6 deletions status.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
)

var (
kvTTL = 10 * 24 * time.Hour
errKV = errors.New("unable to bind to status KV bucket")
errGetKey = errors.New("error fetching existing key, value for update")
errUnmarshalKey = errors.New("error unmarshal key, value for update")
errStatusValue = errors.New("condition status value error")
errStatusPublish = errors.New("condition status publish error")
kvTTL = 10 * 24 * time.Hour
errKV = errors.New("unable to bind to status KV bucket")
errGetKey = errors.New("error fetching existing key, value for update")
errUnmarshalKey = errors.New("error unmarshal key, value for update")
errStatusValue = errors.New("condition status value error")
errStatusPublish = errors.New("condition status publish error")
errOtherController = errors.New("condition being handled by another controller")
errInProgress = errors.New("condition is in progress")
)

// ConditionStatusPublisher defines an interface for publishing status updates for conditions.
Expand Down Expand Up @@ -69,6 +71,27 @@ func NewNatsConditionStatusPublisher(
return nil, errors.Wrap(errKV, err.Error())
}

// before we return, check the status (if any) of this condition and bail if the
// condition is in progress somewhere else
key := condition.StatusValueKVKey(facilityCode, conditionID)
entry, err := statusKV.Get(key)

switch err {
case nil:
existing := &condition.StatusValue{}
if err := json.Unmarshal(entry.Value(), existing); err != nil {
return nil, fmt.Errorf("%w: %s", errStatusValue, err.Error())
}
if existing.WorkerID != controllerID.String() {
return nil, fmt.Errorf("%w: %s", errOtherController, existing.WorkerID)
}
return nil, fmt.Errorf("%w: last update %s", errInProgress, existing.UpdatedAt.String())
case nats.ErrKeyNotFound:
// no existing state, we can proceed
default:
return nil, fmt.Errorf("%w: %s", errGetKey, err.Error())
}

return &NatsConditionStatusPublisher{
facilityCode: facilityCode,
conditionID: conditionID,
Expand Down
66 changes: 27 additions & 39 deletions status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestNewNatsConditionStatusPublisher(t *testing.T) {
logger: logrus.New(),
}

// test happy case
// test happy case -- no existing status
p, err := NewNatsConditionStatusPublisher(
"test",
cond.ID.String(),
Expand All @@ -95,62 +95,50 @@ func TestNewNatsConditionStatusPublisher(t *testing.T) {
controller.logger,
)

publisher, ok := p.(*NatsConditionStatusPublisher)
assert.True(t, ok)
require.NotNil(t, p, "publisher constructor")
require.NoError(t, err)

require.Nil(t, err)
require.NotNil(t, publisher, "publisher constructor")
publisher := p.(*NatsConditionStatusPublisher)

assert.Equal(t, controller.facilityCode, publisher.facilityCode)
assert.Equal(t, cond.ID.String(), publisher.conditionID)
assert.Equal(t, controller.logger, publisher.log)

// Test re-initialized publisher will set lastRev to KV revision and subsequent publishes work
// Write a status value to the KV to simulate a condition in progress
serverID := uuid.New()
require.NotPanics(t,
func() {
errP := publisher.Publish(
context.Background(),
serverID.String(),
condition.Pending,
[]byte(`{"pending": "true"}`),
false,
)
require.NoError(t, errP)
},
"publish 1",
)
errP := publisher.Publish(context.Background(), serverID.String(), condition.Pending,
[]byte(`{"pending": "true"}`), false)

p, err = NewNatsConditionStatusPublisher(
require.NoError(t, errP, "writing status")

// create a new publisher with a different controller id, expect failure
otherID := registry.GetID("other")
_, err = NewNatsConditionStatusPublisher(
"test",
cond.ID.String(),
facilityCode,
cond.Kind,
controllerID,
otherID,
0,
evJS,
controller.logger,
)
require.Error(t, err)
require.ErrorIs(t, err, errOtherController, "new controller id")

publisher, ok = p.(*NatsConditionStatusPublisher)
assert.True(t, ok)

require.Nil(t, err)
require.NotNil(t, publisher, "publisher constructor")

require.NotPanics(t,
func() {
errP := publisher.Publish(
context.Background(),
serverID.String(),
condition.Active,
[]byte(`{"some work...": "true"}`),
false,
)
require.NoError(t, errP)
},
"publish 2",
// create a new publisher with our own controller id, expect failure to avoid having multiple go-routines racing
_, err = NewNatsConditionStatusPublisher(
"test",
cond.ID.String(),
facilityCode,
cond.Kind,
controllerID,
0,
evJS,
controller.logger,
)
require.Error(t, err)
require.ErrorIs(t, err, errInProgress, "same controller id")
}

func TestPublish(t *testing.T) {
Expand Down

0 comments on commit cb4343d

Please sign in to comment.