Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: fix watch metrics #11375

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()

ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()

ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
Expand Down
17 changes: 13 additions & 4 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/pkg/types"
"go.etcd.io/etcd/raft"

"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)

type watchServer struct {
Expand Down Expand Up @@ -189,19 +192,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
}
}()

defer sws.close()
select {
case err = <-errc:
close(sws.ctrlStream)

case <-stream.Context().Done():
err = stream.Context().Err()
// the only server-side cancellation is noleader for now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a better way to decide if the cancel is from server side or from client side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good question I dug pretty hard but let me review again.

if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
md, ok := metadata.FromIncomingContext(stream.Context())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test case for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing

if ok {
if rl := md[rpctypes.MetadataRequireLeaderKey]; len(rl) > 0 && rl[0] == rpctypes.MetadataHasLeader {
if sws.sg.Leader() == types.ID(raft.None) {
return rpctypes.ErrGRPCNoLeader
}
}
}
return rpctypes.ErrGRPCWatchCanceled
}
}

sws.close()
return err
}

Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
{"/health", `{"health":"true"}`},
} {
i++
Expand All @@ -59,6 +60,9 @@ func metricsTest(cx ctlCtx) {
cx.t.Fatal(err)
}

if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
cx.t.Fatal(err)
}
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
Expand Down