diff --git a/lib/backend/etcdv3/watcher.go b/lib/backend/etcdv3/watcher.go index 7d2eea0c5..6ec723c79 100644 --- a/lib/backend/etcdv3/watcher.go +++ b/lib/backend/etcdv3/watcher.go @@ -98,6 +98,7 @@ func (wc *watcher) watchLoop() { if kvps, err = wc.listCurrent(); err != nil { log.Errorf("failed to list current with latest state: %v", err) // Error considered as terminating error, hence terminate watcher. + wc.sendError(err) return } @@ -125,6 +126,7 @@ func (wc *watcher) watchLoop() { // A watch channel error is a terminating event, so exit the loop. err := wres.Err() log.WithError(err).Error("Watch channel error") + wc.sendError(err) return } for _, e := range wres.Events { diff --git a/lib/backend/watchersyncer/watchercache.go b/lib/backend/watchersyncer/watchercache.go index a398695d9..fb836197c 100644 --- a/lib/backend/watchersyncer/watchercache.go +++ b/lib/backend/watchersyncer/watchercache.go @@ -115,13 +115,12 @@ mainLoop: wc.handleWatchListEvent(kvp) case api.WatchError: // Handle a WatchError. This error triggered from upstream, all type - // of WatchError are treated equally,log the Error and trigger a full resync. - - wc.logger.WithField("EventType", event.Type).Errorf("Watch error received from Upstream") - wc.onWaitForDatastore() + // of WatchError are treated equally,log the Error and trigger a full resync. We only log at info + // because errors may occur due to compaction causing revisions to no longer be valid - in this case + // we simply need to do a full resync. + wc.logger.WithError(event.Error).Infof("Watch error received from Upstream") wc.currentWatchRevision = "" wc.resyncAndCreateWatcher(ctx) - default: // Unknown event type - not much we can do other than log. wc.logger.WithField("EventType", event.Type).Errorf("Unknown event type received from the datastore") @@ -172,7 +171,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { } if performFullResync { - wc.logger.Debug("Full resync is required") + wc.logger.Info("Full resync is required") // Notify the converter that we are resyncing. if wc.resourceType.UpdateProcessor != nil { @@ -185,7 +184,6 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { if err != nil { // Failed to perform the list. Pause briefly (so we don't tight loop) and retry. wc.logger.WithError(err).Info("Failed to perform list of current data during resync") - wc.onWaitForDatastore() select { case <-time.After(ListRetryInterval): continue @@ -242,11 +240,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { } // We hit an error creating the Watch. Trigger a full resync. - // TODO We should be able to improve this by handling specific error cases with another - // watch retry. This would require some care to ensure the correct errors are captured - // for the different datastore drivers. wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher") - wc.onWaitForDatastore() performFullResync = true continue } @@ -271,9 +265,8 @@ func (wc *watcherCache) cleanExistingWatcher() { // We may also need to send deleted messages for old resources that were not validated in the // resync (i.e. they must have since been deleted). func (wc *watcherCache) finishResync() { - // If we haven't already sent an InSync event (or signalled the inverse by sending a WaitForDatastore event), - // then send a synced notification. The watcherSyncer will send a Synced event when it has received synced - // events from each cache. + // If we haven't already sent an InSync event then send a synced notification. The watcherSyncer will send a Synced + // event when it has received synced events from each cache. Once in-sync the cache remains in-sync. if !wc.hasSynced { wc.logger.Info("Sending synced update") wc.results <- api.InSync @@ -408,13 +401,3 @@ func (wc *watcherCache) markAsValid(resourceKey string) { } } } - -// If a syncer is in-sync state, the onWaitForDatastore method, sends WaitForDatastore event. -// See finishResync() for how the watcherCache goes back to in-sync. -func (wc *watcherCache) onWaitForDatastore() { - wc.logger.Debug("Send WaitforDatastore event if sync isn't in-sync") - if wc.hasSynced { - wc.results <- api.WaitForDatastore - wc.hasSynced = false - } -} diff --git a/lib/backend/watchersyncer/watchersyncer.go b/lib/backend/watchersyncer/watchersyncer.go index 1b01a23dc..00f568ea2 100644 --- a/lib/backend/watchersyncer/watchersyncer.go +++ b/lib/backend/watchersyncer/watchersyncer.go @@ -222,13 +222,6 @@ func (ws *watcherSyncer) processResult(updates []api.Update, result interface{}) updates = ws.sendUpdates(updates) ws.sendStatusUpdate(api.InSync) } - } else if r == api.WaitForDatastore { - // If we received a WaitForDatastore from a watcherCache and we're in-sync or re-syncing, send a status - // update signalling that we're not in-sync. - if ws.status == api.InSync || ws.status == api.ResyncInProgress { - ws.sendStatusUpdate(api.WaitForDatastore) - } - ws.numSynced-- } } diff --git a/lib/backend/watchersyncer/watchersyncer_test.go b/lib/backend/watchersyncer/watchersyncer_test.go index 201967ef3..a50e45bd7 100644 --- a/lib/backend/watchersyncer/watchersyncer_test.go +++ b/lib/backend/watchersyncer/watchersyncer_test.go @@ -174,10 +174,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.clientListResponse(r1, emptyList) rs.ExpectStatusUpdate(api.ResyncInProgress) rs.clientWatchResponse(r1, genError) - rs.ExpectStatusUpdate(api.WaitForDatastore) - + rs.ExpectStatusUnchanged() rs.clientListResponse(r1, emptyList) - rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUnchanged() //rs.ExpectStatusUpdate(api.InSync) rs.clientWatchResponse(r1, nil) rs.clientListResponse(r2, emptyList) @@ -248,13 +247,13 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.clientWatchResponse(r3, nil) rs.sendEvent(r3, api.WatchEvent{ Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{Err: dsError}, + Error: dsError, }) - rs.ExpectStatusUpdate(api.WaitForDatastore) + rs.ExpectStatusUnchanged() rs.clientListResponse(r3, emptyList) - rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUnchanged() rs.clientWatchResponse(r3, nil) - rs.ExpectStatusUpdate(api.InSync) + rs.ExpectStatusUnchanged() // Watch fails, but gets created again immediately. This should happen without // additional pauses. @@ -291,9 +290,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { By("Syncing no results for resource 2, failing to create a watch, retrying successfully.") rs.clientListResponse(r2, emptyList) rs.clientWatchResponse(r2, genError) - rs.ExpectStatusUpdate(api.WaitForDatastore) + rs.ExpectStatusUnchanged() rs.clientListResponse(r2, emptyList) - rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUnchanged() rs.clientWatchResponse(r2, nil) time.Sleep(130 * watchersyncer.WatchPollInterval / 100) rs.expectAllEventsHandled() @@ -308,13 +307,13 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.clientWatchResponse(r3, nil) rs.sendEvent(r3, api.WatchEvent{ Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{Err: dsError}, + Error: dsError, }) - rs.ExpectStatusUpdate(api.WaitForDatastore) + rs.ExpectStatusUnchanged() rs.clientListResponse(r3, emptyList) - rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUnchanged() rs.clientWatchResponse(r3, nil) - rs.ExpectStatusUpdate(api.InSync) + rs.ExpectStatusUnchanged() rs.clientWatchResponse(r3, nil) // All events should be handled. rs.expectAllEventsHandled() @@ -378,7 +377,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { // The retry thread will be blocked for the watch poll interval. rs.clientWatchResponse(r1, genError) time.Sleep(watchersyncer.WatchPollInterval) - rs.ExpectStatusUpdate(api.WaitForDatastore) + rs.ExpectStatusUnchanged() By("returning a sync list with one entry removed and a new one added") rs.clientListResponse(r1, &model.KVPairList{ @@ -390,8 +389,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { }, }) - rs.ExpectStatusUpdate(api.ResyncInProgress) - rs.ExpectStatusUpdate(api.InSync) + rs.ExpectStatusUnchanged() rs.clientWatchResponse(r1, nil) @@ -433,9 +431,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { By("Failing the watch, and resyncing with another modified entry") rs.sendEvent(r1, api.WatchEvent{ Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{Err: dsError}, + Error: dsError, }) - rs.ExpectStatusUpdate(api.WaitForDatastore) + rs.ExpectStatusUnchanged() rs.clientListResponse(r1, &model.KVPairList{ Revision: "12347", KVPairs: []*model.KVPair{ @@ -443,8 +441,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { eventL1Modified4_2.New, }, }) - rs.ExpectStatusUpdate(api.ResyncInProgress) - rs.ExpectStatusUpdate(api.InSync) + rs.ExpectStatusUnchanged() By("Expecting mod, delete, mod updates") rs.ExpectUpdates([]api.Update{ diff --git a/lib/errors/errors.go b/lib/errors/errors.go index 12f99d533..edea40db2 100644 --- a/lib/errors/errors.go +++ b/lib/errors/errors.go @@ -171,16 +171,6 @@ func UpdateErrorIdentifier(err error, id interface{}) error { return err } -// Error indicating the watcher has been terminated. -type ErrorWatchTerminated struct { - Err error - ClosedByRemote bool -} - -func (e ErrorWatchTerminated) Error() string { - return fmt.Sprintf("watch terminated (closedByRemote:%v): %v", e.ClosedByRemote, e.Err) -} - // Error indicating the datastore has failed to parse an entry. type ErrorParsingDatastoreEntry struct { RawKey string