Skip to content

Commit

Permalink
pool: cleanly terminate tx confirmation streams.
Browse files Browse the repository at this point in the history
This upates the tx confirmation streaming process to be
non-blocking in order to terminate cleanly on context cancellation.
  • Loading branch information
dnldd committed Oct 6, 2020
1 parent b310c0d commit 9828e99
Showing 1 changed file with 40 additions and 4 deletions.
44 changes: 40 additions & 4 deletions pool/paymentmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ type TxBroadcaster interface {
PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error)
}

// confNotifMsg represents a tx confirmation notification messege.
type confNotifMsg struct {
resp *walletrpc.ConfirmationNotificationsResponse
err error
}

// PaymentMgrConfig contains all of the configuration values which should be
// provided when creating a new instance of PaymentMgr.
type PaymentMgrConfig struct {
Expand Down Expand Up @@ -896,6 +902,34 @@ func (pm *PaymentMgr) applyTxFees(inputs []chainjson.TransactionInput, outputs m
return sansFees, estFee, nil
}

// fetchTxConfNotifications is a helper function for fetching tx confirmation
// notifications without blocking.
func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrpc.ConfirmationNotificationsResponse, error)) (*walletrpc.ConfirmationNotificationsResponse, error) {
funcName := "fetchTxConfNotifications"
notifCh := make(chan confNotifMsg)
go func(ch chan confNotifMsg) {
resp, err := notifSource()
ch <- confNotifMsg{
resp: resp,
err: err,
}
}(notifCh)

select {
case <-ctx.Done():
log.Tracef("%s: unable to fx tx confirmation notifications", funcName)
return nil, ErrContextCancelled
case notif := <-notifCh:
close(notifCh)
if notif.err != nil {
desc := fmt.Sprintf("%s: unable to fetch tx confirmation "+
"notifications, %s", funcName, notif.err)
return nil, poolError(ErrTxConf, desc)
}
return notif.resp, nil
}
}

// confirmCoinbases ensures the coinbases referenced by the provided
// transaction hashes are spendable by the expected maximum spendable height.
//
Expand Down Expand Up @@ -927,11 +961,13 @@ txConfs:
// Non-blocking receive fallthrough.
}

resp, err := notifSource()
resp, err := fetchTxConfNotifications(ctx, notifSource)
if err != nil {
desc := fmt.Sprintf("%s: unable to fetch tx confirmations: %v",
funcName, err)
return poolError(ErrTxConf, desc)
if errors.Is(err, ErrContextCancelled) {
// Terminate the tx confirmation process.
continue
}
return err
}

// Ensure all coinbases being spent are spendable.
Expand Down

0 comments on commit 9828e99

Please sign in to comment.