Skip to content

Commit

Permalink
lease,integration: add checkpoint scheduling after leader change
Browse files Browse the repository at this point in the history
Current checkpointing mechanism is buggy. New checkpoints for any lease
are scheduled only until the first leader change. Added fix for that
and a test that will check it.
  • Loading branch information
michaljasionowski authored and serathius committed Dec 2, 2021
1 parent eac7f98 commit 21634a9
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 47 deletions.
1 change: 1 addition & 0 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func (le *lessor) Promote(extend time.Duration) {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}

if len(le.leaseMap) < leaseRevokeRate {
Expand Down
5 changes: 2 additions & 3 deletions server/lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer be.Close()

le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
Expand All @@ -543,13 +544,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
})
defer le.Stop()
le.Promote(0)

_, err := le.Grant(1, 2)
if err != nil {
t.Fatal(err)
}
le.Promote(0)

// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
select {
Expand Down
128 changes: 84 additions & 44 deletions tests/integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,56 +229,96 @@ func TestV3LeaseKeepAlive(t *testing.T) {
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
// across leader elections.
func TestV3LeaseCheckpoint(t *testing.T) {
BeforeTest(t)

var ttl int64 = 300
leaseInterval := 2 * time.Second
clus := NewClusterV3(t, &ClusterConfig{
Size: 3,
EnableLeaseCheckpoint: true,
LeaseCheckpointInterval: leaseInterval,
UseBridge: true,
})
defer clus.Terminate(t)

// create lease
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := toGRPC(clus.RandClient())
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
if err != nil {
t.Fatal(err)
}
tcs := []struct {
name string
checkpointingEnabled bool
ttl time.Duration
checkpointingInterval time.Duration
leaderChanges int
expectTTLIsGT time.Duration
expectTTLIsLT time.Duration
}{
{
name: "Checkpointing disabled, lease TTL is reset",
ttl: 300 * time.Second,
leaderChanges: 1,
expectTTLIsGT: 298 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
expectTTLIsLT: 290 * time.Second,
},
{
// Checking if checkpointing continues after the first leader change.
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 2,
expectTTLIsLT: 280 * time.Second,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
BeforeTest(t)
config := &ClusterConfig{
Size: 3,
EnableLeaseCheckpoint: tc.checkpointingEnabled,
LeaseCheckpointInterval: tc.checkpointingInterval,
}
clus := NewClusterV3(t, config)
defer clus.Terminate(t)

// create lease
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := toGRPC(clus.RandClient())
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
if err != nil {
t.Fatal(err)
}

// wait for a checkpoint to occur
time.Sleep(leaseInterval + 1*time.Second)
for i := 0; i < tc.leaderChanges; i++ {
// wait for a checkpoint to occur
time.Sleep(tc.checkpointingInterval + 1*time.Second)

// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))
// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
}

time.Sleep(250 * time.Millisecond)
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))

time.Sleep(250 * time.Millisecond)

// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err)
}
}
}

// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err)
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
}
}
}

expectedTTL := ttl - int64(leaseInterval.Seconds())
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
}
})
}
}

Expand Down

0 comments on commit 21634a9

Please sign in to comment.