diff --git a/agreement/asyncVoteVerifier_test.go b/agreement/asyncVoteVerifier_test.go index 8a4e819595..42378d506a 100644 --- a/agreement/asyncVoteVerifier_test.go +++ b/agreement/asyncVoteVerifier_test.go @@ -34,6 +34,9 @@ func (fp *expiredExecPool) EnqueueBacklog(enqueueCtx context.Context, t execpool // generate an error, to see if we correctly report that on the verifyVote() call. return context.Canceled } +func (fp *expiredExecPool) BufferSize() (length, capacity int) { + return +} // Test async vote verifier against a full execution pool. func TestVerificationAgainstFullExecutionPool(t *testing.T) { diff --git a/cmd/goal/clerk.go b/cmd/goal/clerk.go index 84aef5e699..49a13ae7de 100644 --- a/cmd/goal/clerk.go +++ b/cmd/goal/clerk.go @@ -422,7 +422,7 @@ var sendCmd = &cobra.Command{ CurrentProtocol: proto, }, } - groupCtx, err := verify.PrepareGroupContext([]transactions.SignedTxn{uncheckedTxn}, blockHeader, nil) + groupCtx, err := verify.PrepareGroupContext([]transactions.SignedTxn{uncheckedTxn}, &blockHeader, nil) if err == nil { err = verify.LogicSigSanityCheck(&uncheckedTxn, 0, groupCtx) } @@ -823,7 +823,7 @@ var signCmd = &cobra.Command{ } var groupCtx *verify.GroupContext if lsig.Logic != nil { - groupCtx, err = verify.PrepareGroupContext(txnGroup, contextHdr, nil) + groupCtx, err = verify.PrepareGroupContext(txnGroup, &contextHdr, nil) if err != nil { // this error has to be unsupported protocol reportErrorf("%s: %v", txFilename, err) diff --git a/crypto/batchverifier.go b/crypto/batchverifier.go index a358d65e38..b8ff87b797 100644 --- a/crypto/batchverifier.go +++ b/crypto/batchverifier.go @@ -53,7 +53,7 @@ const minBatchVerifierAlloc = 16 // Batch verifications errors var ( - ErrBatchVerificationFailed = errors.New("At least one signature didn't pass verification") + ErrBatchHasFailedSigs = errors.New("At least one signature didn't pass verification") ) //export ed25519_randombytes_unsafe @@ -104,8 +104,8 @@ func (b *BatchVerifier) expand() { b.signatures = signatures } -// getNumberOfEnqueuedSignatures returns the number of signatures current enqueue onto the batch verifier object -func (b *BatchVerifier) getNumberOfEnqueuedSignatures() int { +// GetNumberOfEnqueuedSignatures returns the number of signatures currently enqueued into the BatchVerifier +func (b *BatchVerifier) GetNumberOfEnqueuedSignatures() int { return len(b.messages) } @@ -120,18 +120,18 @@ func (b *BatchVerifier) Verify() error { // if some signatures are invalid, true will be set in failed at the corresponding indexes, and // ErrBatchVerificationFailed for err func (b *BatchVerifier) VerifyWithFeedback() (failed []bool, err error) { - if b.getNumberOfEnqueuedSignatures() == 0 { + if b.GetNumberOfEnqueuedSignatures() == 0 { return nil, nil } - var messages = make([][]byte, b.getNumberOfEnqueuedSignatures()) - for i, m := range b.messages { - messages[i] = HashRep(m) + var messages = make([][]byte, b.GetNumberOfEnqueuedSignatures()) + for i := range b.messages { + messages[i] = HashRep(b.messages[i]) } allValid, failed := batchVerificationImpl(messages, b.publicKeys, b.signatures) if allValid { return failed, nil } - return failed, ErrBatchVerificationFailed + return failed, ErrBatchHasFailedSigs } // batchVerificationImpl invokes the ed25519 batch verification algorithm. diff --git a/crypto/batchverifier_test.go b/crypto/batchverifier_test.go index 7c5455703c..6ec24249e1 100644 --- a/crypto/batchverifier_test.go +++ b/crypto/batchverifier_test.go @@ -17,6 +17,7 @@ package crypto import ( + "fmt" "math/rand" "runtime" "testing" @@ -64,7 +65,7 @@ func TestBatchVerifierBulk(t *testing.T) { sig := sigSecrets.Sign(msg) bv.EnqueueSignature(sigSecrets.SignatureVerifier, msg, sig) } - require.Equal(t, n, bv.getNumberOfEnqueuedSignatures()) + require.Equal(t, n, bv.GetNumberOfEnqueuedSignatures()) require.NoError(t, bv.Verify()) } @@ -121,6 +122,67 @@ func BenchmarkBatchVerifier(b *testing.B) { require.NoError(b, bv.Verify()) } +// BenchmarkBatchVerifierBig with b.N over 1000 will report the expected performance +// gain as the batchsize increases. All sigs are valid. +func BenchmarkBatchVerifierBig(b *testing.B) { + c := makeCurve25519Secret() + for batchSize := 1; batchSize <= 96; batchSize++ { + bv := MakeBatchVerifierWithHint(batchSize) + for i := 0; i < batchSize; i++ { + str := randString() + bv.EnqueueSignature(c.SignatureVerifier, str, c.Sign(str)) + } + b.Run(fmt.Sprintf("running batchsize %d", batchSize), func(b *testing.B) { + totalTransactions := b.N + count := totalTransactions / batchSize + if count*batchSize < totalTransactions { + count++ + } + for x := 0; x < count; x++ { + require.NoError(b, bv.Verify()) + } + }) + } +} + +// BenchmarkBatchVerifierBigWithInvalid builds over BenchmarkBatchVerifierBig by introducing +// invalid sigs to even numbered batch sizes. This shows the impact of invalid sigs on the +// performance. Basically, all the gains from batching disappear. +func BenchmarkBatchVerifierBigWithInvalid(b *testing.B) { + c := makeCurve25519Secret() + badSig := Signature{} + for batchSize := 1; batchSize <= 96; batchSize++ { + bv := MakeBatchVerifierWithHint(batchSize) + for i := 0; i < batchSize; i++ { + str := randString() + if batchSize%2 == 0 && (i == 0 || rand.Float32() < 0.1) { + bv.EnqueueSignature(c.SignatureVerifier, str, badSig) + } else { + bv.EnqueueSignature(c.SignatureVerifier, str, c.Sign(str)) + } + } + b.Run(fmt.Sprintf("running batchsize %d", batchSize), func(b *testing.B) { + totalTransactions := b.N + count := totalTransactions / batchSize + if count*batchSize < totalTransactions { + count++ + } + for x := 0; x < count; x++ { + failed, err := bv.VerifyWithFeedback() + if err != nil { + for i, f := range failed { + if bv.signatures[i] == badSig { + require.True(b, f) + } else { + require.False(b, f) + } + } + } + } + }) + } +} + func TestEmpty(t *testing.T) { partitiontest.PartitionTest(t) bv := MakeBatchVerifier() @@ -155,10 +217,10 @@ func TestBatchVerifierIndividualResults(t *testing.T) { } bv.EnqueueSignature(sigSecrets.SignatureVerifier, msg, sig) } - require.Equal(t, n, bv.getNumberOfEnqueuedSignatures()) + require.Equal(t, n, bv.GetNumberOfEnqueuedSignatures()) failed, err := bv.VerifyWithFeedback() if hasBadSig { - require.ErrorIs(t, err, ErrBatchVerificationFailed) + require.ErrorIs(t, err, ErrBatchHasFailedSigs) } else { require.NoError(t, err) } @@ -185,10 +247,10 @@ func TestBatchVerifierIndividualResultsAllValid(t *testing.T) { sig := sigSecrets.Sign(msg) bv.EnqueueSignature(sigSecrets.SignatureVerifier, msg, sig) } - require.Equal(t, n, bv.getNumberOfEnqueuedSignatures()) + require.Equal(t, n, bv.GetNumberOfEnqueuedSignatures()) failed, err := bv.VerifyWithFeedback() require.NoError(t, err) - require.Equal(t, bv.getNumberOfEnqueuedSignatures(), len(failed)) + require.Equal(t, bv.GetNumberOfEnqueuedSignatures(), len(failed)) for _, f := range failed { require.False(t, f) } diff --git a/data/ledger.go b/data/ledger.go index 101da721af..75150f2229 100644 --- a/data/ledger.go +++ b/data/ledger.go @@ -80,7 +80,7 @@ type roundSeed struct { func LoadLedger( log logging.Logger, dbFilenamePrefix string, memory bool, genesisProto protocol.ConsensusVersion, genesisBal bookkeeping.GenesisBalances, genesisID string, genesisHash crypto.Digest, - blockListeners []ledger.BlockListener, cfg config.Local, + blockListeners []ledgercore.BlockListener, cfg config.Local, ) (*Ledger, error) { if genesisBal.Balances == nil { genesisBal.Balances = make(map[basics.Address]basics.AccountData) diff --git a/data/transactions/verify/artifact_test.go b/data/transactions/verify/artifact_test.go index 8444dfa353..8c4f44e601 100644 --- a/data/transactions/verify/artifact_test.go +++ b/data/transactions/verify/artifact_test.go @@ -77,7 +77,7 @@ func BenchmarkTinyMan(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := TxnGroup(stxnss[i], hdr, nil, &logic.NoHeaderLedger{}) + _, err := TxnGroup(stxnss[i], &hdr, nil, &logic.NoHeaderLedger{}) require.NoError(b, err) } }) @@ -93,7 +93,7 @@ func BenchmarkTinyMan(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := TxnGroup(stxns, hdr, nil, &logic.NoHeaderLedger{}) + _, err := TxnGroup(stxns, &hdr, nil, &logic.NoHeaderLedger{}) require.NoError(b, err) } }) diff --git a/data/transactions/verify/txn.go b/data/transactions/verify/txn.go index a4fe2ebd4d..ae06b5ddc5 100644 --- a/data/transactions/verify/txn.go +++ b/data/transactions/verify/txn.go @@ -21,6 +21,9 @@ import ( "encoding/binary" "errors" "fmt" + "sync" + "sync/atomic" + "time" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" @@ -28,6 +31,8 @@ import ( "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/logic" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/util/execpool" "github.com/algorand/go-algorand/util/metrics" @@ -44,6 +49,8 @@ var msigLsigLessOrEqual4 = metrics.MakeCounter(metrics.MetricName{Name: "algod_v var msigLsigLessOrEqual10 = metrics.MakeCounter(metrics.MetricName{Name: "algod_verify_msig_lsig_5_10", Description: "Total transaction scripts with 5-10 msigs"}) var msigLsigMore10 = metrics.MakeCounter(metrics.MetricName{Name: "algod_verify_msig_lsig_10", Description: "Total transaction scripts with 11+ msigs"}) +var errShuttingDownError = errors.New("not verified, verifier is shutting down") + // The PaysetGroups is taking large set of transaction groups and attempt to verify their validity using multiple go-routines. // When doing so, it attempts to break these into smaller "worksets" where each workset takes about 2ms of execution time in order // to avoid context switching overhead while providing good validation cancellation responsiveness. Each one of these worksets is @@ -51,6 +58,19 @@ var msigLsigMore10 = metrics.MakeCounter(metrics.MetricName{Name: "algod_verify_ // show that these are realistic numbers ) const txnPerWorksetThreshold = 32 +// batchSizeBlockLimit is the limit when the batch exceeds, will be added to the exec pool, even if the pool is saturated +// and the batch verifier will block until the exec pool accepts the batch +const batchSizeBlockLimit = 1024 + +// waitForNextTxnDuration is the time to wait before sending the batch to the exec pool +// If the incoming txn rate is low, a txn in the batch may wait no less than +// waitForNextTxnDuration before it is set for verification. +// This can introduce a latency to the propagation of a transaction in the network, +// since every relay will go through this wait time before broadcasting the txn. +// However, when the incoming txn rate is high, the batch will fill up quickly and will send +// for signature evaluation before waitForNextTxnDuration. +const waitForNextTxnDuration = 2 * time.Millisecond + // When the PaysetGroups is generating worksets, it enqueues up to concurrentWorksets entries to the execution pool. This serves several // purposes : // - if the verification task need to be aborted, there are only concurrentWorksets entries that are currently redundant on the execution pool queue. @@ -124,7 +144,7 @@ func (e *TxGroupError) Unwrap() error { // PrepareGroupContext prepares a verification group parameter object for a given transaction // group. -func PrepareGroupContext(group []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) { +func PrepareGroupContext(group []transactions.SignedTxn, contextHdr *bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) { if len(group) == 0 { return nil, nil } @@ -168,7 +188,7 @@ func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, } // TxnGroup verifies a []SignedTxn as being signed and having no obviously inconsistent data. -func TxnGroup(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, cache VerifiedTransactionCache, ledger logic.LedgerForSignature) (groupCtx *GroupContext, err error) { +func TxnGroup(stxs []transactions.SignedTxn, contextHdr *bookkeeping.BlockHeader, cache VerifiedTransactionCache, ledger logic.LedgerForSignature) (groupCtx *GroupContext, err error) { batchVerifier := crypto.MakeBatchVerifier() if groupCtx, err = txnGroupBatchPrep(stxs, contextHdr, ledger, batchVerifier); err != nil { @@ -188,7 +208,7 @@ func TxnGroup(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, // txnGroupBatchPrep verifies a []SignedTxn having no obviously inconsistent data. // it is the caller responsibility to call batchVerifier.Verify() -func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature, verifier *crypto.BatchVerifier) (*GroupContext, error) { +func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr *bookkeeping.BlockHeader, ledger logic.LedgerForSignature, verifier *crypto.BatchVerifier) (*GroupContext, error) { groupCtx, err := PrepareGroupContext(stxs, contextHdr, ledger) if err != nil { return nil, err @@ -229,43 +249,56 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo return groupCtx, nil } -// stxnCoreChecks runs signatures validity checks and enqueues signature into batchVerifier for verification. -func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) *TxGroupError { - numSigs := 0 - hasSig := false - hasMsig := false - hasLogicSig := false +type sigOrTxnType int + +const regularSig sigOrTxnType = 1 +const multiSig sigOrTxnType = 2 +const logicSig sigOrTxnType = 3 +const stateProofTxn sigOrTxnType = 4 + +// checkTxnSigTypeCounts checks the number of signature types and reports an error in case of a violation +func checkTxnSigTypeCounts(s *transactions.SignedTxn) (sigType sigOrTxnType, err *TxGroupError) { + numSigCategories := 0 if s.Sig != (crypto.Signature{}) { - numSigs++ - hasSig = true + numSigCategories++ + sigType = regularSig } if !s.Msig.Blank() { - numSigs++ - hasMsig = true + numSigCategories++ + sigType = multiSig } if !s.Lsig.Blank() { - numSigs++ - hasLogicSig = true + numSigCategories++ + sigType = logicSig } - if numSigs == 0 { + if numSigCategories == 0 { // Special case: special sender address can issue special transaction // types (state proof txn) without any signature. The well-formed // check ensures that this transaction cannot pay any fee, and // cannot have any other interesting fields, except for the state proof payload. if s.Txn.Sender == transactions.StateProofSender && s.Txn.Type == protocol.StateProofTx { - return nil + return stateProofTxn, nil } - return &TxGroupError{err: errTxnSigHasNoSig, Reason: TxGroupErrorReasonHasNoSig} + return 0, &TxGroupError{err: errTxnSigHasNoSig, Reason: TxGroupErrorReasonHasNoSig} } - if numSigs > 1 { - return &TxGroupError{err: errTxnSigNotWellFormed, Reason: TxGroupErrorReasonSigNotWellFormed} + if numSigCategories > 1 { + return 0, &TxGroupError{err: errTxnSigNotWellFormed, Reason: TxGroupErrorReasonSigNotWellFormed} + } + return sigType, nil +} + +// stxnCoreChecks runs signatures validity checks and enqueues signature into batchVerifier for verification. +func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) *TxGroupError { + sigType, err := checkTxnSigTypeCounts(s) + if err != nil { + return err } - if hasSig { + switch sigType { + case regularSig: batchVerifier.EnqueueSignature(crypto.SignatureVerifier(s.Authorizer()), s.Txn, s.Sig) return nil - } - if hasMsig { + case multiSig: if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil { return &TxGroupError{err: fmt.Errorf("multisig validation failed: %w", err), Reason: TxGroupErrorReasonMsigNotWellFormed} } @@ -283,14 +316,19 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex msigMore10.Inc(nil) } return nil - } - if hasLogicSig { + + case logicSig: if err := logicSigVerify(s, txnIdx, groupCtx); err != nil { return &TxGroupError{err: err, Reason: TxGroupErrorReasonLogicSigFailed} } return nil + + case stateProofTxn: + return nil + + default: + return &TxGroupError{err: errUnknownSignature, Reason: TxGroupErrorReasonGeneric} } - return &TxGroupError{err: errUnknownSignature, Reason: TxGroupErrorReasonGeneric} } // LogicSigSanityCheck checks that the signature is valid and that the program is basically well formed. @@ -463,7 +501,7 @@ func PaysetGroups(ctx context.Context, payset [][]transactions.SignedTxn, blkHea batchVerifier := crypto.MakeBatchVerifierWithHint(len(payset)) for i, signTxnsGrp := range txnGroups { - groupCtxs[i], grpErr = txnGroupBatchPrep(signTxnsGrp, blkHeader, ledger, batchVerifier) + groupCtxs[i], grpErr = txnGroupBatchPrep(signTxnsGrp, &blkHeader, ledger, batchVerifier) // abort only if it's a non-cache error. if grpErr != nil { return grpErr @@ -536,3 +574,377 @@ func (w *worksetBuilder) next() (txnGroups [][]transactions.SignedTxn) { func (w *worksetBuilder) completed() bool { return w.idx >= len(w.payset) } + +// UnverifiedElement is the element passed to the Stream verifier +// BacklogMessage is a *txBacklogMsg from data/txHandler.go which needs to be +// passed back to that context +type UnverifiedElement struct { + TxnGroup []transactions.SignedTxn + BacklogMessage interface{} +} + +// VerificationResult is the result of the txn group verification +// BacklogMessage is the reference associated with the txn group which was +// initially passed to the stream verifier +type VerificationResult struct { + TxnGroup []transactions.SignedTxn + BacklogMessage interface{} + Err error +} + +// StreamVerifier verifies txn groups received through the stxnChan channel, and returns the +// results through the resultChan +type StreamVerifier struct { + resultChan chan<- *VerificationResult + droppedChan chan<- *UnverifiedElement + stxnChan <-chan *UnverifiedElement + verificationPool execpool.BacklogPool + ctx context.Context + cache VerifiedTransactionCache + activeLoopWg sync.WaitGroup + nbw *NewBlockWatcher + ledger logic.LedgerForSignature +} + +// NewBlockWatcher is a struct used to provide a new block header to the +// stream verifier +type NewBlockWatcher struct { + blkHeader atomic.Value +} + +// MakeNewBlockWatcher construct a new block watcher with the initial blkHdr +func MakeNewBlockWatcher(blkHdr bookkeeping.BlockHeader) (nbw *NewBlockWatcher) { + nbw = &NewBlockWatcher{} + nbw.blkHeader.Store(&blkHdr) + return nbw +} + +// OnNewBlock implements the interface to subscribe to new block notifications from the ledger +func (nbw *NewBlockWatcher) OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) { + bh := nbw.blkHeader.Load().(*bookkeeping.BlockHeader) + if bh.Round >= block.BlockHeader.Round { + return + } + nbw.blkHeader.Store(&block.BlockHeader) +} + +func (nbw *NewBlockWatcher) getBlockHeader() (bh *bookkeeping.BlockHeader) { + return nbw.blkHeader.Load().(*bookkeeping.BlockHeader) +} + +type batchLoad struct { + txnGroups [][]transactions.SignedTxn + groupCtxs []*GroupContext + elementBacklogMessage []interface{} + messagesForTxn []int +} + +func makeBatchLoad(l int) (bl batchLoad) { + bl.txnGroups = make([][]transactions.SignedTxn, 0, l) + bl.groupCtxs = make([]*GroupContext, 0, l) + bl.elementBacklogMessage = make([]interface{}, 0, l) + bl.messagesForTxn = make([]int, 0, l) + return bl +} + +func (bl *batchLoad) addLoad(txngrp []transactions.SignedTxn, gctx *GroupContext, backlogMsg interface{}, numBatchableSigs int) { + bl.txnGroups = append(bl.txnGroups, txngrp) + bl.groupCtxs = append(bl.groupCtxs, gctx) + bl.elementBacklogMessage = append(bl.elementBacklogMessage, backlogMsg) + bl.messagesForTxn = append(bl.messagesForTxn, numBatchableSigs) + +} + +// LedgerForStreamVerifier defines the ledger methods used by the StreamVerifier. +type LedgerForStreamVerifier interface { + logic.LedgerForSignature + RegisterBlockListeners([]ledgercore.BlockListener) + Latest() basics.Round + BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error) +} + +// MakeStreamVerifier creates a new stream verifier and returns the chans used to send txn groups +// to it and obtain the txn signature verification result from +func MakeStreamVerifier(stxnChan <-chan *UnverifiedElement, resultChan chan<- *VerificationResult, + droppedChan chan<- *UnverifiedElement, ledger LedgerForStreamVerifier, + verificationPool execpool.BacklogPool, cache VerifiedTransactionCache) (*StreamVerifier, error) { + + latest := ledger.Latest() + latestHdr, err := ledger.BlockHdr(latest) + if err != nil { + return nil, errors.New("MakeStreamVerifier: Could not get header for previous block") + } + + nbw := MakeNewBlockWatcher(latestHdr) + ledger.RegisterBlockListeners([]ledgercore.BlockListener{nbw}) + + return &StreamVerifier{ + resultChan: resultChan, + stxnChan: stxnChan, + droppedChan: droppedChan, + verificationPool: verificationPool, + cache: cache, + nbw: nbw, + ledger: ledger, + }, nil +} + +// Start is called when the verifier is created and whenever it needs to restart after +// the ctx is canceled +func (sv *StreamVerifier) Start(ctx context.Context) { + sv.ctx = ctx + sv.activeLoopWg.Add(1) + go sv.batchingLoop() +} + +// WaitForStop waits until the batching loop terminates afer the ctx is canceled +func (sv *StreamVerifier) WaitForStop() { + sv.activeLoopWg.Wait() +} + +func (sv *StreamVerifier) cleanup(pending []*UnverifiedElement) { + // report an error for the unchecked txns + // drop the messages without reporting if the receiver does not consume + for _, uel := range pending { + sv.sendResult(uel.TxnGroup, uel.BacklogMessage, errShuttingDownError) + } +} + +func (sv *StreamVerifier) batchingLoop() { + defer sv.activeLoopWg.Done() + timer := time.NewTicker(waitForNextTxnDuration) + defer timer.Stop() + var added bool + var numberOfSigsInCurrent uint64 + var numberOfBatchAttempts uint64 + ue := make([]*UnverifiedElement, 0, 8) + defer func() { sv.cleanup(ue) }() + for { + select { + case stx := <-sv.stxnChan: + numberOfBatchableSigsInGroup, err := getNumberOfBatchableSigsInGroup(stx.TxnGroup) + if err != nil { + // wrong number of signatures + sv.sendResult(stx.TxnGroup, stx.BacklogMessage, err) + continue + } + + // if no batchable signatures here, send this as a task of its own + if numberOfBatchableSigsInGroup == 0 { + err := sv.addVerificationTaskToThePoolNow([]*UnverifiedElement{stx}) + if err != nil { + return + } + continue // stx is handled, continue + } + + // add this txngrp to the list of batchable txn groups + numberOfSigsInCurrent = numberOfSigsInCurrent + numberOfBatchableSigsInGroup + ue = append(ue, stx) + if numberOfSigsInCurrent > txnPerWorksetThreshold { + // enough transaction in the batch to efficiently verify + + if numberOfSigsInCurrent > batchSizeBlockLimit { + // do not consider adding more txns to this batch. + // bypass the exec pool situation and queue anyway + // this is to prevent creation of very large batches + err := sv.addVerificationTaskToThePoolNow(ue) + if err != nil { + return + } + added = true + } else { + added, err = sv.tryAddVerificationTaskToThePool(ue) + if err != nil { + return + } + } + if added { + numberOfSigsInCurrent = 0 + ue = make([]*UnverifiedElement, 0, 8) + numberOfBatchAttempts = 0 + } else { + // was not added because of the exec pool buffer length + numberOfBatchAttempts++ + } + } + case <-timer.C: + // timer ticked. it is time to send the batch even if it is not full + if numberOfSigsInCurrent == 0 { + // nothing batched yet... wait some more + continue + } + var err error + if numberOfBatchAttempts > 1 { + // bypass the exec pool situation and queue anyway + // this is to prevent long delays in transaction propagation + // at least one transaction here has waited 3 x waitForNextTxnDuration + err = sv.addVerificationTaskToThePoolNow(ue) + added = true + } else { + added, err = sv.tryAddVerificationTaskToThePool(ue) + } + if err != nil { + return + } + if added { + numberOfSigsInCurrent = 0 + ue = make([]*UnverifiedElement, 0, 8) + numberOfBatchAttempts = 0 + } else { + // was not added because of the exec pool buffer length. wait for some more txns + numberOfBatchAttempts++ + } + case <-sv.ctx.Done(): + return + } + } +} + +func (sv *StreamVerifier) sendResult(veTxnGroup []transactions.SignedTxn, veBacklogMessage interface{}, err error) { + // send the txn result out the pipe + select { + case sv.resultChan <- &VerificationResult{ + TxnGroup: veTxnGroup, + BacklogMessage: veBacklogMessage, + Err: err, + }: + default: + // we failed to write to the output queue, since the queue was full. + sv.droppedChan <- &UnverifiedElement{veTxnGroup, veBacklogMessage} + } +} + +func (sv *StreamVerifier) tryAddVerificationTaskToThePool(ue []*UnverifiedElement) (added bool, err error) { + // if the exec pool buffer is full, can go back and collect + // more signatures instead of waiting in the exec pool buffer + // more signatures to the batch do not harm performance but introduce latency when delayed (see crypto.BenchmarkBatchVerifierBig) + + // if the buffer is full + if l, c := sv.verificationPool.BufferSize(); l == c { + return false, nil + } + err = sv.addVerificationTaskToThePoolNow(ue) + if err != nil { + // An error is returned when the context of the pool expires + return false, err + } + return true, nil +} + +func (sv *StreamVerifier) addVerificationTaskToThePoolNow(ue []*UnverifiedElement) error { + // if the context is canceled when the task is in the queue, it should be canceled + // copy the ctx here so that when the StreamVerifier is started again, and a new context + // is created, this task still gets canceled due to the ctx at the time of this task + taskCtx := sv.ctx + function := func(arg interface{}) interface{} { + if taskCtx.Err() != nil { + // ctx is canceled. the results will be returned + sv.cleanup(ue) + return nil + } + + ue := arg.([]*UnverifiedElement) + batchVerifier := crypto.MakeBatchVerifier() + + bl := makeBatchLoad(len(ue)) + // TODO: separate operations here, and get the sig verification inside the LogicSig to the batch here + blockHeader := sv.nbw.getBlockHeader() + for _, ue := range ue { + groupCtx, err := txnGroupBatchPrep(ue.TxnGroup, blockHeader, sv.ledger, batchVerifier) + if err != nil { + // verification failed, no need to add the sig to the batch, report the error + sv.sendResult(ue.TxnGroup, ue.BacklogMessage, err) + continue + } + totalBatchCount := batchVerifier.GetNumberOfEnqueuedSignatures() + bl.addLoad(ue.TxnGroup, groupCtx, ue.BacklogMessage, totalBatchCount) + } + + failed, err := batchVerifier.VerifyWithFeedback() + // this error can only be crypto.ErrBatchHasFailedSigs + if err == nil { // success, all signatures verified + for i := range bl.txnGroups { + sv.sendResult(bl.txnGroups[i], bl.elementBacklogMessage[i], nil) + } + sv.cache.AddPayset(bl.txnGroups, bl.groupCtxs) + return nil + } + + verifiedTxnGroups := make([][]transactions.SignedTxn, 0, len(bl.txnGroups)) + verifiedGroupCtxs := make([]*GroupContext, 0, len(bl.groupCtxs)) + failedSigIdx := 0 + for txgIdx := range bl.txnGroups { + txGroupSigFailed := false + for failedSigIdx < bl.messagesForTxn[txgIdx] { + if failed[failedSigIdx] { + // if there is a failed sig check, then no need to check the rest of the + // sigs for this txnGroup + failedSigIdx = bl.messagesForTxn[txgIdx] + txGroupSigFailed = true + } else { + // proceed to check the next sig belonging to this txnGroup + failedSigIdx++ + } + } + var result error + if !txGroupSigFailed { + verifiedTxnGroups = append(verifiedTxnGroups, bl.txnGroups[txgIdx]) + verifiedGroupCtxs = append(verifiedGroupCtxs, bl.groupCtxs[txgIdx]) + } else { + result = err + } + sv.sendResult(bl.txnGroups[txgIdx], bl.elementBacklogMessage[txgIdx], result) + } + // loading them all at once by locking the cache once + sv.cache.AddPayset(verifiedTxnGroups, verifiedGroupCtxs) + return nil + } + + // EnqueueBacklog returns an error when the context is canceled + err := sv.verificationPool.EnqueueBacklog(sv.ctx, function, ue, nil) + if err != nil { + logging.Base().Infof("addVerificationTaskToThePoolNow: EnqueueBacklog returned an error and StreamVerifier will stop: %v", err) + } + return err +} + +func getNumberOfBatchableSigsInGroup(stxs []transactions.SignedTxn) (batchSigs uint64, err error) { + batchSigs = 0 + for i := range stxs { + count, err := getNumberOfBatchableSigsInTxn(&stxs[i]) + if err != nil { + return 0, err + } + batchSigs = batchSigs + count + } + return +} + +func getNumberOfBatchableSigsInTxn(stx *transactions.SignedTxn) (uint64, error) { + sigType, err := checkTxnSigTypeCounts(stx) + if err != nil { + return 0, err + } + switch sigType { + case regularSig: + return 1, nil + case multiSig: + sig := stx.Msig + batchSigs := uint64(0) + for _, subsigi := range sig.Subsigs { + if (subsigi.Sig != crypto.Signature{}) { + batchSigs++ + } + } + return batchSigs, nil + case logicSig: + // Currently the sigs in here are not batched. Something to consider later. + return 0, nil + case stateProofTxn: + return 0, nil + default: + // this case is impossible + return 0, nil + } +} diff --git a/data/transactions/verify/txn_test.go b/data/transactions/verify/txn_test.go index 3e23c8f352..cf3e0e037b 100644 --- a/data/transactions/verify/txn_test.go +++ b/data/transactions/verify/txn_test.go @@ -17,8 +17,14 @@ package verify import ( + "bytes" "context" + "encoding/binary" + "errors" + "fmt" "math/rand" + "runtime" + "sync" "testing" "time" @@ -30,14 +36,17 @@ import ( "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/logic" + "github.com/algorand/go-algorand/ledger/ledgercore" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util/execpool" + "github.com/algorand/go-algorand/util/metrics" ) var feeSink = basics.Address{0x7, 0xda, 0xcb, 0x4b, 0x6d, 0x9e, 0xd1, 0x41, 0xb1, 0x75, 0x76, 0xbd, 0x45, 0x9a, 0xe6, 0x42, 0x1d, 0x48, 0x6d, 0xa3, 0xd4, 0xef, 0x22, 0x47, 0xc4, 0x9, 0xa3, 0x96, 0xb8, 0x2e, 0xa2, 0x21} var poolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} -var blockHeader = bookkeeping.BlockHeader{ +var blockHeader = &bookkeeping.BlockHeader{ RewardsState: bookkeeping.RewardsState{ FeeSink: feeSink, RewardsPool: poolAddr, @@ -46,6 +55,8 @@ var blockHeader = bookkeeping.BlockHeader{ CurrentProtocol: protocol.ConsensusCurrentVersion, }, } +var protoMaxGroupSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxGroupSize +var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnBytesPerBlock / 200 var spec = transactions.SpecialAddresses{ FeeSink: feeSink, @@ -62,20 +73,22 @@ func verifyTxn(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext) er } type DummyLedgerForSignature struct { + badHdr bool } func (d *DummyLedgerForSignature) BlockHdrCached(basics.Round) (bookkeeping.BlockHeader, error) { - return bookkeeping.BlockHeader{ - Round: 50, - GenesisHash: crypto.Hash([]byte{1, 2, 3, 4, 5}), - UpgradeState: bookkeeping.UpgradeState{ - CurrentProtocol: protocol.ConsensusCurrentVersion, - }, - RewardsState: bookkeeping.RewardsState{ - FeeSink: feeSink, - RewardsPool: poolAddr, - }, - }, nil + return createDummyBlockHeader(), nil +} +func (d *DummyLedgerForSignature) BlockHdr(rnd basics.Round) (blk bookkeeping.BlockHeader, err error) { + if d.badHdr { + return bookkeeping.BlockHeader{}, fmt.Errorf("test error block hdr") + } + return createDummyBlockHeader(), nil +} +func (d *DummyLedgerForSignature) Latest() basics.Round { + return 0 +} +func (d *DummyLedgerForSignature) RegisterBlockListeners([]ledgercore.BlockListener) { } func keypair() *crypto.SignatureSecrets { @@ -108,20 +121,7 @@ func generateMultiSigTxn(numTxs, numAccs int, blockRound basics.Round, t *testin exp = int(blockRound) + rand.Intn(30) } - txs[i] = transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: multiAddress[s], - Fee: basics.MicroAlgos{Raw: f}, - FirstValid: basics.Round(iss), - LastValid: basics.Round(exp), - GenesisHash: crypto.Hash([]byte{1, 2, 3, 4, 5}), - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: multiAddress[r], - Amount: basics.MicroAlgos{Raw: uint64(a)}, - }, - } + txs[i] = createPayTransaction(f, iss, exp, a, multiAddress[s], multiAddress[r]) signed[i].Txn = txs[i] // create multi sig that 2 out of 3 has signed the txn @@ -172,7 +172,7 @@ func generateAccounts(numAccs int) ([]*crypto.SignatureSecrets, []basics.Address return secrets, addresses, pks } -func generateTestObjects(numTxs, numAccs int, blockRound basics.Round) ([]transactions.Transaction, []transactions.SignedTxn, []*crypto.SignatureSecrets, []basics.Address) { +func generateTestObjects(numTxs, numAccs, noteOffset int, blockRound basics.Round) ([]transactions.Transaction, []transactions.SignedTxn, []*crypto.SignatureSecrets, []basics.Address) { txs := make([]transactions.Transaction, numTxs) signed := make([]transactions.SignedTxn, numTxs) secrets, addresses, _ := generateAccounts(numAccs) @@ -192,20 +192,11 @@ func generateTestObjects(numTxs, numAccs int, blockRound basics.Round) ([]transa exp = int(blockRound) + rand.Intn(30) } - txs[i] = transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: addresses[s], - Fee: basics.MicroAlgos{Raw: f}, - FirstValid: basics.Round(iss), - LastValid: basics.Round(exp), - GenesisHash: crypto.Hash([]byte{1, 2, 3, 4, 5}), - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: addresses[r], - Amount: basics.MicroAlgos{Raw: uint64(a)}, - }, - } + txs[i] = createPayTransaction(f, iss, exp, a, addresses[s], addresses[r]) + noteField := make([]byte, binary.MaxVarintLen64) + binary.PutUvarint(noteField, uint64(i+noteOffset)) + txs[i].Note = noteField + signed[i] = txs[i].Sign(secrets[s]) u += 100 } @@ -218,7 +209,7 @@ func TestSignedPayment(t *testing.T) { proto := config.Consensus[protocol.ConsensusCurrentVersion] - payments, stxns, secrets, addrs := generateTestObjects(1, 1, 0) + payments, stxns, secrets, addrs := generateTestObjects(1, 1, 0, 0) payment, stxn, secret, addr := payments[0], stxns[0], secrets[0], addrs[0] groupCtx, err := PrepareGroupContext(stxns, blockHeader, nil) @@ -239,7 +230,7 @@ func TestSignedPayment(t *testing.T) { func TestTxnValidationEncodeDecode(t *testing.T) { partitiontest.PartitionTest(t) - _, signed, _, _ := generateTestObjects(100, 50, 0) + _, signed, _, _ := generateTestObjects(100, 50, 0, 0) for _, txn := range signed { groupCtx, err := PrepareGroupContext([]transactions.SignedTxn{txn}, blockHeader, nil) @@ -261,7 +252,7 @@ func TestTxnValidationEncodeDecode(t *testing.T) { func TestTxnValidationEmptySig(t *testing.T) { partitiontest.PartitionTest(t) - _, signed, _, _ := generateTestObjects(100, 50, 0) + _, signed, _, _ := generateTestObjects(100, 50, 0, 0) for _, txn := range signed { groupCtx, err := PrepareGroupContext([]transactions.SignedTxn{txn}, blockHeader, nil) @@ -299,7 +290,7 @@ func TestTxnValidationStateProof(t *testing.T) { }, } - var blockHeader = bookkeeping.BlockHeader{ + var blockHeader = &bookkeeping.BlockHeader{ RewardsState: bookkeeping.RewardsState{ FeeSink: feeSink, RewardsPool: poolAddr, @@ -372,14 +363,19 @@ func TestDecodeNil(t *testing.T) { func TestPaysetGroups(t *testing.T) { partitiontest.PartitionTest(t) - _, signedTxn, secrets, addrs := generateTestObjects(10000, 20, 50) + if testing.Short() { + t.Log("this is a long test and skipping for -short") + return + } + + _, signedTxn, secrets, addrs := generateTestObjects(10000, 20, 0, 50) blkHdr := createDummyBlockHeader() execPool := execpool.MakePool(t) verificationPool := execpool.MakeBacklog(execPool, 64, execpool.LowPriority, t) defer verificationPool.Shutdown() - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) startPaysetGroupsTime := time.Now() err := PaysetGroups(context.Background(), txnGroups, blkHdr, verificationPool, MakeVerifiedTransactionCache(50000), nil) @@ -399,9 +395,9 @@ func TestPaysetGroups(t *testing.T) { // we define a test that would take 10 seconds to execute, and try to abort at 1.5 seconds. txnCount := len(signedTxn) * 10 * int(time.Second/paysetGroupDuration) - _, signedTxn, secrets, addrs = generateTestObjects(txnCount, 20, 50) + _, signedTxn, secrets, addrs = generateTestObjects(txnCount, 20, 0, 50) - txnGroups = generateTransactionGroups(signedTxn, secrets, addrs) + txnGroups = generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) ctx, ctxCancelFunc := context.WithTimeout(context.Background(), 1500*time.Millisecond) defer ctxCancelFunc() @@ -442,14 +438,14 @@ func BenchmarkPaysetGroups(b *testing.B) { if b.N < 2000 { b.N = 2000 } - _, signedTxn, secrets, addrs := generateTestObjects(b.N, 20, 50) + _, signedTxn, secrets, addrs := generateTestObjects(b.N, 20, 0, 50) blkHdr := createDummyBlockHeader() execPool := execpool.MakePool(b) verificationPool := execpool.MakeBacklog(execPool, 64, execpool.LowPriority, b) defer verificationPool.Shutdown() - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) cache := MakeVerifiedTransactionCache(50000) b.ResetTimer() @@ -461,7 +457,7 @@ func BenchmarkPaysetGroups(b *testing.B) { func TestTxnGroupMixedSignatures(t *testing.T) { partitiontest.PartitionTest(t) - _, signedTxn, secrets, addrs := generateTestObjects(1, 20, 50) + _, signedTxn, secrets, addrs := generateTestObjects(1, 20, 0, 50) blkHdr := createDummyBlockHeader() // add a simple logic that verifies this condition: @@ -472,16 +468,16 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= ==`) require.NoError(t, err) - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) dummyLedger := DummyLedgerForSignature{} - _, err = TxnGroup(txnGroups[0], blkHdr, nil, &dummyLedger) + _, err = TxnGroup(txnGroups[0], &blkHdr, nil, &dummyLedger) require.NoError(t, err) ///// no sig tmpSig := txnGroups[0][0].Sig txnGroups[0][0].Sig = crypto.Signature{} - _, err = TxnGroup(txnGroups[0], blkHdr, nil, &dummyLedger) + _, err = TxnGroup(txnGroups[0], &blkHdr, nil, &dummyLedger) require.Error(t, err) require.Contains(t, err.Error(), "has no sig") txnGroups[0][0].Sig = tmpSig @@ -492,14 +488,14 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= Key: crypto.PublicKey{0x1}, Sig: crypto.Signature{0x2}, } - _, err = TxnGroup(txnGroups[0], blkHdr, nil, &dummyLedger) + _, err = TxnGroup(txnGroups[0], &blkHdr, nil, &dummyLedger) require.Error(t, err) require.Contains(t, err.Error(), "should only have one of Sig or Msig or LogicSig") txnGroups[0][0].Msig.Subsigs = nil ///// Sig + logic txnGroups[0][0].Lsig.Logic = op.Program - _, err = TxnGroup(txnGroups[0], blkHdr, nil, &dummyLedger) + _, err = TxnGroup(txnGroups[0], &blkHdr, nil, &dummyLedger) require.Error(t, err) require.Contains(t, err.Error(), "should only have one of Sig or Msig or LogicSig") txnGroups[0][0].Lsig.Logic = []byte{} @@ -512,7 +508,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= Key: crypto.PublicKey{0x1}, Sig: crypto.Signature{0x2}, } - _, err = TxnGroup(txnGroups[0], blkHdr, nil, &dummyLedger) + _, err = TxnGroup(txnGroups[0], &blkHdr, nil, &dummyLedger) require.Error(t, err) require.Contains(t, err.Error(), "should only have one of Sig or Msig or LogicSig") txnGroups[0][0].Lsig.Logic = []byte{} @@ -528,36 +524,45 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= Key: crypto.PublicKey{0x1}, Sig: crypto.Signature{0x2}, } - _, err = TxnGroup(txnGroups[0], blkHdr, nil, &dummyLedger) + _, err = TxnGroup(txnGroups[0], &blkHdr, nil, &dummyLedger) require.Error(t, err) require.Contains(t, err.Error(), "should only have one of Sig or Msig") } -func generateTransactionGroups(signedTxns []transactions.SignedTxn, secrets []*crypto.SignatureSecrets, addrs []basics.Address) [][]transactions.SignedTxn { +func generateTransactionGroups(maxGroupSize int, signedTxns []transactions.SignedTxn, + secrets []*crypto.SignatureSecrets, addrs []basics.Address) [][]transactions.SignedTxn { addrToSecret := make(map[basics.Address]*crypto.SignatureSecrets) for i, addr := range addrs { addrToSecret[addr] = secrets[i] } txnGroups := make([][]transactions.SignedTxn, 0, len(signedTxns)) - for i := 0; i < len(signedTxns); i++ { - txnPerGroup := 1 + rand.Intn(15) - if i+txnPerGroup >= len(signedTxns) { - txnPerGroup = len(signedTxns) - i - 1 + for i := 0; i < len(signedTxns); { + txnsInGroup := rand.Intn(protoMaxGroupSize-1) + 1 + if txnsInGroup > maxGroupSize { + txnsInGroup = maxGroupSize } - newGroup := signedTxns[i : i+txnPerGroup+1] + if i+txnsInGroup > len(signedTxns) { + txnsInGroup = len(signedTxns) - i + } + + newGroup := signedTxns[i : i+txnsInGroup] var txGroup transactions.TxGroup - for _, txn := range newGroup { - txGroup.TxGroupHashes = append(txGroup.TxGroupHashes, crypto.HashObj(txn.Txn)) + if txnsInGroup > 1 { + for _, txn := range newGroup { + txGroup.TxGroupHashes = append(txGroup.TxGroupHashes, crypto.HashObj(txn.Txn)) + } } groupHash := crypto.HashObj(txGroup) for j := range newGroup { - newGroup[j].Txn.Group = groupHash + if txnsInGroup > 1 { + newGroup[j].Txn.Group = groupHash + } newGroup[j].Sig = addrToSecret[newGroup[j].Txn.Sender].Sign(&newGroup[j].Txn) } txnGroups = append(txnGroups, newGroup) - i += txnPerGroup + i += txnsInGroup } return txnGroups @@ -566,17 +571,17 @@ func generateTransactionGroups(signedTxns []transactions.SignedTxn, secrets []*c func TestTxnGroupCacheUpdate(t *testing.T) { partitiontest.PartitionTest(t) - _, signedTxn, secrets, addrs := generateTestObjects(100, 20, 50) + _, signedTxn, secrets, addrs := generateTestObjects(100, 20, 0, 50) blkHdr := createDummyBlockHeader() - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) breakSignatureFunc := func(txn *transactions.SignedTxn) { txn.Sig[0]++ } restoreSignatureFunc := func(txn *transactions.SignedTxn) { txn.Sig[0]-- } - verifyGroup(t, txnGroups, blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchVerificationFailed.Error()) + verifyGroup(t, txnGroups, &blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchHasFailedSigs.Error()) } // TestTxnGroupCacheUpdateMultiSig makes sure that a payment transaction signed with multisig @@ -598,7 +603,7 @@ func TestTxnGroupCacheUpdateMultiSig(t *testing.T) { restoreSignatureFunc := func(txn *transactions.SignedTxn) { txn.Msig.Subsigs[0].Sig[0]-- } - verifyGroup(t, txnGroups, blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchVerificationFailed.Error()) + verifyGroup(t, txnGroups, &blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchHasFailedSigs.Error()) } // TestTxnGroupCacheUpdateFailLogic test makes sure that a payment transaction contains a logic (and no signature) @@ -606,7 +611,7 @@ func TestTxnGroupCacheUpdateMultiSig(t *testing.T) { func TestTxnGroupCacheUpdateFailLogic(t *testing.T) { partitiontest.PartitionTest(t) - _, signedTxn, _, _ := generateTestObjects(100, 20, 50) + _, signedTxn, _, _ := generateTestObjects(100, 20, 0, 50) blkHdr := createDummyBlockHeader() // sign the transaction with logic @@ -638,7 +643,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= txn.Lsig.Args[0][0]-- } initCounter := logicCostTotal.GetUint64Value() - verifyGroup(t, txnGroups, blkHdr, breakSignatureFunc, restoreSignatureFunc, "rejected by logic") + verifyGroup(t, txnGroups, &blkHdr, breakSignatureFunc, restoreSignatureFunc, "rejected by logic") currentCounter := logicCostTotal.GetUint64Value() require.Greater(t, currentCounter, initCounter) } @@ -649,7 +654,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= func TestTxnGroupCacheUpdateLogicWithSig(t *testing.T) { partitiontest.PartitionTest(t) - _, signedTxn, secrets, addresses := generateTestObjects(100, 20, 50) + _, signedTxn, secrets, addresses := generateTestObjects(100, 20, 0, 50) blkHdr := createDummyBlockHeader() for i := 0; i < len(signedTxn); i++ { @@ -683,7 +688,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= restoreSignatureFunc := func(txn *transactions.SignedTxn) { txn.Lsig.Sig[0]-- } - verifyGroup(t, txnGroups, blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchVerificationFailed.Error()) + verifyGroup(t, txnGroups, &blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchHasFailedSigs.Error()) // signature is correct and logic fails breakSignatureFunc = func(txn *transactions.SignedTxn) { @@ -692,7 +697,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= restoreSignatureFunc = func(txn *transactions.SignedTxn) { txn.Lsig.Args[0][0]-- } - verifyGroup(t, txnGroups, blkHdr, breakSignatureFunc, restoreSignatureFunc, "rejected by logic") + verifyGroup(t, txnGroups, &blkHdr, breakSignatureFunc, restoreSignatureFunc, "rejected by logic") } // TestTxnGroupCacheUpdateLogicWithMultiSig makes sure that a payment transaction contains logicsig signed with multisig is valid @@ -714,20 +719,7 @@ func TestTxnGroupCacheUpdateLogicWithMultiSig(t *testing.T) { a := rand.Intn(1000) f := config.Consensus[protocol.ConsensusCurrentVersion].MinTxnFee + uint64(rand.Intn(10)) - signedTxn[i].Txn = transactions.Transaction{ - Type: protocol.PaymentTx, - Header: transactions.Header{ - Sender: multiAddress[s], - Fee: basics.MicroAlgos{Raw: f}, - FirstValid: basics.Round(1), - LastValid: basics.Round(100), - GenesisHash: crypto.Hash([]byte{1, 2, 3, 4, 5}), - }, - PaymentTxnFields: transactions.PaymentTxnFields{ - Receiver: multiAddress[r], - Amount: basics.MicroAlgos{Raw: uint64(a)}, - }, - } + signedTxn[i].Txn = createPayTransaction(f, 1, 100, a, multiAddress[s], multiAddress[r]) // add a simple logic that verifies this condition: // sha256(arg0) == base64decode(5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E=) op, err := logic.AssembleString(`arg 0 @@ -767,7 +759,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= txn.Lsig.Msig.Subsigs[0].Sig[0]-- } - verifyGroup(t, txnGroups, blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchVerificationFailed.Error()) + verifyGroup(t, txnGroups, &blkHdr, breakSignatureFunc, restoreSignatureFunc, crypto.ErrBatchHasFailedSigs.Error()) // signature is correct and logic fails breakSignatureFunc = func(txn *transactions.SignedTxn) { txn.Lsig.Args[0][0]++ @@ -775,7 +767,7 @@ byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= restoreSignatureFunc = func(txn *transactions.SignedTxn) { txn.Lsig.Args[0][0]-- } - verifyGroup(t, txnGroups, blkHdr, breakSignatureFunc, restoreSignatureFunc, "rejected by logic") + verifyGroup(t, txnGroups, &blkHdr, breakSignatureFunc, restoreSignatureFunc, "rejected by logic") } func createDummyBlockHeader() bookkeeping.BlockHeader { @@ -792,10 +784,27 @@ func createDummyBlockHeader() bookkeeping.BlockHeader { } } +func createPayTransaction(fee uint64, fv, lv, amount int, sender, receiver basics.Address) transactions.Transaction { + return transactions.Transaction{ + Type: protocol.PaymentTx, + Header: transactions.Header{ + Sender: sender, + Fee: basics.MicroAlgos{Raw: fee}, + FirstValid: basics.Round(fv), + LastValid: basics.Round(lv), + GenesisHash: crypto.Hash([]byte{1, 2, 3, 4, 5}), + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: receiver, + Amount: basics.MicroAlgos{Raw: uint64(amount)}, + }, + } +} + // verifyGroup uses TxnGroup to verify txns and add them to the // cache. Then makes sure that only the valid txns are verified and added to // the cache. -func verifyGroup(t *testing.T, txnGroups [][]transactions.SignedTxn, blkHdr bookkeeping.BlockHeader, breakSig func(txn *transactions.SignedTxn), restoreSig func(txn *transactions.SignedTxn), errorString string) { +func verifyGroup(t *testing.T, txnGroups [][]transactions.SignedTxn, blkHdr *bookkeeping.BlockHeader, breakSig func(txn *transactions.SignedTxn), restoreSig func(txn *transactions.SignedTxn), errorString string) { cache := MakeVerifiedTransactionCache(1000) breakSig(&txnGroups[0][0]) @@ -859,25 +868,13 @@ func BenchmarkTxn(b *testing.B) { if b.N < 2000 { b.N = 2000 } - _, signedTxn, secrets, addrs := generateTestObjects(b.N, 20, 50) - blk := bookkeeping.Block{ - BlockHeader: bookkeeping.BlockHeader{ - Round: 50, - GenesisHash: crypto.Hash([]byte{1, 2, 3, 4, 5}), - UpgradeState: bookkeeping.UpgradeState{ - CurrentProtocol: protocol.ConsensusCurrentVersion, - }, - RewardsState: bookkeeping.RewardsState{ - FeeSink: feeSink, - RewardsPool: poolAddr, - }, - }, - } - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + _, signedTxn, secrets, addrs := generateTestObjects(b.N, 20, 0, 50) + blk := bookkeeping.Block{BlockHeader: createDummyBlockHeader()} + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) b.ResetTimer() for _, txnGroup := range txnGroups { - groupCtx, err := PrepareGroupContext(txnGroup, blk.BlockHeader, nil) + groupCtx, err := PrepareGroupContext(txnGroup, &blk.BlockHeader, nil) require.NoError(b, err) for i, txn := range txnGroup { err := verifyTxn(&txn, i, groupCtx) @@ -886,3 +883,774 @@ func BenchmarkTxn(b *testing.B) { } b.StopTimer() } + +var droppedFromPool = metrics.MakeCounter(metrics.MetricName{Name: "test_streamVerifierTestCore_messages_dropped_pool", Description: "Test streamVerifierTestCore messages dropped from pool"}) + +func streamVerifierTestCore(txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64]struct{}, + expectedError error, t *testing.T) (sv *StreamVerifier) { + + numOfTxnGroups := len(txnGroups) + verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t) + defer verificationPool.Shutdown() + + ctx, cancel := context.WithCancel(context.Background()) + cache := MakeVerifiedTransactionCache(50000) + + defer cancel() + + stxnChan := make(chan *UnverifiedElement) + resultChan := make(chan *VerificationResult, txBacklogSize) + droppedChan := make(chan *UnverifiedElement) + sv, err := MakeStreamVerifier(stxnChan, resultChan, droppedChan, &DummyLedgerForSignature{}, verificationPool, cache) + require.NoError(t, err) + sv.Start(ctx) + + wg := sync.WaitGroup{} + + errChan := make(chan error) + var badSigResultCounter int + var goodSigResultCounter int + + wg.Add(1) + go processResults(ctx, errChan, resultChan, numOfTxnGroups, badTxnGroups, &badSigResultCounter, &goodSigResultCounter, &wg) + + wg.Add(1) + // send txn groups to be verified + go func() { + defer wg.Done() + for _, tg := range txnGroups { + stxnChan <- &UnverifiedElement{TxnGroup: tg, BacklogMessage: nil} + } + }() + + for err := range errChan { + require.ErrorContains(t, err, expectedError.Error()) + } + + wg.Wait() + + verifyResults(txnGroups, badTxnGroups, cache, badSigResultCounter, goodSigResultCounter, t) + return sv +} + +func processResults(ctx context.Context, errChan chan<- error, resultChan <-chan *VerificationResult, + numOfTxnGroups int, badTxnGroups map[uint64]struct{}, + badSigResultCounter, goodSigResultCounter *int, wg *sync.WaitGroup) { + defer wg.Done() + defer close(errChan) + // process the results + for x := 0; x < numOfTxnGroups; x++ { + select { + case <-ctx.Done(): + case result := <-resultChan: + u, _ := binary.Uvarint(result.TxnGroup[0].Txn.Note) + if _, has := badTxnGroups[u]; has { + (*badSigResultCounter)++ + if result.Err == nil { + err := fmt.Errorf("%dth (%d)transaction varified with a bad sig", x, u) + errChan <- err + return + } + // we expected an error, but it is not the general crypto error + if result.Err != crypto.ErrBatchHasFailedSigs { + errChan <- result.Err + } + } else { + (*goodSigResultCounter)++ + if result.Err != nil { + errChan <- result.Err + } + } + } + } +} + +func verifyResults(txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64]struct{}, + cache VerifiedTransactionCache, + badSigResultCounter, goodSigResultCounter int, t *testing.T) { + // check if all txns have been checked. + require.Equal(t, len(txnGroups), badSigResultCounter+goodSigResultCounter) + require.Equal(t, len(badTxnGroups), badSigResultCounter) + + // check the cached transactions + // note that the result of each verified txn group is send before the batch is added to the cache + // the test does not know if the batch is not added to the cache yet, so some elts might be missing from the cache + unverifiedGroups := cache.GetUnverifiedTransactionGroups(txnGroups, spec, protocol.ConsensusCurrentVersion) + require.GreaterOrEqual(t, len(unverifiedGroups), badSigResultCounter) + for _, txn := range unverifiedGroups { + u, _ := binary.Uvarint(txn[0].Txn.Note) + if _, has := badTxnGroups[u]; has { + delete(badTxnGroups, u) + } + } + require.Empty(t, badTxnGroups, "unverifiedGroups should have all the transactions with invalid sigs") +} + +func getSignedTransactions(numOfTxns, maxGrpSize, noteOffset int, badTxnProb float32) (txnGroups [][]transactions.SignedTxn, badTxnGroups map[uint64]struct{}) { + + _, signedTxn, secrets, addrs := generateTestObjects(numOfTxns, 20, noteOffset, 50) + txnGroups = generateTransactionGroups(maxGrpSize, signedTxn, secrets, addrs) + + badTxnGroups = make(map[uint64]struct{}) + + for tgi := range txnGroups { + if rand.Float32() < badTxnProb { + // make a bad sig + t := rand.Intn(len(txnGroups[tgi])) + txnGroups[tgi][t].Sig[0] = txnGroups[tgi][t].Sig[0] + 1 + u, _ := binary.Uvarint(txnGroups[tgi][0].Txn.Note) + badTxnGroups[u] = struct{}{} + } + } + return + +} + +// TestStreamVerifier tests the basic functionality +func TestStreamVerifier(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfTxns := 4000 + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, protoMaxGroupSize, 0, 0.5) + + sv := streamVerifierTestCore(txnGroups, badTxnGroups, nil, t) + sv.WaitForStop() +} + +// TestStreamVerifierCases tests various valid and invalid transaction signature cases +func TestStreamVerifierCases(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfTxns := 10 + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, 1, 0, 0) + mod := 1 + + // txn with 0 sigs + txnGroups[mod][0].Sig = crypto.Signature{} + u, _ := binary.Uvarint(txnGroups[mod][0].Txn.Note) + badTxnGroups[u] = struct{}{} + sv := streamVerifierTestCore(txnGroups, badTxnGroups, errTxnSigHasNoSig, t) + sv.WaitForStop() + mod++ + + _, signedTxns, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50) + txnGroups = generateTransactionGroups(1, signedTxns, secrets, addrs) + badTxnGroups = make(map[uint64]struct{}) + + // invalid stateproof txn + txnGroups[mod][0].Sig = crypto.Signature{} + txnGroups[mod][0].Txn.Type = protocol.StateProofTx + txnGroups[mod][0].Txn.Header.Sender = transactions.StateProofSender + u, _ = binary.Uvarint(txnGroups[mod][0].Txn.Note) + badTxnGroups[u] = struct{}{} + errFeeMustBeZeroInStateproofTxn := errors.New("fee must be zero in state-proof transaction") + sv = streamVerifierTestCore(txnGroups, badTxnGroups, errFeeMustBeZeroInStateproofTxn, t) + sv.WaitForStop() + mod++ + + _, signedTxns, secrets, addrs = generateTestObjects(numOfTxns, 20, 0, 50) + txnGroups = generateTransactionGroups(1, signedTxns, secrets, addrs) + badTxnGroups = make(map[uint64]struct{}) + + // acceptable stateproof txn + txnGroups[mod][0].Sig = crypto.Signature{} + txnGroups[mod][0].Txn.Note = nil + txnGroups[mod][0].Txn.Type = protocol.StateProofTx + txnGroups[mod][0].Txn.Header.Fee = basics.MicroAlgos{Raw: 0} + txnGroups[mod][0].Txn.Header.Sender = transactions.StateProofSender + txnGroups[mod][0].Txn.PaymentTxnFields = transactions.PaymentTxnFields{} + sv = streamVerifierTestCore(txnGroups, badTxnGroups, nil, t) + sv.WaitForStop() + mod++ + + // multisig + _, mSigTxn, _, _ := generateMultiSigTxn(1, 6, 50, t) + txnGroups[mod] = mSigTxn + sv = streamVerifierTestCore(txnGroups, badTxnGroups, nil, t) + sv.WaitForStop() + mod++ + + _, signedTxn, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50) + txnGroups = generateTransactionGroups(1, signedTxn, secrets, addrs) + badTxnGroups = make(map[uint64]struct{}) + + // logicsig + // add a simple logic that verifies this condition: + // sha256(arg0) == base64decode(5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E=) + op, err := logic.AssembleString(`arg 0 +sha256 +byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= +==`) + require.NoError(t, err) + s := rand.Intn(len(secrets)) + txnGroups[mod][0].Sig = crypto.Signature{} + txnGroups[mod][0].Txn.Sender = addrs[s] + txnGroups[mod][0].Lsig.Args = [][]byte{[]byte("=0\x97S\x85H\xe9\x91B\xfd\xdb;1\xf5Z\xaec?\xae\xf2I\x93\x08\x12\x94\xaa~\x06\x08\x849b")} + txnGroups[mod][0].Lsig.Logic = op.Program + program := logic.Program(op.Program) + txnGroups[mod][0].Lsig.Sig = secrets[s].Sign(program) + sv = streamVerifierTestCore(txnGroups, badTxnGroups, nil, t) + sv.WaitForStop() + mod++ + + // bad lgicsig + s = rand.Intn(len(secrets)) + txnGroups[mod][0].Sig = crypto.Signature{} + txnGroups[mod][0].Txn.Sender = addrs[s] + txnGroups[mod][0].Lsig.Args = [][]byte{[]byte("=0\x97S\x85H\xe9\x91B\xfd\xdb;1\xf5Z\xaec?\xae\xf2I\x93\x08\x12\x94\xaa~\x06\x08\x849b")} + txnGroups[mod][0].Lsig.Args[0][0]++ + txnGroups[mod][0].Lsig.Logic = op.Program + txnGroups[mod][0].Lsig.Sig = secrets[s].Sign(program) + u, _ = binary.Uvarint(txnGroups[mod][0].Txn.Note) + badTxnGroups[u] = struct{}{} + sv = streamVerifierTestCore(txnGroups, badTxnGroups, errors.New("rejected by logic"), t) + sv.WaitForStop() + mod++ + + _, signedTxn, secrets, addrs = generateTestObjects(numOfTxns, 20, 0, 50) + txnGroups = generateTransactionGroups(1, signedTxn, secrets, addrs) + badTxnGroups = make(map[uint64]struct{}) + + // txn with sig and msig + txnGroups[mod][0].Msig = mSigTxn[0].Msig + u, _ = binary.Uvarint(txnGroups[mod][0].Txn.Note) + badTxnGroups[u] = struct{}{} + sv = streamVerifierTestCore(txnGroups, badTxnGroups, errTxnSigNotWellFormed, t) + sv.WaitForStop() +} + +// TestStreamVerifierIdel starts the verifer and sends nothing, to trigger the timer, then sends a txn +func TestStreamVerifierIdel(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfTxns := 1 + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, protoMaxGroupSize, 0, 0.5) + + sv := streamVerifierTestCore(txnGroups, badTxnGroups, nil, t) + sv.WaitForStop() +} + +func TestGetNumberOfBatchableSigsInGroup(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfTxns := 10 + txnGroups, _ := getSignedTransactions(numOfTxns, 1, 0, 0) + mod := 1 + + // txn with 0 sigs + txnGroups[mod][0].Sig = crypto.Signature{} + batchSigs, err := getNumberOfBatchableSigsInGroup(txnGroups[mod]) + require.ErrorIs(t, err, errTxnSigHasNoSig) + mod++ + + _, signedTxns, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50) + txnGroups = generateTransactionGroups(1, signedTxns, secrets, addrs) + batchSigs, err = getNumberOfBatchableSigsInGroup(txnGroups[0]) + require.NoError(t, err) + require.Equal(t, uint64(1), batchSigs) + + // stateproof txn + txnGroups[mod][0].Sig = crypto.Signature{} + txnGroups[mod][0].Txn.Type = protocol.StateProofTx + txnGroups[mod][0].Txn.Header.Sender = transactions.StateProofSender + batchSigs, err = getNumberOfBatchableSigsInGroup(txnGroups[mod]) + require.NoError(t, err) + require.Equal(t, uint64(0), batchSigs) + mod++ + + // multisig + _, mSigTxn, _, _ := generateMultiSigTxn(1, 6, 50, t) + batchSigs, err = getNumberOfBatchableSigsInGroup(mSigTxn) + require.NoError(t, err) + require.Equal(t, uint64(2), batchSigs) + mod++ + + _, signedTxn, secrets, addrs := generateTestObjects(numOfTxns, 20, 0, 50) + txnGroups = generateTransactionGroups(1, signedTxn, secrets, addrs) + + // logicsig + op, err := logic.AssembleString(`arg 0 +sha256 +byte base64 5rZMNsevs5sULO+54aN+OvU6lQ503z2X+SSYUABIx7E= +==`) + require.NoError(t, err) + s := rand.Intn(len(secrets)) + txnGroups[mod][0].Sig = crypto.Signature{} + txnGroups[mod][0].Txn.Sender = addrs[s] + txnGroups[mod][0].Lsig.Args = [][]byte{[]byte("=0\x97S\x85H\xe9\x91B\xfd\xdb;1\xf5Z\xaec?\xae\xf2I\x93\x08\x12\x94\xaa~\x06\x08\x849b")} + txnGroups[mod][0].Lsig.Logic = op.Program + program := logic.Program(op.Program) + txnGroups[mod][0].Lsig.Sig = secrets[s].Sign(program) + batchSigs, err = getNumberOfBatchableSigsInGroup(txnGroups[mod]) + require.NoError(t, err) + require.Equal(t, uint64(0), batchSigs) + mod++ + + // txn with sig and msig + _, signedTxn, secrets, addrs = generateTestObjects(numOfTxns, 20, 0, 50) + txnGroups = generateTransactionGroups(1, signedTxn, secrets, addrs) + txnGroups[mod][0].Msig = mSigTxn[0].Msig + batchSigs, err = getNumberOfBatchableSigsInGroup(txnGroups[mod]) + require.ErrorIs(t, err, errTxnSigNotWellFormed) +} + +// TestStreamVerifierPoolShutdown tests what happens when the exec pool shuts down +func TestStreamVerifierPoolShutdown(t *testing.T) { + partitiontest.PartitionTest(t) + + // only one transaction should be sufficient for the batch verifier + // to realize the pool is terminated and to shut down + numOfTxns := 1 + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, protoMaxGroupSize, 0, 0.5) + + // check the logged information + var logBuffer bytes.Buffer + log := logging.Base() + log.SetOutput(&logBuffer) + log.SetLevel(logging.Info) + + // prepare the stream verifier + numOfTxnGroups := len(txnGroups) + verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t) + _, buffLen := verificationPool.BufferSize() + + // make sure the pool is shut down and the buffer is full + holdTasks := make(chan interface{}) + for x := 0; x < buffLen+runtime.NumCPU(); x++ { + verificationPool.EnqueueBacklog(context.Background(), + func(arg interface{}) interface{} { <-holdTasks; return nil }, nil, nil) + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // Shutdown will block until all tasks held by holdTasks is released + verificationPool.Shutdown() + }() + // Send more tasks to break the backlog worker after b.pool.Enqueue returns the error + for x := 0; x < 100; x++ { + verificationPool.EnqueueBacklog(context.Background(), + func(arg interface{}) interface{} { <-holdTasks; return nil }, nil, nil) + } + // release the tasks + close(holdTasks) + + // make sure the EnqueueBacklogis returning err + for x := 0; x < 10; x++ { + err := verificationPool.EnqueueBacklog(context.Background(), + func(arg interface{}) interface{} { return nil }, nil, nil) + require.Error(t, err, fmt.Sprintf("x = %d", x)) + } + + ctx, cancel := context.WithCancel(context.Background()) + cache := MakeVerifiedTransactionCache(50000) + + stxnChan := make(chan *UnverifiedElement) + resultChan := make(chan *VerificationResult, txBacklogSize) + droppedChan := make(chan *UnverifiedElement) + sv, err := MakeStreamVerifier(stxnChan, resultChan, droppedChan, &DummyLedgerForSignature{}, verificationPool, cache) + require.NoError(t, err) + sv.Start(ctx) + + errChan := make(chan error) + + var badSigResultCounter int + var goodSigResultCounter int + + wg.Add(1) + go processResults(ctx, errChan, resultChan, numOfTxnGroups, badTxnGroups, &badSigResultCounter, &goodSigResultCounter, &wg) + + // When the exec pool shuts down, the batch verifier should gracefully stop + // cancel the context so that the test can terminate + wg.Add(1) + go func() { + defer wg.Done() + sv.WaitForStop() + cancel() + }() + + wg.Add(1) + // send txn groups to be verified + go func() { + defer wg.Done() + for _, tg := range txnGroups { + select { + case <-ctx.Done(): + break + case stxnChan <- &UnverifiedElement{TxnGroup: tg, BacklogMessage: nil}: + } + } + }() + for err := range errChan { + require.ErrorIs(t, err, errShuttingDownError) + } + require.Contains(t, logBuffer.String(), "addVerificationTaskToThePoolNow: EnqueueBacklog returned an error and StreamVerifier will stop: context canceled") +} + +// TestStreamVerifierRestart tests what happens when the context is canceled +func TestStreamVerifierRestart(t *testing.T) { + partitiontest.PartitionTest(t) + + numOfTxns := 1000 + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, 1, 0, 0.5) + + // prepare the stream verifier + numOfTxnGroups := len(txnGroups) + verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t) + defer verificationPool.Shutdown() + + cache := MakeVerifiedTransactionCache(50) + + stxnChan := make(chan *UnverifiedElement) + resultChan := make(chan *VerificationResult, txBacklogSize) + droppedChan := make(chan *UnverifiedElement) + + ctx, cancel := context.WithCancel(context.Background()) + sv, err := MakeStreamVerifier(stxnChan, resultChan, droppedChan, &DummyLedgerForSignature{}, verificationPool, cache) + require.NoError(t, err) + sv.Start(ctx) + + errChan := make(chan error) + + var badSigResultCounter int + var goodSigResultCounter int + + ctx2, cancel2 := context.WithCancel(context.Background()) + + wg := sync.WaitGroup{} + wg.Add(1) + go processResults(ctx2, errChan, resultChan, numOfTxnGroups, badTxnGroups, &badSigResultCounter, &goodSigResultCounter, &wg) + + wg.Add(1) + // send txn groups to be verified + go func() { + defer wg.Done() + for i, tg := range txnGroups { + if (i+1)%10 == 0 { + cancel() + sv.WaitForStop() + ctx, cancel = context.WithCancel(context.Background()) + sv.Start(ctx) + } + select { + case <-ctx2.Done(): + break + case stxnChan <- &UnverifiedElement{TxnGroup: tg, BacklogMessage: nil}: + } + } + cancel() + }() + for err := range errChan { + require.ErrorIs(t, err, errShuttingDownError) + } + wg.Wait() + sv.WaitForStop() + cancel2() // not necessary, but the golint will want to see this +} + +// TestBlockWatcher runs multiple goroutines to check the concurency and correctness of the block watcher +func TestStreamVerifierBlockWatcher(t *testing.T) { + blkHdr := createDummyBlockHeader() + nbw := MakeNewBlockWatcher(blkHdr) + startingRound := blkHdr.Round + + wg := sync.WaitGroup{} + count := 100 + + wg.Add(1) + go func() { + defer wg.Done() + for x := 0; x < 100; x++ { + blkHdr.Round++ + nbw.OnNewBlock(bookkeeping.Block{BlockHeader: blkHdr}, ledgercore.StateDelta{}) + time.Sleep(10 * time.Millisecond) + nbw.OnNewBlock(bookkeeping.Block{BlockHeader: blkHdr}, ledgercore.StateDelta{}) + } + }() + + bhStore := make(map[basics.Round]*bookkeeping.BlockHeader) + wg.Add(1) + go func() { + defer wg.Done() + for { + bh := nbw.getBlockHeader() + bhStore[bh.Round] = bh + if bh.Round == startingRound+10 { + break + } + } + }() + wg.Wait() + bh := nbw.getBlockHeader() + require.Equal(t, uint64(startingRound)+uint64(count), uint64(bh.Round)) + // There should be no inconsistency after new blocks are added + for r, bh := range bhStore { + require.Equal(t, r, bh.Round) + } +} + +func getSaturatedExecPool(t *testing.T) (execpool.BacklogPool, chan interface{}, execpool.BacklogPool) { + verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t) + _, buffLen := verificationPool.BufferSize() + + // make the buffer full to control when the tasks get executed + holdTasks := make(chan interface{}) + for x := 0; x < buffLen+runtime.NumCPU()+1; x++ { + verificationPool.EnqueueBacklog(context.Background(), + func(arg interface{}) interface{} { + <-holdTasks + return nil + }, nil, nil) + } + return verificationPool, holdTasks, verificationPool +} + +// TestStreamVerifierCtxCancel tests the termination when the ctx is canceled +// To make sure that the batchingLoop is still working on a batch when the +// ctx is cancled, this test first saturates the exec pool buffer, then +// sends a txn and immediately cancels the ctx so that the batch is not +// passed to the exec pool yet, but is in batchingLoop +func TestStreamVerifierCtxCancel(t *testing.T) { + partitiontest.PartitionTest(t) + + verificationPool, holdTasks, vp := getSaturatedExecPool(t) + defer vp.Shutdown() + ctx, cancel := context.WithCancel(context.Background()) + cache := MakeVerifiedTransactionCache(50) + stxnChan := make(chan *UnverifiedElement) + resultChan := make(chan *VerificationResult, txBacklogSize) + droppedChan := make(chan *UnverifiedElement) + sv, err := MakeStreamVerifier(stxnChan, resultChan, droppedChan, &DummyLedgerForSignature{}, verificationPool, cache) + require.NoError(t, err) + sv.Start(ctx) + + var result *VerificationResult + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // no verification tasks should be executed + // one result should be returned + result = <-resultChan + }() + + // send batchSizeBlockLimit after the exec pool buffer is full + numOfTxns := 1 + txnGroups, _ := getSignedTransactions(numOfTxns, 1, 0, 0.5) + stxnChan <- &UnverifiedElement{TxnGroup: txnGroups[0], BacklogMessage: nil} + // cancel the ctx before the sig is sent to the exec pool + cancel() + + // the main loop should stop after cancel() + sv.WaitForStop() + + // release the tasks + close(holdTasks) + + wg.Wait() + require.ErrorIs(t, result.Err, errShuttingDownError) +} + +// TestStreamVerifierCtxCancelPoolQueue tests the termination when the ctx is canceled +// To make sure that the batchingLoop is still working on a batch when the +// ctx is cancled, this test first saturates the exec pool buffer, then +// sends a txn and cancels the ctx after multiple waitForNextTxnDuration +// so that the batch is sent to the pool. Since the pool is saturated, +// the task will be stuck waiting to be queued when the context is canceled +// everything should be gracefully terminated +func TestStreamVerifierCtxCancelPoolQueue(t *testing.T) { + partitiontest.PartitionTest(t) + + verificationPool, holdTasks, vp := getSaturatedExecPool(t) + defer vp.Shutdown() + + // check the logged information + var logBuffer bytes.Buffer + log := logging.Base() + log.SetOutput(&logBuffer) + log.SetLevel(logging.Info) + + ctx, cancel := context.WithCancel(context.Background()) + cache := MakeVerifiedTransactionCache(50) + stxnChan := make(chan *UnverifiedElement) + resultChan := make(chan *VerificationResult, txBacklogSize) + droppedChan := make(chan *UnverifiedElement) + sv, err := MakeStreamVerifier(stxnChan, resultChan, droppedChan, &DummyLedgerForSignature{}, verificationPool, cache) + require.NoError(t, err) + sv.Start(ctx) + + var result *VerificationResult + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + // no verification tasks should be executed + // one result should be returned + result = <-resultChan + }() + + // send batchSizeBlockLimit after the exec pool buffer is full + numOfTxns := 1 + txnGroups, _ := getSignedTransactions(numOfTxns, 1, 0, 0.5) + stxnChan <- &UnverifiedElement{TxnGroup: txnGroups[0], BacklogMessage: nil} + // cancel the ctx as the sig is not yet sent to the exec pool + // the test might sporadically fail if between sending the txn above + // and the cancelation, 2 x waitForNextTxnDuration elapses (10ms) + time.Sleep(6 * waitForNextTxnDuration) + cancel() + + // the main loop should stop after cancel() + sv.WaitForStop() + + // release the tasks + close(holdTasks) + + wg.Wait() + require.ErrorIs(t, result.Err, errShuttingDownError) + require.Contains(t, logBuffer.String(), "addVerificationTaskToThePoolNow: EnqueueBacklog returned an error and StreamVerifier will stop: context canceled") +} + +// TestStreamVerifierPostVBlocked tests the behavior when the return channel (result chan) of verified +// transactions is blocked, and checks droppedFromPool counter to confirm the drops +func TestStreamVerifierPostVBlocked(t *testing.T) { + + // prepare the stream verifier + verificationPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, t) + defer verificationPool.Shutdown() + errChan := make(chan error) + var badSigResultCounter int + var goodSigResultCounter int + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cache := MakeVerifiedTransactionCache(50) + + txBacklogSizeMod := txBacklogSize / 20 + + stxnChan := make(chan *UnverifiedElement) + resultChan := make(chan *VerificationResult, txBacklogSizeMod) + droppedChan := make(chan *UnverifiedElement) + sv, err := MakeStreamVerifier(stxnChan, resultChan, droppedChan, &DummyLedgerForSignature{}, verificationPool, cache) + require.NoError(t, err) + + defer close(droppedChan) + go func() { + for range droppedChan { + droppedFromPool.Inc(nil) + } + }() + + // start the verifier + sv.Start(ctx) + overflow := 3 + // send txBacklogSizeMod + 3 transactions to overflow the result buffer + numOfTxns := txBacklogSizeMod + overflow + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, 1, 0, 0.5) + numOfTxnGroups := len(txnGroups) + for _, tg := range txnGroups { + stxnChan <- &UnverifiedElement{TxnGroup: tg, BacklogMessage: nil} + } + + var droppedPool uint64 + // wait until overflow transactions are dropped + for w := 0; w < 100; w++ { + droppedPool = droppedFromPool.GetUint64Value() + if droppedPool >= uint64(overflow) { + break + } + time.Sleep(time.Millisecond * 20) + } + + require.Equal(t, uint64(overflow), droppedPool) + + wg := sync.WaitGroup{} + wg.Add(1) + // make sure the other results are fine + go processResults(ctx, errChan, resultChan, numOfTxnGroups-overflow, badTxnGroups, &badSigResultCounter, &goodSigResultCounter, &wg) + + for err := range errChan { + require.ErrorIs(t, err, errShuttingDownError) + fmt.Println(badTxnGroups) + } + + // check if more transactions can be accepted + errChan = make(chan error) + + wg.Add(1) + // make sure the other results are fine + txnGroups, badTxnGroups2 := getSignedTransactions(numOfTxns, 1, numOfTxns, 0.5) + // need to combine these, since left overs from the previous one could still come out + for b := range badTxnGroups2 { + badTxnGroups[b] = struct{}{} + } + go processResults(ctx, errChan, resultChan, numOfTxnGroups, badTxnGroups, &badSigResultCounter, &goodSigResultCounter, &wg) + + for _, tg := range txnGroups { + stxnChan <- &UnverifiedElement{TxnGroup: tg, BacklogMessage: nil} + } + + for err := range errChan { + require.ErrorIs(t, err, errShuttingDownError) + fmt.Println(badTxnGroups) + } + + wg.Wait() +} + +func TestStreamVerifierMakeStreamVerifierErr(t *testing.T) { + _, err := MakeStreamVerifier(nil, nil, nil, &DummyLedgerForSignature{badHdr: true}, nil, nil) + require.Error(t, err) +} + +// TestStreamVerifierCancelWhenPooled tests the case where the ctx is cancled after the verification +// task is queued to the exec pool and before the task is executed in the pool +func TestStreamVerifierCancelWhenPooled(t *testing.T) { + partitiontest.PartitionTest(t) + numOfTxns := 1000 + txnGroups, badTxnGroups := getSignedTransactions(numOfTxns, 1, 0, 0.5) + + // prepare the stream verifier + numOfTxnGroups := len(txnGroups) + execPool := execpool.MakePool(t) + defer execPool.Shutdown() + verificationPool := execpool.MakeBacklog(execPool, 64, execpool.LowPriority, t) + defer verificationPool.Shutdown() + + cache := MakeVerifiedTransactionCache(50) + + stxnChan := make(chan *UnverifiedElement) + resultChan := make(chan *VerificationResult, txBacklogSize) + droppedChan := make(chan *UnverifiedElement) + ctx, cancel := context.WithCancel(context.Background()) + sv, err := MakeStreamVerifier(stxnChan, resultChan, droppedChan, &DummyLedgerForSignature{}, verificationPool, cache) + require.NoError(t, err) + sv.Start(ctx) + + errChan := make(chan error) + + var badSigResultCounter int + var goodSigResultCounter int + + ctx2, cancel2 := context.WithCancel(context.Background()) + + wg := sync.WaitGroup{} + wg.Add(1) + go processResults(ctx2, errChan, resultChan, numOfTxnGroups, badTxnGroups, &badSigResultCounter, &goodSigResultCounter, &wg) + + wg.Add(1) + // send txn groups to be verified + go func() { + defer wg.Done() + for _, tg := range txnGroups { + stxnChan <- &UnverifiedElement{TxnGroup: tg, BacklogMessage: nil} + } + // cancel the ctx, and expect at least one task queued to the pool but not yet executed + cancel() + }() + for err := range errChan { + require.ErrorIs(t, err, errShuttingDownError) + } + wg.Wait() + sv.WaitForStop() + cancel2() // not necessary, but the golint will want to see this +} diff --git a/data/transactions/verify/verifiedTxnCache.go b/data/transactions/verify/verifiedTxnCache.go index 82f0e9772e..58ce23ae1f 100644 --- a/data/transactions/verify/verifiedTxnCache.go +++ b/data/transactions/verify/verifiedTxnCache.go @@ -59,7 +59,7 @@ type VerifiedTransactionCache interface { // in the cache, the new entry overrides the old one. Add(txgroup []transactions.SignedTxn, groupCtx *GroupContext) // AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts. - AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) error + AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) // GetUnverifiedTransactionGroups compares the provided payset against the currently cached transactions and figure which transaction groups aren't fully cached. GetUnverifiedTransactionGroups(payset [][]transactions.SignedTxn, CurrSpecAddrs transactions.SpecialAddresses, CurrProto protocol.ConsensusVersion) [][]transactions.SignedTxn // UpdatePinned replaces the pinned entries with the one provided in the pinnedTxns map. This is typically expected to be a subset of the @@ -106,13 +106,12 @@ func (v *verifiedTransactionCache) Add(txgroup []transactions.SignedTxn, groupCt } // AddPayset works in a similar way to Add, but is intended for adding an array of transaction groups, along with their corresponding contexts. -func (v *verifiedTransactionCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) error { +func (v *verifiedTransactionCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) { v.bucketsLock.Lock() defer v.bucketsLock.Unlock() for i := range txgroup { v.add(txgroup[i], groupCtxs[i]) } - return nil } // GetUnverifiedTransactionGroups compares the provided payset against the currently cached transactions and figure which transaction groups aren't fully cached. @@ -268,8 +267,7 @@ func (v *mockedCache) Add(txgroup []transactions.SignedTxn, groupCtx *GroupConte return } -func (v *mockedCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) error { - return nil +func (v *mockedCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*GroupContext) { } func (v *mockedCache) GetUnverifiedTransactionGroups(txnGroups [][]transactions.SignedTxn, currSpecAddrs transactions.SpecialAddresses, currProto protocol.ConsensusVersion) (unverifiedGroups [][]transactions.SignedTxn) { diff --git a/data/transactions/verify/verifiedTxnCache_test.go b/data/transactions/verify/verifiedTxnCache_test.go index e3001db674..553eb33bdd 100644 --- a/data/transactions/verify/verifiedTxnCache_test.go +++ b/data/transactions/verify/verifiedTxnCache_test.go @@ -32,8 +32,8 @@ func TestAddingToCache(t *testing.T) { icache := MakeVerifiedTransactionCache(500) impl := icache.(*verifiedTransactionCache) - _, signedTxn, secrets, addrs := generateTestObjects(10, 5, 50) - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + _, signedTxn, secrets, addrs := generateTestObjects(10, 5, 0, 50) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) groupCtx, err := PrepareGroupContext(txnGroups[0], blockHeader, nil) require.NoError(t, err) impl.Add(txnGroups[0], groupCtx) @@ -52,7 +52,7 @@ func TestBucketCycling(t *testing.T) { entriesPerBucket := 100 icache := MakeVerifiedTransactionCache(entriesPerBucket * (bucketCount - 1)) impl := icache.(*verifiedTransactionCache) - _, signedTxn, _, _ := generateTestObjects(entriesPerBucket*bucketCount*2, bucketCount, 0) + _, signedTxn, _, _ := generateTestObjects(entriesPerBucket*bucketCount*2, bucketCount, 0, 0) require.Equal(t, entriesPerBucket*bucketCount*2, len(signedTxn)) groupCtx, err := PrepareGroupContext([]transactions.SignedTxn{signedTxn[0]}, blockHeader, nil) @@ -82,8 +82,8 @@ func TestGetUnverifiedTransactionGroups50(t *testing.T) { size := 300 icache := MakeVerifiedTransactionCache(size * 2) impl := icache.(*verifiedTransactionCache) - _, signedTxn, secrets, addrs := generateTestObjects(size*2, 10+size/1000, 0) - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + _, signedTxn, secrets, addrs := generateTestObjects(size*2, 10+size/1000, 0, 0) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) expectedUnverifiedGroups := make([][]transactions.SignedTxn, 0, len(txnGroups)/2) // add every even transaction to the cache. @@ -107,8 +107,8 @@ func BenchmarkGetUnverifiedTransactionGroups50(b *testing.B) { } icache := MakeVerifiedTransactionCache(b.N * 2) impl := icache.(*verifiedTransactionCache) - _, signedTxn, secrets, addrs := generateTestObjects(b.N*2, 10+b.N/1000, 0) - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + _, signedTxn, secrets, addrs := generateTestObjects(b.N*2, 10+b.N/1000, 0, 0) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) queryTxnGroups := make([][]transactions.SignedTxn, 0, b.N) // add every even transaction to the cache. @@ -140,8 +140,8 @@ func TestUpdatePinned(t *testing.T) { size := 100 icache := MakeVerifiedTransactionCache(size * 10) impl := icache.(*verifiedTransactionCache) - _, signedTxn, secrets, addrs := generateTestObjects(size*2, 10, 0) - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + _, signedTxn, secrets, addrs := generateTestObjects(size*2, 10, 0, 0) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) // insert some entries. for i := 0; i < len(txnGroups); i++ { @@ -169,8 +169,8 @@ func TestPinningTransactions(t *testing.T) { size := 100 icache := MakeVerifiedTransactionCache(size) impl := icache.(*verifiedTransactionCache) - _, signedTxn, secrets, addrs := generateTestObjects(size*2, 10, 0) - txnGroups := generateTransactionGroups(signedTxn, secrets, addrs) + _, signedTxn, secrets, addrs := generateTestObjects(size*2, 10, 0, 0) + txnGroups := generateTransactionGroups(protoMaxGroupSize, signedTxn, secrets, addrs) // insert half of the entries. for i := 0; i < len(txnGroups)/2; i++ { diff --git a/data/txDupCache.go b/data/txDupCache.go index 026a168578..83429165cf 100644 --- a/data/txDupCache.go +++ b/data/txDupCache.go @@ -109,6 +109,7 @@ type txSaltedCache struct { curSalt [4]byte prevSalt [4]byte ctx context.Context + wg sync.WaitGroup } func makeSaltedCache(size int) *txSaltedCache { @@ -117,9 +118,10 @@ func makeSaltedCache(size int) *txSaltedCache { } } -func (c *txSaltedCache) start(ctx context.Context, refreshInterval time.Duration) { +func (c *txSaltedCache) Start(ctx context.Context, refreshInterval time.Duration) { c.ctx = ctx if refreshInterval != 0 { + c.wg.Add(1) go c.salter(refreshInterval) } @@ -128,9 +130,14 @@ func (c *txSaltedCache) start(ctx context.Context, refreshInterval time.Duration c.moreSalt() } +func (c *txSaltedCache) WaitForStop() { + c.wg.Wait() +} + // salter is a goroutine refreshing the cache by schedule func (c *txSaltedCache) salter(refreshInterval time.Duration) { ticker := time.NewTicker(refreshInterval) + defer c.wg.Done() defer ticker.Stop() for { select { diff --git a/data/txDupCache_test.go b/data/txDupCache_test.go index e0bfac25a5..42c7659bfb 100644 --- a/data/txDupCache_test.go +++ b/data/txDupCache_test.go @@ -120,7 +120,7 @@ func TestTxHandlerSaltedCacheBasic(t *testing.T) { const size = 20 cache := makeSaltedCache(size) - cache.start(context.Background(), 0) + cache.Start(context.Background(), 0) require.Zero(t, cache.Len()) // add some unique random @@ -204,7 +204,7 @@ func TestTxHandlerSaltedCacheScheduled(t *testing.T) { const size = 20 updateInterval := 1000 * time.Microsecond cache := makeSaltedCache(size) - cache.start(context.Background(), updateInterval) + cache.Start(context.Background(), updateInterval) require.Zero(t, cache.Len()) // add some unique random @@ -229,7 +229,7 @@ func TestTxHandlerSaltedCacheManual(t *testing.T) { const size = 20 cache := makeSaltedCache(2 * size) - cache.start(context.Background(), 0) + cache.Start(context.Background(), 0) require.Zero(t, cache.Len()) // add some unique random @@ -294,7 +294,7 @@ func (m digestCacheMaker) make(size int) cachePusher { } func (m saltedCacheMaker) make(size int) cachePusher { scp := &saltedCachePusher{c: makeSaltedCache(size)} - scp.c.start(context.Background(), 0) + scp.c.Start(context.Background(), 0) return scp } diff --git a/data/txHandler.go b/data/txHandler.go index a43f320695..78704f0f88 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -63,6 +63,12 @@ var transactionGroupTxSyncRemember = metrics.MakeCounter(metrics.TransactionGrou var transactionGroupTxSyncAlreadyCommitted = metrics.MakeCounter(metrics.TransactionGroupTxSyncAlreadyCommitted) var txBacklogDroppedCongestionManagement = metrics.MakeCounter(metrics.TransactionMessagesTxnDroppedCongestionManagement) +// ErrInvalidTxPool is reported when nil is passed for the tx pool +var ErrInvalidTxPool = errors.New("MakeTxHandler: txPool is nil on initialization") + +// ErrInvalidLedger is reported when nil is passed for the ledger +var ErrInvalidLedger = errors.New("MakeTxHandler: ledger is nil on initialization") + var transactionMessageTxPoolRememberCounter = metrics.NewTagCounter( "algod_transaction_messages_txpool_remember_err_{TAG}", "Number of transaction messages not remembered by txpool b/c of {TAG}", txPoolRememberTagCap, txPoolRememberPendingEval, txPoolRememberTagNoSpace, txPoolRememberTagFee, txPoolRememberTagTxnDead, txPoolRememberTagTxnEarly, txPoolRememberTagTooLarge, txPoolRememberTagGroupID, @@ -111,7 +117,7 @@ type TxHandler struct { genesisHash crypto.Digest txVerificationPool execpool.BacklogPool backlogQueue chan *txBacklogMsg - postVerificationQueue chan *txBacklogMsg + postVerificationQueue chan *verify.VerificationResult backlogWg sync.WaitGroup net network.GossipNode msgCache *txSaltedCache @@ -119,6 +125,9 @@ type TxHandler struct { cacheConfig txHandlerConfig ctx context.Context ctxCancel context.CancelFunc + streamVerifier *verify.StreamVerifier + streamVerifierChan chan *verify.UnverifiedElement + streamVerifierDropped chan *verify.UnverifiedElement erl *util.ElasticRateLimiter } @@ -140,15 +149,14 @@ type txHandlerConfig struct { } // MakeTxHandler makes a new handler for transaction messages -func MakeTxHandler(opts TxHandlerOpts) *TxHandler { +func MakeTxHandler(opts TxHandlerOpts) (*TxHandler, error) { + if opts.TxPool == nil { - logging.Base().Fatal("MakeTxHandler: txPool is nil on initialization") - return nil + return nil, ErrInvalidTxPool } if opts.Ledger == nil { - logging.Base().Fatal("MakeTxHandler: ledger is nil on initialization") - return nil + return nil, ErrInvalidLedger } // backlog size is big enough for each peer to have its reserved capacity in the backlog, plus the config's backlogSize as a shared capacity @@ -164,12 +172,15 @@ func MakeTxHandler(opts TxHandlerOpts) *TxHandler { ledger: opts.Ledger, txVerificationPool: opts.ExecutionPool, backlogQueue: make(chan *txBacklogMsg, txBacklogSize), - postVerificationQueue: make(chan *txBacklogMsg, txBacklogSize), + postVerificationQueue: make(chan *verify.VerificationResult, txBacklogSize), net: opts.Net, msgCache: makeSaltedCache(2 * txBacklogSize), txCanonicalCache: makeDigestCache(2 * txBacklogSize), cacheConfig: txHandlerConfig{opts.Config.TxFilterRawMsgEnabled(), opts.Config.TxFilterCanonicalEnabled()}, + streamVerifierChan: make(chan *verify.UnverifiedElement), + streamVerifierDropped: make(chan *verify.UnverifiedElement), } + if opts.Config.EnableTxBacklogRateLimiting { rateLimiter := util.NewElasticRateLimiter( txBacklogSize, @@ -179,19 +190,43 @@ func MakeTxHandler(opts TxHandlerOpts) *TxHandler { ) handler.erl = rateLimiter } - return handler + + // prepare the transaction stream verifer + var err error + handler.streamVerifier, err = verify.MakeStreamVerifier(handler.streamVerifierChan, + handler.postVerificationQueue, handler.streamVerifierDropped, handler.ledger, + handler.txVerificationPool, handler.ledger.VerifiedTransactionCache()) + if err != nil { + return nil, err + } + go handler.droppedTxnWatcher() + return handler, nil +} + +func (handler *TxHandler) droppedTxnWatcher() { + for unverified := range handler.streamVerifierDropped { + // we failed to write to the output queue, since the queue was full. + // adding the metric here allows us to monitor how frequently it happens. + transactionMessagesDroppedFromPool.Inc(nil) + + tx := unverified.BacklogMessage.(*txBacklogMsg) + + // delete from duplicate caches to give it a chance to be re-submitted + handler.deleteFromCaches(tx.rawmsgDataHash, tx.unverifiedTxGroupHash) + } } // Start enables the processing of incoming messages at the transaction handler func (handler *TxHandler) Start() { handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) - handler.msgCache.start(handler.ctx, 60*time.Second) + handler.msgCache.Start(handler.ctx, 60*time.Second) handler.net.RegisterHandlers([]network.TaggedMessageHandler{ {Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)}, }) handler.backlogWg.Add(2) go handler.backlogWorker() go handler.backlogGaugeThread() + handler.streamVerifier.Start(handler.ctx) if handler.erl != nil { handler.erl.Start() } @@ -204,6 +239,8 @@ func (handler *TxHandler) Stop() { handler.erl.Stop() } handler.backlogWg.Wait() + handler.streamVerifier.WaitForStop() + handler.msgCache.WaitForStop() } func reencode(stxns []transactions.SignedTxn) []byte { @@ -241,7 +278,9 @@ func (handler *TxHandler) backlogWorker() { if !ok { return } - handler.postProcessCheckedTxn(wi) + m := wi.BacklogMessage.(*txBacklogMsg) + m.verificationErr = wi.Err + handler.postProcessCheckedTxn(m) // restart the loop so that we could empty out the post verification queue. continue @@ -252,6 +291,7 @@ func (handler *TxHandler) backlogWorker() { select { case wi, ok := <-handler.backlogQueue: if !ok { + // this is never happening since handler.backlogQueue is never closed return } if wi.capguard != nil { @@ -266,18 +306,24 @@ func (handler *TxHandler) backlogWorker() { } continue } - - // enqueue the task to the verification pool. - handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil) + // handler.streamVerifierChan does not receive if ctx is cancled + select { + case handler.streamVerifierChan <- &verify.UnverifiedElement{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi}: + case <-handler.ctx.Done(): + transactionMessagesDroppedFromBacklog.Inc(nil) + return + } if wi.capguard != nil { wi.capguard.Served() } - case wi, ok := <-handler.postVerificationQueue: if !ok { + // this is never happening since handler.postVerificationQueue is never closed return } - handler.postProcessCheckedTxn(wi) + m := wi.BacklogMessage.(*txBacklogMsg) + m.verificationErr = wi.Err + handler.postProcessCheckedTxn(m) case <-handler.ctx.Done(): return @@ -286,7 +332,7 @@ func (handler *TxHandler) backlogWorker() { } func (handler *TxHandler) postProcessReportErrors(err error) { - if errors.Is(err, crypto.ErrBatchVerificationFailed) { + if errors.Is(err, crypto.ErrBatchHasFailedSigs) { transactionMessagesTxnSigVerificationFailed.Inc(nil) return } @@ -450,36 +496,6 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { handler.net.Relay(handler.ctx, protocol.TxnTag, reencode(verifiedTxGroup), false, wi.rawmsg.Sender) } -// asyncVerifySignature verifies that the given transaction group is valid, and update the txBacklogMsg data structure accordingly. -func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} { - tx := arg.(*txBacklogMsg) - - // build the transaction verification context - latest := handler.ledger.Latest() - latestHdr, err := handler.ledger.BlockHdr(latest) - if err != nil { - tx.verificationErr = fmt.Errorf("could not get header for previous block %d: %w", latest, err) - logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err) - } else { - // we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock. - _, tx.verificationErr = verify.TxnGroup(tx.unverifiedTxGroup, latestHdr, handler.ledger.VerifiedTransactionCache(), handler.ledger) - } - - select { - case handler.postVerificationQueue <- tx: - default: - // we failed to write to the output queue, since the queue was full. - // adding the metric here allows us to monitor how frequently it happens. - transactionMessagesDroppedFromPool.Inc(nil) - - // delete from duplicate caches to give it chance to be re-submitted - // this relatively rare operation and implementation is expensive (requires re-hashing) - handler.deleteFromCaches(tx.rawmsgDataHash, tx.unverifiedTxGroupHash) - - } - return nil -} - func (handler *TxHandler) deleteFromCaches(msgKey *crypto.Digest, canonicalKey *crypto.Digest) { if handler.cacheConfig.enableFilteringCanonical && canonicalKey != nil { handler.txCanonicalCache.Delete(canonicalKey) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 3090fd7b50..0339ee2e4a 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -45,7 +45,6 @@ import ( "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/data/transactions/verify" "github.com/algorand/go-algorand/data/txntest" - realledger "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/network" @@ -108,8 +107,10 @@ func BenchmarkTxHandlerProcessing(b *testing.B) { cfg.TxPoolSize = 75000 cfg.EnableProcessBlockStats = false - txHandler := makeTestTxHandler(l, cfg) + txHandler, err := makeTestTxHandler(l, cfg) + require.NoError(b, err) defer txHandler.txVerificationPool.Shutdown() + defer close(txHandler.streamVerifierDropped) makeTxns := func(N int) [][]transactions.SignedTxn { ret := make([][]transactions.SignedTxn, 0, N) @@ -154,7 +155,7 @@ func BenchmarkTxHandlerProcessing(b *testing.B) { b.Logf("verifying %d signedTransactionGroups", len(signedTransactionGroups)) b.ResetTimer() for i := range signedTransactionGroups { - verify.TxnGroup(signedTransactionGroups[i], hdr, vtc, l) + verify.TxnGroup(signedTransactionGroups[i], &hdr, vtc, l) } }) } @@ -163,8 +164,8 @@ func BenchmarkTxHandlerProcessing(b *testing.B) { type vtCache struct{} func (vtCache) Add(txgroup []transactions.SignedTxn, groupCtx *verify.GroupContext) {} -func (vtCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*verify.GroupContext) error { - return nil +func (vtCache) AddPayset(txgroup [][]transactions.SignedTxn, groupCtxs []*verify.GroupContext) { + return } func (vtCache) GetUnverifiedTransactionGroups(payset [][]transactions.SignedTxn, CurrSpecAddrs transactions.SpecialAddresses, CurrProto protocol.ConsensusVersion) [][]transactions.SignedTxn { return nil @@ -787,11 +788,11 @@ func makeTestTxHandlerOrphanedWithContext(ctx context.Context, backlogSize int, txCanonicalCache: makeDigestCache(cacheSize), cacheConfig: txHandlerConfig, } - handler.msgCache.start(ctx, refreshInterval) + handler.msgCache.Start(ctx, refreshInterval) return handler } -func makeTestTxHandler(dl *Ledger, cfg config.Local) *TxHandler { +func makeTestTxHandler(dl *Ledger, cfg config.Local) (*TxHandler, error) { tp := pools.MakeTransactionPool(dl.Ledger, cfg, logging.Base()) backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) opts := TxHandlerOpts{ @@ -952,7 +953,6 @@ func TestTxHandlerProcessIncomingCacheRotation(t *testing.T) { // TestTxHandlerProcessIncomingCacheBacklogDrop checks if dropped messages are also removed from caches func TestTxHandlerProcessIncomingCacheBacklogDrop(t *testing.T) { partitiontest.PartitionTest(t) - t.Parallel() handler := makeTestTxHandlerOrphanedWithContext(context.Background(), 1, 20, txHandlerConfig{true, true}, 0) @@ -983,6 +983,7 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { const numUsers = 100 log := logging.TestingLog(t) + log.SetLevel(logging.Panic) // prepare the accounts addresses, secrets, genesis := makeTestGenesisAccounts(t, numUsers) @@ -998,9 +999,20 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { defer ledger.Close() l := ledger - handler := makeTestTxHandler(l, cfg) + handler, err := makeTestTxHandler(l, cfg) + require.NoError(t, err) defer handler.txVerificationPool.Shutdown() - handler.postVerificationQueue = make(chan *txBacklogMsg) + defer close(handler.streamVerifierDropped) + + // saturate the postVerificationQueue +loop: + for { + select { + case handler.postVerificationQueue <- &verify.VerificationResult{}: + default: + break loop + } + } makeTxns := func(sendIdx, recvIdx int) ([]transactions.SignedTxn, []byte) { tx := transactions.Transaction{ @@ -1035,8 +1047,22 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) { require.Equal(t, stxns, msg.unverifiedTxGroup) initialCount := transactionMessagesDroppedFromPool.GetUint64Value() - handler.asyncVerifySignature(msg) - currentCount := transactionMessagesDroppedFromPool.GetUint64Value() + + // emulate handler.Start() without the backlog + handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) + handler.streamVerifier.Start(handler.ctx) + defer handler.streamVerifier.WaitForStop() + defer handler.ctxCancel() + handler.streamVerifierChan <- &verify.UnverifiedElement{ + TxnGroup: msg.unverifiedTxGroup, BacklogMessage: msg} + var currentCount uint64 + for x := 0; x < 1000; x++ { + currentCount = transactionMessagesDroppedFromPool.GetUint64Value() + if currentCount > 0 { + break + } + time.Sleep(10 * time.Millisecond) + } require.Equal(t, initialCount+1, currentCount) require.Equal(t, 0, handler.msgCache.Len()) require.Equal(t, 0, handler.txCanonicalCache.Len()) @@ -1127,6 +1153,7 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t const numUsers = 100 log := logging.TestingLog(t) + log.SetLevel(logging.Warn) // prepare the accounts addresses, secrets, genesis := makeTestGenesisAccounts(t, numUsers) @@ -1140,21 +1167,26 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t require.NoError(t, err) defer ledger.Close() - handler := makeTestTxHandler(ledger, cfg) + handler, err := makeTestTxHandler(ledger, cfg) + require.NoError(t, err) defer handler.txVerificationPool.Shutdown() + defer close(handler.streamVerifierDropped) // since Start is not called, set the context here handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) defer handler.ctxCancel() - outChan := make(chan *txBacklogMsg, 10) + // emulate handler.Start() without the backlog + handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) + handler.streamVerifier.Start(handler.ctx) + + testResultChan := make(chan *txBacklogMsg, 10) wg := sync.WaitGroup{} wg.Add(1) // Make a test backlog worker, which is similar to backlogWorker, but sends the results - // through the outChan instead of passing it to postProcessCheckedTxn + // through the testResultChan instead of passing it to postProcessCheckedTxn go func() { defer wg.Done() - defer close(outChan) for { // prioritize the postVerificationQueue select { @@ -1162,7 +1194,10 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t if !ok { return } - outChan <- wi + txBLMsg := wi.BacklogMessage.(*txBacklogMsg) + txBLMsg.verificationErr = wi.Err + testResultChan <- txBLMsg + // restart the loop so that we could empty out the post verification queue. continue default: @@ -1172,29 +1207,20 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t select { case wi, ok := <-handler.backlogQueue: if !ok { - // shut down to end the test - handler.txVerificationPool.Shutdown() - close(handler.postVerificationQueue) - // wait until all the pending responses are obtained. - // this is not in backlogWorker, maybe should be - for wi := range handler.postVerificationQueue { - outChan <- wi - } return } if handler.checkAlreadyCommitted(wi) { // this is not expected during the test continue } - - // enqueue the task to the verification pool. - handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil) - + handler.streamVerifierChan <- &verify.UnverifiedElement{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi} case wi, ok := <-handler.postVerificationQueue: if !ok { return } - outChan <- wi + txBLMsg := wi.BacklogMessage.(*txBacklogMsg) + txBLMsg.verificationErr = wi.Err + testResultChan <- txBLMsg case <-handler.ctx.Done(): return @@ -1234,7 +1260,7 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t timer := time.NewTicker(250 * time.Millisecond) for { select { - case wi := <-outChan: + case wi := <-testResultChan: txnCounter = txnCounter + len(wi.unverifiedTxGroup) groupCounter++ u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note) @@ -1658,8 +1684,10 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac cfg.IncomingConnectionsLimit = 10 ledger := txGen.makeLedger(b, cfg, log, fmt.Sprintf("%s-%d", b.Name(), b.N)) defer ledger.Close() - handler := makeTestTxHandler(ledger, cfg) + handler, err := makeTestTxHandler(ledger, cfg) + require.NoError(b, err) defer handler.txVerificationPool.Shutdown() + defer close(handler.streamVerifierDropped) // The benchmark generates only 1000 txns, and reuses them. This is done for faster benchmark time and the // ability to have long runs without being limited to the memory. The dedup will block the txns once the same @@ -1672,12 +1700,15 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) defer handler.ctxCancel() - testResultChan := handler.postVerificationQueue + // emulate handler.Start() without the backlog + handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) + handler.streamVerifier.Start(handler.ctx) + + testResultChan := make(chan *txBacklogMsg, 10) wg := sync.WaitGroup{} if useBacklogWorker { wg.Add(1) - testResultChan = make(chan *txBacklogMsg, 10) // Make a test backlog worker, which is similar to backlogWorker, but sends the results // through the testResultChan instead of passing it to postProcessCheckedTxn go func() { @@ -1689,7 +1720,9 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac if !ok { return } - testResultChan <- wi + txBLMsg := wi.BacklogMessage.(*txBacklogMsg) + txBLMsg.verificationErr = wi.Err + testResultChan <- txBLMsg // restart the loop so that we could empty out the post verification queue. continue @@ -1706,13 +1739,14 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac // this is not expected during the test continue } - handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, wi, nil) - + handler.streamVerifierChan <- &verify.UnverifiedElement{TxnGroup: wi.unverifiedTxGroup, BacklogMessage: wi} case wi, ok := <-handler.postVerificationQueue: if !ok { return } - testResultChan <- wi + txBLMsg := wi.BacklogMessage.(*txBacklogMsg) + txBLMsg.verificationErr = wi.Err + testResultChan <- txBLMsg case <-handler.ctx.Done(): return @@ -1751,6 +1785,7 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac groupCounter := uint64(0) var txnCounter uint64 invalidCounter := 0 + // report the results defer func() { if groupCounter > 1 { timeSinceStart := time.Since(tt) @@ -1768,6 +1803,7 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac }() counterMutex := deadlock.Mutex{} stopChan := make(chan interface{}) + // monitor the counters to tell when everything is processed and the checker should stop wg.Add(1) go func() { defer wg.Done() @@ -1784,28 +1820,55 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac } } }() - - for { - select { - case wi := <-testResultChan: - txnCounter = txnCounter + uint64(len(wi.unverifiedTxGroup)) - counterMutex.Lock() - groupCounter++ - counterMutex.Unlock() - u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note) - _, inBad := badTxnGroups[u] - if wi.verificationErr == nil { - require.False(b, inBad, "No error for invalid signature") - } else { - invalidCounter++ - require.True(b, inBad, "Error for good signature") + // pick up each output from the verifier and check it is was correctly decided + // since the data paths differ, distinguish between useBacklogWorker or not + if useBacklogWorker { + for { + select { + case wi := <-testResultChan: + txnCounter = txnCounter + uint64(len(wi.unverifiedTxGroup)) + counterMutex.Lock() + groupCounter++ + counterMutex.Unlock() + u, _ := binary.Uvarint(wi.unverifiedTxGroup[0].Txn.Note) + _, inBad := badTxnGroups[u] + if wi.verificationErr == nil { + require.False(b, inBad, "No error for invalid signature") + } else { + invalidCounter++ + require.True(b, inBad, "Error for good signature") + } + if groupCounter == uint64(b.N) { + // all the benchmark txns processed + return + } + case <-stopChan: + return } - if groupCounter == uint64(b.N) { - // all the benchmark txns processed + } + } else { + for { + select { + case wi := <-handler.postVerificationQueue: + txnCounter = txnCounter + uint64(len(wi.TxnGroup)) + counterMutex.Lock() + groupCounter++ + counterMutex.Unlock() + u, _ := binary.Uvarint(wi.TxnGroup[0].Txn.Note) + _, inBad := badTxnGroups[u] + if wi.Err == nil { + require.False(b, inBad, "No error for invalid signature") + } else { + invalidCounter++ + require.True(b, inBad, "Error for good signature") + } + if groupCounter == uint64(b.N) { + // all the benchmark txns processed + return + } + case <-stopChan: return } - case <-stopChan: - return } } }() @@ -1827,7 +1890,7 @@ func runHandlerBenchmarkWithBacklog(b *testing.B, txGen txGenIf, tps int, useBac } else { stxngrp := signedTransactionGroups[i] blm := txBacklogMsg{rawmsg: nil, unverifiedTxGroup: stxngrp} - handler.txVerificationPool.EnqueueBacklog(handler.ctx, handler.asyncVerifySignature, &blm, nil) + handler.streamVerifierChan <- &verify.UnverifiedElement{TxnGroup: stxngrp, BacklogMessage: &blm} } c++ if c == b.N { @@ -1906,7 +1969,7 @@ func TestTxHandlerPostProcessError(t *testing.T) { const expected = int(verify.TxGroupErrorReasonNumValues) - 3 require.Len(t, result, expected) - errVerify := crypto.ErrBatchVerificationFailed + errVerify := crypto.ErrBatchHasFailedSigs txh.postProcessReportErrors(errVerify) result = collect() require.Len(t, result, expected+1) @@ -1928,7 +1991,7 @@ func TestTxHandlerPostProcessErrorWithVerify(t *testing.T) { CurrentProtocol: protocol.ConsensusCurrentVersion, }, } - _, err := verify.TxnGroup([]transactions.SignedTxn{stxn}, hdr, nil, nil) + _, err := verify.TxnGroup([]transactions.SignedTxn{stxn}, &hdr, nil, nil) var txGroupErr *verify.TxGroupError require.ErrorAs(t, err, &txGroupErr) @@ -2092,8 +2155,10 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { require.NoError(t, err) defer ledger.Close() - handler := makeTestTxHandler(ledger, cfg) + handler, err := makeTestTxHandler(ledger, cfg) + require.NoError(t, err) defer handler.txVerificationPool.Shutdown() + defer close(handler.streamVerifierDropped) // since Start is not called, set the context here handler.ctx, handler.ctxCancel = context.WithCancel(context.Background()) defer handler.ctxCancel() @@ -2215,7 +2280,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { // make an invalid block to fail recompute pool and expose transactionMessageTxGroupRememberNoPendingEval metric blockTicker := makeBlockTicker() - blockListeners := []realledger.BlockListener{ + blockListeners := []ledgercore.BlockListener{ handler.txPool, blockTicker, } @@ -2244,3 +2309,167 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { handler.postProcessCheckedTxn(&wi) require.Equal(t, 1, getMetricCounter(txPoolRememberPendingEval)) } + +func TestMakeTxHandlerErrors(t *testing.T) { + opts := TxHandlerOpts{ + nil, nil, nil, &mocks.MockNetwork{}, "", crypto.Digest{}, config.Local{}, + } + _, err := MakeTxHandler(opts) + require.Error(t, err, ErrInvalidTxPool) + + opts = TxHandlerOpts{ + &pools.TransactionPool{}, nil, nil, &mocks.MockNetwork{}, "", crypto.Digest{}, config.Local{}, + } + _, err = MakeTxHandler(opts) + require.Error(t, err, ErrInvalidLedger) + + // it is not possible to test MakeStreamVerifier returning an error, because it is not possible to + // get the leger return an error for returining the header of its latest round +} + +// TestTxHandlerRestartWithBacklogAndTxPool starts txHandler, sends transactions, +// stops, starts in a loop, sends more transactions, and makes sure all the transactions +// are accounted for. It uses the production backlog worker +func TestTxHandlerRestartWithBacklogAndTxPool(t *testing.T) { + transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) + transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) + transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) + transactionMessagesBacklogErr = metrics.MakeCounter(metrics.TransactionMessagesBacklogErr) + transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted) + transactionMessagesRemember = metrics.MakeCounter(metrics.TransactionMessagesRemember) + transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled) + + defer func() { + // reset the counters + transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) + transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) + transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) + transactionMessagesBacklogErr = metrics.MakeCounter(metrics.TransactionMessagesBacklogErr) + transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted) + transactionMessagesRemember = metrics.MakeCounter(metrics.TransactionMessagesRemember) + transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled) + }() + + const numUsers = 100 + log := logging.TestingLog(t) + log.SetLevel(logging.Warn) + addresses := make([]basics.Address, numUsers) + secrets := make([]*crypto.SignatureSecrets, numUsers) + + // avoid printing the warning messages + origLevel := logging.Base().GetLevel() + defer func() { logging.Base().SetLevel(origLevel) }() + logging.Base().SetLevel(logging.Error) + + // prepare the accounts + genesis := make(map[basics.Address]basics.AccountData) + for i := 0; i < numUsers; i++ { + secret := keypair() + addr := basics.Address(secret.SignatureVerifier) + secrets[i] = secret + addresses[i] = addr + genesis[addr] = basics.AccountData{ + Status: basics.Online, + MicroAlgos: basics.MicroAlgos{Raw: 10000000000000}, + } + } + genesis[poolAddr] = basics.AccountData{ + Status: basics.NotParticipating, + MicroAlgos: basics.MicroAlgos{Raw: config.Consensus[protocol.ConsensusCurrentVersion].MinBalance}, + } + + // setup the ledger + require.Equal(t, len(genesis), numUsers+1) + genBal := bookkeeping.MakeGenesisBalances(genesis, sinkAddr, poolAddr) + ledgerName := fmt.Sprintf("%s-mem", t.Name()) + const inMem = true + cfg := config.GetDefaultLocal() + cfg.Archival = true + ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg) + require.NoError(t, err) + defer ledger.Ledger.Close() + + handler, err := makeTestTxHandler(ledger, cfg) + require.NoError(t, err) + defer handler.txVerificationPool.Shutdown() + defer close(handler.streamVerifierDropped) + // prepare the transactions + numTxns := 3000 + maxGroupSize := 1 + tps := 40000 + invalidRate := float32(0.5) + rateAdjuster := time.Second / time.Duration(tps) + signedTransactionGroups, badTxnGroups := makeSignedTxnGroups(numTxns, numUsers, maxGroupSize, invalidRate, addresses, secrets) + var encodedSignedTransactionGroups []network.IncomingMessage + + encodedSignedTransactionGroups = make([]network.IncomingMessage, 0, numTxns) + for _, stxngrp := range signedTransactionGroups { + data := make([]byte, 0) + for _, stxn := range stxngrp { + data = append(data, protocol.Encode(&stxn)...) + } + encodedSignedTransactionGroups = + append(encodedSignedTransactionGroups, network.IncomingMessage{Data: data}) + } + + // start the handler + handler.Start() + + // send the transactions to the backlog worker + for _, tg := range encodedSignedTransactionGroups[0 : numTxns/2] { + handler.processIncomingTxn(tg) + time.Sleep(rateAdjuster) + } + // stop in a loop to test for possible race conditions + for x := 0; x < 1000; x++ { + handler.Stop() + handler.Start() + } + handler.Stop() + + // send the second half after stopping the txHandler + for _, tg := range encodedSignedTransactionGroups[numTxns/2:] { + handler.processIncomingTxn(tg) + time.Sleep(rateAdjuster) + } + + // check that all the incomming transactions are accounted for + droppeda, droppedb := getDropped() + dropped := droppeda + droppedb + stuckInBLQueue := uint64(len(handler.backlogQueue)) + resultBadTxnCount := transactionMessagesTxnSigVerificationFailed.GetUint64Value() + resultGoodTxnCount := transactionMessagesHandled.GetUint64Value() + shutdownDropCount := transactionMessagesBacklogErr.GetUint64Value() + require.Equal(t, numTxns, int(dropped+resultGoodTxnCount+resultBadTxnCount+stuckInBLQueue+shutdownDropCount)) + + // start the handler again + handler.Start() + defer handler.Stop() + + // no dpulicates are sent at this point + require.Equal(t, 0, int(transactionMessagesAlreadyCommitted.GetUint64Value())) + + // send the same set of transactions again + for _, tg := range encodedSignedTransactionGroups { + handler.processIncomingTxn(tg) + time.Sleep(rateAdjuster) + } + + inputGoodTxnCount := len(signedTransactionGroups) - len(badTxnGroups) + tp := handler.txPool + // Wait untill all the expected transactions are in the pool + for x := 0; x < 100; x++ { + if len(tp.PendingTxGroups()) == inputGoodTxnCount { + break + } + time.Sleep(40 * time.Millisecond) + } + + // check the couters and the accepted transactions + require.Equal(t, inputGoodTxnCount, len(tp.PendingTxGroups())) + for _, txg := range tp.PendingTxGroups() { + u, _ := binary.Uvarint(txg[0].Txn.Note) + _, inBad := badTxnGroups[u] + require.False(t, inBad, "invalid transaction accepted") + } +} diff --git a/ledger/ledger.go b/ledger/ledger.go index 12c4148cf7..c25007221c 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -399,7 +399,7 @@ func (l *Ledger) Close() { // RegisterBlockListeners registers listeners that will be called when a // new block is added to the ledger. -func (l *Ledger) RegisterBlockListeners(listeners []BlockListener) { +func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) { l.notifier.register(listeners) } diff --git a/ledger/ledgercore/misc.go b/ledger/ledgercore/misc.go index 5bc39cd1cc..22ea6b5e74 100644 --- a/ledger/ledgercore/misc.go +++ b/ledger/ledgercore/misc.go @@ -28,3 +28,8 @@ type InitState struct { Accounts map[basics.Address]basics.AccountData GenesisHash crypto.Digest } + +// BlockListener represents an object that needs to get notified on new blocks. +type BlockListener interface { + OnNewBlock(block bookkeeping.Block, delta StateDelta) +} diff --git a/ledger/notifier.go b/ledger/notifier.go index b7d220ac32..b34aeb4728 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -28,11 +28,6 @@ import ( "github.com/algorand/go-algorand/ledger/ledgercore" ) -// BlockListener represents an object that needs to get notified on new blocks. -type BlockListener interface { - OnNewBlock(block bookkeeping.Block, delta ledgercore.StateDelta) -} - type blockDeltaPair struct { block bookkeeping.Block delta ledgercore.StateDelta @@ -41,7 +36,7 @@ type blockDeltaPair struct { type blockNotifier struct { mu deadlock.Mutex cond *sync.Cond - listeners []BlockListener + listeners []ledgercore.BlockListener pendingBlocks []blockDeltaPair running bool // closing is the waitgroup used to synchronize closing the worker goroutine. It's being increased during loadFromDisk, and the worker is responsible to call Done on it once it's aborting it's goroutine. The close function waits on this to complete. @@ -96,7 +91,7 @@ func (bn *blockNotifier) loadFromDisk(l ledgerForTracker, _ basics.Round) error return nil } -func (bn *blockNotifier) register(listeners []BlockListener) { +func (bn *blockNotifier) register(listeners []ledgercore.BlockListener) { bn.mu.Lock() defer bn.mu.Unlock() diff --git a/ledger/simulation/simulator.go b/ledger/simulation/simulator.go index a6b12b6832..b8a136a4b7 100644 --- a/ledger/simulation/simulator.go +++ b/ledger/simulation/simulator.go @@ -150,7 +150,7 @@ func (s Simulator) check(hdr bookkeeping.BlockHeader, txgroup []transactions.Sig } // Verify the signed transactions are well-formed and have valid signatures - _, err = verify.TxnGroup(txnsToVerify, hdr, nil, s.ledger) + _, err = verify.TxnGroup(txnsToVerify, &hdr, nil, s.ledger) if err != nil { return false, InvalidTxGroupError{SimulatorError{err}} } diff --git a/logging/log.go b/logging/log.go index 1849774edd..14759d1cdc 100644 --- a/logging/log.go +++ b/logging/log.go @@ -134,6 +134,9 @@ type Logger interface { // Set the logging version (Info by default) SetLevel(Level) + // Get the logging version + GetLevel() Level + // Sets the output target SetOutput(io.Writer) @@ -285,6 +288,10 @@ func (l logger) WithFields(fields Fields) Logger { } } +func (l logger) GetLevel() (lvl Level) { + return Level(l.entry.Logger.Level) +} + func (l logger) SetLevel(lvl Level) { l.entry.Logger.Level = logrus.Level(lvl) } diff --git a/logging/log_test.go b/logging/log_test.go index 2f4365ccb6..f95027b71e 100644 --- a/logging/log_test.go +++ b/logging/log_test.go @@ -60,6 +60,15 @@ func TestFileOutputNewLogger(t *testing.T) { } +func TestSetGetLevel(t *testing.T) { + partitiontest.PartitionTest(t) + + nl := NewLogger() + require.Equal(t, Info, nl.GetLevel()) + nl.SetLevel(Error) + require.Equal(t, Error, nl.GetLevel()) +} + func TestSetLevelNewLogger(t *testing.T) { partitiontest.PartitionTest(t) a := require.New(t) diff --git a/node/node.go b/node/node.go index 155c5c2ede..0c81191691 100644 --- a/node/node.go +++ b/node/node.go @@ -214,7 +214,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.cryptoPool = execpool.MakePool(node) node.lowPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.LowPriority, node) node.highPriorityCryptoVerificationPool = execpool.MakeBacklog(node.cryptoPool, 2*node.cryptoPool.GetParallelism(), execpool.HighPriority, node) - node.ledger, err = data.LoadLedger(node.log, ledgerPathnamePrefix, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledger.BlockListener{}, cfg) + node.ledger, err = data.LoadLedger(node.log, ledgerPathnamePrefix, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg) if err != nil { log.Errorf("Cannot initialize ledger (%s): %v", ledgerPathnamePrefix, err) return nil, err @@ -222,7 +222,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd node.transactionPool = pools.MakeTransactionPool(node.ledger.Ledger, cfg, node.log) - blockListeners := []ledger.BlockListener{ + blockListeners := []ledgercore.BlockListener{ node.transactionPool, node, } @@ -240,7 +240,11 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd GenesisHash: node.genesisHash, Config: cfg, } - node.txHandler = data.MakeTxHandler(txHandlerOpts) + node.txHandler, err = data.MakeTxHandler(txHandlerOpts) + if err != nil { + log.Errorf("Cannot initialize TxHandler: %v", err) + return nil, err + } // Indexer setup if cfg.IsIndexerActive && cfg.Archival { @@ -514,7 +518,7 @@ func (node *AlgorandFullNode) broadcastSignedTxGroup(txgroup []transactions.Sign return err } - _, err = verify.TxnGroup(txgroup, b, node.ledger.VerifiedTransactionCache(), node.ledger) + _, err = verify.TxnGroup(txgroup, &b, node.ledger.VerifiedTransactionCache(), node.ledger) if err != nil { node.log.Warnf("malformed transaction: %v", err) return err diff --git a/util/execpool/backlog.go b/util/execpool/backlog.go index 966aafe97f..125fb6f923 100644 --- a/util/execpool/backlog.go +++ b/util/execpool/backlog.go @@ -43,6 +43,7 @@ type backlogItemTask struct { type BacklogPool interface { ExecutionPool EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error + BufferSize() (length, capacity int) } // MakeBacklog creates a backlog @@ -94,6 +95,11 @@ func (b *backlog) Enqueue(enqueueCtx context.Context, t ExecFunc, arg interface{ } } +// BufferSize returns the length and the capacity of the buffer +func (b *backlog) BufferSize() (length, capacity int) { + return len(b.buffer), cap(b.buffer) +} + // Enqueue enqueues a single task into the backlog func (b *backlog) EnqueueBacklog(enqueueCtx context.Context, t ExecFunc, arg interface{}, out chan interface{}) error { select {