diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 7236515f2b3..6687529742a 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -446,6 +446,7 @@ func (le *lessor) Promote(extend time.Duration) { l.refresh(extend) item := &LeaseWithTime{id: l.ID, time: l.expiry} le.leaseExpiredNotifier.RegisterOrUpdate(item) + le.scheduleCheckpointIfNeeded(l) } if len(le.leaseMap) < leaseRevokeRate { diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 58a4ad29086..5d6d28782f3 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -531,6 +531,7 @@ func TestLessorCheckpointScheduling(t *testing.T) { defer be.Close() le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second}) + defer le.Stop() le.minLeaseTTL = 1 checkpointedC := make(chan struct{}) le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) { @@ -543,13 +544,11 @@ func TestLessorCheckpointScheduling(t *testing.T) { t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL) } }) - defer le.Stop() - le.Promote(0) - _, err := le.Grant(1, 2) if err != nil { t.Fatal(err) } + le.Promote(0) // TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds. select { diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 1727da65cce..6adc2f2b3b1 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -229,56 +229,96 @@ func TestV3LeaseKeepAlive(t *testing.T) { // TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted // across leader elections. func TestV3LeaseCheckpoint(t *testing.T) { - BeforeTest(t) - - var ttl int64 = 300 - leaseInterval := 2 * time.Second - clus := NewClusterV3(t, &ClusterConfig{ - Size: 3, - EnableLeaseCheckpoint: true, - LeaseCheckpointInterval: leaseInterval, - UseBridge: true, - }) - defer clus.Terminate(t) - - // create lease - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - c := toGRPC(clus.RandClient()) - lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl}) - if err != nil { - t.Fatal(err) - } + tcs := []struct { + name string + checkpointingEnabled bool + ttl time.Duration + checkpointingInterval time.Duration + leaderChanges int + expectTTLIsGT time.Duration + expectTTLIsLT time.Duration + }{ + { + name: "Checkpointing disabled, lease TTL is reset", + ttl: 300 * time.Second, + leaderChanges: 1, + expectTTLIsGT: 298 * time.Second, + }, + { + name: "Checkpointing enabled 10s, lease TTL is preserved after leader change", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 1, + expectTTLIsLT: 290 * time.Second, + }, + { + // Checking if checkpointing continues after the first leader change. + name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes", + ttl: 300 * time.Second, + checkpointingEnabled: true, + checkpointingInterval: 10 * time.Second, + leaderChanges: 2, + expectTTLIsLT: 280 * time.Second, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + BeforeTest(t) + config := &ClusterConfig{ + Size: 3, + EnableLeaseCheckpoint: tc.checkpointingEnabled, + LeaseCheckpointInterval: tc.checkpointingInterval, + } + clus := NewClusterV3(t, config) + defer clus.Terminate(t) + + // create lease + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := toGRPC(clus.RandClient()) + lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())}) + if err != nil { + t.Fatal(err) + } - // wait for a checkpoint to occur - time.Sleep(leaseInterval + 1*time.Second) + for i := 0; i < tc.leaderChanges; i++ { + // wait for a checkpoint to occur + time.Sleep(tc.checkpointingInterval + 1*time.Second) - // Force a leader election - leaderId := clus.WaitLeader(t) - leader := clus.Members[leaderId] - leader.Stop(t) - time.Sleep(time.Duration(3*electionTicks) * tickDuration) - leader.Restart(t) - newLeaderId := clus.WaitLeader(t) - c2 := toGRPC(clus.Client(newLeaderId)) + // Force a leader election + leaderId := clus.WaitLeader(t) + leader := clus.Members[leaderId] + leader.Stop(t) + time.Sleep(time.Duration(3*electionTicks) * tickDuration) + leader.Restart(t) + } - time.Sleep(250 * time.Millisecond) + newLeaderId := clus.WaitLeader(t) + c2 := toGRPC(clus.Client(newLeaderId)) + + time.Sleep(250 * time.Millisecond) + + // Check the TTL of the new leader + var ttlresp *pb.LeaseTimeToLiveResponse + for i := 0; i < 10; i++ { + if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { + time.Sleep(time.Millisecond * 250) + } else { + t.Fatal(err) + } + } + } - // Check the TTL of the new leader - var ttlresp *pb.LeaseTimeToLiveResponse - for i := 0; i < 10; i++ { - if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil { - if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { - time.Sleep(time.Millisecond * 250) - } else { - t.Fatal(err) + if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT { + t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT) } - } - } - expectedTTL := ttl - int64(leaseInterval.Seconds()) - if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL { - t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL) + if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT { + t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT) + } + }) } }