Skip to content

Commit

Permalink
etcd backend not passing watcher errors up the stack, and errors not …
Browse files Browse the repository at this point in the history
…triggering full resync in watcher cache
  • Loading branch information
Rob Brockbank committed Oct 30, 2020
1 parent 728c26d commit eddd417
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 44 deletions.
2 changes: 2 additions & 0 deletions lib/backend/etcdv3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (wc *watcher) watchLoop() {
if kvps, err = wc.listCurrent(); err != nil {
log.Errorf("failed to list current with latest state: %v", err)
// Error considered as terminating error, hence terminate watcher.
wc.sendError(err)
return
}

Expand Down Expand Up @@ -125,6 +126,7 @@ func (wc *watcher) watchLoop() {
// A watch channel error is a terminating event, so exit the loop.
err := wres.Err()
log.WithError(err).Error("Watch channel error")
wc.sendError(err)
return
}
for _, e := range wres.Events {
Expand Down
31 changes: 7 additions & 24 deletions lib/backend/watchersyncer/watchercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,12 @@ mainLoop:
wc.handleWatchListEvent(kvp)
case api.WatchError:
// Handle a WatchError. This error triggered from upstream, all type
// of WatchError are treated equally,log the Error and trigger a full resync.

wc.logger.WithField("EventType", event.Type).Errorf("Watch error received from Upstream")
wc.onWaitForDatastore()
// of WatchError are treated equally,log the Error and trigger a full resync. We only log at info
// because errors may occur due to compaction causing revisions to no longer be valid - in this case
// we simply need to do a full resync.
wc.logger.WithError(event.Error).Infof("Watch error received from Upstream")
wc.currentWatchRevision = ""
wc.resyncAndCreateWatcher(ctx)

default:
// Unknown event type - not much we can do other than log.
wc.logger.WithField("EventType", event.Type).Errorf("Unknown event type received from the datastore")
Expand Down Expand Up @@ -172,7 +171,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
}

if performFullResync {
wc.logger.Debug("Full resync is required")
wc.logger.Info("Full resync is required")

// Notify the converter that we are resyncing.
if wc.resourceType.UpdateProcessor != nil {
Expand All @@ -185,7 +184,6 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
if err != nil {
// Failed to perform the list. Pause briefly (so we don't tight loop) and retry.
wc.logger.WithError(err).Info("Failed to perform list of current data during resync")
wc.onWaitForDatastore()
select {
case <-time.After(ListRetryInterval):
continue
Expand Down Expand Up @@ -242,11 +240,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
}

// We hit an error creating the Watch. Trigger a full resync.
// TODO We should be able to improve this by handling specific error cases with another
// watch retry. This would require some care to ensure the correct errors are captured
// for the different datastore drivers.
wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher")
wc.onWaitForDatastore()
performFullResync = true
continue
}
Expand All @@ -271,9 +265,8 @@ func (wc *watcherCache) cleanExistingWatcher() {
// We may also need to send deleted messages for old resources that were not validated in the
// resync (i.e. they must have since been deleted).
func (wc *watcherCache) finishResync() {
// If we haven't already sent an InSync event (or signalled the inverse by sending a WaitForDatastore event),
// then send a synced notification. The watcherSyncer will send a Synced event when it has received synced
// events from each cache.
// If we haven't already sent an InSync event then send a synced notification. The watcherSyncer will send a Synced
// event when it has received synced events from each cache. Once in-sync the cache remains in-sync.
if !wc.hasSynced {
wc.logger.Info("Sending synced update")
wc.results <- api.InSync
Expand Down Expand Up @@ -408,13 +401,3 @@ func (wc *watcherCache) markAsValid(resourceKey string) {
}
}
}

// If a syncer is in-sync state, the onWaitForDatastore method, sends WaitForDatastore event.
// See finishResync() for how the watcherCache goes back to in-sync.
func (wc *watcherCache) onWaitForDatastore() {
wc.logger.Debug("Send WaitforDatastore event if sync isn't in-sync")
if wc.hasSynced {
wc.results <- api.WaitForDatastore
wc.hasSynced = false
}
}
7 changes: 0 additions & 7 deletions lib/backend/watchersyncer/watchersyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ func (ws *watcherSyncer) processResult(updates []api.Update, result interface{})
updates = ws.sendUpdates(updates)
ws.sendStatusUpdate(api.InSync)
}
} else if r == api.WaitForDatastore {
// If we received a WaitForDatastore from a watcherCache and we're in-sync or re-syncing, send a status
// update signalling that we're not in-sync.
if ws.status == api.InSync || ws.status == api.ResyncInProgress {
ws.sendStatusUpdate(api.WaitForDatastore)
}
ws.numSynced--
}
}

Expand Down
6 changes: 3 additions & 3 deletions lib/backend/watchersyncer/watchersyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
rs.clientWatchResponse(r3, nil)
rs.sendEvent(r3, api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{Err: dsError},
Error: dsError,
})
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.clientListResponse(r3, emptyList)
Expand Down Expand Up @@ -308,7 +308,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
rs.clientWatchResponse(r3, nil)
rs.sendEvent(r3, api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{Err: dsError},
Error: dsError,
})
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.clientListResponse(r3, emptyList)
Expand Down Expand Up @@ -433,7 +433,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
By("Failing the watch, and resyncing with another modified entry")
rs.sendEvent(r1, api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{Err: dsError},
Error: dsError,
})
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.clientListResponse(r1, &model.KVPairList{
Expand Down
10 changes: 0 additions & 10 deletions lib/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,6 @@ func UpdateErrorIdentifier(err error, id interface{}) error {
return err
}

// Error indicating the watcher has been terminated.
type ErrorWatchTerminated struct {
Err error
ClosedByRemote bool
}

func (e ErrorWatchTerminated) Error() string {
return fmt.Sprintf("watch terminated (closedByRemote:%v): %v", e.ClosedByRemote, e.Err)
}

// Error indicating the datastore has failed to parse an entry.
type ErrorParsingDatastoreEntry struct {
RawKey string
Expand Down

0 comments on commit eddd417

Please sign in to comment.