Skip to content

Commit

Permalink
Handle backend watch, if upstream closes channel
Browse files Browse the repository at this point in the history
  • Loading branch information
Gobinath Krishnamoorthy committed Jun 5, 2020
1 parent 1b8bc83 commit 5f4021d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 297 deletions.
1 change: 0 additions & 1 deletion lib/backend/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ type WatchInterface interface {

// Returns a chan which will receive all the events. This channel is closed when:
// - Stop() is called, or
// - An error of type errors.ErrorWatchTerminated is received.
// In both cases the watcher will be cleaned up, and the client should stop receiving
// from this channel.
ResultChan() <-chan WatchEvent
Expand Down
17 changes: 3 additions & 14 deletions lib/backend/etcdv3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package etcdv3

import (
"context"
goerrors "errors"
"strconv"
"sync/atomic"

Expand All @@ -25,7 +24,6 @@ import (

"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/errors"
)

const (
Expand Down Expand Up @@ -99,7 +97,7 @@ func (wc *watcher) watchLoop() {
var err error
if kvps, err = wc.listCurrent(); err != nil {
log.Errorf("failed to list current with latest state: %v", err)
wc.sendError(err, true)
// Error considered as terminating error, hence terminate watcher.
return
}

Expand Down Expand Up @@ -127,7 +125,6 @@ func (wc *watcher) watchLoop() {
// A watch channel error is a terminating event, so exit the loop.
err := wres.Err()
log.WithError(err).Error("Watch channel error")
wc.sendError(err, true)
return
}
for _, e := range wres.Events {
Expand All @@ -137,15 +134,13 @@ func (wc *watcher) watchLoop() {
if ae, err := convertWatchEvent(e, wc.list); ae != nil {
wc.sendEvent(ae)
} else if err != nil {
wc.sendError(err, false)
wc.sendError(err)
}
}
}

// If we exit the loop, it means the watcher has closed for some reason.
// Bubble this up as a watch termination error.
log.Warn("etcdv3 watch channel closed")
wc.sendError(goerrors.New("etcdv3 watch channel closed"), true)
}

// listCurrent retrieves the existing entries.
Expand Down Expand Up @@ -205,7 +200,7 @@ func (wc *watcher) terminateWatcher() {
}

// sendError packages up the error as an event and sends it in the results channel.
func (wc *watcher) sendError(err error, terminating bool) {
func (wc *watcher) sendError(err error) {
// The response from etcd commands may include a context.Canceled error if the context
// was cancelled before completion. Since with our Watcher we don't include that as
// an error type skip over the Canceled error, the error processing in the main
Expand All @@ -214,12 +209,6 @@ func (wc *watcher) sendError(err error, terminating bool) {
return
}

// If this is a terminating error, wrap the error up in an errors.ErrorWatchTerminated
// error type.
if terminating {
err = errors.ErrorWatchTerminated{Err: err}
}

// Wrap the error up in a WatchEvent and use sendEvent to send it.
errEvent := &api.WatchEvent{
Type: api.WatchError,
Expand Down
42 changes: 11 additions & 31 deletions lib/backend/k8s/resources/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,32 +437,18 @@ func (npw *networkPolicyWatcher) processNPEvents() {
select {
case e, ok = <-npw.crdNPWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Calico NP channel closed")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Calico NP watch channel closed"),
},
}
// Upstream channel is closed by k8s, hence exit from processing watch events
log.Debug("Calico NP channel closed by remote.")
return
}
log.Debug("Processing Calico NP event")
isCRDEvent = true

case e, ok = <-npw.k8sNPWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Kubernetes NP channel closed")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Kubernetes NP watch channel closed"),
},
}
// Upstream channel is closed by k8s, hence exit from processing watch events
log.Debug("Kubernetes NP channel closed by remote.")
return
}
log.Debug("Processing Kubernetes NP event")
isCRDEvent = false
Expand All @@ -489,11 +475,10 @@ func (npw *networkPolicyWatcher) processNPEvents() {
if !ok {
log.WithField("event", e).Error(
"Resource returned from watch does not implement the ObjectMetaAccessor interface")
// Handle this error as WatchError, this will force to resync the watcher
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
Err: errors.New("Resource returned from watch does not implement the ObjectMetaAccessor interface"),
},
Type: api.WatchError,
Error: errors.New("Resource returned from watch does not implement the ObjectMetaAccessor interface"),
}
}
if isCRDEvent {
Expand All @@ -514,14 +499,9 @@ func (npw *networkPolicyWatcher) processNPEvents() {
// Send the processed event.
select {
case npw.resultChan <- e:
// If this is an error event, check to see if it's a terminating one.
// If so, terminate this watcher.
// If this is a Watcherror event, bubble up error event.
if e.Type == api.WatchError {
log.WithError(e.Error).Debug("Kubernetes event converted to backend watcher error event")
if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok {
log.Debug("Watch terminated event")
return
}
log.Debug("Kubernetes watcher error converted to backend watcher error event")
}

case <-npw.context.Done():
Expand Down
44 changes: 10 additions & 34 deletions lib/backend/k8s/resources/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,32 +393,18 @@ func (pw *profileWatcher) processProfileEvents() {
select {
case e, ok = <-pw.k8sNSWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Profile, namespace watch channel closed.")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Profile namespace watch channel closed."),
},
}
// Watch channel is closed by upstream hence, return from loop.
log.Debug("Profile, namespace watch channel closed by remote.")
return
}
log.Debug("Processing Namespace event")
isNsEvent = true

case e, ok = <-pw.k8sSAWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Profile, serviceaccount watch channel closed.")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Profile serviceaccount watch channel closed."),
},
}
// Watch channel is closed by upstream hence, return from loop.
log.Debug("Profile, serviceaccount watch channel closed by remote.")
return
}
log.Debug("Processing ServiceAccount event")
isNsEvent = false
Expand Down Expand Up @@ -446,13 +432,8 @@ func (pw *profileWatcher) processProfileEvents() {
if !ok {
log.WithField("event", e).Error(
"Resource returned from watch does not implement ObjectMetaAccessor interface")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Profile value does not implement ObjectMetaAccessor interface."),
},
}
// handle this Error as a Watcher termination by remote.
return
} else {
if isNsEvent {
pw.k8sNSRev = oma.GetObjectMeta().GetResourceVersion()
Expand All @@ -468,14 +449,9 @@ func (pw *profileWatcher) processProfileEvents() {
// Send the processed event.
select {
case pw.resultChan <- e:
// If this is an error event. check to see if it's a terminating one.
// If so, terminate this watcher.
// If this is an error event. bubble up the error event.
if e.Type == api.WatchError {
log.WithError(e.Error).Debug("Kubernetes event converted to backend watcher error event")
if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok {
log.Debug("Watch terminated event")
return
}
log.Debug("Kubernetes event converted to backend watcher error event")
}

case <-pw.context.Done():
Expand Down
31 changes: 6 additions & 25 deletions lib/backend/k8s/resources/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"sync/atomic"

"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
kwatch "k8s.io/apimachinery/pkg/watch"

"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
cerrors "github.com/projectcalico/libcalico-go/lib/errors"
)

const (
Expand Down Expand Up @@ -104,16 +104,9 @@ func (crw *k8sWatcherConverter) processK8sEvents() {
case event, ok := <-crw.k8sWatch.ResultChan():
var events []*api.WatchEvent
if !ok {
// The channel is closed so send a terminating watcher event indicating the watch was
// closed by the remote.
crw.logCxt.Debug("Watcher terminated by remote")
events = []*api.WatchEvent{{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
Err: fmt.Errorf("terminating error event from Kubernetes watcher: closed by remote"),
ClosedByRemote: true,
},
}}
// Watch channel is closed by remote k8s hence, return from loop.
crw.logCxt.Debug("Watcher channel closed by remote")
return
} else {
// We have a valid event, so convert it.
events = crw.convertEvent(event)
Expand All @@ -127,16 +120,6 @@ func (crw *k8sWatcherConverter) processK8sEvents() {
select {
case crw.resultChan <- *e:
crw.logCxt.Debug("Kubernetes event converted and sent to backend watcher")

// If this is an error event, check to see if it's a terminating one (the
// convertEvent method will decide that). If so, terminate this watcher.
if e.Type == api.WatchError {
crw.logCxt.WithError(e.Error).Debug("Watch event was an error event type")
if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok {
crw.logCxt.Debug("Watch event indicates a terminated watcher")
return
}
}
case <-crw.context.Done():
crw.logCxt.Debug("Process watcher done event during watch event in kdd client")
return
Expand All @@ -160,10 +143,8 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv
case kwatch.Error:
// An error directly from the k8s watcher is a terminating event.
return []*api.WatchEvent{{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
Err: fmt.Errorf("terminating error event from Kubernetes watcher: %v", kevent.Object),
},
Type: api.WatchError,
Error: apierrors.FromObject(kevent.Object),
}}
case kwatch.Deleted:
fallthrough
Expand Down
Loading

0 comments on commit 5f4021d

Please sign in to comment.