diff --git a/integration/cluster.go b/integration/cluster.go index e3924e64d11f..a3db95e960aa 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -148,6 +148,8 @@ type ClusterConfig struct { // UseIP is true to use only IP for gRPC requests. UseIP bool + + LeaseCheckpointInterval time.Duration } type cluster struct { @@ -290,6 +292,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member { clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, useIP: c.cfg.UseIP, + leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -575,6 +578,7 @@ type memberConfig struct { clientMaxCallSendMsgSize int clientMaxCallRecvMsgSize int useIP bool + leaseCheckpointInterval time.Duration } // mustNewMember return an inited member with the given name. If peerTLS is @@ -665,6 +669,7 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize m.useIP = mcfg.useIP + m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval m.InitialCorruptCheck = true diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 7ec2d3c764e0..4e47e6c0a07a 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -25,7 +25,9 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" "github.com/coreos/etcd/pkg/testutil" + "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" ) // TestV3LeasePrmote ensures the newly elected leader can promote itself @@ -222,6 +224,56 @@ func TestV3LeaseKeepAlive(t *testing.T) { }) } +// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted +// across leader elections. +func TestV3LeaseCheckpoint(t *testing.T) { + var ttl int64 = 300 + leaseInterval := 2 * time.Second + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, LeaseCheckpointInterval: leaseInterval}) + defer clus.Terminate(t) + + // create lease + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := toGRPC(clus.RandClient()) + lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl}) + if err != nil { + t.Fatal(err) + } + + // wait for a checkpoint to occur + time.Sleep(leaseInterval + 1*time.Second) + + // Force a leader election + leaderId := clus.WaitLeader(t) + leader := clus.Members[leaderId] + leader.Stop(t) + time.Sleep(time.Duration(3*electionTicks) * tickDuration) + leader.Restart(t) + newLeaderId := clus.WaitLeader(t) + c2 := toGRPC(clus.Client(newLeaderId)) + + time.Sleep(250 * time.Millisecond) + + // Check the TTL of the new leader + var ttlresp *pb.LeaseTimeToLiveResponse + for i := 0; i < 10; i++ { + if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { + time.Sleep(time.Millisecond * 250) + } else { + t.Fatal(err) + } + } + } + + expectedTTL := ttl - int64(leaseInterval.Seconds()) + if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL { + t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL) + } +} + // TestV3LeaseExists creates a lease on a random client and confirms it exists in the cluster. func TestV3LeaseExists(t *testing.T) { defer testutil.AfterTest(t) diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 4beb6a336268..b034ca0a1494 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -15,6 +15,7 @@ package lease import ( + "context" "fmt" "io/ioutil" "os" @@ -25,6 +26,7 @@ import ( "testing" "time" + pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/mvcc/backend" "go.uber.org/zap" ) @@ -476,6 +478,62 @@ func TestLessorMaxTTL(t *testing.T) { } } +func TestLessorCheckpointScheduling(t *testing.T) { + lg := zap.NewNop() + + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + le.minLeaseTTL = 1 + checkpointedC := make(chan struct{}) + le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { + close(checkpointedC) + if len(lc.Checkpoints) != 1 { + t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints)) + } + c := lc.Checkpoints[0] + if c.Remaining_TTL != 1 { + t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) + } + }) + defer le.Stop() + le.Promote(0) + + _, err := le.Grant(1, 2) + if err != nil { + t.Fatal(err) + } + + // TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds. + select { + case <-checkpointedC: + case <-time.After(2 * time.Second): + t.Fatal("expected checkpointer to be called, but it was not") + } +} + +func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { + lg := zap.NewNop() + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL}) + defer le.Stop() + l, err := le.Grant(1, 10) + if err != nil { + t.Fatal(err) + } + le.Checkpoint(l.ID, 5) + le.Promote(0) + remaining := l.Remaining().Seconds() + if !(remaining > 4 && remaining < 5) { + t.Fatalf("expected expiry to be less than 1s in the future, but got %f seconds", remaining) + } +} + type fakeDeleter struct { deleted []string tx backend.BatchTx