Skip to content

Commit

Permalink
tests: fix flaky TestStreamVerifierCtxCancelPoolQueue (#4980)
Browse files Browse the repository at this point in the history
  • Loading branch information
algonautshant authored Jan 9, 2023
1 parent 19fa6c2 commit 787d386
Showing 1 changed file with 39 additions and 15 deletions.
54 changes: 39 additions & 15 deletions data/transactions/verify/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,7 +1389,7 @@ func TestStreamVerifierBlockWatcher(t *testing.T) {
}
}

func getSaturatedExecPool(t *testing.T) (execpool.BacklogPool, chan interface{}, execpool.BacklogPool) {
func getSaturatedExecPool(t *testing.T) (execpool.BacklogPool, chan interface{}) {
verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t)
_, buffLen := verificationPool.BufferSize()

Expand All @@ -1402,7 +1402,7 @@ func getSaturatedExecPool(t *testing.T) (execpool.BacklogPool, chan interface{},
return nil
}, nil, nil)
}
return verificationPool, holdTasks, verificationPool
return verificationPool, holdTasks
}

// TestStreamVerifierCtxCancel tests the termination when the ctx is canceled
Expand All @@ -1413,8 +1413,8 @@ func getSaturatedExecPool(t *testing.T) (execpool.BacklogPool, chan interface{},
func TestStreamVerifierCtxCancel(t *testing.T) {
partitiontest.PartitionTest(t)

verificationPool, holdTasks, vp := getSaturatedExecPool(t)
defer vp.Shutdown()
verificationPool, holdTasks := getSaturatedExecPool(t)
defer verificationPool.Shutdown()
ctx, cancel := context.WithCancel(context.Background())
cache := MakeVerifiedTransactionCache(50)
stxnChan := make(chan *UnverifiedElement)
Expand Down Expand Up @@ -1461,8 +1461,7 @@ func TestStreamVerifierCtxCancel(t *testing.T) {
func TestStreamVerifierCtxCancelPoolQueue(t *testing.T) {
partitiontest.PartitionTest(t)

verificationPool, holdTasks, vp := getSaturatedExecPool(t)
defer vp.Shutdown()
verificationPool, holdTasks := getSaturatedExecPool(t)

// check the logged information
var logBuffer bytes.Buffer
Expand All @@ -1484,26 +1483,51 @@ func TestStreamVerifierCtxCancelPoolQueue(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
// no verification tasks should be executed
// one result should be returned
result = <-resultChan
for {
result = <-resultChan
// at least one errShuttingDownError is expected
if result.Err != errShuttingDownError {
continue
}
break
}
}()

// send batchSizeBlockLimit after the exec pool buffer is full
numOfTxns := 1
txnGroups, _ := getSignedTransactions(numOfTxns, 1, 0, 0.5)
stxnChan <- &UnverifiedElement{TxnGroup: txnGroups[0], BacklogMessage: nil}

wg.Add(1)
// run in separate goroutine because the exec pool is blocked here, and this will not advance
// until holdTasks are closed
go func() {
defer wg.Done()
for {
select {
// Normally, a single txn is sufficient, but the goroutines could be scheduled is such a way that
// the single transaction slips through and passes the batch verifier before the exec pool shuts down.
// this happens when close(holdTasks) runs and frees the exec pool, and lets the txns get verified, before
// verificationPool.Shutdown() executes.
case stxnChan <- &UnverifiedElement{TxnGroup: txnGroups[0], BacklogMessage: nil}:
case <-ctx.Done():
return
}
}
}()
// cancel the ctx as the sig is not yet sent to the exec pool
// the test might sporadically fail if between sending the txn above
// and the cancelation, 2 x waitForNextTxnDuration elapses (10ms)
time.Sleep(6 * waitForNextTxnDuration)
cancel()
go func() {
// wait a bit before releasing the tasks, so that the verificationPool ctx first gets canceled
time.Sleep(20 * time.Millisecond)
close(holdTasks)
}()
verificationPool.Shutdown()

// the main loop should stop after cancel()
// the main loop should stop before calling cancel() when the exec pool shuts down and returns an error
sv.WaitForStop()

// release the tasks
close(holdTasks)
cancel()

wg.Wait()
require.ErrorIs(t, result.Err, errShuttingDownError)
Expand Down

0 comments on commit 787d386

Please sign in to comment.