Skip to content

Commit

Permalink
[R4R]api: expose memo and source (#39)
Browse files Browse the repository at this point in the history
* api: expose memo and source

* rpc: use atomic value to avoid currently read/write

* use options
  • Loading branch information
unclezoro authored Jun 3, 2019
1 parent cea7274 commit 6442ea1
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 68 deletions.
51 changes: 32 additions & 19 deletions client/rpc/ws_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"net"
"net/http"
"sync/atomic"

"fmt"
"strings"
Expand Down Expand Up @@ -492,7 +493,7 @@ type WSClient struct {
wg sync.WaitGroup

mtx sync.RWMutex
dialing bool
dialing atomic.Value

// Maximum reconnect attempts (0 or greater; default: 25).
// Less than 0 means always try to reconnect.
Expand Down Expand Up @@ -532,8 +533,8 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli
writeWait: defaultWriteWait,
pingPeriod: defaultPingPeriod,
protocol: protocol,
dialing: true,
}
c.dialing.Store(true)
c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
for _, option := range options {
option(c)
Expand Down Expand Up @@ -572,7 +573,7 @@ func (c *WSClient) OnStart() error {
c.wg.Add(1)
go c.dialRoutine()
} else {
c.dialing = false
c.dialing.Store(false)
}

c.startReadWriteRoutines()
Expand All @@ -596,7 +597,7 @@ func (c *WSClient) Stop() error {

// IsDialing returns true if the client is dialing right now.
func (c *WSClient) IsDialing() bool {
return c.dialing
return c.dialing.Load().(bool)
}

// IsActive returns true if the client is running and not dialing.
Expand Down Expand Up @@ -639,6 +640,18 @@ func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, param
return c.Send(ctx, request)
}

func (c *WSClient) GetConnection() *websocket.Conn {
c.mtx.RLock()
defer c.mtx.RUnlock()
return c.conn
}

func (c *WSClient) SetConnection(conn *websocket.Conn) {
c.mtx.Lock()
defer c.mtx.Unlock()
c.conn = conn
}

func (c *WSClient) Codec() *amino.Codec {
return c.cdc
}
Expand Down Expand Up @@ -672,7 +685,7 @@ func (c *WSClient) dial() error {
if err != nil {
return err
}
c.conn = conn
c.SetConnection(conn)
return nil
}

Expand All @@ -681,9 +694,9 @@ func (c *WSClient) dial() error {
func (c *WSClient) reconnect() error {
attempt := 0

c.dialing = true
c.dialing.Store(true)
defer func() {
c.dialing = false
c.dialing.Store(false)
}()
backOffDuration := 1 * time.Second
for {
Expand Down Expand Up @@ -722,9 +735,9 @@ func (c *WSClient) startReadWriteRoutines() {
func (c *WSClient) dialRoutine() {
dialTicker := time.NewTicker(defaultDialPeriod)
defer dialTicker.Stop()
c.dialing = true
c.dialing.Store(true)
defer func() {
c.dialing = false
c.dialing.Store(false)
c.wg.Done()
}()
for {
Expand Down Expand Up @@ -780,7 +793,7 @@ func (c *WSClient) writeRoutine() {

defer func() {
ticker.Stop()
if err := c.conn.Close(); err != nil {
if err := c.GetConnection().Close(); err != nil {
// ignore error; it will trigger in tests
// likely because it's closing an already closed connection
}
Expand All @@ -790,28 +803,28 @@ func (c *WSClient) writeRoutine() {
select {
case request := <-c.send:
if c.writeWait > 0 {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
if err := c.GetConnection().SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
c.Logger.Error("failed to set write deadline", "err", err)
}
}
if err := c.conn.WriteJSON(request); err != nil {
if err := c.GetConnection().WriteJSON(request); err != nil {
c.Logger.Error("failed to send request", "err", err)
c.reconnectAfter <- err
}
case <-ticker.C:
if c.writeWait > 0 {
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
if err := c.GetConnection().SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
c.Logger.Error("failed to set write deadline", "err", err)
}
}
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
if err := c.GetConnection().WriteMessage(websocket.PingMessage, []byte{}); err != nil {
c.Logger.Error("failed to write ping", "err", err)
c.reconnectAfter <- err
continue
}
c.Logger.Debug("sent ping")
case <-c.Quit():
if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
if err := c.GetConnection().WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
c.Logger.Error("failed to write message", "err", err)
}
return
Expand All @@ -823,25 +836,25 @@ func (c *WSClient) writeRoutine() {
// executing all reads from this goroutine.
func (c *WSClient) readRoutine() {
defer func() {
if err := c.conn.Close(); err != nil {
if err := c.GetConnection().Close(); err != nil {
// ignore error; it will trigger in tests
// likely because it's closing an already closed connection
}
}()
c.wg.Wait()
c.conn.SetPongHandler(func(string) error {
c.GetConnection().SetPongHandler(func(string) error {
c.Logger.Debug("got pong")
return nil
})

for {
// reset deadline for every message type (control or data)
if c.readWait > 0 {
if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
if err := c.GetConnection().SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
c.Logger.Error("failed to set read deadline", "err", err)
}
}
_, data, err := c.conn.ReadMessage()
_, data, err := c.GetConnection().ReadMessage()
if err != nil {
c.Logger.Error("failed to read response", "err", err)
c.reconnectAfter <- err
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/burn_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type BurnTokenResult struct {
tx.TxCommitResult
}

func (c *client) BurnToken(symbol string, amount int64, sync bool) (*BurnTokenResult, error) {
func (c *client) BurnToken(symbol string, amount int64, sync bool, options ...Option) (*BurnTokenResult, error) {
if symbol == "" {
return nil, fmt.Errorf("Burn token symbol can'c be empty ")
}
Expand All @@ -26,7 +26,7 @@ func (c *client) BurnToken(symbol string, amount int64, sync bool) (*BurnTokenRe
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(burnMsg, sync)
commit, err := c.broadcastMsg(burnMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/cancel_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type CancelOrderResult struct {
tx.TxCommitResult
}

func (c *client) CancelOrder(baseAssetSymbol, quoteAssetSymbol, refId string, sync bool) (*CancelOrderResult, error) {
func (c *client) CancelOrder(baseAssetSymbol, quoteAssetSymbol, refId string, sync bool, options ...Option) (*CancelOrderResult, error) {
if baseAssetSymbol == "" || quoteAssetSymbol == "" {
return nil, fmt.Errorf("BaseAssetSymbol or QuoteAssetSymbol is missing. ")
}
Expand All @@ -27,7 +27,7 @@ func (c *client) CancelOrder(baseAssetSymbol, quoteAssetSymbol, refId string, sy
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(cancelOrderMsg, sync)
commit, err := c.broadcastMsg(cancelOrderMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/create_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type CreateOrderResult struct {
OrderId string
}

func (c *client) CreateOrder(baseAssetSymbol, quoteAssetSymbol string, op int8, price, quantity int64, sync bool) (*CreateOrderResult, error) {
func (c *client) CreateOrder(baseAssetSymbol, quoteAssetSymbol string, op int8, price, quantity int64, sync bool, options ...Option) (*CreateOrderResult, error) {
if baseAssetSymbol == "" || quoteAssetSymbol == "" {
return nil, fmt.Errorf("BaseAssetSymbol or QuoteAssetSymbol is missing. ")
}
Expand All @@ -37,7 +37,7 @@ func (c *client) CreateOrder(baseAssetSymbol, quoteAssetSymbol string, op int8,
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(newOrderMsg, sync)
commit, err := c.broadcastMsg(newOrderMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/deposit_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ type DepositProposalResult struct {
tx.TxCommitResult
}

func (c *client) DepositProposal(proposalID int64, amount int64, sync bool) (*DepositProposalResult, error) {
func (c *client) DepositProposal(proposalID int64, amount int64, sync bool, options ...Option) (*DepositProposalResult, error) {
fromAddr := c.keyManager.GetAddr()
coins := ctypes.Coins{ctypes.Coin{Denom: types.NativeSymbol, Amount: amount}}
depositMsg := msg.NewDepositMsg(fromAddr, proposalID, coins)
err := depositMsg.ValidateBasic()
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(depositMsg, sync)
commit, err := c.broadcastMsg(depositMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/freeze_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type FreezeTokenResult struct {
tx.TxCommitResult
}

func (c *client) FreezeToken(symbol string, amount int64, sync bool) (*FreezeTokenResult, error) {
func (c *client) FreezeToken(symbol string, amount int64, sync bool, options ...Option) (*FreezeTokenResult, error) {
if symbol == "" {
return nil, fmt.Errorf("Freeze token symbol can'c be empty ")
}
Expand All @@ -26,7 +26,7 @@ func (c *client) FreezeToken(symbol string, amount int64, sync bool) (*FreezeTok
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(freezeMsg, sync)
commit, err := c.broadcastMsg(freezeMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/issue_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type IssueTokenValue struct {
Owner string `json:"owner"`
}

func (c *client) IssueToken(name, symbol string, supply int64, sync bool, mintable bool) (*IssueTokenResult, error) {
func (c *client) IssueToken(name, symbol string, supply int64, sync bool, mintable bool, options ...Option) (*IssueTokenResult, error) {
if symbol == "" {
return nil, fmt.Errorf("Freeze token symbol can'c be empty ")
}
Expand All @@ -38,7 +38,7 @@ func (c *client) IssueToken(name, symbol string, supply int64, sync bool, mintab
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(issueMsg, sync)
commit, err := c.broadcastMsg(issueMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/list_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ type ListPairResult struct {
tx.TxCommitResult
}

func (c *client) ListPair(proposalId int64, baseAssetSymbol string, quoteAssetSymbol string, initPrice int64, sync bool) (*ListPairResult, error) {
func (c *client) ListPair(proposalId int64, baseAssetSymbol string, quoteAssetSymbol string, initPrice int64, sync bool, options ...Option) (*ListPairResult, error) {
fromAddr := c.keyManager.GetAddr()

burnMsg := msg.NewDexListMsg(fromAddr, proposalId, baseAssetSymbol, quoteAssetSymbol, initPrice)
err := burnMsg.ValidateBasic()
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(burnMsg, sync)
commit, err := c.broadcastMsg(burnMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/mint_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type MintTokenResult struct {
tx.TxCommitResult
}

func (c *client) MintToken(symbol string, amount int64, sync bool) (*MintTokenResult, error) {
func (c *client) MintToken(symbol string, amount int64, sync bool, options ...Option) (*MintTokenResult, error) {
if symbol == "" {
return nil, fmt.Errorf("Freeze token symbol can'c be empty ")
}
Expand All @@ -26,7 +26,7 @@ func (c *client) MintToken(symbol string, amount int64, sync bool) (*MintTokenRe
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(mintMsg, sync)
commit, err := c.broadcastMsg(mintMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions client/transaction/send_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type SendTokenResult struct {
tx.TxCommitResult
}

func (c *client) SendToken(transfers []msg.Transfer, sync bool) (*SendTokenResult, error) {
func (c *client) SendToken(transfers []msg.Transfer, sync bool, options ...Option) (*SendTokenResult, error) {
fromAddr := c.keyManager.GetAddr()
fromCoins := types.Coins{}
for _, t := range transfers {
Expand All @@ -21,7 +21,7 @@ func (c *client) SendToken(transfers []msg.Transfer, sync bool) (*SendTokenResul
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(sendMsg, sync)
commit, err := c.broadcastMsg(sendMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions client/transaction/submit_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@ type SubmitProposalResult struct {
ProposalId int64 `json:"proposal_id"`
}

func (c *client) SubmitListPairProposal(title string, param msg.ListTradingPairParams, initialDeposit int64, votingPeriod time.Duration, sync bool) (*SubmitProposalResult, error) {
func (c *client) SubmitListPairProposal(title string, param msg.ListTradingPairParams, initialDeposit int64, votingPeriod time.Duration, sync bool, options ...Option) (*SubmitProposalResult, error) {
bz, err := json.Marshal(&param)
if err != nil {
return nil, err
}
return c.SubmitProposal(title, string(bz), msg.ProposalTypeListTradingPair, initialDeposit, votingPeriod, sync)
return c.SubmitProposal(title, string(bz), msg.ProposalTypeListTradingPair, initialDeposit, votingPeriod, sync, options...)
}

func (c *client) SubmitProposal(title string, description string, proposalType msg.ProposalKind, initialDeposit int64, votingPeriod time.Duration, sync bool) (*SubmitProposalResult, error) {
func (c *client) SubmitProposal(title string, description string, proposalType msg.ProposalKind, initialDeposit int64, votingPeriod time.Duration, sync bool, options ...Option) (*SubmitProposalResult, error) {
fromAddr := c.keyManager.GetAddr()
coins := ctypes.Coins{ctypes.Coin{Denom: types.NativeSymbol, Amount: initialDeposit}}
proposalMsg := msg.NewMsgSubmitProposal(title, description, proposalType, fromAddr, coins, votingPeriod)
err := proposalMsg.ValidateBasic()
if err != nil {
return nil, err
}
commit, err := c.broadcastMsg(proposalMsg, sync)
commit, err := c.broadcastMsg(proposalMsg, sync, options...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 6442ea1

Please sign in to comment.