Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec: new TxHandler byte decoder #4266

Merged
merged 9 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/algokey/multisig.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var multisigCmd = &cobra.Command{
}

var outBytes []byte
dec := protocol.NewDecoderBytes(txdata)
dec := protocol.NewMsgpDecoderBytes(txdata)
for {
var stxn transactions.SignedTxn
err = dec.Decode(&stxn)
Expand Down Expand Up @@ -123,7 +123,7 @@ var appendAuthAddrCmd = &cobra.Command{
}

var outBytes []byte
dec := protocol.NewDecoderBytes(txdata)
dec := protocol.NewMsgpDecoderBytes(txdata)

var stxn transactions.SignedTxn
err = dec.Decode(&stxn)
Expand Down
2 changes: 1 addition & 1 deletion cmd/algokey/sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var signCmd = &cobra.Command{
}

var outBytes []byte
dec := protocol.NewDecoderBytes(txdata)
dec := protocol.NewMsgpDecoderBytes(txdata)
for {
var stxn transactions.SignedTxn
err = dec.Decode(&stxn)
Expand Down
12 changes: 6 additions & 6 deletions cmd/goal/clerk.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ var rawsendCmd = &cobra.Command{
reportErrorf(fileReadError, txFilename, err)
}

dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
client := ensureAlgodClient(ensureSingleDataDir())

txnIDs := make(map[transactions.Txid]transactions.SignedTxn)
Expand Down Expand Up @@ -673,7 +673,7 @@ var inspectCmd = &cobra.Command{
reportErrorf(fileReadError, txFilename, err)
}

dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
count := 0
for {
var txn transactions.SignedTxn
Expand Down Expand Up @@ -773,7 +773,7 @@ var signCmd = &cobra.Command{
}

var outData []byte
dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
// read the entire file and prepare in-memory copy of each signed transaction, with grouping.
txnGroups := make(map[crypto.Digest][]*transactions.SignedTxn)
var groupsOrder []crypto.Digest
Expand Down Expand Up @@ -868,7 +868,7 @@ var groupCmd = &cobra.Command{
reportErrorf(fileReadError, txFilename, err)
}

dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)

var stxns []transactions.SignedTxn
var group transactions.TxGroup
Expand Down Expand Up @@ -920,7 +920,7 @@ var splitCmd = &cobra.Command{
reportErrorf(fileReadError, txFilename, err)
}

dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)

var txns []transactions.SignedTxn
for {
Expand Down Expand Up @@ -1120,7 +1120,7 @@ var dryrunCmd = &cobra.Command{
if err != nil {
reportErrorf(fileReadError, txFilename, err)
}
dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
stxns := make([]transactions.SignedTxn, 0, 10)
for {
var txn transactions.SignedTxn
Expand Down
4 changes: 2 additions & 2 deletions cmd/goal/multisig.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ var addSigCmd = &cobra.Command{
wh, pw := ensureWalletHandleMaybePassword(dataDir, walletName, true)

var outData []byte
dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
for {
var stxn transactions.SignedTxn
err = dec.Decode(&stxn)
Expand Down Expand Up @@ -245,7 +245,7 @@ var mergeSigCmd = &cobra.Command{
reportErrorf(fileReadError, arg, err)
}

dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
var txns []transactions.SignedTxn
for {
var txn transactions.SignedTxn
Expand Down
4 changes: 2 additions & 2 deletions cmd/tealdbg/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func txnGroupFromParams(dp *DebugParams) (txnGroup []transactions.SignedTxn, err
}

// 3. Attempt msgp - array of transactions
dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
for {
var txn transactions.SignedTxn
err = dec.Decode(&txn)
Expand Down Expand Up @@ -124,7 +124,7 @@ func balanceRecordsFromParams(dp *DebugParams) (records []basics.BalanceRecord,
}

// 3. Attempt msgp - a array of records
dec := protocol.NewDecoderBytes(data)
dec := protocol.NewMsgpDecoderBytes(data)
for {
var record basics.BalanceRecord
err = dec.Decode(&record)
Expand Down
12 changes: 7 additions & 5 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (handler *TxHandler) asyncVerifySignature(arg interface{}) interface{} {
}

func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) network.OutgoingMessage {
dec := protocol.NewDecoderBytes(rawmsg.Data)
dec := protocol.NewMsgpDecoderBytes(rawmsg.Data)
ntx := 0
unverifiedTxGroup := make([]transactions.SignedTxn, 1)
for {
Expand Down Expand Up @@ -265,11 +265,13 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
// Note that this also checks the consistency of the transaction's group hash,
// which is required for safe transaction signature caching behavior.
func (handler *TxHandler) checkAlreadyCommitted(tx *txBacklogMsg) (processingDone bool) {
txids := make([]transactions.Txid, len(tx.unverifiedTxGroup))
for i := range tx.unverifiedTxGroup {
txids[i] = tx.unverifiedTxGroup[i].ID()
if logging.Base().IsLevelEnabled(logging.Debug) {
txids := make([]transactions.Txid, len(tx.unverifiedTxGroup))
for i := range tx.unverifiedTxGroup {
txids[i] = tx.unverifiedTxGroup[i].ID()
}
logging.Base().Debugf("got a tx group with IDs %v", txids)
}
logging.Base().Debugf("got a tx group with IDs %v", txids)

// do a quick test to check that this transaction could potentially be committed, to reject dup pending transactions
err := handler.txPool.Test(tx.unverifiedTxGroup)
Expand Down
94 changes: 93 additions & 1 deletion data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package data

import (
"fmt"
"io"
"math/rand"
"testing"
"time"
Expand All @@ -32,16 +33,18 @@ import (
"github.com/algorand/go-algorand/data/pools"
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/execpool"
)

func BenchmarkTxHandlerProcessDecoded(b *testing.B) {
b.StopTimer()
b.ResetTimer()
const numRounds = 10
const numUsers = 100
log := logging.TestingLog(b)
log.SetLevel(logging.Warn)
secrets := make([]*crypto.SignatureSecrets, numUsers)
addresses := make([]basics.Address, numUsers)

Expand Down Expand Up @@ -121,3 +124,92 @@ func BenchmarkTimeAfter(b *testing.B) {
}
}
}

func makeRandomTransactions(num int) ([]transactions.SignedTxn, []byte) {
stxns := make([]transactions.SignedTxn, num)
result := make([]byte, 0, num*200)
for i := 0; i < num; i++ {
var sig crypto.Signature
crypto.RandBytes(sig[:])
var addr basics.Address
crypto.RandBytes(addr[:])
stxns[i] = transactions.SignedTxn{
Sig: sig,
AuthAddr: addr,
Txn: transactions.Transaction{
Header: transactions.Header{
Sender: addr,
Fee: basics.MicroAlgos{Raw: crypto.RandUint64()},
Note: sig[:],
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: addr,
Amount: basics.MicroAlgos{Raw: crypto.RandUint64()},
},
},
}

d2 := protocol.Encode(&stxns[i])
result = append(result, d2...)
}
return stxns, result
}

func TestTxHandlerProcessIncomingTxn(t *testing.T) {
partitiontest.PartitionTest(t)

const numTxns = 11
handler := TxHandler{
backlogQueue: make(chan *txBacklogMsg, 1),
}
stxns, blob := makeRandomTransactions(numTxns)
action := handler.processIncomingTxn(network.IncomingMessage{Data: blob})
require.Equal(t, network.OutgoingMessage{Action: network.Ignore}, action)

require.Equal(t, 1, len(handler.backlogQueue))
msg := <-handler.backlogQueue
require.Equal(t, numTxns, len(msg.unverifiedTxGroup))
for i := 0; i < numTxns; i++ {
require.Equal(t, stxns[i], msg.unverifiedTxGroup[i])
}
}

const benchTxnNum = 25_000

func BenchmarkTxHandlerDecoder(b *testing.B) {
_, blob := makeRandomTransactions(benchTxnNum)
var err error
stxns := make([]transactions.SignedTxn, benchTxnNum+1)
for i := 0; i < b.N; i++ {
dec := protocol.NewDecoderBytes(blob)
var idx int
for {
err = dec.Decode(&stxns[idx])
if err == io.EOF {
break
}
require.NoError(b, err)
idx++
}
require.Equal(b, benchTxnNum, idx)
}
}

func BenchmarkTxHandlerDecoderMsgp(b *testing.B) {
_, blob := makeRandomTransactions(benchTxnNum)
var err error
stxns := make([]transactions.SignedTxn, benchTxnNum+1)
for i := 0; i < b.N; i++ {
dec := protocol.NewMsgpDecoderBytes(blob)
var idx int
for {
err = dec.Decode(&stxns[idx])
if err == io.EOF {
break
}
require.NoError(b, err)
idx++
}
require.Equal(b, benchTxnNum, idx)
}
}
29 changes: 29 additions & 0 deletions protocol/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,35 @@ func NewDecoderBytes(b []byte) Decoder {
return codec.NewDecoderBytes(b, CodecHandle)
}

// NewMsgpDecoderBytes returns a decoder object reading bytes from [b].
// that works with msgp-serialized objects
func NewMsgpDecoderBytes(b []byte) *MsgpDecoderBytes {
return &MsgpDecoderBytes{b: b, pos: 0}
}

// MsgpDecoderBytes is a []byte decoder into msgp-encoded objects
type MsgpDecoderBytes struct {
b []byte
pos int
}

// Decode an objptr from from a byte stream
func (d *MsgpDecoderBytes) Decode(objptr msgp.Unmarshaler) error {
if !objptr.CanUnmarshalMsg(objptr) {
return fmt.Errorf("object %T cannot be msgp-unmashalled", objptr)
}
if d.pos >= len(d.b) {
return io.EOF
}

rem, err := objptr.UnmarshalMsg(d.b[d.pos:])
if err != nil {
return err
}
d.pos = (len(d.b) - len(rem))
return nil
}

// encodingPool holds temporary byte slice buffers used for encoding messages.
var encodingPool = sync.Pool{
New: func() interface{} {
Expand Down
37 changes: 37 additions & 0 deletions protocol/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package protocol

import (
"fmt"
"io"
"math/rand"
"reflect"
"testing"

Expand Down Expand Up @@ -200,6 +203,40 @@ func TestEncodeJSON(t *testing.T) {
require.True(t, reflect.DeepEqual(v, sv))
}

func TestMsgpDecode(t *testing.T) {
partitiontest.PartitionTest(t)

var tag Tag = "test"
dec := NewMsgpDecoderBytes([]byte{1, 2, 3})
err := dec.Decode(&tag)
require.Error(t, err)

data := EncodeMsgp(tag)
dec = NewMsgpDecoderBytes(data)
var tag2 Tag
err = dec.Decode(&tag2)
require.Equal(t, tag, tag2)
require.NoError(t, err)

limit := rand.Intn(30)
tags := make([]Tag, limit)
buf := make([]byte, 0, limit*10)
for i := 0; i < limit; i++ {
tags[i] = Tag(fmt.Sprintf("tag_%d", i))
buf = append(buf, EncodeMsgp(tags[i])...)
}

dec = NewMsgpDecoderBytes(buf)
for i := 0; i < limit; i++ {
err = dec.Decode(&tag2)
require.NoError(t, err)
require.Equal(t, tags[i], tag2)
}
err = dec.Decode(&tag2)
require.Error(t, err)
require.ErrorIs(t, err, io.EOF)
}

func TestRandomizeObjectWithPtrField(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down
2 changes: 1 addition & 1 deletion protocol/codec_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func checkMsgpAllocBoundDirective(dataType reflect.Type) bool {
if err != nil {
continue
}
if strings.Index(string(fileBytes), fmt.Sprintf("msgp:allocbound %s", dataType.Name())) != -1 {
if strings.Contains(string(fileBytes), fmt.Sprintf("msgp:allocbound %s", dataType.Name())) {
// message pack alloc bound definition was found.
return true
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e-go/cli/goal/expect/goalFormattingTest.exp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ if { [catch {
set NON_PRINTABLE_CHARS_WARNING 1
exp_continue
}
{Cannot decode transactions from *: msgpack decode error \[pos 33\]: no matching struct field found when decoding stream map with key \[0G\[0K\[33munexpected_key\[0m} {
{Cannot decode transactions from *: Unknown field: \[0G\[0K\[33munexpected_key\[0m} {
set CANNOT_DECODE_MESSAGE 1
exp_continue
}
Expand Down
2 changes: 1 addition & 1 deletion tools/debug/algodump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (dh *dumpHandler) Handle(msg network.IncomingMessage) network.OutgoingMessa
data = fmt.Sprintf("proposal %s", shortdigest(crypto.Digest(p.Block.Hash())))

case protocol.TxnTag:
dec := protocol.NewDecoderBytes(msg.Data)
dec := protocol.NewMsgpDecoderBytes(msg.Data)
for {
var stx transactions.SignedTxn
err := dec.Decode(&stx)
Expand Down