From 5f4021d242f8206d62560e74b8d47439e9afb2a2 Mon Sep 17 00:00:00 2001 From: Gobinath Krishnamoorthy Date: Sat, 18 Apr 2020 13:09:08 -0700 Subject: [PATCH] Handle backend watch, if upstream closes channel --- lib/backend/api/api.go | 1 - lib/backend/etcdv3/watcher.go | 17 +- lib/backend/k8s/resources/networkpolicy.go | 42 ++--- lib/backend/k8s/resources/profile.go | 44 +---- lib/backend/k8s/resources/watcher.go | 31 +--- lib/backend/watchersyncer/watchercache.go | 64 ++----- .../watchersyncer/watchersyncer_test.go | 175 +++--------------- 7 files changed, 77 insertions(+), 297 deletions(-) diff --git a/lib/backend/api/api.go b/lib/backend/api/api.go index a8e0a0a44..d12af1eed 100644 --- a/lib/backend/api/api.go +++ b/lib/backend/api/api.go @@ -178,7 +178,6 @@ type WatchInterface interface { // Returns a chan which will receive all the events. This channel is closed when: // - Stop() is called, or - // - An error of type errors.ErrorWatchTerminated is received. // In both cases the watcher will be cleaned up, and the client should stop receiving // from this channel. ResultChan() <-chan WatchEvent diff --git a/lib/backend/etcdv3/watcher.go b/lib/backend/etcdv3/watcher.go index 76e409cdf..5fd3af566 100644 --- a/lib/backend/etcdv3/watcher.go +++ b/lib/backend/etcdv3/watcher.go @@ -16,7 +16,6 @@ package etcdv3 import ( "context" - goerrors "errors" "strconv" "sync/atomic" @@ -25,7 +24,6 @@ import ( "github.com/projectcalico/libcalico-go/lib/backend/api" "github.com/projectcalico/libcalico-go/lib/backend/model" - "github.com/projectcalico/libcalico-go/lib/errors" ) const ( @@ -99,7 +97,7 @@ func (wc *watcher) watchLoop() { var err error if kvps, err = wc.listCurrent(); err != nil { log.Errorf("failed to list current with latest state: %v", err) - wc.sendError(err, true) + // Error considered as terminating error, hence terminate watcher. return } @@ -127,7 +125,6 @@ func (wc *watcher) watchLoop() { // A watch channel error is a terminating event, so exit the loop. err := wres.Err() log.WithError(err).Error("Watch channel error") - wc.sendError(err, true) return } for _, e := range wres.Events { @@ -137,15 +134,13 @@ func (wc *watcher) watchLoop() { if ae, err := convertWatchEvent(e, wc.list); ae != nil { wc.sendEvent(ae) } else if err != nil { - wc.sendError(err, false) + wc.sendError(err) } } } // If we exit the loop, it means the watcher has closed for some reason. - // Bubble this up as a watch termination error. log.Warn("etcdv3 watch channel closed") - wc.sendError(goerrors.New("etcdv3 watch channel closed"), true) } // listCurrent retrieves the existing entries. @@ -205,7 +200,7 @@ func (wc *watcher) terminateWatcher() { } // sendError packages up the error as an event and sends it in the results channel. -func (wc *watcher) sendError(err error, terminating bool) { +func (wc *watcher) sendError(err error) { // The response from etcd commands may include a context.Canceled error if the context // was cancelled before completion. Since with our Watcher we don't include that as // an error type skip over the Canceled error, the error processing in the main @@ -214,12 +209,6 @@ func (wc *watcher) sendError(err error, terminating bool) { return } - // If this is a terminating error, wrap the error up in an errors.ErrorWatchTerminated - // error type. - if terminating { - err = errors.ErrorWatchTerminated{Err: err} - } - // Wrap the error up in a WatchEvent and use sendEvent to send it. errEvent := &api.WatchEvent{ Type: api.WatchError, diff --git a/lib/backend/k8s/resources/networkpolicy.go b/lib/backend/k8s/resources/networkpolicy.go index efc801b00..afd30f2d9 100644 --- a/lib/backend/k8s/resources/networkpolicy.go +++ b/lib/backend/k8s/resources/networkpolicy.go @@ -437,32 +437,18 @@ func (npw *networkPolicyWatcher) processNPEvents() { select { case e, ok = <-npw.crdNPWatch.ResultChan(): if !ok { - // We shouldn't get a closed channel without first getting a terminating error, - // so write a warning log and convert to a termination error. - log.Warn("Calico NP channel closed") - e = api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - ClosedByRemote: true, - Err: errors.New("Calico NP watch channel closed"), - }, - } + // Upstream channel is closed by k8s, hence exit from processing watch events + log.Debug("Calico NP channel closed by remote.") + return } log.Debug("Processing Calico NP event") isCRDEvent = true case e, ok = <-npw.k8sNPWatch.ResultChan(): if !ok { - // We shouldn't get a closed channel without first getting a terminating error, - // so write a warning log and convert to a termination error. - log.Warn("Kubernetes NP channel closed") - e = api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - ClosedByRemote: true, - Err: errors.New("Kubernetes NP watch channel closed"), - }, - } + // Upstream channel is closed by k8s, hence exit from processing watch events + log.Debug("Kubernetes NP channel closed by remote.") + return } log.Debug("Processing Kubernetes NP event") isCRDEvent = false @@ -489,11 +475,10 @@ func (npw *networkPolicyWatcher) processNPEvents() { if !ok { log.WithField("event", e).Error( "Resource returned from watch does not implement the ObjectMetaAccessor interface") + // Handle this error as WatchError, this will force to resync the watcher e = api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - Err: errors.New("Resource returned from watch does not implement the ObjectMetaAccessor interface"), - }, + Type: api.WatchError, + Error: errors.New("Resource returned from watch does not implement the ObjectMetaAccessor interface"), } } if isCRDEvent { @@ -514,14 +499,9 @@ func (npw *networkPolicyWatcher) processNPEvents() { // Send the processed event. select { case npw.resultChan <- e: - // If this is an error event, check to see if it's a terminating one. - // If so, terminate this watcher. + // If this is a Watcherror event, bubble up error event. if e.Type == api.WatchError { - log.WithError(e.Error).Debug("Kubernetes event converted to backend watcher error event") - if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok { - log.Debug("Watch terminated event") - return - } + log.Debug("Kubernetes watcher error converted to backend watcher error event") } case <-npw.context.Done(): diff --git a/lib/backend/k8s/resources/profile.go b/lib/backend/k8s/resources/profile.go index e829aef0d..a976ea7ab 100644 --- a/lib/backend/k8s/resources/profile.go +++ b/lib/backend/k8s/resources/profile.go @@ -393,32 +393,18 @@ func (pw *profileWatcher) processProfileEvents() { select { case e, ok = <-pw.k8sNSWatch.ResultChan(): if !ok { - // We shouldn't get a closed channel without first getting a terminating error, - // so write a warning log and convert to a termination error. - log.Warn("Profile, namespace watch channel closed.") - e = api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - ClosedByRemote: true, - Err: errors.New("Profile namespace watch channel closed."), - }, - } + // Watch channel is closed by upstream hence, return from loop. + log.Debug("Profile, namespace watch channel closed by remote.") + return } log.Debug("Processing Namespace event") isNsEvent = true case e, ok = <-pw.k8sSAWatch.ResultChan(): if !ok { - // We shouldn't get a closed channel without first getting a terminating error, - // so write a warning log and convert to a termination error. - log.Warn("Profile, serviceaccount watch channel closed.") - e = api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - ClosedByRemote: true, - Err: errors.New("Profile serviceaccount watch channel closed."), - }, - } + // Watch channel is closed by upstream hence, return from loop. + log.Debug("Profile, serviceaccount watch channel closed by remote.") + return } log.Debug("Processing ServiceAccount event") isNsEvent = false @@ -446,13 +432,8 @@ func (pw *profileWatcher) processProfileEvents() { if !ok { log.WithField("event", e).Error( "Resource returned from watch does not implement ObjectMetaAccessor interface") - e = api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - ClosedByRemote: true, - Err: errors.New("Profile value does not implement ObjectMetaAccessor interface."), - }, - } + // handle this Error as a Watcher termination by remote. + return } else { if isNsEvent { pw.k8sNSRev = oma.GetObjectMeta().GetResourceVersion() @@ -468,14 +449,9 @@ func (pw *profileWatcher) processProfileEvents() { // Send the processed event. select { case pw.resultChan <- e: - // If this is an error event. check to see if it's a terminating one. - // If so, terminate this watcher. + // If this is an error event. bubble up the error event. if e.Type == api.WatchError { - log.WithError(e.Error).Debug("Kubernetes event converted to backend watcher error event") - if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok { - log.Debug("Watch terminated event") - return - } + log.Debug("Kubernetes event converted to backend watcher error event") } case <-pw.context.Done(): diff --git a/lib/backend/k8s/resources/watcher.go b/lib/backend/k8s/resources/watcher.go index 59e570145..045b0263c 100644 --- a/lib/backend/k8s/resources/watcher.go +++ b/lib/backend/k8s/resources/watcher.go @@ -20,11 +20,11 @@ import ( "sync/atomic" "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" kwatch "k8s.io/apimachinery/pkg/watch" "github.com/projectcalico/libcalico-go/lib/backend/api" "github.com/projectcalico/libcalico-go/lib/backend/model" - cerrors "github.com/projectcalico/libcalico-go/lib/errors" ) const ( @@ -104,16 +104,9 @@ func (crw *k8sWatcherConverter) processK8sEvents() { case event, ok := <-crw.k8sWatch.ResultChan(): var events []*api.WatchEvent if !ok { - // The channel is closed so send a terminating watcher event indicating the watch was - // closed by the remote. - crw.logCxt.Debug("Watcher terminated by remote") - events = []*api.WatchEvent{{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - Err: fmt.Errorf("terminating error event from Kubernetes watcher: closed by remote"), - ClosedByRemote: true, - }, - }} + // Watch channel is closed by remote k8s hence, return from loop. + crw.logCxt.Debug("Watcher channel closed by remote") + return } else { // We have a valid event, so convert it. events = crw.convertEvent(event) @@ -127,16 +120,6 @@ func (crw *k8sWatcherConverter) processK8sEvents() { select { case crw.resultChan <- *e: crw.logCxt.Debug("Kubernetes event converted and sent to backend watcher") - - // If this is an error event, check to see if it's a terminating one (the - // convertEvent method will decide that). If so, terminate this watcher. - if e.Type == api.WatchError { - crw.logCxt.WithError(e.Error).Debug("Watch event was an error event type") - if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok { - crw.logCxt.Debug("Watch event indicates a terminated watcher") - return - } - } case <-crw.context.Done(): crw.logCxt.Debug("Process watcher done event during watch event in kdd client") return @@ -160,10 +143,8 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv case kwatch.Error: // An error directly from the k8s watcher is a terminating event. return []*api.WatchEvent{{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{ - Err: fmt.Errorf("terminating error event from Kubernetes watcher: %v", kevent.Object), - }, + Type: api.WatchError, + Error: apierrors.FromObject(kevent.Object), }} case kwatch.Deleted: fallthrough diff --git a/lib/backend/watchersyncer/watchercache.go b/lib/backend/watchersyncer/watchercache.go index b4e17e8e4..a398695d9 100644 --- a/lib/backend/watchersyncer/watchercache.go +++ b/lib/backend/watchersyncer/watchercache.go @@ -42,10 +42,8 @@ type watcherCache struct { oldResources map[string]cacheEntry results chan<- interface{} hasSynced bool - errors int resourceType ResourceType currentWatchRevision string - errorThreshold int } var ( @@ -66,12 +64,11 @@ type cacheEntry struct { // Create a new watcherCache. func newWatcherCache(client api.Client, resourceType ResourceType, results chan<- interface{}) *watcherCache { return &watcherCache{ - logger: logrus.WithField("ListRoot", model.ListOptionsToDefaultPathRoot(resourceType.ListInterface)), - client: client, - resourceType: resourceType, - results: results, - resources: make(map[string]cacheEntry, 0), - errorThreshold: DefaultErrorThreshold, + logger: logrus.WithField("ListRoot", model.ListOptionsToDefaultPathRoot(resourceType.ListInterface)), + client: client, + resourceType: resourceType, + results: results, + resources: make(map[string]cacheEntry, 0), } } @@ -117,30 +114,14 @@ mainLoop: kvp.Value = nil wc.handleWatchListEvent(kvp) case api.WatchError: - // Handle a WatchError. First determine if the error type indicates that the - // watch has closed, and if so we'll need to resync and create a new watcher. - wc.results <- event.Error - - if e, ok := event.Error.(cerrors.ErrorWatchTerminated); ok { - wc.logger.Debug("Received watch terminated error - recreate watcher") - if !e.ClosedByRemote { - // If the watcher was not closed by remote, reset the currentWatchRevision. This will - // trigger a full resync rather than simply trying to watch from the last event - // revision. - wc.logger.Debug("Watch was not closed by remote - full resync required") - wc.currentWatchRevision = "" - wc.onError() - } - wc.resyncAndCreateWatcher(ctx) - } else { - wc.onError() - } + // Handle a WatchError. This error triggered from upstream, all type + // of WatchError are treated equally,log the Error and trigger a full resync. + + wc.logger.WithField("EventType", event.Type).Errorf("Watch error received from Upstream") + wc.onWaitForDatastore() + wc.currentWatchRevision = "" + wc.resyncAndCreateWatcher(ctx) - if wc.errors > wc.errorThreshold { - // Trigger a full resync if we're past the error threshold. - wc.currentWatchRevision = "" - wc.resyncAndCreateWatcher(ctx) - } default: // Unknown event type - not much we can do other than log. wc.logger.WithField("EventType", event.Type).Errorf("Unknown event type received from the datastore") @@ -204,7 +185,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { if err != nil { // Failed to perform the list. Pause briefly (so we don't tight loop) and retry. wc.logger.WithError(err).Info("Failed to perform list of current data during resync") - wc.onError() + wc.onWaitForDatastore() select { case <-time.After(ListRetryInterval): continue @@ -265,7 +246,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // watch retry. This would require some care to ensure the correct errors are captured // for the different datastore drivers. wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher") - wc.onError() + wc.onWaitForDatastore() performFullResync = true continue } @@ -297,7 +278,6 @@ func (wc *watcherCache) finishResync() { wc.logger.Info("Sending synced update") wc.results <- api.InSync wc.hasSynced = true - wc.errors = 0 } // If the watcher failed at any time, we end up recreating a watcher and storing off @@ -330,7 +310,6 @@ func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) { if wc.resourceType.UpdateProcessor == nil { // No update processor - handle immediately. wc.handleConvertedWatchEvent(kvp) - wc.errors = 0 return } @@ -343,8 +322,6 @@ func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) { // If we hit a conversion error, notify the main syncer. if err != nil { wc.results <- err - } else { - wc.errors = 0 } } @@ -432,13 +409,12 @@ func (wc *watcherCache) markAsValid(resourceKey string) { } } -// onError signals to the syncer that this watcherCache is not in-sync if the number of consecutive errors -// exceeds the error threshold. See finishResync() for how the watcherCache goes back to in-sync. -func (wc *watcherCache) onError() { - wc.errors++ - if wc.hasSynced && wc.errors > wc.errorThreshold { - wc.logger.WithFields(logrus.Fields{"errors": wc.errors, "threshold": wc.errorThreshold}).Debugf("Exceeded error threshold") - wc.hasSynced = false +// If a syncer is in-sync state, the onWaitForDatastore method, sends WaitForDatastore event. +// See finishResync() for how the watcherCache goes back to in-sync. +func (wc *watcherCache) onWaitForDatastore() { + wc.logger.Debug("Send WaitforDatastore event if sync isn't in-sync") + if wc.hasSynced { wc.results <- api.WaitForDatastore + wc.hasSynced = false } } diff --git a/lib/backend/watchersyncer/watchersyncer_test.go b/lib/backend/watchersyncer/watchersyncer_test.go index 13ff73b26..201967ef3 100644 --- a/lib/backend/watchersyncer/watchersyncer_test.go +++ b/lib/backend/watchersyncer/watchersyncer_test.go @@ -135,53 +135,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.ExpectStatusUnchanged() }) - It("should return WaitForDatastore on multiple consecutive watch errors", func() { - By("Sending errors to trigger a WaitForDatastore status update") - - defer setErrorThreshold(watchersyncer.DefaultErrorThreshold) - setErrorThreshold(6) - - rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1}) - rs.ExpectStatusUpdate(api.WaitForDatastore) - rs.clientListResponse(r1, emptyList) - rs.ExpectStatusUpdate(api.ResyncInProgress) - rs.ExpectStatusUpdate(api.InSync) - - rs.clientWatchResponse(r1, genError) - rs.ExpectStatusUnchanged() - for i := 0; i < watchersyncer.DefaultErrorThreshold-1; i++ { - rs.clientListResponse(r1, genError) - rs.ExpectStatusUnchanged() - } - - rs.clientListResponse(r1, genError) - rs.ExpectStatusUpdate(api.WaitForDatastore) - - rs.clientListResponse(r1, emptyList) - rs.ExpectStatusUpdate(api.ResyncInProgress) - rs.ExpectStatusUpdate(api.InSync) - - By("Going back in-sync and driving WatchError events up to the error threshold") - genWatchError := api.WatchEvent{ - Type: api.WatchError, - Error: genError, - } - - rs.clientWatchResponse(r1, nil) - - // Watch is set but is now returning a generic WatchErrors - for i := 0; i < watchersyncer.DefaultErrorThreshold; i++ { - rs.sendEvent(r1, genWatchError) - rs.ExpectStatusUnchanged() - } - - rs.sendEvent(r1, genWatchError) - rs.ExpectStatusUpdate(api.WaitForDatastore) - }) - It("should handle reconnection if watchers fail to be created", func() { - defer setErrorThreshold(watchersyncer.DefaultErrorThreshold) - setErrorThreshold(3) rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3}) rs.ExpectStatusUpdate(api.WaitForDatastore) @@ -220,16 +174,11 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.clientListResponse(r1, emptyList) rs.ExpectStatusUpdate(api.ResyncInProgress) rs.clientWatchResponse(r1, genError) - - for i := 0; i < watchersyncer.DefaultErrorThreshold; i++ { - rs.clientListResponse(r1, genError) - } - - // We've exceeded the default error threshold rs.ExpectStatusUpdate(api.WaitForDatastore) rs.clientListResponse(r1, emptyList) rs.ExpectStatusUpdate(api.ResyncInProgress) + //rs.ExpectStatusUpdate(api.InSync) rs.clientWatchResponse(r1, nil) rs.clientListResponse(r2, emptyList) rs.clientWatchResponse(r2, notSupported) @@ -286,9 +235,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { It("Should handle reconnection and syncing when the watcher sends a watch terminated error", func() { - defer setErrorThreshold(watchersyncer.DefaultErrorThreshold) - setErrorThreshold(0) - rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3}) rs.ExpectStatusUpdate(api.WaitForDatastore) rs.clientListResponse(r1, emptyList) @@ -315,26 +261,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { rs.expectAllEventsHandled() }) - It("Should not return WaitForDatastore on multiple watch errors due to ClosedByRemote exceeding error threshold", func() { - defer setErrorThreshold(watchersyncer.DefaultErrorThreshold) - setErrorThreshold(0) - - rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1}) - rs.ExpectStatusUpdate(api.WaitForDatastore) - rs.clientListResponse(r1, emptyList) - rs.ExpectStatusUpdate(api.ResyncInProgress) - rs.ExpectStatusUpdate(api.InSync) - rs.clientWatchResponse(r1, nil) - - rs.sendEvent(r1, api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorWatchTerminated{Err: dsError, ClosedByRemote: true}, - }) - - rs.ExpectStatusUnchanged() - rs.expectAllEventsHandled() - }) - It("Should handle receiving events while one watcher fails and fails to recreate", func() { rs := newWatcherSyncerTester([]watchersyncer.ResourceType{r1, r2, r3}) eventL1Added1 := addEvent(l1Key1) @@ -365,9 +291,9 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { By("Syncing no results for resource 2, failing to create a watch, retrying successfully.") rs.clientListResponse(r2, emptyList) rs.clientWatchResponse(r2, genError) - rs.ExpectStatusUnchanged() + rs.ExpectStatusUpdate(api.WaitForDatastore) rs.clientListResponse(r2, emptyList) - rs.ExpectStatusUnchanged() + rs.ExpectStatusUpdate(api.ResyncInProgress) rs.clientWatchResponse(r2, nil) time.Sleep(130 * watchersyncer.WatchPollInterval / 100) rs.expectAllEventsHandled() @@ -384,7 +310,12 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { Type: api.WatchError, Error: cerrors.ErrorWatchTerminated{Err: dsError}, }) - rs.ExpectStatusUnchanged() + rs.ExpectStatusUpdate(api.WaitForDatastore) + rs.clientListResponse(r3, emptyList) + rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.clientWatchResponse(r3, nil) + rs.ExpectStatusUpdate(api.InSync) + rs.clientWatchResponse(r3, nil) // All events should be handled. rs.expectAllEventsHandled() @@ -446,8 +377,8 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { // The retry thread will be blocked for the watch poll interval. rs.clientWatchResponse(r1, genError) - rs.ExpectStatusUnchanged() time.Sleep(watchersyncer.WatchPollInterval) + rs.ExpectStatusUpdate(api.WaitForDatastore) By("returning a sync list with one entry removed and a new one added") rs.clientListResponse(r1, &model.KVPairList{ @@ -458,6 +389,10 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { eventL1Added4.New, }, }) + + rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUpdate(api.InSync) + rs.clientWatchResponse(r1, nil) By("Expecting new events for the first three entries followed by an add and then the delete") @@ -500,7 +435,7 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { Type: api.WatchError, Error: cerrors.ErrorWatchTerminated{Err: dsError}, }) - rs.ExpectStatusUnchanged() + rs.ExpectStatusUpdate(api.WaitForDatastore) rs.clientListResponse(r1, &model.KVPairList{ Revision: "12347", KVPairs: []*model.KVPair{ @@ -508,8 +443,8 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { eventL1Modified4_2.New, }, }) - rs.ExpectStatusUnchanged() - rs.clientWatchResponse(r1, nil) + rs.ExpectStatusUpdate(api.ResyncInProgress) + rs.ExpectStatusUpdate(api.InSync) By("Expecting mod, delete, mod updates") rs.ExpectUpdates([]api.Update{ @@ -537,8 +472,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { eventL2Added1 := addEvent(l2Key1) eventL2Added2 := addEvent(l2Key2) eventL2Modified1 := modifiedEvent(l2Key1) - eventL2Modified1_2 := modifiedEvent(l2Key1) - eventL2Modified2 := modifiedEvent(l2Key2) eventL1Delete1 := deleteEvent(l1Key1) rs.ExpectStatusUpdate(api.WaitForDatastore) @@ -597,53 +530,6 @@ var _ = Describe("Test the backend datastore multi-watch syncer", func() { UpdateType: api.UpdateTypeKVDeleted, }, }}) - - // Send in: an add, an update, a parse error and another update. - // - An OnUpdate with 2 updates (the error will spit up the update) - // - An OnUpdate with 1 update - // - A parse error - rs.sendEvent(r1, eventL1Added1) - // Pause a little here because the previous watch event is on a different goroutine so - // we need to ensure it's been processed in a reliable order. - time.Sleep(100 * time.Millisecond) - rs.sendEvent(r2, eventL2Modified1_2) - rs.sendEvent(r2, eventL2Modified2) - rs.sendEvent(r2, api.WatchEvent{ - Type: api.WatchError, - Error: cerrors.ErrorParsingDatastoreEntry{ - RawKey: "abcdef", - RawValue: "aabbccdd", - }, - }) - rs.sendEvent(r2, eventL2Modified1) - - // Pause a little before unblocking the handler to allow the updates to be - // consolidated. - time.Sleep(100 * time.Millisecond) - rs.UnblockUpdateHandling() - rs.ExpectOnUpdates([][]api.Update{ - { - { - KVPair: *eventL1Added1.New, - UpdateType: api.UpdateTypeKVNew, - }, - { - KVPair: *eventL2Modified1_2.New, - UpdateType: api.UpdateTypeKVUpdated, - }, - { - KVPair: *eventL2Modified2.New, - UpdateType: api.UpdateTypeKVUpdated, - }, - }, - { - { - KVPair: *eventL2Modified1.New, - UpdateType: api.UpdateTypeKVUpdated, - }, - }, - }) - rs.ExpectParseError("abcdef", "aabbccdd") }) It("should emit all events when stop is called", func() { @@ -827,10 +713,6 @@ func setWatchIntervals(listRetryInterval, watchPollInterval time.Duration) { watchersyncer.WatchPollInterval = watchPollInterval } -func setErrorThreshold(t int) { - watchersyncer.DefaultErrorThreshold = t -} - // Fake converter used to cover error and update handling paths. type fakeConverter struct { i int @@ -977,7 +859,7 @@ type watcherSyncerTester struct { // Call to test that all of the client and watcher events have been processed. // Note that an unhandled event could easily be a problem with the test rather -// than the WatcherSyncer. +// than the watcherSyncer. func (rst *watcherSyncerTester) expectAllEventsHandled() { log.Infof("Expecting all events to have been handled") for _, l := range rst.lws { @@ -1014,13 +896,12 @@ func (rst *watcherSyncerTester) sendEvent(r watchersyncer.ResourceType, event ap log.Info("Previous watcher terminated (if any)") if event.Type == api.WatchError { - if _, ok := event.Error.(cerrors.ErrorWatchTerminated); ok { - // This is a terminating event. Our test framework will shut down the previous - // watcher as part of the creation of the new one. Increment the init wait group - // in the watcher which will be decremented once the old one has fully terminated. - log.WithField("Name", name).Info("Watcher error will trigger restart - increment termination count") - rst.lws[name].termWg.Add(1) - } + // Watch errors are treated as a terminating event. Our test framework will shut down the previous + // watcher as part of the creation of the new one. Increment the init wait group + // in the watcher which will be decremented once the old one has fully terminated. + log.WithField("Name", name).Info("Watcher error will trigger restart - increment termination count") + rst.lws[name].termWg.Add(1) + } log.WithField("Name", name).Info("Sending event") @@ -1029,11 +910,9 @@ func (rst *watcherSyncerTester) sendEvent(r watchersyncer.ResourceType, event ap if event.Type == api.WatchError { // Finally, since this is a terminating event then we expect a corresponding Stop() // invocation (now that the event has been sent). - if _, ok := event.Error.(cerrors.ErrorWatchTerminated); ok { - log.WithField("Name", name).Info("Expecting a stop invocation") - rst.expectStop(r) - log.WithField("Name", name).Info("Stop invoked") - } + log.WithField("Name", name).Info("Expecting a stop invocation") + rst.expectStop(r) + log.WithField("Name", name).Info("Stop invoked") } }