diff --git a/cmd/algokey/multisig.go b/cmd/algokey/multisig.go index 7c8ae81040..97b5c17319 100644 --- a/cmd/algokey/multisig.go +++ b/cmd/algokey/multisig.go @@ -73,7 +73,7 @@ var multisigCmd = &cobra.Command{ } var outBytes []byte - dec := protocol.NewDecoderBytes(txdata) + dec := protocol.NewMsgpDecoderBytes(txdata) for { var stxn transactions.SignedTxn err = dec.Decode(&stxn) @@ -123,7 +123,7 @@ var appendAuthAddrCmd = &cobra.Command{ } var outBytes []byte - dec := protocol.NewDecoderBytes(txdata) + dec := protocol.NewMsgpDecoderBytes(txdata) var stxn transactions.SignedTxn err = dec.Decode(&stxn) diff --git a/cmd/algokey/sign.go b/cmd/algokey/sign.go index 9afa5a3d34..226df50948 100644 --- a/cmd/algokey/sign.go +++ b/cmd/algokey/sign.go @@ -59,7 +59,7 @@ var signCmd = &cobra.Command{ } var outBytes []byte - dec := protocol.NewDecoderBytes(txdata) + dec := protocol.NewMsgpDecoderBytes(txdata) for { var stxn transactions.SignedTxn err = dec.Decode(&stxn) diff --git a/cmd/goal/clerk.go b/cmd/goal/clerk.go index 2be5ff3322..8ab3293267 100644 --- a/cmd/goal/clerk.go +++ b/cmd/goal/clerk.go @@ -544,7 +544,7 @@ var rawsendCmd = &cobra.Command{ reportErrorf(fileReadError, txFilename, err) } - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) client := ensureAlgodClient(ensureSingleDataDir()) txnIDs := make(map[transactions.Txid]transactions.SignedTxn) @@ -673,7 +673,7 @@ var inspectCmd = &cobra.Command{ reportErrorf(fileReadError, txFilename, err) } - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) count := 0 for { var txn transactions.SignedTxn @@ -773,7 +773,7 @@ var signCmd = &cobra.Command{ } var outData []byte - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) // read the entire file and prepare in-memory copy of each signed transaction, with grouping. txnGroups := make(map[crypto.Digest][]*transactions.SignedTxn) var groupsOrder []crypto.Digest @@ -868,7 +868,7 @@ var groupCmd = &cobra.Command{ reportErrorf(fileReadError, txFilename, err) } - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) var stxns []transactions.SignedTxn var group transactions.TxGroup @@ -920,7 +920,7 @@ var splitCmd = &cobra.Command{ reportErrorf(fileReadError, txFilename, err) } - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) var txns []transactions.SignedTxn for { @@ -1120,7 +1120,7 @@ var dryrunCmd = &cobra.Command{ if err != nil { reportErrorf(fileReadError, txFilename, err) } - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) stxns := make([]transactions.SignedTxn, 0, 10) for { var txn transactions.SignedTxn diff --git a/cmd/goal/multisig.go b/cmd/goal/multisig.go index 6e55fcc9e1..905ae035ca 100644 --- a/cmd/goal/multisig.go +++ b/cmd/goal/multisig.go @@ -96,7 +96,7 @@ var addSigCmd = &cobra.Command{ wh, pw := ensureWalletHandleMaybePassword(dataDir, walletName, true) var outData []byte - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) for { var stxn transactions.SignedTxn err = dec.Decode(&stxn) @@ -245,7 +245,7 @@ var mergeSigCmd = &cobra.Command{ reportErrorf(fileReadError, arg, err) } - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) var txns []transactions.SignedTxn for { var txn transactions.SignedTxn diff --git a/cmd/tealdbg/local.go b/cmd/tealdbg/local.go index c9cba4de39..fa3c5d6fc5 100644 --- a/cmd/tealdbg/local.go +++ b/cmd/tealdbg/local.go @@ -73,7 +73,7 @@ func txnGroupFromParams(dp *DebugParams) (txnGroup []transactions.SignedTxn, err } // 3. Attempt msgp - array of transactions - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) for { var txn transactions.SignedTxn err = dec.Decode(&txn) @@ -124,7 +124,7 @@ func balanceRecordsFromParams(dp *DebugParams) (records []basics.BalanceRecord, } // 3. Attempt msgp - a array of records - dec := protocol.NewDecoderBytes(data) + dec := protocol.NewMsgpDecoderBytes(data) for { var record basics.BalanceRecord err = dec.Decode(&record) diff --git a/data/txHandler.go b/data/txHandler.go index 46248b4edb..cd4c25c8e0 100644 --- a/data/txHandler.go +++ b/data/txHandler.go @@ -219,7 +219,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} { } func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage { - dec := protocol.NewDecoderBytes(rawmsg.Data) + dec := protocol.NewMsgpDecoderBytes(rawmsg.Data) ntx := 0 unverifiedTxGroup := make([]transactions.SignedTxn, 1) for { @@ -265,11 +265,13 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net // Note that this also checks the consistency of the transaction's group hash, // which is required for safe transaction signature caching behavior. func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDone bool) { - txids := make([]transactions.Txid, len(tx.unverifiedTxGroup)) - for i := range tx.unverifiedTxGroup { - txids[i] = tx.unverifiedTxGroup[i].ID() + if logging.Base().IsLevelEnabled(logging.Debug) { + txids := make([]transactions.Txid, len(tx.unverifiedTxGroup)) + for i := range tx.unverifiedTxGroup { + txids[i] = tx.unverifiedTxGroup[i].ID() + } + logging.Base().Debugf("got a tx group with IDs %v", txids) } - logging.Base().Debugf("got a tx group with IDs %v", txids) // do a quick test to check that this transaction could potentially be committed, to reject dup pending transactions err := handler.txPool.Test(tx.unverifiedTxGroup) diff --git a/data/txHandler_test.go b/data/txHandler_test.go index 653cd51e9e..f8ef5f99ce 100644 --- a/data/txHandler_test.go +++ b/data/txHandler_test.go @@ -18,6 +18,7 @@ package data import ( "fmt" + "io" "math/rand" "testing" "time" @@ -32,16 +33,18 @@ import ( "github.com/algorand/go-algorand/data/pools" "github.com/algorand/go-algorand/data/transactions" "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/network" "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" "github.com/algorand/go-algorand/util/execpool" ) func BenchmarkTxHandlerProcessDecoded(b *testing.B) { b.StopTimer() b.ResetTimer() - const numRounds = 10 const numUsers = 100 log := logging.TestingLog(b) + log.SetLevel(logging.Warn) secrets := make([]*crypto.SignatureSecrets, numUsers) addresses := make([]basics.Address, numUsers) @@ -121,3 +124,92 @@ func BenchmarkTimeAfter(b *testing.B) { } } } + +func makeRandomTransactions(num int) ([]transactions.SignedTxn, []byte) { + stxns := make([]transactions.SignedTxn, num) + result := make([]byte, 0, num*200) + for i := 0; i < num; i++ { + var sig crypto.Signature + crypto.RandBytes(sig[:]) + var addr basics.Address + crypto.RandBytes(addr[:]) + stxns[i] = transactions.SignedTxn{ + Sig: sig, + AuthAddr: addr, + Txn: transactions.Transaction{ + Header: transactions.Header{ + Sender: addr, + Fee: basics.MicroAlgos{Raw: crypto.RandUint64()}, + Note: sig[:], + }, + PaymentTxnFields: transactions.PaymentTxnFields{ + Receiver: addr, + Amount: basics.MicroAlgos{Raw: crypto.RandUint64()}, + }, + }, + } + + d2 := protocol.Encode(&stxns[i]) + result = append(result, d2...) + } + return stxns, result +} + +func TestTxHandlerProcessIncomingTxn(t *testing.T) { + partitiontest.PartitionTest(t) + + const numTxns = 11 + handler := TxHandler{ + backlogQueue: make(chan *txBacklogMsg, 1), + } + stxns, blob := makeRandomTransactions(numTxns) + action := handler.processIncomingTxn(network.IncomingMessage{Data: blob}) + require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action) + + require.Equal(t, 1, len(handler.backlogQueue)) + msg := <-handler.backlogQueue + require.Equal(t, numTxns, len(msg.unverifiedTxGroup)) + for i := 0; i < numTxns; i++ { + require.Equal(t, stxns[i], msg.unverifiedTxGroup[i]) + } +} + +const benchTxnNum = 25_000 + +func BenchmarkTxHandlerDecoder(b *testing.B) { + _, blob := makeRandomTransactions(benchTxnNum) + var err error + stxns := make([]transactions.SignedTxn, benchTxnNum+1) + for i := 0; i < b.N; i++ { + dec := protocol.NewDecoderBytes(blob) + var idx int + for { + err = dec.Decode(&stxns[idx]) + if err == io.EOF { + break + } + require.NoError(b, err) + idx++ + } + require.Equal(b, benchTxnNum, idx) + } +} + +func BenchmarkTxHandlerDecoderMsgp(b *testing.B) { + _, blob := makeRandomTransactions(benchTxnNum) + var err error + stxns := make([]transactions.SignedTxn, benchTxnNum+1) + for i := 0; i < b.N; i++ { + dec := protocol.NewMsgpDecoderBytes(blob) + var idx int + for { + err = dec.Decode(&stxns[idx]) + if err == io.EOF { + break + } + require.NoError(b, err) + idx++ + } + require.Equal(b, benchTxnNum, idx) + } +} diff --git a/protocol/codec.go b/protocol/codec.go index 1153b7c612..e74b4b3e8e 100644 --- a/protocol/codec.go +++ b/protocol/codec.go @@ -246,6 +246,35 @@ func NewDecoderBytes(b []byte) Decoder { return codec.NewDecoderBytes(b, CodecHandle) } +// NewMsgpDecoderBytes returns a decoder object reading bytes from [b]. +// that works with msgp-serialized objects +func NewMsgpDecoderBytes(b []byte) *MsgpDecoderBytes { + return &MsgpDecoderBytes{b: b, pos: 0} +} + +// MsgpDecoderBytes is a []byte decoder into msgp-encoded objects +type MsgpDecoderBytes struct { + b []byte + pos int +} + +// Decode an objptr from from a byte stream +func (d *MsgpDecoderBytes) Decode(objptr msgp.Unmarshaler) error { + if !objptr.CanUnmarshalMsg(objptr) { + return fmt.Errorf("object %T cannot be msgp-unmashalled", objptr) + } + if d.pos >= len(d.b) { + return io.EOF + } + + rem, err := objptr.UnmarshalMsg(d.b[d.pos:]) + if err != nil { + return err + } + d.pos = (len(d.b) - len(rem)) + return nil +} + // encodingPool holds temporary byte slice buffers used for encoding messages. var encodingPool = sync.Pool{ New: func() interface{} { diff --git a/protocol/codec_test.go b/protocol/codec_test.go index 1b1d4e6a60..e623f9024b 100644 --- a/protocol/codec_test.go +++ b/protocol/codec_test.go @@ -17,6 +17,9 @@ package protocol import ( + "fmt" + "io" + "math/rand" "reflect" "testing" @@ -200,6 +203,40 @@ func TestEncodeJSON(t *testing.T) { require.True(t, reflect.DeepEqual(v, sv)) } +func TestMsgpDecode(t *testing.T) { + partitiontest.PartitionTest(t) + + var tag Tag = "test" + dec := NewMsgpDecoderBytes([]byte{1, 2, 3}) + err := dec.Decode(&tag) + require.Error(t, err) + + data := EncodeMsgp(tag) + dec = NewMsgpDecoderBytes(data) + var tag2 Tag + err = dec.Decode(&tag2) + require.Equal(t, tag, tag2) + require.NoError(t, err) + + limit := rand.Intn(30) + tags := make([]Tag, limit) + buf := make([]byte, 0, limit*10) + for i := 0; i < limit; i++ { + tags[i] = Tag(fmt.Sprintf("tag_%d", i)) + buf = append(buf, EncodeMsgp(tags[i])...) + } + + dec = NewMsgpDecoderBytes(buf) + for i := 0; i < limit; i++ { + err = dec.Decode(&tag2) + require.NoError(t, err) + require.Equal(t, tags[i], tag2) + } + err = dec.Decode(&tag2) + require.Error(t, err) + require.ErrorIs(t, err, io.EOF) +} + func TestRandomizeObjectWithPtrField(t *testing.T) { partitiontest.PartitionTest(t) diff --git a/protocol/codec_tester.go b/protocol/codec_tester.go index d6d2c375e7..3f441c3c96 100644 --- a/protocol/codec_tester.go +++ b/protocol/codec_tester.go @@ -150,7 +150,7 @@ func checkMsgpAllocBoundDirective(dataType reflect.Type) bool { if err != nil { continue } - if strings.Index(string(fileBytes), fmt.Sprintf("msgp:allocbound %s", dataType.Name())) != -1 { + if strings.Contains(string(fileBytes), fmt.Sprintf("msgp:allocbound %s", dataType.Name())) { // message pack alloc bound definition was found. return true } diff --git a/test/e2e-go/cli/goal/expect/goalFormattingTest.exp b/test/e2e-go/cli/goal/expect/goalFormattingTest.exp index 054406479d..cfa0af63f8 100644 --- a/test/e2e-go/cli/goal/expect/goalFormattingTest.exp +++ b/test/e2e-go/cli/goal/expect/goalFormattingTest.exp @@ -26,7 +26,7 @@ if { [catch { set NON_PRINTABLE_CHARS_WARNING 1 exp_continue } - {Cannot decode transactions from *: msgpack decode error \[pos 33\]: no matching struct field found when decoding stream map with key \[0G\[0K\[33munexpected_key\[0m} { + {Cannot decode transactions from *: Unknown field: \[0G\[0K\[33munexpected_key\[0m} { set CANNOT_DECODE_MESSAGE 1 exp_continue } diff --git a/tools/debug/algodump/main.go b/tools/debug/algodump/main.go index e3ebba9223..f61944ff1a 100644 --- a/tools/debug/algodump/main.go +++ b/tools/debug/algodump/main.go @@ -99,7 +99,7 @@ func (dh *dumpHandler) Handle(msg network.IncomingMessage) network.OutgoingMessa data = fmt.Sprintf("proposal %s", shortdigest(crypto.Digest(p.Block.Hash()))) case protocol.TxnTag: - dec := protocol.NewDecoderBytes(msg.Data) + dec := protocol.NewMsgpDecoderBytes(msg.Data) for { var stx transactions.SignedTxn err := dec.Decode(&stx)