Skip to content

Commit

Permalink
update: rabbit enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
JulianToledano committed Oct 24, 2024
1 parent c840773 commit 5b3db1e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 30 deletions.
24 changes: 13 additions & 11 deletions client/v2/broadcast/comet/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type CometRPC interface {
) (*coretypes.ResultBlockSearch, error)
}

var _ broadcast.Broadcaster = CometBFTBroadcaster{}
var _ broadcast.Broadcaster = &CometBFTBroadcaster{}

// CometBFTBroadcaster implements the Broadcaster interface for CometBFT consensus engine.
type CometBFTBroadcaster struct {
Expand All @@ -69,7 +69,7 @@ type CometBFTBroadcaster struct {
cdc codec.JSONCodec
}

// NewCometBFTBroadcaster creates a new CometBftBroadcaster.
// NewCometBFTBroadcaster creates a new CometBFTBroadcaster.
func NewCometBFTBroadcaster(rpcURL, mode string, cdc codec.JSONCodec) (*CometBFTBroadcaster, error) {
if cdc == nil {
return nil, errors.New("codec can't be nil")
Expand All @@ -91,28 +91,30 @@ func NewCometBFTBroadcaster(rpcURL, mode string, cdc codec.JSONCodec) (*CometBFT
}, nil
}

func (c CometBFTBroadcaster) Consensus() string {
// Consensus returns the consensus engine name used by the broadcaster.
// It always returns "comet" for CometBFTBroadcaster.
func (c *CometBFTBroadcaster) Consensus() string {
return cometBFTConsensus
}

// Broadcast sends a transaction to the network and returns the result.
// returns a byte slice containing the JSON-encoded result and an error if the broadcast failed.
func (c CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) {
func (c *CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]byte, error) {
if c.cdc == nil {
return []byte{}, fmt.Errorf("JSON codec is not initialized")
}

var fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error)
var broadcastFunc func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error)
switch c.mode {
case BroadcastSync:
fn = c.rpcClient.BroadcastTxSync
broadcastFunc = c.rpcClient.BroadcastTxSync
case BroadcastAsync:
fn = c.rpcClient.BroadcastTxAsync
broadcastFunc = c.rpcClient.BroadcastTxAsync
default:
return []byte{}, fmt.Errorf("unknown broadcast mode: %s", c.mode)
}

res, err := c.broadcast(ctx, txBytes, fn)
res, err := c.broadcast(ctx, txBytes, broadcastFunc)
if err != nil {
return []byte{}, err
}
Expand All @@ -121,11 +123,11 @@ func (c CometBFTBroadcaster) Broadcast(ctx context.Context, txBytes []byte) ([]b
}

// broadcast sends a transaction to the CometBFT network using the provided function.
func (c CometBFTBroadcaster) broadcast(ctx context.Context, txbytes []byte,
func (c *CometBFTBroadcaster) broadcast(ctx context.Context, txBytes []byte,
fn func(ctx context.Context, tx cmttypes.Tx) (*coretypes.ResultBroadcastTx, error),
) (*apiacbci.TxResponse, error) {
bResult, err := fn(ctx, txbytes)
if errRes := checkCometError(err, txbytes); err != nil {
bResult, err := fn(ctx, txBytes)
if errRes := checkCometError(err, txBytes); err != nil {
return errRes, nil
}

Expand Down
52 changes: 34 additions & 18 deletions client/v2/broadcast/comet/comet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,32 +66,48 @@ func TestCometBftBroadcaster_Broadcast(t *testing.T) {
cdc: cdc,
}
tests := []struct {
name string
mode string
want []byte
wantErr bool
name string
mode string
setupMock func(*mockrpc.MockCometRPC)
wantErr bool
}{
{
name: "sync",
mode: BroadcastSync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxSync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
{
name: "async",
mode: BroadcastAsync,
setupMock: func(m *mockrpc.MockCometRPC) {
m.EXPECT().BroadcastTxAsync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c.mode = tt.mode
cometMock.EXPECT().BroadcastTxSync(context.Background(), gomock.Any()).Return(&coretypes.ResultBroadcastTx{
Code: 0,
Data: []byte{},
Log: "",
Codespace: "",
Hash: []byte("%�����\u0010\n�T�\u0017\u0016�N^H[5�\u0006}�n�w�/Vi� "),
}, nil)
tt.setupMock(cometMock)
got, err := c.Broadcast(context.Background(), []byte{})
if (err != nil) != tt.wantErr {
t.Errorf("Broadcast() error = %v, wantErr %v", err, tt.wantErr)
return
if tt.wantErr {
require.Error(t, err)
} else {
require.NotNil(t, got)
}
require.NotNil(t, got)
})
}
}
Expand All @@ -103,21 +119,21 @@ func Test_checkCometError(t *testing.T) {
want *apiacbci.TxResponse
}{
{
name: "error in cache",
name: "tx already in cache",
err: errors.New("tx already exists in cache"),
want: &apiacbci.TxResponse{
Code: 19,
},
},
{
name: "error in cache",
name: "mempool is full",
err: mempool.ErrMempoolIsFull{},
want: &apiacbci.TxResponse{
Code: 20,
},
},
{
name: "error in cache",
name: "tx too large",
err: mempool.ErrTxTooLarge{},
want: &apiacbci.TxResponse{
Code: 21,
Expand Down
2 changes: 1 addition & 1 deletion client/v2/tx/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func GenerateOrBroadcastTxCLIWithBroadcaster(ctx client.Context, flagSet *pflag.
}

// GenerateOrBroadcastTxCLI will either generate and print an unsigned transaction
// or sign it and broadcast it using defaults cometBFT broadcaster, returning an error upon failure.
// or sign it and broadcast it using default CometBFT broadcaster, returning an error upon failure.
func GenerateOrBroadcastTxCLI(ctx client.Context, flagSet *pflag.FlagSet, msgs ...transaction.Msg) error {
cometBroadcaster, err := getCometBroadcaster(ctx, flagSet)
if err != nil {
Expand Down

0 comments on commit 5b3db1e

Please sign in to comment.