Skip to content

Commit

Permalink
Merge pull request #12889 from ptabor/20210423-deflake-TestFirstCommi…
Browse files Browse the repository at this point in the history
…tNotification

Deflake: TestFirstCommitNotification
  • Loading branch information
ptabor authored Apr 23, 2021
2 parents bd4f8e2 + 9f55977 commit 9a3aff6
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions tests/integration/v3_leadership_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func TestTransferLeadershipWithLearner(t *testing.T) {

func TestFirstCommitNotification(t *testing.T) {
BeforeTest(t)
ctx := context.Background()
clusterSize := 3
cluster := NewClusterV3(t, &ClusterConfig{Size: clusterSize})
defer cluster.Terminate(t)
Expand All @@ -205,17 +206,26 @@ func TestFirstCommitNotification(t *testing.T) {
t.Errorf("got error during leadership transfer: %v", err)
}

t.Logf("Leadership transferred.")
t.Logf("Submitting write to make sure empty and 'foo' index entry was already flushed")
cli := cluster.RandClient()

if _, err := cli.Put(ctx, "foo", "bar"); err != nil {
t.Fatalf("Failed to put kv pair.")
}

// It's guaranteed now that leader contains the 'foo'->'bar' index entry.
leaderAppliedIndex := cluster.Members[newLeaderIdx].s.AppliedIndex()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

group, groupContext := errgroup.WithContext(ctx)

for i, notifier := range notifiers {
member, notifier := cluster.Members[i], notifier
group.Go(func() error {
return checkFirstCommitNotification(groupContext, member, leaderAppliedIndex, notifier)
return checkFirstCommitNotification(groupContext, t, member, leaderAppliedIndex, notifier)
})
}

Expand All @@ -227,20 +237,21 @@ func TestFirstCommitNotification(t *testing.T) {

func checkFirstCommitNotification(
ctx context.Context,
t testing.TB,
member *member,
leaderAppliedIndex uint64,
notifier <-chan struct{},
) error {
// wait until server applies all the changes of leader
for member.s.AppliedIndex() < leaderAppliedIndex {
t.Logf("member.s.AppliedIndex():%v <= leaderAppliedIndex:%v", member.s.AppliedIndex(), leaderAppliedIndex)
select {
case <-ctx.Done():
return ctx.Err()
default:
time.Sleep(100 * time.Millisecond)
}
}

select {
case msg, ok := <-notifier:
if ok {
Expand All @@ -251,6 +262,7 @@ func checkFirstCommitNotification(
)
}
default:
t.Logf("member.s.AppliedIndex():%v >= leaderAppliedIndex:%v", member.s.AppliedIndex(), leaderAppliedIndex)
return fmt.Errorf(
"notification was not triggered, member ID: %d",
member.ID(),
Expand Down

0 comments on commit 9a3aff6

Please sign in to comment.