Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client/v2): broadcast logic #22282

Merged
merged 22 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/v2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [#18461](https://github.com/cosmos/cosmos-sdk/pull/18461) Support governance proposals.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Introduce client/v2 tx factory.
* [#20623](https://github.com/cosmos/cosmos-sdk/pull/20623) Extend client/v2 keyring interface with `KeyType` and `KeyInfo`.
* [#22282](https://github.com/cosmos/cosmos-sdk/pull/22282) Added custom broadcast logic.

### Improvements

Expand Down
15 changes: 15 additions & 0 deletions client/v2/broadcast/broadcaster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package broadcast

import "context"

// Broadcaster defines an interface for broadcasting transactions to the consensus engine.
type Broadcaster interface {
// Broadcast sends a transaction to the network and returns the result.
//
// It returns a byte slice containing the formatted result that will be
// passed to the output writer, and an error if the broadcast failed.
Broadcast(ctx context.Context, txBytes []byte) ([]byte, error)

// Consensus returns the consensus engine identifier for this Broadcaster.
Consensus() string
}
197 changes: 197 additions & 0 deletions client/v2/broadcast/comet/comet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package comet

import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

"github.com/cometbft/cometbft/mempool"
rpcclient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
cmttypes "github.com/cometbft/cometbft/types"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
"cosmossdk.io/client/v2/broadcast"

"github.com/cosmos/cosmos-sdk/codec"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

const (
// BroadcastSync defines a tx broadcasting mode where the client waits for
// a CheckTx execution response only.
BroadcastSync = "sync"
// BroadcastAsync defines a tx broadcasting mode where the client returns
// immediately.
BroadcastAsync = "async"

// cometBftConsensus is the identifier for the CometBFT consensus engine.
cometBFTConsensus = "comet"
)

// CometRPC defines the interface of a CometBFT RPC client needed for
// queries and transaction handling.
type CometRPC interface {
rpcclient.ABCIClient

Validators(ctx context.Context, height *int64, page, perPage *int) (*coretypes.ResultValidators, error)
Status(context.Context) (*coretypes.ResultStatus, error)
Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error)
BlockByHash(ctx context.Context, hash []byte) (*coretypes.ResultBlock, error)
BlockResults(ctx context.Context, height *int64) (*coretypes.ResultBlockResults, error)
BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error)
Commit(ctx context.Context, height *int64) (*coretypes.ResultCommit, error)
Tx(ctx context.Context, hash []byte, prove bool) (*coretypes.ResultTx, error)
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error)
BlockSearch(
ctx context.Context,
query string,
page, perPage *int,
orderBy string,
) (*coretypes.ResultBlockSearch, error)
}

var _ broadcast.Broadcaster = &CometBFTBroadcaster{}

// CometBFTBroadcaster implements the Broadcaster interface for CometBFT consensus engine.
type CometBFTBroadcaster struct {
rpcClient CometRPC
mode string
cdc codec.JSONCodec
}

// NewCometBFTBroadcaster creates a new CometBFTBroadcaster.
func NewCometBFTBroadcaster(rpcURL, mode string, cdc codec.JSONCodec) (*CometBFTBroadcaster, error) {
if cdc == nil {
return nil, errors.New("codec can't be nil")
}

if mode == "" {
mode = BroadcastSync
}

rpcClient, err := rpchttp.New(rpcURL)
if err != nil {
return nil, fmt.Errorf("failed to create CometBft RPC client: %w", err)
}

return &CometBFTBroadcaster{
rpcClient: rpcClient,
mode: mode,
cdc: cdc,
}, nil
}

// Consensus returns the consensus engine name used by the broadcaster.
// It always returns "comet" for CometBFTBroadcaster.
func (c *CometBFTBroadcaster) Consensus() string {
return cometBFTConsensus
}

// Broadcast sends a transaction to the network and returns the result.
// returns a byte slice containing the JSON-encoded result and an error if the broadcast failed.
func (c *CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) {
if c.cdc == nil {
return []byte{}, fmt.Errorf("JSON codec is not initialized")
}

var broadcastFunc func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error)
switch c.mode {
case BroadcastSync:
broadcastFunc = c.rpcClient.BroadcastTxSync
case BroadcastAsync:
broadcastFunc = c.rpcClient.BroadcastTxAsync
default:
return []byte{}, fmt.Errorf("unknown broadcast mode: %s", c.mode)
}

res, err := c.broadcast(ctx, txBytes, broadcastFunc)
if err != nil {
return []byte{}, err
}

return c.cdc.MarshalJSON(res)
}

// broadcast sends a transaction to the CometBFT network using the provided function.
func (c *CometBFTBroadcaster) broadcast(ctx context.Context, txBytes []byte,
fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error),
) (*apiacbci.TxResponse, error) {
bResult, err := fn(ctx, txBytes)
if errRes := checkCometError(err, txBytes); errRes != nil {
return errRes, nil
}

return newResponseFormatBroadcastTx(bResult), err
}
Comment on lines +129 to +135
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix inconsistent error handling in broadcast method.

The method returns both the formatted response and the error on line 134, but the error was already handled in the previous if statement. This could lead to returning a successful response with an error.

func (c *CometBFTBroadcaster) broadcast(ctx context.Context, txBytes []byte,
	fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error),
) (*apiacbci.TxResponse, error) {
	bResult, err := fn(ctx, txBytes)
	if errRes := checkCometError(err, txBytes); errRes != nil {
		return errRes, nil
	}
+	if err != nil {
+		return nil, err
+	}
-	return newResponseFormatBroadcastTx(bResult), err
+	return newResponseFormatBroadcastTx(bResult), nil
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
bResult, err := fn(ctx, txBytes)
if errRes := checkCometError(err, txBytes); errRes != nil {
return errRes, nil
}
return newResponseFormatBroadcastTx(bResult), err
}
bResult, err := fn(ctx, txBytes)
if errRes := checkCometError(err, txBytes); errRes != nil {
return errRes, nil
}
if err != nil {
return nil, err
}
return newResponseFormatBroadcastTx(bResult), nil
}


// checkCometError checks for errors returned by the CometBFT network and returns an appropriate TxResponse.
// It extracts error information and constructs a TxResponse with the error details.
func checkCometError(err error, tx cmttypes.Tx) *apiacbci.TxResponse {
if err == nil {
return nil
}

errStr := strings.ToLower(err.Error())
txHash := fmt.Sprintf("%X", tx.Hash())

switch {
case strings.Contains(errStr, strings.ToLower(mempool.ErrTxInCache.Error())):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxInMempoolCache.ABCICode(),
Codespace: sdkerrors.ErrTxInMempoolCache.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "mempool is full"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrMempoolIsFull.ABCICode(),
Codespace: sdkerrors.ErrMempoolIsFull.Codespace(),
Txhash: txHash,
}

case strings.Contains(errStr, "tx too large"):
return &apiacbci.TxResponse{
Code: sdkerrors.ErrTxTooLarge.ABCICode(),
Codespace: sdkerrors.ErrTxTooLarge.Codespace(),
Txhash: txHash,
}

default:
return nil
}
}

// newResponseFormatBroadcastTx returns a TxResponse given a ResultBroadcastTx from cometbft
func newResponseFormatBroadcastTx(res *coretypes.ResultBroadcastTx) *apiacbci.TxResponse {
if res == nil {
return nil
}

parsedLogs, _ := parseABCILogs(res.Log)

Comment on lines +180 to +181
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider handling log parsing errors.

The unmarshal error from parseABCILogs is silently ignored. Consider either logging the error or including it in the response for better debugging.

-	parsedLogs, _ := parseABCILogs(res.Log)
+	parsedLogs, err := parseABCILogs(res.Log)
+	if err != nil {
+		// Either log the error or include it in the response
+		parsedLogs = []*apiacbci.ABCIMessageLog{{
+			Log: fmt.Sprintf("failed to parse logs: %v", err),
+		}}
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
parsedLogs, _ := parseABCILogs(res.Log)
parsedLogs, err := parseABCILogs(res.Log)
if err != nil {
// Either log the error or include it in the response
parsedLogs = []*apiacbci.ABCIMessageLog{{
Log: fmt.Sprintf("failed to parse logs: %v", err),
}}
}

return &apiacbci.TxResponse{
Code: res.Code,
Codespace: res.Codespace,
Data: res.Data.String(),
RawLog: res.Log,
Logs: parsedLogs,
Txhash: res.Hash.String(),
}
}

// parseABCILogs attempts to parse a stringified ABCI tx log into a slice of
// ABCIMessageLog types. It returns an error upon JSON decoding failure.
func parseABCILogs(logs string) (res []*apiacbci.ABCIMessageLog, err error) {
err = json.Unmarshal([]byte(logs), &res)
return res, err
}
149 changes: 149 additions & 0 deletions client/v2/broadcast/comet/comet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package comet

import (
"context"
"errors"
"testing"

"github.com/cometbft/cometbft/mempool"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

apiacbci "cosmossdk.io/api/cosmos/base/abci/v1beta1"
mockrpc "cosmossdk.io/client/v2/broadcast/comet/testutil"

"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/codec/testutil"
)

var cdc = testutil.CodecOptions{}.NewCodec()

func TestNewCometBftBroadcaster(t *testing.T) {
tests := []struct {
name string
cdc codec.JSONCodec
mode string
want *CometBFTBroadcaster
wantErr bool
}{
{
name: "constructor",
mode: BroadcastSync,
cdc: cdc,
want: &CometBFTBroadcaster{
mode: BroadcastSync,
cdc: cdc,
},
},
{
name: "nil codec",
mode: BroadcastSync,
cdc: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewCometBFTBroadcaster("localhost:26657", tt.mode, tt.cdc)
if tt.wantErr {
require.Error(t, err)
require.Nil(t, got)
} else {
require.Equal(t, got.mode, tt.want.mode)
require.Equal(t, got.cdc, tt.want.cdc)
}
})
}
}

func TestCometBftBroadcaster_Broadcast(t *testing.T) {
ctrl := gomock.NewController(t)
cometMock := mockrpc.NewMockCometRPC(ctrl)
c := CometBFTBroadcaster{
rpcClient: cometMock,
mode: BroadcastSync,
cdc: cdc,
}
tests := []struct {
name string
mode string
setupMock func(*mockrpc.MockCometRPC)
wantErr bool
}{
{
name: "sync",
mode: BroadcastSync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxSync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
{
name: "async",
mode: BroadcastAsync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxAsync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c.mode = tt.mode
tt.setupMock(cometMock)
got, err := c.Broadcast(context.Background(), []byte{})
if tt.wantErr {
require.Error(t, err)
} else {
require.NotNil(t, got)
}
})
}
}

func Test_checkCometError(t *testing.T) {
tests := []struct {
name string
err error
want *apiacbci.TxResponse
}{
{
name: "tx already in cache",
err: errors.New("tx already exists in cache"),
want: &apiacbci.TxResponse{
Code: 19,
},
},
{
name: "mempool is full",
err: mempool.ErrMempoolIsFull{},
want: &apiacbci.TxResponse{
Code: 20,
},
},
{
name: "tx too large",
err: mempool.ErrTxTooLarge{},
want: &apiacbci.TxResponse{
Code: 21,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := checkCometError(tt.err, []byte{})
require.Equal(t, got.Code, tt.want.Code)
})
}
}
Loading
Loading