Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcd backend not passing watcher errors up the stack, and errors not … #1337

Merged
merged 1 commit into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/backend/etcdv3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (wc *watcher) watchLoop() {
if kvps, err = wc.listCurrent(); err != nil {
log.Errorf("failed to list current with latest state: %v", err)
// Error considered as terminating error, hence terminate watcher.
wc.sendError(err)
return
}

Expand Down Expand Up @@ -125,6 +126,7 @@ func (wc *watcher) watchLoop() {
// A watch channel error is a terminating event, so exit the loop.
err := wres.Err()
log.WithError(err).Error("Watch channel error")
wc.sendError(err)
return
}
for _, e := range wres.Events {
Expand Down
31 changes: 7 additions & 24 deletions lib/backend/watchersyncer/watchercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,12 @@ mainLoop:
wc.handleWatchListEvent(kvp)
case api.WatchError:
// Handle a WatchError. This error triggered from upstream, all type
// of WatchError are treated equally,log the Error and trigger a full resync.

wc.logger.WithField("EventType", event.Type).Errorf("Watch error received from Upstream")
wc.onWaitForDatastore()
// of WatchError are treated equally,log the Error and trigger a full resync. We only log at info
// because errors may occur due to compaction causing revisions to no longer be valid - in this case
// we simply need to do a full resync.
wc.logger.WithError(event.Error).Infof("Watch error received from Upstream")
wc.currentWatchRevision = ""
wc.resyncAndCreateWatcher(ctx)

default:
// Unknown event type - not much we can do other than log.
wc.logger.WithField("EventType", event.Type).Errorf("Unknown event type received from the datastore")
Expand Down Expand Up @@ -172,7 +171,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
}

if performFullResync {
wc.logger.Debug("Full resync is required")
wc.logger.Info("Full resync is required")

// Notify the converter that we are resyncing.
if wc.resourceType.UpdateProcessor != nil {
Expand All @@ -185,7 +184,6 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
if err != nil {
// Failed to perform the list. Pause briefly (so we don't tight loop) and retry.
wc.logger.WithError(err).Info("Failed to perform list of current data during resync")
wc.onWaitForDatastore()
select {
case <-time.After(ListRetryInterval):
continue
Expand Down Expand Up @@ -242,11 +240,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
}

// We hit an error creating the Watch. Trigger a full resync.
// TODO We should be able to improve this by handling specific error cases with another
// watch retry. This would require some care to ensure the correct errors are captured
// for the different datastore drivers.
wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher")
wc.onWaitForDatastore()
performFullResync = true
continue
}
Expand All @@ -271,9 +265,8 @@ func (wc *watcherCache) cleanExistingWatcher() {
// We may also need to send deleted messages for old resources that were not validated in the
// resync (i.e. they must have since been deleted).
func (wc *watcherCache) finishResync() {
// If we haven't already sent an InSync event (or signalled the inverse by sending a WaitForDatastore event),
// then send a synced notification. The watcherSyncer will send a Synced event when it has received synced
// events from each cache.
// If we haven't already sent an InSync event then send a synced notification. The watcherSyncer will send a Synced
// event when it has received synced events from each cache. Once in-sync the cache remains in-sync.
if !wc.hasSynced {
wc.logger.Info("Sending synced update")
wc.results <- api.InSync
Expand Down Expand Up @@ -408,13 +401,3 @@ func (wc *watcherCache) markAsValid(resourceKey string) {
}
}
}

// If a syncer is in-sync state, the onWaitForDatastore method, sends WaitForDatastore event.
// See finishResync() for how the watcherCache goes back to in-sync.
func (wc *watcherCache) onWaitForDatastore() {
wc.logger.Debug("Send WaitforDatastore event if sync isn't in-sync")
if wc.hasSynced {
wc.results <- api.WaitForDatastore
wc.hasSynced = false
}
}
7 changes: 0 additions & 7 deletions lib/backend/watchersyncer/watchersyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,6 @@ func (ws *watcherSyncer) processResult(updates []api.Update, result interface{})
updates = ws.sendUpdates(updates)
ws.sendStatusUpdate(api.InSync)
}
} else if r == api.WaitForDatastore {
// If we received a WaitForDatastore from a watcherCache and we're in-sync or re-syncing, send a status
// update signalling that we're not in-sync.
if ws.status == api.InSync || ws.status == api.ResyncInProgress {
ws.sendStatusUpdate(api.WaitForDatastore)
}
ws.numSynced--
}
}

Expand Down
37 changes: 17 additions & 20 deletions lib/backend/watchersyncer/watchersyncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
rs.clientListResponse(r1, emptyList)
rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.clientWatchResponse(r1, genError)
rs.ExpectStatusUpdate(api.WaitForDatastore)

rs.ExpectStatusUnchanged()
rs.clientListResponse(r1, emptyList)
rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.ExpectStatusUnchanged()
//rs.ExpectStatusUpdate(api.InSync)
rs.clientWatchResponse(r1, nil)
rs.clientListResponse(r2, emptyList)
Expand Down Expand Up @@ -248,13 +247,13 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
rs.clientWatchResponse(r3, nil)
rs.sendEvent(r3, api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{Err: dsError},
Error: dsError,
})
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.ExpectStatusUnchanged()
rs.clientListResponse(r3, emptyList)
rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.ExpectStatusUnchanged()
rs.clientWatchResponse(r3, nil)
rs.ExpectStatusUpdate(api.InSync)
rs.ExpectStatusUnchanged()

// Watch fails, but gets created again immediately. This should happen without
// additional pauses.
Expand Down Expand Up @@ -291,9 +290,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
By("Syncing no results for resource 2, failing to create a watch, retrying successfully.")
rs.clientListResponse(r2, emptyList)
rs.clientWatchResponse(r2, genError)
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.ExpectStatusUnchanged()
rs.clientListResponse(r2, emptyList)
rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.ExpectStatusUnchanged()
rs.clientWatchResponse(r2, nil)
time.Sleep(130 * watchersyncer.WatchPollInterval / 100)
rs.expectAllEventsHandled()
Expand All @@ -308,13 +307,13 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
rs.clientWatchResponse(r3, nil)
rs.sendEvent(r3, api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{Err: dsError},
Error: dsError,
})
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.ExpectStatusUnchanged()
rs.clientListResponse(r3, emptyList)
rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.ExpectStatusUnchanged()
rs.clientWatchResponse(r3, nil)
rs.ExpectStatusUpdate(api.InSync)
rs.ExpectStatusUnchanged()
rs.clientWatchResponse(r3, nil)
// All events should be handled.
rs.expectAllEventsHandled()
Expand Down Expand Up @@ -378,7 +377,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
// The retry thread will be blocked for the watch poll interval.
rs.clientWatchResponse(r1, genError)
time.Sleep(watchersyncer.WatchPollInterval)
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.ExpectStatusUnchanged()

By("returning a sync list with one entry removed and a new one added")
rs.clientListResponse(r1, &model.KVPairList{
Expand All @@ -390,8 +389,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
},
})

rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.ExpectStatusUpdate(api.InSync)
rs.ExpectStatusUnchanged()

rs.clientWatchResponse(r1, nil)

Expand Down Expand Up @@ -433,18 +431,17 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() {
By("Failing the watch, and resyncing with another modified entry")
rs.sendEvent(r1, api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{Err: dsError},
Error: dsError,
})
rs.ExpectStatusUpdate(api.WaitForDatastore)
rs.ExpectStatusUnchanged()
rs.clientListResponse(r1, &model.KVPairList{
Revision: "12347",
KVPairs: []*model.KVPair{
eventL1Added3.New,
eventL1Modified4_2.New,
},
})
rs.ExpectStatusUpdate(api.ResyncInProgress)
rs.ExpectStatusUpdate(api.InSync)
rs.ExpectStatusUnchanged()

By("Expecting mod, delete, mod updates")
rs.ExpectUpdates([]api.Update{
Expand Down
10 changes: 0 additions & 10 deletions lib/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,6 @@ func UpdateErrorIdentifier(err error, id interface{}) error {
return err
}

// Error indicating the watcher has been terminated.
type ErrorWatchTerminated struct {
Err error
ClosedByRemote bool
}

func (e ErrorWatchTerminated) Error() string {
return fmt.Sprintf("watch terminated (closedByRemote:%v): %v", e.ClosedByRemote, e.Err)
}

// Error indicating the datastore has failed to parse an entry.
type ErrorParsingDatastoreEntry struct {
RawKey string
Expand Down