Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy/grpcproxy: fix grpc proxy hang when broadcast failed to cancel a watcher #12030

Merged
merged 3 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change.

- Fix [`panic on error`](https://github.com/etcd-io/etcd/pull/11694) for metrics handler.
- Add [gRPC keepalive related flags](https://github.com/etcd-io/etcd/pull/11711) `grpc-keepalive-min-time`, `grpc-keepalive-interval` and `grpc-keepalive-timeout`.
- [Fix grpc watch proxy hangs when failed to cancel a watcher](https://github.com/etcd-io/etcd/pull/12030) .

### Auth

Expand Down
2 changes: 1 addition & 1 deletion etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server {
}

kvp, _ := grpcproxy.NewKvProxy(client)
watchp, _ := grpcproxy.NewWatchProxy(client)
watchp, _ := grpcproxy.NewWatchProxy(lg, client)
if grpcProxyResolverPrefix != "" {
grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL)
}
Expand Down
9 changes: 8 additions & 1 deletion proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
Expand All @@ -44,16 +45,18 @@ type watchProxy struct {

// kv is used for permission checking
kv clientv3.KV
lg *zap.Logger
}

func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
func NewWatchProxy(lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
cctx, cancel := context.WithCancel(c.Ctx())
wp := &watchProxy{
cw: c.Watcher,
ctx: cctx,
leader: newLeader(c.Ctx(), c.Watcher),

kv: c.KV, // for permission checking
lg: lg,
}
wp.ranges = newWatchRanges(wp)
ch := make(chan struct{})
Expand Down Expand Up @@ -99,6 +102,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
ctx: ctx,
cancel: cancel,
kv: wp.kv,
lg: wp.lg,
}

var lostLeaderC <-chan struct{}
Expand Down Expand Up @@ -181,6 +185,7 @@ type watchProxyStream struct {

// kv is used for permission checking
kv clientv3.KV
lg *zap.Logger
}

func (wps *watchProxyStream) close() {
Expand Down Expand Up @@ -262,8 +267,10 @@ func (wps *watchProxyStream) recvLoop() error {
wps.watchers[w.id] = w
wps.ranges.add(w)
wps.mu.Unlock()
wps.lg.Debug("create watcher", zap.String("key", w.wr.key), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID))
case *pb.WatchRequest_CancelRequest:
wps.delete(uv.CancelRequest.WatchId)
wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
default:
panic("not implemented")
}
Expand Down
18 changes: 16 additions & 2 deletions proxy/grpcproxy/watch_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package grpcproxy
import (
"context"
"sync"
"time"

"go.etcd.io/etcd/v3/clientv3"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"

"go.uber.org/zap"
)

// watchBroadcast broadcasts a server watcher to many client watchers.
Expand All @@ -36,15 +39,17 @@ type watchBroadcast struct {
receivers map[*watcher]struct{}
// responses counts the number of responses
responses int
lg *zap.Logger
}

func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
func newWatchBroadcast(lg *zap.Logger, wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast {
cctx, cancel := context.WithCancel(wp.ctx)
wb := &watchBroadcast{
cancel: cancel,
nextrev: w.nextrev,
receivers: make(map[*watcher]struct{}),
donec: make(chan struct{}),
lg: lg,
}
wb.add(w)
go func() {
Expand All @@ -61,6 +66,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast))
cctx = withClientAuthToken(cctx, w.wps.stream.Context())

wch := wp.cw.Watch(cctx, w.wr.key, opts...)
wp.lg.Debug("watch", zap.String("key", w.wr.key))

for wr := range wch {
wb.bcast(wr)
Expand Down Expand Up @@ -148,5 +154,13 @@ func (wb *watchBroadcast) stop() {
}

wb.cancel()
<-wb.donec

select {
case <-wb.donec:
// watchProxyStream will hold watchRanges global mutex lock all the time if client failed to cancel etcd watchers.
// and it will cause the watch proxy to not work.
// please see pr https://github.com/etcd-io/etcd/pull/12030 to get more detail info.
case <-time.After(time.Second):
wb.lg.Error("failed to cancel etcd watcher")
}
}
2 changes: 1 addition & 1 deletion proxy/grpcproxy/watch_broadcasts.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (wbs *watchBroadcasts) add(w *watcher) {
}
}
// no fit; create a bcast
wb := newWatchBroadcast(wbs.wp, w, wbs.update)
wb := newWatchBroadcast(wbs.wp.lg, wbs.wp, w, wbs.update)
wbs.watchers[w] = wb
wbs.bcasts[wb] = struct{}{}
}
Expand Down
1 change: 1 addition & 0 deletions proxy/grpcproxy/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (w *watcher) post(wr *pb.WatchResponse) bool {
case w.wps.watchCh <- wr:
case <-time.After(50 * time.Millisecond):
w.wps.cancel()
w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout")
return false
}
return true
Expand Down