Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle backend watch, if upstream closes channel[ClosedByRemote] #1247

Merged
merged 1 commit into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/backend/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ type WatchInterface interface {

// Returns a chan which will receive all the events. This channel is closed when:
// - Stop() is called, or
// - An error of type errors.ErrorWatchTerminated is received.
// In both cases the watcher will be cleaned up, and the client should stop receiving
// from this channel.
ResultChan() <-chan WatchEvent
Expand Down
17 changes: 3 additions & 14 deletions lib/backend/etcdv3/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package etcdv3

import (
"context"
goerrors "errors"
"strconv"
"sync/atomic"

Expand All @@ -25,7 +24,6 @@ import (

"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
"github.com/projectcalico/libcalico-go/lib/errors"
)

const (
Expand Down Expand Up @@ -99,7 +97,7 @@ func (wc *watcher) watchLoop() {
var err error
if kvps, err = wc.listCurrent(); err != nil {
log.Errorf("failed to list current with latest state: %v", err)
wc.sendError(err, true)
// Error considered as terminating error, hence terminate watcher.
return
}

Expand Down Expand Up @@ -127,7 +125,6 @@ func (wc *watcher) watchLoop() {
// A watch channel error is a terminating event, so exit the loop.
err := wres.Err()
log.WithError(err).Error("Watch channel error")
wc.sendError(err, true)
return
}
for _, e := range wres.Events {
Expand All @@ -137,15 +134,13 @@ func (wc *watcher) watchLoop() {
if ae, err := convertWatchEvent(e, wc.list); ae != nil {
wc.sendEvent(ae)
} else if err != nil {
wc.sendError(err, false)
wc.sendError(err)
}
}
}

// If we exit the loop, it means the watcher has closed for some reason.
// Bubble this up as a watch termination error.
log.Warn("etcdv3 watch channel closed")
wc.sendError(goerrors.New("etcdv3 watch channel closed"), true)
}

// listCurrent retrieves the existing entries.
Expand Down Expand Up @@ -205,7 +200,7 @@ func (wc *watcher) terminateWatcher() {
}

// sendError packages up the error as an event and sends it in the results channel.
func (wc *watcher) sendError(err error, terminating bool) {
func (wc *watcher) sendError(err error) {
// The response from etcd commands may include a context.Canceled error if the context
// was cancelled before completion. Since with our Watcher we don't include that as
// an error type skip over the Canceled error, the error processing in the main
Expand All @@ -214,12 +209,6 @@ func (wc *watcher) sendError(err error, terminating bool) {
return
}

// If this is a terminating error, wrap the error up in an errors.ErrorWatchTerminated
// error type.
if terminating {
err = errors.ErrorWatchTerminated{Err: err}
}

// Wrap the error up in a WatchEvent and use sendEvent to send it.
errEvent := &api.WatchEvent{
Type: api.WatchError,
Expand Down
42 changes: 11 additions & 31 deletions lib/backend/k8s/resources/networkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,32 +437,18 @@ func (npw *networkPolicyWatcher) processNPEvents() {
select {
case e, ok = <-npw.crdNPWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Calico NP channel closed")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Calico NP watch channel closed"),
},
}
// Upstream channel is closed by k8s, hence exit from processing watch events
log.Debug("Calico NP channel closed by remote.")
return
}
log.Debug("Processing Calico NP event")
isCRDEvent = true

case e, ok = <-npw.k8sNPWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Kubernetes NP channel closed")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Kubernetes NP watch channel closed"),
},
}
// Upstream channel is closed by k8s, hence exit from processing watch events
log.Debug("Kubernetes NP channel closed by remote.")
return
}
log.Debug("Processing Kubernetes NP event")
isCRDEvent = false
Expand All @@ -489,11 +475,10 @@ func (npw *networkPolicyWatcher) processNPEvents() {
if !ok {
log.WithField("event", e).Error(
"Resource returned from watch does not implement the ObjectMetaAccessor interface")
// Handle this error as WatchError, this will force to resync the watcher
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
Err: errors.New("Resource returned from watch does not implement the ObjectMetaAccessor interface"),
},
Type: api.WatchError,
Error: errors.New("Resource returned from watch does not implement the ObjectMetaAccessor interface"),
}
}
if isCRDEvent {
Expand All @@ -514,14 +499,9 @@ func (npw *networkPolicyWatcher) processNPEvents() {
// Send the processed event.
select {
case npw.resultChan <- e:
// If this is an error event, check to see if it's a terminating one.
// If so, terminate this watcher.
// If this is a Watcherror event, bubble up error event.
if e.Type == api.WatchError {
log.WithError(e.Error).Debug("Kubernetes event converted to backend watcher error event")
if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok {
log.Debug("Watch terminated event")
return
}
log.Debug("Kubernetes watcher error converted to backend watcher error event")
}

case <-npw.context.Done():
Expand Down
44 changes: 10 additions & 34 deletions lib/backend/k8s/resources/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,32 +393,18 @@ func (pw *profileWatcher) processProfileEvents() {
select {
case e, ok = <-pw.k8sNSWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Profile, namespace watch channel closed.")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Profile namespace watch channel closed."),
},
}
// Watch channel is closed by upstream hence, return from loop.
log.Debug("Profile, namespace watch channel closed by remote.")
return
}
log.Debug("Processing Namespace event")
isNsEvent = true

case e, ok = <-pw.k8sSAWatch.ResultChan():
if !ok {
// We shouldn't get a closed channel without first getting a terminating error,
// so write a warning log and convert to a termination error.
log.Warn("Profile, serviceaccount watch channel closed.")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Profile serviceaccount watch channel closed."),
},
}
// Watch channel is closed by upstream hence, return from loop.
log.Debug("Profile, serviceaccount watch channel closed by remote.")
return
}
log.Debug("Processing ServiceAccount event")
isNsEvent = false
Expand Down Expand Up @@ -446,13 +432,8 @@ func (pw *profileWatcher) processProfileEvents() {
if !ok {
log.WithField("event", e).Error(
"Resource returned from watch does not implement ObjectMetaAccessor interface")
e = api.WatchEvent{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
ClosedByRemote: true,
Err: errors.New("Profile value does not implement ObjectMetaAccessor interface."),
},
}
// handle this Error as a Watcher termination by remote.
return
} else {
if isNsEvent {
pw.k8sNSRev = oma.GetObjectMeta().GetResourceVersion()
Expand All @@ -468,14 +449,9 @@ func (pw *profileWatcher) processProfileEvents() {
// Send the processed event.
select {
case pw.resultChan <- e:
// If this is an error event. check to see if it's a terminating one.
// If so, terminate this watcher.
// If this is an error event. bubble up the error event.
if e.Type == api.WatchError {
log.WithError(e.Error).Debug("Kubernetes event converted to backend watcher error event")
if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok {
log.Debug("Watch terminated event")
return
}
log.Debug("Kubernetes event converted to backend watcher error event")
}

case <-pw.context.Done():
Expand Down
31 changes: 6 additions & 25 deletions lib/backend/k8s/resources/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"sync/atomic"

"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
kwatch "k8s.io/apimachinery/pkg/watch"

"github.com/projectcalico/libcalico-go/lib/backend/api"
"github.com/projectcalico/libcalico-go/lib/backend/model"
cerrors "github.com/projectcalico/libcalico-go/lib/errors"
)

const (
Expand Down Expand Up @@ -104,16 +104,9 @@ func (crw *k8sWatcherConverter) processK8sEvents() {
case event, ok := <-crw.k8sWatch.ResultChan():
var events []*api.WatchEvent
if !ok {
// The channel is closed so send a terminating watcher event indicating the watch was
// closed by the remote.
crw.logCxt.Debug("Watcher terminated by remote")
events = []*api.WatchEvent{{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
Err: fmt.Errorf("terminating error event from Kubernetes watcher: closed by remote"),
ClosedByRemote: true,
},
}}
// Watch channel is closed by remote k8s hence, return from loop.
crw.logCxt.Debug("Watcher channel closed by remote")
return
} else {
// We have a valid event, so convert it.
events = crw.convertEvent(event)
Expand All @@ -127,16 +120,6 @@ func (crw *k8sWatcherConverter) processK8sEvents() {
select {
case crw.resultChan <- *e:
crw.logCxt.Debug("Kubernetes event converted and sent to backend watcher")

// If this is an error event, check to see if it's a terminating one (the
// convertEvent method will decide that). If so, terminate this watcher.
if e.Type == api.WatchError {
crw.logCxt.WithError(e.Error).Debug("Watch event was an error event type")
if _, ok := e.Error.(cerrors.ErrorWatchTerminated); ok {
crw.logCxt.Debug("Watch event indicates a terminated watcher")
return
}
}
case <-crw.context.Done():
crw.logCxt.Debug("Process watcher done event during watch event in kdd client")
return
Expand All @@ -160,10 +143,8 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv
case kwatch.Error:
// An error directly from the k8s watcher is a terminating event.
return []*api.WatchEvent{{
Type: api.WatchError,
Error: cerrors.ErrorWatchTerminated{
Err: fmt.Errorf("terminating error event from Kubernetes watcher: %v", kevent.Object),
},
Type: api.WatchError,
Error: apierrors.FromObject(kevent.Object),
}}
case kwatch.Deleted:
fallthrough
Expand Down
Loading