Skip to content

Commit

Permalink
pool: resolve review issues (2 of x).
Browse files Browse the repository at this point in the history
  • Loading branch information
dnldd committed Oct 9, 2020
1 parent 20710dc commit da67aa3
Showing 1 changed file with 13 additions and 26 deletions.
39 changes: 13 additions & 26 deletions pool/paymentmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type TxBroadcaster interface {
PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error)
}

// confNotifMsg represents a tx confirmation notification messege.
// confNotifMsg represents a tx confirmation notification message.
type confNotifMsg struct {
resp *walletrpc.ConfirmationNotificationsResponse
err error
Expand Down Expand Up @@ -136,7 +136,7 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) {
binary.LittleEndian.PutUint64(b, 0)
err := pbkt.Put(lastPaymentPaidOn, b)
if err != nil {
desc := fmt.Sprintf("%s: unable to load last payment "+
desc := fmt.Sprintf("%s: unable to persist last payment "+
"paid-on time: %v", funcName, err)
return dbError(ErrPersistEntry, desc)
}
Expand All @@ -152,7 +152,7 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) {
binary.LittleEndian.PutUint32(b, 0)
err := pbkt.Put(lastPaymentHeight, b)
if err != nil {
desc := fmt.Sprintf("%s: unable to load last payment "+
desc := fmt.Sprintf("%s: unable to persist last payment "+
"height: %v", funcName, err)
return dbError(ErrPersistEntry, desc)
}
Expand All @@ -168,7 +168,7 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) {
binary.LittleEndian.PutUint64(b, 0)
err := pbkt.Put(lastPaymentCreatedOn, b)
if err != nil {
desc := fmt.Sprintf("%s: unable to load last payment "+
desc := fmt.Sprintf("%s: unable to persist last payment "+
"created-on time: %v", funcName, err)
return dbError(ErrPersistEntry, desc)
}
Expand Down Expand Up @@ -903,7 +903,9 @@ func (pm *PaymentMgr) applyTxFees(inputs []chainjson.TransactionInput, outputs m
}

// fetchTxConfNotifications is a helper function for fetching tx confirmation
// notifications without blocking.
// notifications. It will return when either a notification or error is
// received from the provided notification source, or when the provided
// context is cancelled.
func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrpc.ConfirmationNotificationsResponse, error)) (*walletrpc.ConfirmationNotificationsResponse, error) {
funcName := "fetchTxConfNotifications"
notifCh := make(chan confNotifMsg)
Expand All @@ -917,7 +919,8 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp

select {
case <-ctx.Done():
log.Tracef("%s: unable to fx tx confirmation notifications", funcName)
log.Tracef("%s: unable to fetch tx confirmation notifications",
funcName)
return nil, ErrContextCancelled
case notif := <-notifCh:
close(notifCh)
Expand Down Expand Up @@ -951,21 +954,13 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]
// Wait for coinbase tx confirmations from the wallet.
maxSpendableConfs := int32(pm.cfg.ActiveNet.CoinbaseMaturity) + 1

txConfs:
for {
select {
case <-ctx.Done():
break txConfs

default:
// Non-blocking receive fallthrough.
}

resp, err := fetchTxConfNotifications(ctx, notifSource)
if err != nil {
if errors.Is(err, ErrContextCancelled) {
// Terminate the tx confirmation process.
continue
desc := fmt.Sprintf("%s: cancelled confirming %d coinbase "+
"transaction(s)", funcName, len(txHashes))
return poolError(ErrContextCancelled, desc)
}
return err
}
Expand All @@ -987,17 +982,9 @@ txConfs:
}

if len(txHashes) == 0 {
break
return nil
}
}

if len(txHashes) != 0 {
desc := fmt.Sprintf("%s: cancelled confirming %d coinbase "+
"transaction(s)", funcName, len(txHashes))
return poolError(ErrContextCancelled, desc)
}

return nil
}

// generatePayoutTxDetails creates the payout transaction inputs and outputs
Expand Down

0 comments on commit da67aa3

Please sign in to comment.