diff --git a/clientv3/lease.go b/clientv3/lease.go index e476db5be2e..aa9ea2d78aa 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,7 +22,6 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" - "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -183,72 +182,55 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati } func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { - for { - r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(ctx, r) - if err == nil { - gresp := &LeaseGrantResponse{ - ResponseHeader: resp.GetHeader(), - ID: LeaseID(resp.ID), - TTL: resp.TTL, - Error: resp.Error, - } - return gresp, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + r := &pb.LeaseGrantRequest{TTL: ttl} + resp, err := l.remote.LeaseGrant(ctx, r) + if err == nil { + gresp := &LeaseGrantResponse{ + ResponseHeader: resp.GetHeader(), + ID: LeaseID(resp.ID), + TTL: resp.TTL, + Error: resp.Error, } + return gresp, nil } + return nil, toErr(ctx, err) } func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { - for { - r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(ctx, r) - - if err == nil { - return (*LeaseRevokeResponse)(resp), nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) - } + r := &pb.LeaseRevokeRequest{ID: int64(id)} + resp, err := l.remote.LeaseRevoke(ctx, r) + if err == nil { + return (*LeaseRevokeResponse)(resp), nil } + return nil, toErr(ctx, err) } func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { - for { - r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(ctx, r, grpc.FailFast(false)) - if err == nil { - gresp := &LeaseTimeToLiveResponse{ - ResponseHeader: resp.GetHeader(), - ID: LeaseID(resp.ID), - TTL: resp.TTL, - GrantedTTL: resp.GrantedTTL, - Keys: resp.Keys, - } - return gresp, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + r := toLeaseTimeToLiveRequest(id, opts...) + resp, err := l.remote.LeaseTimeToLive(ctx, r) + if err == nil { + gresp := &LeaseTimeToLiveResponse{ + ResponseHeader: resp.GetHeader(), + ID: LeaseID(resp.ID), + TTL: resp.TTL, + GrantedTTL: resp.GrantedTTL, + Keys: resp.Keys, } + return gresp, nil } + return nil, toErr(ctx, err) } func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { - for { - resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, grpc.FailFast(false)) - if err == nil { - leases := make([]LeaseStatus, len(resp.Leases)) - for i := range resp.Leases { - leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)} - } - return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil - } - if isHaltErr(ctx, err) { - return nil, toErr(ctx, err) + resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}) + if err == nil { + leases := make([]LeaseStatus, len(resp.Leases)) + for i := range resp.Leases { + leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)} } + return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil } + return nil, toErr(ctx, err) } func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) { @@ -389,7 +371,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.remote.LeaseKeepAlive(cctx, grpc.FailFast(false)) + stream, err := l.remote.LeaseKeepAlive(cctx) if err != nil { return nil, toErr(ctx, err) } @@ -433,7 +415,6 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { } else { for { resp, err := stream.Recv() - if err != nil { if canceledByCaller(l.stopCtx, err) { return err @@ -461,7 +442,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending keep alive requests. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx, grpc.FailFast(false)) + stream, err := l.remote.LeaseKeepAlive(sctx) if err != nil { cancel() return nil, err diff --git a/clientv3/retry.go b/clientv3/retry.go index 27bacf1d5ee..408383d2798 100644 --- a/clientv3/retry.go +++ b/clientv3/retry.go @@ -164,11 +164,11 @@ func (rkv *retryWriteKVClient) Compact(ctx context.Context, in *pb.CompactionReq } type retryLeaseClient struct { - pb.LeaseClient - readRetry retryRPCFunc + lc pb.LeaseClient + repeatableRetry retryRPCFunc } -// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy. +// RetryLeaseClient implements a LeaseClient. func RetryLeaseClient(c *Client) pb.LeaseClient { retry := &retryLeaseClient{ pb.NewLeaseClient(c.conn), @@ -177,9 +177,25 @@ func RetryLeaseClient(c *Client) pb.LeaseClient { return &retryLeaseClient{retry, c.newAuthRetryWrapper()} } +func (rlc *retryLeaseClient) LeaseTimeToLive(ctx context.Context, in *pb.LeaseTimeToLiveRequest, opts ...grpc.CallOption) (resp *pb.LeaseTimeToLiveResponse, err error) { + err = rlc.repeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.lc.LeaseTimeToLive(rctx, in, opts...) + return err + }) + return resp, err +} + +func (rlc *retryLeaseClient) LeaseLeases(ctx context.Context, in *pb.LeaseLeasesRequest, opts ...grpc.CallOption) (resp *pb.LeaseLeasesResponse, err error) { + err = rlc.repeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.lc.LeaseLeases(rctx, in, opts...) + return err + }) + return resp, err +} + func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) { - err = rlc.readRetry(ctx, func(rctx context.Context) error { - resp, err = rlc.LeaseClient.LeaseGrant(rctx, in, opts...) + err = rlc.repeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.lc.LeaseGrant(rctx, in, opts...) return err }) return resp, err @@ -187,13 +203,21 @@ func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRe } func (rlc *retryLeaseClient) LeaseRevoke(ctx context.Context, in *pb.LeaseRevokeRequest, opts ...grpc.CallOption) (resp *pb.LeaseRevokeResponse, err error) { - err = rlc.readRetry(ctx, func(rctx context.Context) error { - resp, err = rlc.LeaseClient.LeaseRevoke(rctx, in, opts...) + err = rlc.repeatableRetry(ctx, func(rctx context.Context) error { + resp, err = rlc.lc.LeaseRevoke(rctx, in, opts...) return err }) return resp, err } +func (rlc *retryLeaseClient) LeaseKeepAlive(ctx context.Context, opts ...grpc.CallOption) (stream pb.Lease_LeaseKeepAliveClient, err error) { + err = rlc.repeatableRetry(ctx, func(rctx context.Context) error { + stream, err = rlc.lc.LeaseKeepAlive(rctx, opts...) + return err + }) + return stream, err +} + type retryClusterClient struct { pb.ClusterClient writeRetry retryRPCFunc