From f79a4451972da3913731ddd7d0d44f825196404b Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 14 Nov 2022 10:35:55 -0500 Subject: [PATCH 01/13] txhandler: add more metric --- data/transactions/verify/txn.go | 74 ++++++++++++++++++++++++++------- data/txHandler.go | 55 ++++++++++++++++++++++-- network/wsNetwork.go | 23 ++++++---- network/wsNetwork_test.go | 10 +++-- util/metrics/metrics.go | 14 +++++++ 5 files changed, 148 insertions(+), 28 deletions(-) diff --git a/data/transactions/verify/txn.go b/data/transactions/verify/txn.go index 4d0bd2717c..b81a798ca7 100644 --- a/data/transactions/verify/txn.go +++ b/data/transactions/verify/txn.go @@ -69,6 +69,46 @@ type GroupContext struct { ledger logic.LedgerForSignature } +type ErrTxGroupInvalidFee struct { + err error +} + +func (e *ErrTxGroupInvalidFee) Error() string { + return e.err.Error() +} + +type ErrTxnBadFormed struct { + err error +} + +func (e *ErrTxnBadFormed) Error() string { + return e.err.Error() +} + +type ErrTxnSigBadFormed struct { + err error +} + +func (e *ErrTxnSigBadFormed) Error() string { + return e.err.Error() +} + +type ErrTxnMsigBadFormed struct { + err error +} + +func (e *ErrTxnMsigBadFormed) Error() string { + return e.err.Error() +} + +type ErrTxnLogicSig struct { + err error +} + +func (e *ErrTxnLogicSig) Error() string { + return e.err.Error() +} + // PrepareGroupContext prepares a verification group parameter object for a given transaction // group. func PrepareGroupContext(group []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) { @@ -108,7 +148,7 @@ func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, } if err := s.Txn.WellFormed(groupCtx.specAddrs, groupCtx.consensusParams); err != nil { - return err + return &ErrTxnBadFormed{err: err} } return stxnCoreChecks(s, txnIdx, groupCtx, verifier) @@ -147,7 +187,7 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo err = txnBatchPrep(&stxn, i, groupCtx, verifier) if err != nil { err = fmt.Errorf("transaction %+v invalid : %w", stxn, err) - return + return nil, err } if stxn.Txn.Type != protocol.StateProofTx { minFeeCount++ @@ -156,19 +196,22 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo } feeNeeded, overflow := basics.OMul(groupCtx.consensusParams.MinTxnFee, minFeeCount) if overflow { - err = fmt.Errorf("txgroup fee requirement overflow") - return + err = &ErrTxGroupInvalidFee{err: fmt.Errorf("txgroup fee requirement overflow")} + return nil, err } // feesPaid may have saturated. That's ok. Since we know // feeNeeded did not overflow, simple comparison tells us // feesPaid was enough. if feesPaid < feeNeeded { - err = fmt.Errorf("txgroup had %d in fees, which is less than the minimum %d * %d", - feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee) - return + err = &ErrTxGroupInvalidFee{ + err: fmt.Errorf( + "txgroup had %d in fees, which is less than the minimum %d * %d", + feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee), + } + return nil, err } - return + return groupCtx, nil } func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) error { @@ -197,10 +240,10 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex return nil } - return errors.New("signedtxn has no sig") + return &ErrTxnSigBadFormed{err: errors.New("signedtxn has no sig")} } if numSigs > 1 { - return errors.New("signedtxn should only have one of Sig or Msig or LogicSig") + return &ErrTxnSigBadFormed{err: errors.New("signedtxn should only have one of Sig or Msig or LogicSig")} } if hasSig { @@ -209,12 +252,15 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex } if hasMsig { if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil { - return fmt.Errorf("multisig validation failed: %w", err) + return &ErrTxnMsigBadFormed{err: fmt.Errorf("multisig validation failed: %w", err)} } return nil } if hasLogicSig { - return logicSigVerify(s, txnIdx, groupCtx) + if err := logicSigVerify(s, txnIdx, groupCtx); err != nil { + return &ErrTxnLogicSig{err: err} + } + return nil } return errors.New("has one mystery sig. WAT?") } @@ -254,7 +300,7 @@ func logicSigSanityCheckBatchPrep(txn *transactions.SignedTxn, groupIndex int, g } if groupIndex < 0 { - return errors.New("Negative groupIndex") + return errors.New("negative groupIndex") } txngroup := transactions.WrapSignedTxnsWithAD(groupCtx.signedGroupTxns) ep := logic.EvalParams{ @@ -310,7 +356,7 @@ func logicSigVerify(txn *transactions.SignedTxn, groupIndex int, groupCtx *Group } if groupIndex < 0 { - return errors.New("Negative groupIndex") + return errors.New("negative groupIndex") } ep := logic.EvalParams{ Proto: &groupCtx.consensusParams, diff --git a/data/txHandler.go b/data/txHandler.go index 8e4a717ede..4cef49d449 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -19,6 +19,7 @@ package data import ( "bytes" "context" + "errors" "fmt" "io" "sync" @@ -45,6 +46,13 @@ var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnByt var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled) var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) +var transactionMessagesDupPreBacklog = metrics.MakeCounter(metrics.TransactionMessagesDupPreBacklog) +var transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee) +var transactionMessagesTxnBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnBadFormed) +var transactionMessagesTxnSigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigBadFormed) +var transactionMessagesTxnMsigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigBadFormed) +var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) +var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) // The txBacklogMsg structure used to track a single incoming transaction from the gossip network, type txBacklogMsg struct { @@ -132,7 +140,7 @@ func (handler *TxHandler) backlogWorker() { if !ok { return } - handler.postprocessCheckedTxn(wi) + handler.postProcessCheckedTxn(wi) // restart the loop so that we could empty out the post verification queue. continue @@ -146,6 +154,7 @@ func (handler *TxHandler) backlogWorker() { return } if handler.checkAlreadyCommitted(wi) { + transactionMessagesDupPreBacklog.Inc(nil) continue } @@ -156,7 +165,7 @@ func (handler *TxHandler) backlogWorker() { if !ok { return } - handler.postprocessCheckedTxn(wi) + handler.postProcessCheckedTxn(wi) case <-handler.ctx.Done(): return @@ -164,9 +173,47 @@ func (handler *TxHandler) backlogWorker() { } } -func (handler *TxHandler) postprocessCheckedTxn(wi *txBacklogMsg) { +func (handler *TxHandler) postProcessReportErrors(err error) { + var feeError *verify.ErrTxGroupInvalidFee + if errors.As(err, &feeError) { + transactionMessagesTxGroupInvalidFee.Inc(nil) + return + } + + var badFormed *verify.ErrTxnBadFormed + if errors.As(err, &badFormed) { + transactionMessagesTxnBadFormed.Inc(nil) + return + } + + var sigBadFormed *verify.ErrTxnSigBadFormed + if errors.As(err, &sigBadFormed) { + transactionMessagesTxnSigBadFormed.Inc(nil) + return + } + + var msigBadFormed *verify.ErrTxnMsigBadFormed + if errors.As(err, &msigBadFormed) { + transactionMessagesTxnMsigBadFormed.Inc(nil) + return + } + + var logicSig *verify.ErrTxnLogicSig + if errors.As(err, &logicSig) { + transactionMessagesTxnLogicSig.Inc(nil) + return + } + + if errors.Is(err, crypto.ErrBatchVerificationFailed) { + transactionMessagesTxnSigVerificationFailed.Inc(nil) + return + } +} + +func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { if wi.verificationErr != nil { // disconnect from peer. + handler.postProcessReportErrors(wi.verificationErr) logging.Base().Warnf("Received a malformed tx group %v: %v", wi.unverifiedTxGroup, wi.verificationErr) handler.net.Disconnect(wi.rawmsg.Sender) return @@ -203,7 +250,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} { latest := handler.ledger.Latest() latestHdr, err := handler.ledger.BlockHdr(latest) if err != nil { - tx.verificationErr = fmt.Errorf("Could not get header for previous block %d: %w", latest, err) + tx.verificationErr = fmt.Errorf("could not get header for previous block %d: %w", latest, err) logging.Base().Warnf("Could not get header for previous block %d: %v", latest, err) } else { // we can't use PaysetGroups here since it's using a execpool like this go-routine and we don't want to deadlock. diff --git a/network/wsNetwork.go b/network/wsNetwork.go index d5f8b111e8..4b6d25bbd6 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1440,11 +1440,13 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { // preparePeerData prepares batches of data for sending. // It performs optional zstd compression for proposal massages -func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest) { +func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, map[protocol.Tag]struct{}) { // determine if there is a payload proposal and peers supporting compressed payloads wantCompression := false + var messageTags map[protocol.Tag]struct{} if prio { wantCompression = checkCanCompress(request, peers) + messageTags = make(map[protocol.Tag]struct{}, 1) } digests := make([]crypto.Digest, len(request.data)) @@ -1463,8 +1465,11 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, digests[i] = crypto.Hash(mbytes) } - if prio && request.tags[i] == protocol.ProposalPayloadTag { - networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil) + if prio { + if request.tags[i] == protocol.ProposalPayloadTag { + networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil) + } + messageTags[request.tags[i]] = struct{}{} } if wantCompression { @@ -1482,7 +1487,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, } } } - return data, dataCompressed, digests + return data, dataCompressed, digests, messageTags } // prio is set if the broadcast is a high-priority broadcast. @@ -1499,7 +1504,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, } start := time.Now() - data, dataWithCompression, digests := wn.preparePeerData(request, prio, peers) + data, dataWithCompression, digests, seenPrioTags := wn.preparePeerData(request, prio, peers) // first send to all the easy outbound peers who don't block, get them started. sentMessageCount := 0 @@ -1515,12 +1520,16 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, // if this peer supports compressed proposals and compressed data batch is filled out, use it ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime) if prio { - networkPrioBatchesPPWithCompression.Inc(nil) + if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok { + networkPrioBatchesPPWithCompression.Inc(nil) + } } } else { ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime) if prio { - networkPrioBatchesPPWithoutCompression.Inc(nil) + if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok { + networkPrioBatchesPPWithoutCompression.Inc(nil) + } } } if ok { diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 37dd646aa3..65eb943301 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -2690,7 +2690,7 @@ func TestParseHostOrURL(t *testing.T) { func TestPreparePeerData(t *testing.T) { partitiontest.PartitionTest(t) - // no comression + // no compression req := broadcastRequest{ tags: []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag}, data: [][]byte{[]byte("test"), []byte("data")}, @@ -2698,12 +2698,13 @@ func TestPreparePeerData(t *testing.T) { peers := []*wsPeer{} wn := WebsocketNetwork{} - data, comp, digests := wn.preparePeerData(req, false, peers) + data, comp, digests, seenPrioTags := wn.preparePeerData(req, false, peers) require.NotEmpty(t, data) require.Empty(t, comp) require.NotEmpty(t, digests) require.Equal(t, len(req.data), len(digests)) require.Equal(t, len(data), len(digests)) + require.Empty(t, seenPrioTags) for i := range data { require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i]) @@ -2717,13 +2718,16 @@ func TestPreparePeerData(t *testing.T) { features: pfCompressedProposal, } peers = []*wsPeer{&peer1, &peer2} - data, comp, digests = wn.preparePeerData(req, true, peers) + data, comp, digests, seenPrioTags = wn.preparePeerData(req, true, peers) require.NotEmpty(t, data) require.NotEmpty(t, comp) require.NotEmpty(t, digests) require.Equal(t, len(req.data), len(digests)) require.Equal(t, len(data), len(digests)) require.Equal(t, len(comp), len(digests)) + require.NotEmpty(t, seenPrioTags) + require.Len(t, seenPrioTags, 1) + require.Contains(t, seenPrioTags, protocol.ProposalPayloadTag) for i := range data { require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i]) diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index f9965d0013..b3916b78f7 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -91,4 +91,18 @@ var ( TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"} // TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool" TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"} + // TransactionMessagesDupPreBacklog "Number of duplicate transaction messages before placing into a backlog" + TransactionMessagesDupPreBacklog = MetricName{Name: "algod_transaction_messages_dup_prebacklog", Description: "Number of duplicate transaction messages before placing into a backlog"} + // TransactionMessagesTxGroupInvalidFee "Number of transaction messages with invalid txgroup fee" + TransactionMessagesTxGroupInvalidFee = MetricName{Name: "algod_transaction_messages_txgroup_invalid_fee", Description: "Number of transaction messages with invalid txgroup fee"} + // TransactionMessagesTxnBadFormed "Number of transaction messages not well formed" + TransactionMessagesTxnBadFormed = MetricName{Name: "algod_transaction_messages_txn_bad_formed", Description: "Number of transaction messages not well formed"} + // TransactionMessagesTxnSigBadFormed "Number of transaction messages with bad formed signature" + TransactionMessagesTxnSigBadFormed = MetricName{Name: "algod_transaction_messages_sig_bad_formed", Description: "Number of transaction messages with bad formed signature"} + // TransactionMessagesTxnMsigBadFormed "Number of transaction messages with bad formed multisig" + TransactionMessagesTxnMsigBadFormed = MetricName{Name: "algod_transaction_messages_msig_bad_formed", Description: "Number of transaction messages with bad formed multisig"} + // TransactionMessagesTxnLogicSig "Number of transaction messages with invalid logic sig" + TransactionMessagesTxnLogicSig = MetricName{Name: "algod_transaction_messages_logic_sig_failed", Description: "Number of transaction messages with invalid logic sig"} + // TransactionMessagesTxnSigVerificationFailed "Number of transaction messages with signature verification failed" + TransactionMessagesTxnSigVerificationFailed = MetricName{Name: "algod_transaction_messages_sig_verify_failed", Description: "Number of transaction messages with signature verification failed"} ) From 6d1f0ad5bb75e855b8157ac0729c020db0eeaeec Mon Sep 17 00:00:00 2001 From: chris erway Date: Mon, 14 Nov 2022 12:49:59 -0500 Subject: [PATCH 02/13] add TxHandler backlog size gauge --- data/txHandler.go | 19 ++++++++++++++++++- util/metrics/metrics.go | 2 ++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/data/txHandler.go b/data/txHandler.go index 4cef49d449..3569a19bd9 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "sync" + "time" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" @@ -53,6 +54,7 @@ var transactionMessagesTxnSigBadFormed = metrics.MakeCounter(metrics.Transaction var transactionMessagesTxnMsigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigBadFormed) var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) +var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize) // The txBacklogMsg structure used to track a single incoming transaction from the gossip network, type txBacklogMsg struct { @@ -109,8 +111,9 @@ func (handler *TxHandler) Start() { handler.net.RegisterHandlers([]network.TaggedMessageHandler{ {Tag: protocol.TxnTag, MessageHandler: network.HandlerFunc(handler.processIncomingTxn)}, }) - handler.backlogWg.Add(1) + handler.backlogWg.Add(2) go handler.backlogWorker() + go handler.backlogGaugeThread() } // Stop suspends the processing of incoming messages at the transaction handler @@ -127,6 +130,20 @@ func reencode(stxns []transactions.SignedTxn) []byte { return bytes.Join(result, nil) } +func (handler *TxHandler) backlogGaugeThread() { + defer handler.backlogWg.Done() + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + transactionMessagesBacklogSizeGauge.Set(float64(len(handler.backlogQueue))) + case <-handler.ctx.Done(): + return + } + } +} + // backlogWorker is the worker go routine that process the incoming messages from the postVerificationQueue and backlogQueue channels // and dispatches them further. func (handler *TxHandler) backlogWorker() { diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index b3916b78f7..98baccc407 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -105,4 +105,6 @@ var ( TransactionMessagesTxnLogicSig = MetricName{Name: "algod_transaction_messages_logic_sig_failed", Description: "Number of transaction messages with invalid logic sig"} // TransactionMessagesTxnSigVerificationFailed "Number of transaction messages with signature verification failed" TransactionMessagesTxnSigVerificationFailed = MetricName{Name: "algod_transaction_messages_sig_verify_failed", Description: "Number of transaction messages with signature verification failed"} + // TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue" + TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"} ) From f234c96f9e9f0564a4a00fe9dbad52fafa3c52e3 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 14 Nov 2022 16:26:00 -0500 Subject: [PATCH 03/13] Make less allocations in postProcessReportErrors --- data/transactions/verify/txn.go | 105 ++++++++++++++++++-------------- data/txHandler.go | 60 ++++++++---------- data/txHandler_test.go | 8 +++ network/wsNetwork_test.go | 2 +- util/metrics/metrics.go | 16 ++--- 5 files changed, 101 insertions(+), 90 deletions(-) diff --git a/data/transactions/verify/txn.go b/data/transactions/verify/txn.go index b81a798ca7..87920955a4 100644 --- a/data/transactions/verify/txn.go +++ b/data/transactions/verify/txn.go @@ -69,44 +69,52 @@ type GroupContext struct { ledger logic.LedgerForSignature } -type ErrTxGroupInvalidFee struct { - err error -} - -func (e *ErrTxGroupInvalidFee) Error() string { - return e.err.Error() -} - -type ErrTxnBadFormed struct { - err error -} - -func (e *ErrTxnBadFormed) Error() string { - return e.err.Error() -} - -type ErrTxnSigBadFormed struct { - err error -} - -func (e *ErrTxnSigBadFormed) Error() string { - return e.err.Error() -} +var errTxGroupInvalidFee = errors.New("txgroup fee requirement overflow") +var errTxnSigHasNoSig = errors.New("signedtxn has no sig") +var errTxnSigNotWellFormed = errors.New("signedtxn should only have one of Sig or Msig or LogicSig") +var errRekeyingNotSupported = errors.New("nonempty AuthAddr but rekeying is not supported") +var errUnknownSignature = errors.New("has one mystery sig. WAT?") + +// TxGroupErrorReason is reason code for ErrTxGroupError +type TxGroupErrorReason int + +const ( + // TxGroupErrorReasonGeneric is a generic (not tracked) reason code + TxGroupErrorReasonGeneric TxGroupErrorReason = iota + // TxGroupErrorReasonNotWellFormed is txn.WellFormed failure + TxGroupErrorReasonNotWellFormed + // TxGroupErrorReasonInvalidFee is invalid fee pooling in transaction group + TxGroupErrorReasonInvalidFee + // TxGroupErrorReasonHasNoSig is for transaction without any signature + TxGroupErrorReasonHasNoSig + // TxGroupErrorReasonSigNotWellFormed defines signature format errors + TxGroupErrorReasonSigNotWellFormed + // TxGroupErrorReasonMsigNotWellFormed defines multisig format errors + TxGroupErrorReasonMsigNotWellFormed + // TxGroupErrorReasonLogicSigFailed defines logic sig validation errors + TxGroupErrorReasonLogicSigFailed +) -type ErrTxnMsigBadFormed struct { - err error +// ErrTxGroupError is an error from txn pre-validation (well form-ness, signature format, etc). +// It can be unwrapped into underlying error, as well as has a specific failure reason code. +type ErrTxGroupError struct { + err error + reason TxGroupErrorReason } -func (e *ErrTxnMsigBadFormed) Error() string { +// Error returns an error message from the underlying error +func (e *ErrTxGroupError) Error() string { return e.err.Error() } -type ErrTxnLogicSig struct { - err error +// Unwrap returns an underlying error +func (e *ErrTxGroupError) Unwrap() error { + return e.err } -func (e *ErrTxnLogicSig) Error() string { - return e.err.Error() +// Reason returns a reason code +func (e *ErrTxGroupError) Reason() TxGroupErrorReason { + return e.reason } // PrepareGroupContext prepares a verification group parameter object for a given transaction @@ -141,14 +149,14 @@ func (g *GroupContext) Equal(other *GroupContext) bool { // txnBatchPrep verifies a SignedTxn having no obviously inconsistent data. // Block-assembly time checks of LogicSig and accounting rules may still block the txn. -// it is the caller responsibility to call batchVerifier.Verify() -func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, verifier *crypto.BatchVerifier) error { +// It is the caller responsibility to call batchVerifier.Verify(). +func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, verifier *crypto.BatchVerifier) *ErrTxGroupError { if !groupCtx.consensusParams.SupportRekeying && (s.AuthAddr != basics.Address{}) { - return errors.New("nonempty AuthAddr but rekeying is not supported") + return &ErrTxGroupError{err: errRekeyingNotSupported, reason: TxGroupErrorReasonGeneric} } if err := s.Txn.WellFormed(groupCtx.specAddrs, groupCtx.consensusParams); err != nil { - return &ErrTxnBadFormed{err: err} + return &ErrTxGroupError{err: err, reason: TxGroupErrorReasonNotWellFormed} } return stxnCoreChecks(s, txnIdx, groupCtx, verifier) @@ -184,9 +192,13 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo minFeeCount := uint64(0) feesPaid := uint64(0) for i, stxn := range stxs { - err = txnBatchPrep(&stxn, i, groupCtx, verifier) - if err != nil { - err = fmt.Errorf("transaction %+v invalid : %w", stxn, err) + prepErr := txnBatchPrep(&stxn, i, groupCtx, verifier) + if prepErr != nil { + // re-wrap the error, take underlying one and copy the reason code + err = &ErrTxGroupError{ + err: fmt.Errorf("transaction %+v invalid : %w", stxn, prepErr.err), + reason: prepErr.reason, + } return nil, err } if stxn.Txn.Type != protocol.StateProofTx { @@ -196,17 +208,18 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo } feeNeeded, overflow := basics.OMul(groupCtx.consensusParams.MinTxnFee, minFeeCount) if overflow { - err = &ErrTxGroupInvalidFee{err: fmt.Errorf("txgroup fee requirement overflow")} + err = &ErrTxGroupError{err: errTxGroupInvalidFee, reason: TxGroupErrorReasonInvalidFee} return nil, err } // feesPaid may have saturated. That's ok. Since we know // feeNeeded did not overflow, simple comparison tells us // feesPaid was enough. if feesPaid < feeNeeded { - err = &ErrTxGroupInvalidFee{ + err = &ErrTxGroupError{ err: fmt.Errorf( "txgroup had %d in fees, which is less than the minimum %d * %d", feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee), + reason: TxGroupErrorReasonInvalidFee, } return nil, err } @@ -214,7 +227,8 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo return groupCtx, nil } -func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) error { +// stxnCoreChecks runs signatures validity checks and enqueues signature into batchVerifier for verification. +func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, batchVerifier *crypto.BatchVerifier) *ErrTxGroupError { numSigs := 0 hasSig := false hasMsig := false @@ -239,11 +253,10 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex if s.Txn.Sender == transactions.StateProofSender && s.Txn.Type == protocol.StateProofTx { return nil } - - return &ErrTxnSigBadFormed{err: errors.New("signedtxn has no sig")} + return &ErrTxGroupError{err: errTxnSigHasNoSig, reason: TxGroupErrorReasonHasNoSig} } if numSigs > 1 { - return &ErrTxnSigBadFormed{err: errors.New("signedtxn should only have one of Sig or Msig or LogicSig")} + return &ErrTxGroupError{err: errTxnSigNotWellFormed, reason: TxGroupErrorReasonSigNotWellFormed} } if hasSig { @@ -252,17 +265,17 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex } if hasMsig { if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil { - return &ErrTxnMsigBadFormed{err: fmt.Errorf("multisig validation failed: %w", err)} + return &ErrTxGroupError{err: fmt.Errorf("multisig validation failed: %w", err), reason: TxGroupErrorReasonMsigNotWellFormed} } return nil } if hasLogicSig { if err := logicSigVerify(s, txnIdx, groupCtx); err != nil { - return &ErrTxnLogicSig{err: err} + return &ErrTxGroupError{err: err, reason: TxGroupErrorReasonLogicSigFailed} } return nil } - return errors.New("has one mystery sig. WAT?") + return &ErrTxGroupError{err: errUnknownSignature, reason: TxGroupErrorReasonGeneric} } // LogicSigSanityCheck checks that the signature is valid and that the program is basically well formed. diff --git a/data/txHandler.go b/data/txHandler.go index 3569a19bd9..f1ae93ec76 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -47,11 +47,11 @@ var txBacklogSize = config.Consensus[protocol.ConsensusCurrentVersion].MaxTxnByt var transactionMessagesHandled = metrics.MakeCounter(metrics.TransactionMessagesHandled) var transactionMessagesDroppedFromBacklog = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromBacklog) var transactionMessagesDroppedFromPool = metrics.MakeCounter(metrics.TransactionMessagesDroppedFromPool) -var transactionMessagesDupPreBacklog = metrics.MakeCounter(metrics.TransactionMessagesDupPreBacklog) +var transactionMessagesAlreadyCommitted = metrics.MakeCounter(metrics.TransactionMessagesAlreadyCommitted) var transactionMessagesTxGroupInvalidFee = metrics.MakeCounter(metrics.TransactionMessagesTxGroupInvalidFee) -var transactionMessagesTxnBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnBadFormed) -var transactionMessagesTxnSigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigBadFormed) -var transactionMessagesTxnMsigBadFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigBadFormed) +var transactionMessagesTxnNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnNotWellFormed) +var transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigNotWellFormed) +var transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed) var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize) @@ -171,7 +171,7 @@ func (handler *TxHandler) backlogWorker() { return } if handler.checkAlreadyCommitted(wi) { - transactionMessagesDupPreBacklog.Inc(nil) + transactionMessagesAlreadyCommitted.Inc(nil) continue } @@ -191,40 +191,30 @@ func (handler *TxHandler) backlogWorker() { } func (handler *TxHandler) postProcessReportErrors(err error) { - var feeError *verify.ErrTxGroupInvalidFee - if errors.As(err, &feeError) { - transactionMessagesTxGroupInvalidFee.Inc(nil) - return - } - - var badFormed *verify.ErrTxnBadFormed - if errors.As(err, &badFormed) { - transactionMessagesTxnBadFormed.Inc(nil) - return - } - - var sigBadFormed *verify.ErrTxnSigBadFormed - if errors.As(err, &sigBadFormed) { - transactionMessagesTxnSigBadFormed.Inc(nil) - return - } - - var msigBadFormed *verify.ErrTxnMsigBadFormed - if errors.As(err, &msigBadFormed) { - transactionMessagesTxnMsigBadFormed.Inc(nil) - return - } - - var logicSig *verify.ErrTxnLogicSig - if errors.As(err, &logicSig) { - transactionMessagesTxnLogicSig.Inc(nil) - return - } - if errors.Is(err, crypto.ErrBatchVerificationFailed) { transactionMessagesTxnSigVerificationFailed.Inc(nil) return } + + var txGroupErr *verify.ErrTxGroupError + if errors.As(err, &txGroupErr) { + txGroupErr = err.(*verify.ErrTxGroupError) + switch txGroupErr.Reason() { + case verify.TxGroupErrorReasonNotWellFormed: + transactionMessagesTxnNotWellFormed.Inc(nil) + case verify.TxGroupErrorReasonInvalidFee: + transactionMessagesTxGroupInvalidFee.Inc(nil) + case verify.TxGroupErrorReasonHasNoSig: + fallthrough + case verify.TxGroupErrorReasonSigNotWellFormed: + transactionMessagesTxnSigNotWellFormed.Inc(nil) + case verify.TxGroupErrorReasonMsigNotWellFormed: + transactionMessagesTxnMsigNotWellFormed.Inc(nil) + case verify.TxGroupErrorReasonLogicSigFailed: + transactionMessagesTxnLogicSig.Inc(nil) + default: + } + } } func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { diff --git a/data/txHandler_test.go b/data/txHandler_test.go index da8fe6469d..693b067876 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -18,6 +18,7 @@ package data import ( "encoding/binary" + "errors" "fmt" "io" "math/rand" @@ -568,3 +569,10 @@ func runHandlerBenchmark(maxGroupSize int, b *testing.B) { close(handler.backlogQueue) wg.Wait() } + +func BenchmarkPostProcessError(b *testing.B) { + var txh TxHandler + + err := errors.New("couldn't find latest resources") + txh.postProcessReportErrors(err) +} diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 65eb943301..61b2c31caa 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -2726,7 +2726,7 @@ func TestPreparePeerData(t *testing.T) { require.Equal(t, len(data), len(digests)) require.Equal(t, len(comp), len(digests)) require.NotEmpty(t, seenPrioTags) - require.Len(t, seenPrioTags, 1) + require.Len(t, seenPrioTags, 2) require.Contains(t, seenPrioTags, protocol.ProposalPayloadTag) for i := range data { diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index 98baccc407..86d5211029 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -91,16 +91,16 @@ var ( TransactionMessagesDroppedFromBacklog = MetricName{Name: "algod_transaction_messages_dropped_backlog", Description: "Number of transaction messages dropped from backlog"} // TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool" TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"} - // TransactionMessagesDupPreBacklog "Number of duplicate transaction messages before placing into a backlog" - TransactionMessagesDupPreBacklog = MetricName{Name: "algod_transaction_messages_dup_prebacklog", Description: "Number of duplicate transaction messages before placing into a backlog"} + // TransactionMessagesAlreadyCommitted "Number of duplicate or error transaction messages before placing into a backlog" + TransactionMessagesAlreadyCommitted = MetricName{Name: "algod_transaction_messages_already_committed", Description: "Number of duplicate or error transaction messages after txhandler backlog"} // TransactionMessagesTxGroupInvalidFee "Number of transaction messages with invalid txgroup fee" TransactionMessagesTxGroupInvalidFee = MetricName{Name: "algod_transaction_messages_txgroup_invalid_fee", Description: "Number of transaction messages with invalid txgroup fee"} - // TransactionMessagesTxnBadFormed "Number of transaction messages not well formed" - TransactionMessagesTxnBadFormed = MetricName{Name: "algod_transaction_messages_txn_bad_formed", Description: "Number of transaction messages not well formed"} - // TransactionMessagesTxnSigBadFormed "Number of transaction messages with bad formed signature" - TransactionMessagesTxnSigBadFormed = MetricName{Name: "algod_transaction_messages_sig_bad_formed", Description: "Number of transaction messages with bad formed signature"} - // TransactionMessagesTxnMsigBadFormed "Number of transaction messages with bad formed multisig" - TransactionMessagesTxnMsigBadFormed = MetricName{Name: "algod_transaction_messages_msig_bad_formed", Description: "Number of transaction messages with bad formed multisig"} + // TransactionMessagesTxnNotWellFormed "Number of transaction messages not well formed" + TransactionMessagesTxnNotWellFormed = MetricName{Name: "algod_transaction_messages_txn_notwell_formed", Description: "Number of transaction messages not well formed"} + // TransactionMessagesTxnSigNotWellFormed "Number of transaction messages with bad formed signature" + TransactionMessagesTxnSigNotWellFormed = MetricName{Name: "algod_transaction_messages_sig_bad_formed", Description: "Number of transaction messages with bad formed signature"} + // TransactionMessagesTxnMsigNotWellFormed "Number of transaction messages with bad formed multisig" + TransactionMessagesTxnMsigNotWellFormed = MetricName{Name: "algod_transaction_messages_msig_bas_formed", Description: "Number of transaction messages with bad formed msig"} // TransactionMessagesTxnLogicSig "Number of transaction messages with invalid logic sig" TransactionMessagesTxnLogicSig = MetricName{Name: "algod_transaction_messages_logic_sig_failed", Description: "Number of transaction messages with invalid logic sig"} // TransactionMessagesTxnSigVerificationFailed "Number of transaction messages with signature verification failed" From a12fe42067ff399ce51f5740352b45388b7b38f4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 14 Nov 2022 19:18:43 -0500 Subject: [PATCH 04/13] Add unit tests --- data/transactions/verify/txn.go | 30 ++++++------- data/txHandler.go | 8 ++-- data/txHandler_test.go | 77 ++++++++++++++++++++++++++++++++- 3 files changed, 93 insertions(+), 22 deletions(-) diff --git a/data/transactions/verify/txn.go b/data/transactions/verify/txn.go index 87920955a4..8ff8f494ba 100644 --- a/data/transactions/verify/txn.go +++ b/data/transactions/verify/txn.go @@ -93,13 +93,16 @@ const ( TxGroupErrorReasonMsigNotWellFormed // TxGroupErrorReasonLogicSigFailed defines logic sig validation errors TxGroupErrorReasonLogicSigFailed + + // TxGroupErrorReasonNumValues is number of enum values + TxGroupErrorReasonNumValues ) // ErrTxGroupError is an error from txn pre-validation (well form-ness, signature format, etc). // It can be unwrapped into underlying error, as well as has a specific failure reason code. type ErrTxGroupError struct { err error - reason TxGroupErrorReason + Reason TxGroupErrorReason } // Error returns an error message from the underlying error @@ -112,11 +115,6 @@ func (e *ErrTxGroupError) Unwrap() error { return e.err } -// Reason returns a reason code -func (e *ErrTxGroupError) Reason() TxGroupErrorReason { - return e.reason -} - // PrepareGroupContext prepares a verification group parameter object for a given transaction // group. func PrepareGroupContext(group []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature) (*GroupContext, error) { @@ -152,11 +150,11 @@ func (g *GroupContext) Equal(other *GroupContext) bool { // It is the caller responsibility to call batchVerifier.Verify(). func txnBatchPrep(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContext, verifier *crypto.BatchVerifier) *ErrTxGroupError { if !groupCtx.consensusParams.SupportRekeying && (s.AuthAddr != basics.Address{}) { - return &ErrTxGroupError{err: errRekeyingNotSupported, reason: TxGroupErrorReasonGeneric} + return &ErrTxGroupError{err: errRekeyingNotSupported, Reason: TxGroupErrorReasonGeneric} } if err := s.Txn.WellFormed(groupCtx.specAddrs, groupCtx.consensusParams); err != nil { - return &ErrTxGroupError{err: err, reason: TxGroupErrorReasonNotWellFormed} + return &ErrTxGroupError{err: err, Reason: TxGroupErrorReasonNotWellFormed} } return stxnCoreChecks(s, txnIdx, groupCtx, verifier) @@ -197,7 +195,7 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo // re-wrap the error, take underlying one and copy the reason code err = &ErrTxGroupError{ err: fmt.Errorf("transaction %+v invalid : %w", stxn, prepErr.err), - reason: prepErr.reason, + Reason: prepErr.Reason, } return nil, err } @@ -208,7 +206,7 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo } feeNeeded, overflow := basics.OMul(groupCtx.consensusParams.MinTxnFee, minFeeCount) if overflow { - err = &ErrTxGroupError{err: errTxGroupInvalidFee, reason: TxGroupErrorReasonInvalidFee} + err = &ErrTxGroupError{err: errTxGroupInvalidFee, Reason: TxGroupErrorReasonInvalidFee} return nil, err } // feesPaid may have saturated. That's ok. Since we know @@ -219,7 +217,7 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo err: fmt.Errorf( "txgroup had %d in fees, which is less than the minimum %d * %d", feesPaid, minFeeCount, groupCtx.consensusParams.MinTxnFee), - reason: TxGroupErrorReasonInvalidFee, + Reason: TxGroupErrorReasonInvalidFee, } return nil, err } @@ -253,10 +251,10 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex if s.Txn.Sender == transactions.StateProofSender && s.Txn.Type == protocol.StateProofTx { return nil } - return &ErrTxGroupError{err: errTxnSigHasNoSig, reason: TxGroupErrorReasonHasNoSig} + return &ErrTxGroupError{err: errTxnSigHasNoSig, Reason: TxGroupErrorReasonHasNoSig} } if numSigs > 1 { - return &ErrTxGroupError{err: errTxnSigNotWellFormed, reason: TxGroupErrorReasonSigNotWellFormed} + return &ErrTxGroupError{err: errTxnSigNotWellFormed, Reason: TxGroupErrorReasonSigNotWellFormed} } if hasSig { @@ -265,17 +263,17 @@ func stxnCoreChecks(s *transactions.SignedTxn, txnIdx int, groupCtx *GroupContex } if hasMsig { if err := crypto.MultisigBatchPrep(s.Txn, crypto.Digest(s.Authorizer()), s.Msig, batchVerifier); err != nil { - return &ErrTxGroupError{err: fmt.Errorf("multisig validation failed: %w", err), reason: TxGroupErrorReasonMsigNotWellFormed} + return &ErrTxGroupError{err: fmt.Errorf("multisig validation failed: %w", err), Reason: TxGroupErrorReasonMsigNotWellFormed} } return nil } if hasLogicSig { if err := logicSigVerify(s, txnIdx, groupCtx); err != nil { - return &ErrTxGroupError{err: err, reason: TxGroupErrorReasonLogicSigFailed} + return &ErrTxGroupError{err: err, Reason: TxGroupErrorReasonLogicSigFailed} } return nil } - return &ErrTxGroupError{err: errUnknownSignature, reason: TxGroupErrorReasonGeneric} + return &ErrTxGroupError{err: errUnknownSignature, Reason: TxGroupErrorReasonGeneric} } // LogicSigSanityCheck checks that the signature is valid and that the program is basically well formed. diff --git a/data/txHandler.go b/data/txHandler.go index f1ae93ec76..7b2112454a 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -124,8 +124,8 @@ func (handler *TxHandler) Stop() { func reencode(stxns []transactions.SignedTxn) []byte { var result [][]byte - for _, stxn := range stxns { - result = append(result, protocol.Encode(&stxn)) + for i := range stxns { + result = append(result, protocol.Encode(&stxns[i])) } return bytes.Join(result, nil) } @@ -198,8 +198,8 @@ func (handler *TxHandler) postProcessReportErrors(err error) { var txGroupErr *verify.ErrTxGroupError if errors.As(err, &txGroupErr) { - txGroupErr = err.(*verify.ErrTxGroupError) - switch txGroupErr.Reason() { + // txGroupErr = err.(*verify.ErrTxGroupError) + switch txGroupErr.Reason { case verify.TxGroupErrorReasonNotWellFormed: transactionMessagesTxnNotWellFormed.Inc(nil) case verify.TxGroupErrorReasonInvalidFee: diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 693b067876..f15be9e24a 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -570,9 +570,82 @@ func runHandlerBenchmark(maxGroupSize int, b *testing.B) { wg.Wait() } -func BenchmarkPostProcessError(b *testing.B) { +func TestPostProcessError(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + collect := func() map[string]float64 { + result := map[string]float64{} + transactionMessagesTxnSigVerificationFailed.AddMetric(result) + transactionMessagesAlreadyCommitted.AddMetric(result) + transactionMessagesTxGroupInvalidFee.AddMetric(result) + // exclude TxGroupErrorReasonNotWellFormed, tested in TestPostProcessErrorWithVerify + // transactionMessagesTxnNotWellFormed.AddMetric(result) + transactionMessagesTxnSigNotWellFormed.AddMetric(result) + transactionMessagesTxnMsigNotWellFormed.AddMetric(result) + transactionMessagesTxnLogicSig.AddMetric(result) + return result + } var txh TxHandler - err := errors.New("couldn't find latest resources") + errSome := errors.New("some error") + txh.postProcessReportErrors(errSome) + result := collect() + require.Len(t, result, 0) + + counter := 0 + for i := verify.TxGroupErrorReasonGeneric; i <= verify.TxGroupErrorReasonLogicSigFailed; i++ { + if i == verify.TxGroupErrorReasonNotWellFormed { + // skip TxGroupErrorReasonNotWellFormed, tested in TestPostProcessErrorWithVerify. + // the test uses global metric counters, skipping makes the test deterministic + continue + } + + errTxGroup := &verify.ErrTxGroupError{Reason: i} + txh.postProcessReportErrors(errTxGroup) + result = collect() + if i == verify.TxGroupErrorReasonSigNotWellFormed { + // TxGroupErrorReasonSigNotWellFormed and TxGroupErrorReasonHasNoSig increment the same metric + counter-- + require.Equal(t, result[metrics.TransactionMessagesTxnSigNotWellFormed.Name], float64(2)) + } + require.Len(t, result, counter) + counter++ + } + + // there are one less metrics than number of tracked values, + // plus one generic non-tracked value, plus skipped TxGroupErrorReasonNotWellFormed + const expected = int(verify.TxGroupErrorReasonNumValues) - 3 + require.Len(t, result, expected) + + errVerify := crypto.ErrBatchVerificationFailed + txh.postProcessReportErrors(errVerify) + result = collect() + require.Len(t, result, expected+1) +} + +func TestPostProcessErrorWithVerify(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + txn := transactions.Transaction{} + stxn := transactions.SignedTxn{Txn: txn} + + hdr := bookkeeping.BlockHeader{ + UpgradeState: bookkeeping.UpgradeState{ + CurrentProtocol: protocol.ConsensusCurrentVersion, + }, + } + _, err := verify.TxnGroup([]transactions.SignedTxn{stxn}, hdr, nil, nil) + var txGroupErr *verify.ErrTxGroupError + require.ErrorAs(t, err, &txGroupErr) + + result := map[string]float64{} + transactionMessagesTxnNotWellFormed.AddMetric(result) + require.Len(t, result, 0) + + var txh TxHandler txh.postProcessReportErrors(err) + transactionMessagesTxnNotWellFormed.AddMetric(result) + require.Len(t, result, 1) } From 5fd63cb1dcb7efe96cedf90066dcae4a672011b4 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 14 Nov 2022 21:26:38 -0500 Subject: [PATCH 05/13] Add txsync counters --- data/txHandler.go | 6 ++++++ util/metrics/metrics.go | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/data/txHandler.go b/data/txHandler.go index 7b2112454a..5d60a5cf34 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -56,6 +56,9 @@ var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMess var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize) +var transactionGroupTxSyncRemember = metrics.MakeCounter(metrics.TransactionGroupTxSyncRemember) +var transactionGroupTxSyncAlreadyCommitted = metrics.MakeCounter(metrics.TransactionGroupTxSyncAlreadyCommitted) + // The txBacklogMsg structure used to track a single incoming transaction from the gossip network, type txBacklogMsg struct { rawmsg *network.IncomingMessage // the raw message from the network @@ -343,6 +346,7 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed unverifiedTxGroup: unverifiedTxGroup, } if handler.checkAlreadyCommitted(tx) { + transactionGroupTxSyncAlreadyCommitted.Inc(nil) return network.OutgoingMessage{}, true } @@ -373,6 +377,8 @@ func (handler *TxHandler) processDecoded(unverifiedTxGroup []transactions.Signed return network.OutgoingMessage{}, true } + transactionGroupTxSyncRemember.Inc(nil) + // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) if err != nil { diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index 86d5211029..91ff056854 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -107,4 +107,9 @@ var ( TransactionMessagesTxnSigVerificationFailed = MetricName{Name: "algod_transaction_messages_sig_verify_failed", Description: "Number of transaction messages with signature verification failed"} // TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue" TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"} + + // TransactionGroupTxSyncRemember "Number of transaction groups remembered via tx sync" + TransactionGroupTxSyncRemember = MetricName{Name: "algod_transaction_group_txsync_remember", Description: "Number of transaction groups remembered via txsync"} + // TransactionGroupTxSyncAlreadyCommitted "Number of duplicate or error transaction groups received via txsync" + TransactionGroupTxSyncAlreadyCommitted = MetricName{Name: "algod_transaction_group_txsync_already_committed", Description: "Number of duplicate or error transaction groups received via txsync"} ) From 96d46e5c21d89b1b07c5fc272c30d48407d94690 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 16 Nov 2022 14:21:37 -0500 Subject: [PATCH 06/13] Add generic algod_transaction_messages_backlog_err metric --- data/txHandler.go | 5 ++++- data/txHandler_test.go | 9 ++++++--- util/metrics/metrics.go | 6 ++++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/data/txHandler.go b/data/txHandler.go index 5d60a5cf34..41fdca28bf 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -54,6 +54,7 @@ var transactionMessagesTxnSigNotWellFormed = metrics.MakeCounter(metrics.Transac var transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.TransactionMessagesTxnMsigNotWellFormed) var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) +var transactionMessagesBacklogErr = metrics.MakeCounter(metrics.TransactionMessagesBacklogErr) var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize) var transactionGroupTxSyncRemember = metrics.MakeCounter(metrics.TransactionGroupTxSyncRemember) @@ -201,7 +202,6 @@ func (handler *TxHandler) postProcessReportErrors(err error) { var txGroupErr *verify.ErrTxGroupError if errors.As(err, &txGroupErr) { - // txGroupErr = err.(*verify.ErrTxGroupError) switch txGroupErr.Reason { case verify.TxGroupErrorReasonNotWellFormed: transactionMessagesTxnNotWellFormed.Inc(nil) @@ -216,7 +216,10 @@ func (handler *TxHandler) postProcessReportErrors(err error) { case verify.TxGroupErrorReasonLogicSigFailed: transactionMessagesTxnLogicSig.Inc(nil) default: + transactionMessagesBacklogErr.Inc(nil) } + } else { + transactionMessagesBacklogErr.Inc(nil) } } diff --git a/data/txHandler_test.go b/data/txHandler_test.go index f15be9e24a..0be0aab0ee 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -570,16 +570,17 @@ func runHandlerBenchmark(maxGroupSize int, b *testing.B) { wg.Wait() } -func TestPostProcessError(t *testing.T) { +func TestTxHandlerPostProcessError(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() collect := func() map[string]float64 { + // collect all specific error reason metrics except TxGroupErrorReasonNotWellFormed, + // it is tested in TestPostProcessErrorWithVerify result := map[string]float64{} transactionMessagesTxnSigVerificationFailed.AddMetric(result) transactionMessagesAlreadyCommitted.AddMetric(result) transactionMessagesTxGroupInvalidFee.AddMetric(result) - // exclude TxGroupErrorReasonNotWellFormed, tested in TestPostProcessErrorWithVerify // transactionMessagesTxnNotWellFormed.AddMetric(result) transactionMessagesTxnSigNotWellFormed.AddMetric(result) transactionMessagesTxnMsigNotWellFormed.AddMetric(result) @@ -592,6 +593,8 @@ func TestPostProcessError(t *testing.T) { txh.postProcessReportErrors(errSome) result := collect() require.Len(t, result, 0) + transactionMessagesBacklogErr.AddMetric(result) + require.Len(t, result, 1) counter := 0 for i := verify.TxGroupErrorReasonGeneric; i <= verify.TxGroupErrorReasonLogicSigFailed; i++ { @@ -624,7 +627,7 @@ func TestPostProcessError(t *testing.T) { require.Len(t, result, expected+1) } -func TestPostProcessErrorWithVerify(t *testing.T) { +func TestTxHandlerPostProcessErrorWithVerify(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index 91ff056854..3a5cad23f6 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -92,7 +92,7 @@ var ( // TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool" TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"} // TransactionMessagesAlreadyCommitted "Number of duplicate or error transaction messages before placing into a backlog" - TransactionMessagesAlreadyCommitted = MetricName{Name: "algod_transaction_messages_already_committed", Description: "Number of duplicate or error transaction messages after txhandler backlog"} + TransactionMessagesAlreadyCommitted = MetricName{Name: "algod_transaction_messages_err_or_committed", Description: "Number of duplicate or error transaction messages after txhandler backlog"} // TransactionMessagesTxGroupInvalidFee "Number of transaction messages with invalid txgroup fee" TransactionMessagesTxGroupInvalidFee = MetricName{Name: "algod_transaction_messages_txgroup_invalid_fee", Description: "Number of transaction messages with invalid txgroup fee"} // TransactionMessagesTxnNotWellFormed "Number of transaction messages not well formed" @@ -105,11 +105,13 @@ var ( TransactionMessagesTxnLogicSig = MetricName{Name: "algod_transaction_messages_logic_sig_failed", Description: "Number of transaction messages with invalid logic sig"} // TransactionMessagesTxnSigVerificationFailed "Number of transaction messages with signature verification failed" TransactionMessagesTxnSigVerificationFailed = MetricName{Name: "algod_transaction_messages_sig_verify_failed", Description: "Number of transaction messages with signature verification failed"} + // TransactionMessagesBacklogErr "Number of transaction messages with some validation error" + TransactionMessagesBacklogErr = MetricName{Name: "algod_transaction_messages_backlog_err", Description: "Number of transaction messages with some validation error"} // TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue" TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"} // TransactionGroupTxSyncRemember "Number of transaction groups remembered via tx sync" TransactionGroupTxSyncRemember = MetricName{Name: "algod_transaction_group_txsync_remember", Description: "Number of transaction groups remembered via txsync"} // TransactionGroupTxSyncAlreadyCommitted "Number of duplicate or error transaction groups received via txsync" - TransactionGroupTxSyncAlreadyCommitted = MetricName{Name: "algod_transaction_group_txsync_already_committed", Description: "Number of duplicate or error transaction groups received via txsync"} + TransactionGroupTxSyncAlreadyCommitted = MetricName{Name: "algod_transaction_group_txsync_err_or_committed", Description: "Number of duplicate or error transaction groups received via txsync"} ) From e5bcf911405d913c9d9da5c539ecdd9f8b1df123 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Wed, 16 Nov 2022 17:44:13 -0500 Subject: [PATCH 07/13] Add algod_transaction_messages_remember --- data/txHandler.go | 3 +++ util/metrics/metrics.go | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/data/txHandler.go b/data/txHandler.go index 41fdca28bf..1be331a2c4 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -55,6 +55,7 @@ var transactionMessagesTxnMsigNotWellFormed = metrics.MakeCounter(metrics.Transa var transactionMessagesTxnLogicSig = metrics.MakeCounter(metrics.TransactionMessagesTxnLogicSig) var transactionMessagesTxnSigVerificationFailed = metrics.MakeCounter(metrics.TransactionMessagesTxnSigVerificationFailed) var transactionMessagesBacklogErr = metrics.MakeCounter(metrics.TransactionMessagesBacklogErr) +var transactionMessagesRemember = metrics.MakeCounter(metrics.TransactionMessagesRemember) var transactionMessagesBacklogSizeGauge = metrics.MakeGauge(metrics.TransactionMessagesBacklogSize) var transactionGroupTxSyncRemember = metrics.MakeCounter(metrics.TransactionGroupTxSyncRemember) @@ -245,6 +246,8 @@ func (handler *TxHandler) postProcessCheckedTxn(wi *txBacklogMsg) { return } + transactionMessagesRemember.Inc(nil) + // if we remembered without any error ( i.e. txpool wasn't full ), then we should pin these transactions. err = handler.ledger.VerifiedTransactionCache().Pin(verifiedTxGroup) if err != nil { diff --git a/util/metrics/metrics.go b/util/metrics/metrics.go index 3a5cad23f6..0752ef62ae 100644 --- a/util/metrics/metrics.go +++ b/util/metrics/metrics.go @@ -92,7 +92,7 @@ var ( // TransactionMessagesDroppedFromPool "Number of transaction messages dropped from pool" TransactionMessagesDroppedFromPool = MetricName{Name: "algod_transaction_messages_dropped_pool", Description: "Number of transaction messages dropped from pool"} // TransactionMessagesAlreadyCommitted "Number of duplicate or error transaction messages before placing into a backlog" - TransactionMessagesAlreadyCommitted = MetricName{Name: "algod_transaction_messages_err_or_committed", Description: "Number of duplicate or error transaction messages after txhandler backlog"} + TransactionMessagesAlreadyCommitted = MetricName{Name: "algod_transaction_messages_err_or_committed", Description: "Number of duplicate or error transaction messages after TX handler backlog"} // TransactionMessagesTxGroupInvalidFee "Number of transaction messages with invalid txgroup fee" TransactionMessagesTxGroupInvalidFee = MetricName{Name: "algod_transaction_messages_txgroup_invalid_fee", Description: "Number of transaction messages with invalid txgroup fee"} // TransactionMessagesTxnNotWellFormed "Number of transaction messages not well formed" @@ -107,6 +107,8 @@ var ( TransactionMessagesTxnSigVerificationFailed = MetricName{Name: "algod_transaction_messages_sig_verify_failed", Description: "Number of transaction messages with signature verification failed"} // TransactionMessagesBacklogErr "Number of transaction messages with some validation error" TransactionMessagesBacklogErr = MetricName{Name: "algod_transaction_messages_backlog_err", Description: "Number of transaction messages with some validation error"} + // TransactionMessagesRemember "Number of transaction messages remembered in TX handler" + TransactionMessagesRemember = MetricName{Name: "algod_transaction_messages_remember", Description: "Number of transaction messages remembered in TX handler"} // TransactionMessagesBacklogSize "Number of transaction messages in the TX handler backlog queue" TransactionMessagesBacklogSize = MetricName{Name: "algod_transaction_messages_backlog_size", Description: "Number of transaction messages in the TX handler backlog queue"} From 17a100bc7caa0e3f1a942ac82929bb35aa5501a4 Mon Sep 17 00:00:00 2001 From: chris erway Date: Thu, 17 Nov 2022 12:00:05 -0500 Subject: [PATCH 08/13] add TX, AV, PP, MI counters to DisconnectPeerEventDetails and PeerConnectionDetails --- logging/telemetryspec/event.go | 4 ++++ network/wsNetwork.go | 13 ++++++++++++- network/wsPeer.go | 10 ++++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) diff --git a/logging/telemetryspec/event.go b/logging/telemetryspec/event.go index b98830bc0e..eb7340e2d7 100644 --- a/logging/telemetryspec/event.go +++ b/logging/telemetryspec/event.go @@ -230,6 +230,8 @@ const DisconnectPeerEvent Event = "DisconnectPeer" type DisconnectPeerEventDetails struct { PeerEventDetails Reason string + // Received message counters for this peer while it was connected + TXCount, MICount, AVCount, PPCount uint64 } // ErrorOutputEvent event @@ -304,6 +306,8 @@ type PeerConnectionDetails struct { MessageDelay int64 `json:",omitempty"` // DuplicateFilterCount is the number of times this peer has sent us a message hash to filter that it had already sent before. DuplicateFilterCount uint64 + // These message counters count received messages from this peer. + TXCount, MICount, AVCount, PPCount uint64 // TCPInfo provides connection measurements from TCP. TCP util.TCPInfo `json:",omitempty"` } diff --git a/network/wsNetwork.go b/network/wsNetwork.go index d9ec65d8fc..abf426357c 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1837,7 +1837,11 @@ func (wn *WebsocketNetwork) getPeerConnectionTelemetryDetails(now time.Time, pee ConnectionDuration: uint(now.Sub(peer.createTime).Seconds()), TelemetryGUID: peer.TelemetryGUID, InstanceName: peer.InstanceName, - DuplicateFilterCount: peer.duplicateFilterCount, + DuplicateFilterCount: atomic.LoadUint64(&peer.duplicateFilterCount), + TXCount: atomic.LoadUint64(&peer.txMessageCount), + MICount: atomic.LoadUint64(&peer.miMessageCount), + AVCount: atomic.LoadUint64(&peer.avMessageCount), + PPCount: atomic.LoadUint64(&peer.ppMessageCount), } // unwrap websocket.Conn, requestTrackedConnection, rejectingLimitListenerConn var uconn net.Conn = peer.conn.UnderlyingConn() @@ -2323,6 +2327,10 @@ func (wn *WebsocketNetwork) removePeer(peer *wsPeer, reason disconnectReason) { telemetryspec.DisconnectPeerEventDetails{ PeerEventDetails: eventDetails, Reason: string(reason), + TXCount: atomic.LoadUint64(&peer.txMessageCount), + MICount: atomic.LoadUint64(&peer.miMessageCount), + AVCount: atomic.LoadUint64(&peer.avMessageCount), + PPCount: atomic.LoadUint64(&peer.ppMessageCount), }) peers.Set(float64(wn.NumPeers())) @@ -2454,6 +2462,7 @@ func (wn *WebsocketNetwork) updateMessagesOfInterestEnc() { atomic.AddUint32(&wn.messagesOfInterestGeneration, 1) var peers []*wsPeer peers, _ = wn.peerSnapshot(peers) + wn.log.Infof("updateMessagesOfInterestEnc maybe sending messagesOfInterest %v", wn.messagesOfInterest) for _, peer := range peers { wn.maybeSendMessagesOfInterest(peer, wn.messagesOfInterestEnc) } @@ -2465,9 +2474,11 @@ func (wn *WebsocketNetwork) postMessagesOfInterestThread() { // if we're not a relay, and not participating, we don't need txn pool wantTXGossip := wn.nodeInfo.IsParticipating() if wantTXGossip && (wn.wantTXGossip != wantTXGossipYes) { + wn.log.Infof("postMessagesOfInterestThread: enabling TX gossip") wn.RegisterMessageInterest(protocol.TxnTag) atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipYes) } else if !wantTXGossip && (wn.wantTXGossip != wantTXGossipNo) { + wn.log.Infof("postMessagesOfInterestThread: disabling TX gossip") wn.DeregisterMessageInterest(protocol.TxnTag) atomic.StoreUint32(&wn.wantTXGossip, wantTXGossipNo) } diff --git a/network/wsPeer.go b/network/wsPeer.go index d551cf1a66..94a1bd2b79 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -190,6 +190,9 @@ type wsPeer struct { // this needs to be 64-bit aligned for use with atomic.AddUint64 on 32-bit platforms. duplicateFilterCount uint64 + // These message counters need to be 64-bit aligned as well. + txMessageCount, miMessageCount, ppMessageCount, avMessageCount uint64 + wsPeerCore // conn will be *websocket.Conn (except in testing) @@ -496,6 +499,7 @@ func (wp *wsPeer) readLoop() { switch msg.Tag { case protocol.MsgOfInterestTag: // try to decode the message-of-interest + atomic.AddUint64(&wp.miMessageCount, 1) if wp.handleMessageOfInterest(msg) { return } @@ -529,6 +533,12 @@ func (wp *wsPeer) readLoop() { // network maintenance message handled immediately instead of handing off to general handlers wp.handleFilterMessage(msg) continue + case protocol.TxnTag: + atomic.AddUint64(&wp.txMessageCount, 1) + case protocol.AgreementVoteTag: + atomic.AddUint64(&wp.avMessageCount, 1) + case protocol.ProposalPayloadTag: + atomic.AddUint64(&wp.ppMessageCount, 1) } if len(msg.Data) > 0 && wp.incomingMsgFilter != nil && dedupSafeTag(msg.Tag) { if wp.incomingMsgFilter.CheckIncomingMessage(msg.Tag, msg.Data, true, true) { From 918869037bfd8de6dc86c08ac71fa1e4ccbd4f8a Mon Sep 17 00:00:00 2001 From: chris erway Date: Thu, 17 Nov 2022 12:42:58 -0500 Subject: [PATCH 09/13] use bool instead of seenPrioTags --- network/wsNetwork.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index abf426357c..bfda37621d 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1440,13 +1440,12 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { // preparePeerData prepares batches of data for sending. // It performs optional zstd compression for proposal massages -func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, map[protocol.Tag]struct{}) { +func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) { // determine if there is a payload proposal and peers supporting compressed payloads wantCompression := false - var messageTags map[protocol.Tag]struct{} + containsCompressableData := false if prio { wantCompression = checkCanCompress(request, peers) - messageTags = make(map[protocol.Tag]struct{}, 1) } digests := make([]crypto.Digest, len(request.data)) @@ -1469,7 +1468,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, if request.tags[i] == protocol.ProposalPayloadTag { networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil) } - messageTags[request.tags[i]] = struct{}{} + containsCompressableData = true } if wantCompression { @@ -1487,7 +1486,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, } } } - return data, dataCompressed, digests, messageTags + return data, dataCompressed, digests, containsCompressableData } // prio is set if the broadcast is a high-priority broadcast. @@ -1504,7 +1503,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, } start := time.Now() - data, dataWithCompression, digests, seenPrioTags := wn.preparePeerData(request, prio, peers) + data, dataWithCompression, digests, containsCompressableData := wn.preparePeerData(request, prio, peers) // first send to all the easy outbound peers who don't block, get them started. sentMessageCount := 0 @@ -1520,14 +1519,14 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, // if this peer supports compressed proposals and compressed data batch is filled out, use it ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime) if prio { - if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok { + if containsCompressableData { networkPrioBatchesPPWithCompression.Inc(nil) } } } else { ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime) if prio { - if _, ok := seenPrioTags[protocol.ProposalPayloadTag]; ok { + if containsCompressableData { networkPrioBatchesPPWithoutCompression.Inc(nil) } } From 937818e78806520b1b7d2f3aa7144843437aed9a Mon Sep 17 00:00:00 2001 From: chris erway Date: Thu, 17 Nov 2022 13:56:16 -0500 Subject: [PATCH 10/13] fix --- network/wsNetwork.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index bfda37621d..135e3309bd 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1467,8 +1467,8 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, if prio { if request.tags[i] == protocol.ProposalPayloadTag { networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil) + containsCompressableData = true } - containsCompressableData = true } if wantCompression { From 2c29d72a4b65518de38d0ac862e34edc7e5300b2 Mon Sep 17 00:00:00 2001 From: chris erway Date: Thu, 17 Nov 2022 14:03:34 -0500 Subject: [PATCH 11/13] add containsPPTag --- network/wsNetwork.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 135e3309bd..7394c4a913 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1443,7 +1443,7 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) { // determine if there is a payload proposal and peers supporting compressed payloads wantCompression := false - containsCompressableData := false + containsPPTag := false if prio { wantCompression = checkCanCompress(request, peers) } @@ -1467,7 +1467,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, if prio { if request.tags[i] == protocol.ProposalPayloadTag { networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil) - containsCompressableData = true + containsPPTag = true } } @@ -1486,7 +1486,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, } } } - return data, dataCompressed, digests, containsCompressableData + return data, dataCompressed, digests, containsPPTag } // prio is set if the broadcast is a high-priority broadcast. @@ -1503,7 +1503,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, } start := time.Now() - data, dataWithCompression, digests, containsCompressableData := wn.preparePeerData(request, prio, peers) + data, dataWithCompression, digests, containsPPTag := wn.preparePeerData(request, prio, peers) // first send to all the easy outbound peers who don't block, get them started. sentMessageCount := 0 @@ -1519,14 +1519,14 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, // if this peer supports compressed proposals and compressed data batch is filled out, use it ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime) if prio { - if containsCompressableData { + if containsPPTag { networkPrioBatchesPPWithCompression.Inc(nil) } } } else { ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime) if prio { - if containsCompressableData { + if containsPPTag { networkPrioBatchesPPWithoutCompression.Inc(nil) } } From ab035caa3d26426c7b1adc07be05945b058c8708 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 17 Nov 2022 15:03:35 -0500 Subject: [PATCH 12/13] Fix tests --- network/wsNetwork.go | 12 ++++++------ network/wsNetwork_test.go | 10 ++++------ 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 7394c4a913..d6778a132f 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1443,7 +1443,7 @@ func (wn *WebsocketNetwork) peerSnapshot(dest []*wsPeer) ([]*wsPeer, int32) { func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) { // determine if there is a payload proposal and peers supporting compressed payloads wantCompression := false - containsPPTag := false + containsPrioPPTag := false if prio { wantCompression = checkCanCompress(request, peers) } @@ -1467,7 +1467,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, if prio { if request.tags[i] == protocol.ProposalPayloadTag { networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil) - containsPPTag = true + containsPrioPPTag = true } } @@ -1486,7 +1486,7 @@ func (wn *WebsocketNetwork) preparePeerData(request broadcastRequest, prio bool, } } } - return data, dataCompressed, digests, containsPPTag + return data, dataCompressed, digests, containsPrioPPTag } // prio is set if the broadcast is a high-priority broadcast. @@ -1503,7 +1503,7 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, } start := time.Now() - data, dataWithCompression, digests, containsPPTag := wn.preparePeerData(request, prio, peers) + data, dataWithCompression, digests, containsPrioPPTag := wn.preparePeerData(request, prio, peers) // first send to all the easy outbound peers who don't block, get them started. sentMessageCount := 0 @@ -1519,14 +1519,14 @@ func (wn *WebsocketNetwork) innerBroadcast(request broadcastRequest, prio bool, // if this peer supports compressed proposals and compressed data batch is filled out, use it ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime) if prio { - if containsPPTag { + if containsPrioPPTag { networkPrioBatchesPPWithCompression.Inc(nil) } } } else { ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime) if prio { - if containsPPTag { + if containsPrioPPTag { networkPrioBatchesPPWithoutCompression.Inc(nil) } } diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index c637bde645..e5f6ec6da5 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -2634,13 +2634,13 @@ func TestPreparePeerData(t *testing.T) { peers := []*wsPeer{} wn := WebsocketNetwork{} - data, comp, digests, seenPrioTags := wn.preparePeerData(req, false, peers) + data, comp, digests, seenPrioPPTag := wn.preparePeerData(req, false, peers) require.NotEmpty(t, data) require.Empty(t, comp) require.NotEmpty(t, digests) require.Equal(t, len(req.data), len(digests)) require.Equal(t, len(data), len(digests)) - require.Empty(t, seenPrioTags) + require.False(t, seenPrioPPTag) for i := range data { require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i]) @@ -2654,16 +2654,14 @@ func TestPreparePeerData(t *testing.T) { features: pfCompressedProposal, } peers = []*wsPeer{&peer1, &peer2} - data, comp, digests, seenPrioTags = wn.preparePeerData(req, true, peers) + data, comp, digests, seenPrioPPTag = wn.preparePeerData(req, true, peers) require.NotEmpty(t, data) require.NotEmpty(t, comp) require.NotEmpty(t, digests) require.Equal(t, len(req.data), len(digests)) require.Equal(t, len(data), len(digests)) require.Equal(t, len(comp), len(digests)) - require.NotEmpty(t, seenPrioTags) - require.Len(t, seenPrioTags, 2) - require.Contains(t, seenPrioTags, protocol.ProposalPayloadTag) + require.True(t, seenPrioPPTag) for i := range data { require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i]) From d9989172ac8dc6bc782cfa117c23cc09a7157bf7 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 17 Nov 2022 15:16:19 -0500 Subject: [PATCH 13/13] do no re-allocate error --- data/transactions/verify/txn.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/data/transactions/verify/txn.go b/data/transactions/verify/txn.go index 8ff8f494ba..115bd95362 100644 --- a/data/transactions/verify/txn.go +++ b/data/transactions/verify/txn.go @@ -181,8 +181,8 @@ func TxnGroup(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, // txnGroupBatchPrep verifies a []SignedTxn having no obviously inconsistent data. // it is the caller responsibility to call batchVerifier.Verify() -func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature, verifier *crypto.BatchVerifier) (groupCtx *GroupContext, err error) { - groupCtx, err = PrepareGroupContext(stxs, contextHdr, ledger) +func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.BlockHeader, ledger logic.LedgerForSignature, verifier *crypto.BatchVerifier) (*GroupContext, error) { + groupCtx, err := PrepareGroupContext(stxs, contextHdr, ledger) if err != nil { return nil, err } @@ -192,12 +192,9 @@ func txnGroupBatchPrep(stxs []transactions.SignedTxn, contextHdr bookkeeping.Blo for i, stxn := range stxs { prepErr := txnBatchPrep(&stxn, i, groupCtx, verifier) if prepErr != nil { - // re-wrap the error, take underlying one and copy the reason code - err = &ErrTxGroupError{ - err: fmt.Errorf("transaction %+v invalid : %w", stxn, prepErr.err), - Reason: prepErr.Reason, - } - return nil, err + // re-wrap the error with more details + prepErr.err = fmt.Errorf("transaction %+v invalid : %w", stxn, prepErr.err) + return nil, prepErr } if stxn.Txn.Type != protocol.StateProofTx { minFeeCount++