Skip to content

Commit

Permalink
Merge pull request #9772 from hashicorp/streamin-fix-bad-cached-snapshot
Browse files Browse the repository at this point in the history
streaming: fix snapshot cache bug
  • Loading branch information
dnephin authored and dizzyup committed Apr 21, 2021
1 parent d2758e6 commit e8cdc33
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 18 deletions.
4 changes: 4 additions & 0 deletions .changelog/9772.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
streaming: fixes a bug caused by caching an incorrect snapshot, that would cause clients
to error until the cache expired.
```
35 changes: 17 additions & 18 deletions agent/consul/stream/event_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
}

snapFromCache := e.getCachedSnapshotLocked(req)
if req.Index == 0 && snapFromCache != nil {
return e.subscriptions.add(req, snapFromCache.First), nil
if snapFromCache == nil {
snap := newEventSnapshot()
snap.appendAndSplice(*req, handler, topicHead)
e.setCachedSnapshotLocked(req, snap)
snapFromCache = snap
}
snap := newEventSnapshot()

// if the request has an Index the client view is stale and must be reset
// with a NewSnapshotToFollow event.
if req.Index > 0 {
snap.buffer.Append([]Event{{
Topic: req.Topic,
Payload: newSnapshotToFollow{},
}})

if snapFromCache != nil {
snap.buffer.AppendItem(snapFromCache.First)
return e.subscriptions.add(req, snap.First), nil
}
// If the request.Index is 0 the client has no view, send a full snapshot.
if req.Index == 0 {
return e.subscriptions.add(req, snapFromCache.First), nil
}

snap.appendAndSplice(*req, handler, topicHead)
e.setCachedSnapshotLocked(req, snap)
return e.subscriptions.add(req, snap.First), nil
// otherwise the request has an Index, the client view is stale and must be reset
// with a NewSnapshotToFollow event.
result := newEventSnapshot()
result.buffer.Append([]Event{{
Topic: req.Topic,
Payload: newSnapshotToFollow{},
}})
result.buffer.AppendItem(snapFromCache.First)
return e.subscriptions.add(req, result.First), nil
}

func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
Expand Down
75 changes: 75 additions & 0 deletions agent/consul/stream/event_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,81 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
})
}

func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
Index: 1,
}

nextEvent := Event{
Topic: testTopic,
Index: 3,
Payload: simplePayload{key: "sub-key", value: "event-3"},
}

handlers := SnapshotHandlers{
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
if req.Topic != testTopic {
return 0, fmt.Errorf("unexpected topic: %v", req.Topic)
}
buf.Append([]Event{testSnapshotEvent})
buf.Append([]Event{nextEvent})
return 3, nil
},
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

publisher := NewEventPublisher(handlers, time.Second)
go publisher.Run(ctx)
// Include the same events in the topicBuffer
publisher.publishEvent([]Event{testSnapshotEvent})
publisher.publishEvent([]Event{nextEvent})

runStep(t, "start a subscription and unsub", func(t *testing.T) {
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()

eventCh := runSubscription(ctx, sub)
next := getNextEvent(t, eventCh)
require.True(t, next.IsNewSnapshotToFollow(), next)

next = getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)

next = getNextEvent(t, eventCh)
require.Equal(t, nextEvent, next)

next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot(), next)
require.Equal(t, uint64(3), next.Index)
})

publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
}

runStep(t, "resume the subscription", func(t *testing.T) {
newReq := *req
newReq.Index = 0
sub, err := publisher.Subscribe(&newReq)
require.NoError(t, err)

eventCh := runSubscription(ctx, sub)
next := getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)

next = getNextEvent(t, eventCh)
require.Equal(t, nextEvent, next)

next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
})
}

func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
Expand Down

0 comments on commit e8cdc33

Please sign in to comment.