From 9d15c9dba6e94763b53cd327a742bd284f857572 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Mon, 21 Sep 2020 22:36:28 +0000 Subject: [PATCH 1/8] pool: update payment manager errors. This updates payment manager related errors to leverage the updated error types. --- pool/paymentmgr.go | 127 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 99 insertions(+), 28 deletions(-) diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 9f7a95db..b6e6a517 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -139,23 +139,32 @@ func (pm *PaymentMgr) fetchLastPaymentHeight() uint32 { // persistLastPaymentHeight saves the last payment height to the db. func (pm *PaymentMgr) persistLastPaymentHeight(tx *bolt.Tx) error { + funcName := "persistLastPaymentHeight" pbkt := tx.Bucket(poolBkt) if pbkt == nil { - desc := fmt.Sprintf("bucket %s not found", string(poolBkt)) + desc := fmt.Sprintf("%s: bucket %s not found", funcName, + string(poolBkt)) return dbError(ErrBucketNotFound, desc) } height := atomic.LoadUint32(&pm.lastPaymentHeight) b := make([]byte, 4) binary.LittleEndian.PutUint32(b, height) err := pbkt.Put(lastPaymentHeight, b) - return err + if err != nil { + desc := fmt.Sprintf("%s: unable to persist last payment height: %v", + funcName, err) + return dbError(ErrPersistEntry, desc) + } + return nil } // loadLastPaymentHeight fetches the last payment height from the db. func (pm *PaymentMgr) loadLastPaymentHeight(tx *bolt.Tx) error { + funcName := "loadLastPaymentHeight" pbkt := tx.Bucket(poolBkt) if pbkt == nil { - desc := fmt.Sprintf("bucket %s not found", string(poolBkt)) + desc := fmt.Sprintf("%s: bucket %s not found", funcName, + string(poolBkt)) return dbError(ErrBucketNotFound, desc) } lastPaymentHeightB := pbkt.Get(lastPaymentHeight) @@ -163,7 +172,13 @@ func (pm *PaymentMgr) loadLastPaymentHeight(tx *bolt.Tx) error { pm.setLastPaymentHeight(0) b := make([]byte, 4) binary.LittleEndian.PutUint32(b, 0) - return pbkt.Put(lastPaymentHeight, b) + err := pbkt.Put(lastPaymentHeight, b) + if err != nil { + desc := fmt.Sprintf("%s: unable to load last payment "+ + "height: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + return nil } pm.setLastPaymentHeight(binary.LittleEndian.Uint32(lastPaymentHeightB)) return nil @@ -181,17 +196,26 @@ func (pm *PaymentMgr) fetchLastPaymentPaidOn() uint64 { // persistLastPaymentPaidOn saves the last payment paid on time to the db. func (pm *PaymentMgr) persistLastPaymentPaidOn(tx *bolt.Tx) error { + funcName := "persistLastPaymentPaidOn" pbkt := tx.Bucket(poolBkt) if pbkt == nil { - desc := fmt.Sprintf("bucket %s not found", string(poolBkt)) + desc := fmt.Sprintf("%s: bucket %s not found", funcName, + string(poolBkt)) return dbError(ErrBucketNotFound, desc) } - return pbkt.Put(lastPaymentPaidOn, + err := pbkt.Put(lastPaymentPaidOn, nanoToBigEndianBytes(int64(pm.lastPaymentPaidOn))) + if err != nil { + desc := fmt.Sprintf("%s: unable to persist last payment "+ + "paid on time: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + return nil } // pruneShares removes invalidated shares from the db. func (pm *PaymentMgr) pruneShares(tx *bolt.Tx, minNano int64) error { + funcName := "pruneShares" minB := nanoToBigEndianBytes(minNano) bkt, err := fetchShareBucket(tx) if err != nil { @@ -203,9 +227,10 @@ func (pm *PaymentMgr) pruneShares(tx *bolt.Tx, minNano int64) error { for k, _ := cursor.First(); k != nil; k, _ = cursor.Next() { _, err := hex.Decode(createdOnB, k[:16]) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to decode share created-on "+ + "bytes: %v", funcName, err) + return dbError(ErrDecode, desc) } - if bytes.Compare(minB, createdOnB) > 0 { toDelete = append(toDelete, k) } @@ -221,9 +246,11 @@ func (pm *PaymentMgr) pruneShares(tx *bolt.Tx, minNano int64) error { // fetchPoolBucket is a helper function for getting the pool bucket. func fetchPoolBucket(tx *bolt.Tx) (*bolt.Bucket, error) { + funcName := "fetchPoolBucket" pbkt := tx.Bucket(poolBkt) if pbkt == nil { - desc := fmt.Sprintf("bucket %s not found", string(poolBkt)) + desc := fmt.Sprintf("%s: bucket %s not found", funcName, + string(poolBkt)) return nil, dbError(ErrBucketNotFound, desc) } return pbkt, nil @@ -237,6 +264,7 @@ func bigEndianBytesToNano(b []byte) uint64 { // loadLastPaymentPaidOn fetches the last payment paid on time from the db. func (pm *PaymentMgr) loadLastPaymentPaidOn(tx *bolt.Tx) error { + funcName := "loadLastPaymentPaidOn" pbkt, err := fetchPoolBucket(tx) if err != nil { return err @@ -246,7 +274,13 @@ func (pm *PaymentMgr) loadLastPaymentPaidOn(tx *bolt.Tx) error { pm.setLastPaymentPaidOn(0) b := make([]byte, 8) binary.LittleEndian.PutUint64(b, 0) - return pbkt.Put(lastPaymentPaidOn, b) + err := pbkt.Put(lastPaymentPaidOn, b) + if err != nil { + desc := fmt.Sprintf("%s: unable to load last payment "+ + "paid-on time: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + return nil } pm.setLastPaymentPaidOn(bigEndianBytesToNano(lastPaymentPaidOnB)) return nil @@ -264,16 +298,24 @@ func (pm *PaymentMgr) fetchLastPaymentCreatedOn() uint64 { // persistLastPaymentCreatedOn saves the last payment created on time to the db. func (pm *PaymentMgr) persistLastPaymentCreatedOn(tx *bolt.Tx) error { + funcName := "persistLastPaymentCreatedOn" pbkt, err := fetchPoolBucket(tx) if err != nil { return err } - return pbkt.Put(lastPaymentCreatedOn, + err = pbkt.Put(lastPaymentCreatedOn, nanoToBigEndianBytes(int64(pm.lastPaymentCreatedOn))) + if err != nil { + desc := fmt.Sprintf("%s: unable to persist last payment "+ + "paid-on time: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + return nil } // loadLastPaymentCreaedOn fetches the last payment created on time from the db. func (pm *PaymentMgr) loadLastPaymentCreatedOn(tx *bolt.Tx) error { + funcName := "loadLastPaymentCreatedOn" pbkt, err := fetchPoolBucket(tx) if err != nil { return err @@ -283,7 +325,13 @@ func (pm *PaymentMgr) loadLastPaymentCreatedOn(tx *bolt.Tx) error { pm.setLastPaymentCreatedOn(0) b := make([]byte, 8) binary.LittleEndian.PutUint64(b, 0) - return pbkt.Put(lastPaymentCreatedOn, b) + err := pbkt.Put(lastPaymentCreatedOn, b) + if err != nil { + desc := fmt.Sprintf("%s: unable to load last payment "+ + "created-on time: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + return nil } pm.setLastPaymentCreatedOn(bigEndianBytesToNano(lastPaymentCreatedOnB)) return nil @@ -321,6 +369,7 @@ func (pm *PaymentMgr) sharePercentages(shares []*Share) (map[string]*big.Rat, er // PPSEligibleShares fetches all shares created before or at the provided // time. func (pm *PaymentMgr) PPSEligibleShares(max []byte) ([]*Share, error) { + funcName := "PPSEligibleShares" eligibleShares := make([]*Share, 0) err := pm.cfg.DB.View(func(tx *bolt.Tx) error { bkt, err := fetchShareBucket(tx) @@ -332,19 +381,22 @@ func (pm *PaymentMgr) PPSEligibleShares(max []byte) ([]*Share, error) { for k, v := c.First(); k != nil; k, v = c.Next() { _, err := hex.Decode(createdOnB, k[:16]) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to decode share "+ + "created-on bytes: %v", funcName, err) + return dbError(ErrDecode, desc) } if bytes.Compare(createdOnB, max) <= 0 { var share Share err := json.Unmarshal(v, &share) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to unmarshal share: %v", + funcName, err) + return dbError(ErrParse, desc) } eligibleShares = append(eligibleShares, &share) } } - return nil }) if err != nil { @@ -375,6 +427,7 @@ func (pm *PaymentMgr) PPSSharePercentages(workCreatedOn int64) (map[string]*big. // PPLNSEligibleShares fetches all shares keyed greater than the provided // minimum. func (pm *PaymentMgr) PPLNSEligibleShares(min []byte) ([]*Share, error) { + funcName := "PPLNSEligibleShares" eligibleShares := make([]*Share, 0) err := pm.cfg.DB.View(func(tx *bolt.Tx) error { bkt, err := fetchShareBucket(tx) @@ -386,14 +439,18 @@ func (pm *PaymentMgr) PPLNSEligibleShares(min []byte) ([]*Share, error) { for k, v := c.Last(); k != nil; k, v = c.Prev() { _, err := hex.Decode(createdOnB, k[:16]) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to decode share "+ + "created-on bytes: %v", funcName, err) + return dbError(ErrDecode, desc) } if bytes.Compare(createdOnB, min) > 0 { var share Share err := json.Unmarshal(v, &share) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to unmarshal "+ + "share: %v", funcName, err) + return dbError(ErrParse, desc) } eligibleShares = append(eligibleShares, &share) } @@ -555,6 +612,7 @@ func (pm *PaymentMgr) generatePayments(height uint32, source *PaymentSource, amt // pendingPayments fetches all unpaid payments. func (pm *PaymentMgr) pendingPayments() ([]*Payment, error) { + funcName := "pendingPayments" payments := make([]*Payment, 0) err := pm.cfg.DB.View(func(tx *bolt.Tx) error { bkt, err := fetchPaymentBucket(tx) @@ -566,9 +624,10 @@ func (pm *PaymentMgr) pendingPayments() ([]*Payment, error) { var payment Payment err := json.Unmarshal(v, &payment) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to unmarshal "+ + "payment: %v", funcName, err) + return dbError(ErrParse, desc) } - if payment.PaidOnHeight == 0 { payments = append(payments, &payment) } @@ -583,6 +642,7 @@ func (pm *PaymentMgr) pendingPayments() ([]*Payment, error) { // pendingPaymentsAtHeight fetches all pending payments at the provided height. func (pm *PaymentMgr) pendingPaymentsAtHeight(height uint32) ([]*Payment, error) { + funcName := "pendingPaymentsAtHeight" payments := make([]*Payment, 0) err := pm.cfg.DB.View(func(tx *bolt.Tx) error { bkt, err := fetchPaymentBucket(tx) @@ -596,16 +656,19 @@ func (pm *PaymentMgr) pendingPaymentsAtHeight(height uint32) ([]*Payment, error) for k, v := cursor.First(); k != nil; k, v = cursor.Next() { _, err := hex.Decode(paymentHeightB, k[:8]) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to decode payment "+ + "height: %v", funcName, err) + return dbError(ErrDecode, desc) } if bytes.Compare(heightBE, paymentHeightB) > 0 { var payment Payment err := json.Unmarshal(v, &payment) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to unmarshal payment: %v", + funcName, err) + return dbError(ErrParse, desc) } - if payment.PaidOnHeight == 0 { payments = append(payments, &payment) } @@ -622,6 +685,7 @@ func (pm *PaymentMgr) pendingPaymentsAtHeight(height uint32) ([]*Payment, error) // pendingPaymentsForBlockHash returns the number of pending payments with // the provided block hash as their source. func (pm *PaymentMgr) pendingPaymentsForBlockHash(blockHash string) (uint32, error) { + funcName := "pendingPaymentsForBlockHash" var count uint32 err := pm.cfg.DB.View(func(tx *bolt.Tx) error { bkt, err := fetchPaymentBucket(tx) @@ -634,9 +698,10 @@ func (pm *PaymentMgr) pendingPaymentsForBlockHash(blockHash string) (uint32, err var payment Payment err := json.Unmarshal(v, &payment) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to unmarshal payment: %v", + funcName, err) + return dbError(ErrParse, desc) } - if payment.PaidOnHeight == 0 { if payment.Source.BlockHash == blockHash { count++ @@ -654,6 +719,7 @@ func (pm *PaymentMgr) pendingPaymentsForBlockHash(blockHash string) (uint32, err // archivedPayments fetches all archived payments. List is ordered, most // recent comes first. func (pm *PaymentMgr) archivedPayments() ([]*Payment, error) { + funcName := "archivedPayments" pmts := make([]*Payment, 0) err := pm.cfg.DB.View(func(tx *bolt.Tx) error { abkt, err := fetchPaymentArchiveBucket(tx) @@ -666,7 +732,9 @@ func (pm *PaymentMgr) archivedPayments() ([]*Payment, error) { var payment Payment err := json.Unmarshal(v, &payment) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to unmarshal payment: %v", + funcName, err) + return dbError(ErrParse, desc) } pmts = append(pmts, &payment) } @@ -681,6 +749,7 @@ func (pm *PaymentMgr) archivedPayments() ([]*Payment, error) { // maturePendingPayments fetches all mature pending payments at the // provided height. func (pm *PaymentMgr) maturePendingPayments(height uint32) (map[string][]*Payment, error) { + funcName := "maturePendingPayments" payments := make([]*Payment, 0) err := pm.cfg.DB.View(func(tx *bolt.Tx) error { bkt, err := fetchPaymentBucket(tx) @@ -693,9 +762,10 @@ func (pm *PaymentMgr) maturePendingPayments(height uint32) (map[string][]*Paymen var payment Payment err := json.Unmarshal(v, &payment) if err != nil { - return err + desc := fmt.Sprintf("%s: unable to unmarshal payment: %v", + funcName, err) + return dbError(ErrParse, desc) } - spendableHeight := payment.EstimatedMaturity + 1 if payment.PaidOnHeight == 0 && spendableHeight <= height { payments = append(payments, &payment) @@ -713,10 +783,11 @@ func (pm *PaymentMgr) maturePendingPayments(height uint32) (map[string][]*Paymen if !ok { set = make([]*Payment, 0) } - set = append(set, pmt) pmts[pmt.Source.BlockHash] = set } + return pmts, nil +} return pmts, nil } From 8aee5606f541ac147a9e375c5a19dab986318d80 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Mon, 21 Sep 2020 22:59:54 +0000 Subject: [PATCH 2/8] pool: refactor payDividends. This refactors payDividends into smaller scoped helper functions that are easier to test. --- pool/error.go | 30 +++ pool/error_test.go | 10 + pool/paymentmgr.go | 450 ++++++++++++++++++++++++++++----------------- 3 files changed, 319 insertions(+), 171 deletions(-) diff --git a/pool/error.go b/pool/error.go index f8da7235..ba218903 100644 --- a/pool/error.go +++ b/pool/error.go @@ -96,6 +96,36 @@ const ( // ErrPaymentSource indicates a payment source error. ErrPaymentSource = ErrorKind("ErrPaymentSource") + + // ErrShareRatio indicates a share ratio error. + ErrShareRatio = ErrorKind("ErrShareRatio") + + // ErrCreateHash indicates a hash creation error. + ErrCreateHash = ErrorKind("ErrCreateHash") + + // ErrCoinbase indicates a coinbase related error. + ErrCoinbase = ErrorKind("ErrCoinbase") + + // ErrCreateTx indicates a transaction creation error. + ErrCreateTx = ErrorKind("ErrCreateTx") + + // ErrSignTx indicates a transaction signing error. + ErrSignTx = ErrorKind("ErrSignTx") + + // ErrPublishTx indicates a transaction pubishing error. + ErrPublishTx = ErrorKind("ErrPublishTx") + + // ErrTxOut indicates a transaction output related error. + ErrTxOut = ErrorKind("ErrTxOut") + + // ErrTxIn indicates a transaction input related error. + ErrTxIn = ErrorKind("ErrTxIn") + + // ErrContextCancelled indicates a context cancellation related error. + ErrContextCancelled = ErrorKind("ErrContextCancelled") + + // ErrCreateAmount indicates an amount creation error. + ErrCreateAmount = ErrorKind("ErrCreateAmount") ) // Error satisfies the error interface and prints human-readable errors. diff --git a/pool/error_test.go b/pool/error_test.go index 21ce5a72..f119abc3 100644 --- a/pool/error_test.go +++ b/pool/error_test.go @@ -42,6 +42,16 @@ func TestErrorKindStringer(t *testing.T) { {ErrDifficulty, "ErrDifficulty"}, {ErrWorkRejected, "ErrWorkRejected"}, {ErrPaymentSource, "ErrPaymentSource"}, + {ErrShareRatio, "ErrShareRatio"}, + {ErrCreateHash, "ErrCreateHash"}, + {ErrCoinbase, "ErrCoinbase"}, + {ErrCreateTx, "ErrCreateTx"}, + {ErrSignTx, "ErrSignTx"}, + {ErrPublishTx, "ErrPublishTx"}, + {ErrTxOut, "ErrTxOut"}, + {ErrTxIn, "ErrTxIn"}, + {ErrContextCancelled, "ErrContextCancelled"}, + {ErrCreateAmount, "ErrCreateAmount"}, } for i, test := range tests { diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index b6e6a517..8dc13b88 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "encoding/hex" "encoding/json" + "errors" "fmt" "math/big" "math/rand" @@ -92,6 +93,9 @@ type PaymentMgrConfig struct { // FetchTxBroadcaster returns a transaction broadcaster that allows signing // and publishing of transactions. FetchTxBroadcaster func() TxBroadcaster + // CoinbaseConfTimeout is the duration to wait for coinbase confirmations + // when generating a payout transaction. + CoinbaseConfTimeout time.Duration } // PaymentMgr handles generating shares and paying out dividends to @@ -488,9 +492,11 @@ func (pm *PaymentMgr) PPLNSSharePercentages() (map[string]*big.Rat, error) { // calculatePayments creates the payments due participating accounts. func (pm *PaymentMgr) calculatePayments(ratios map[string]*big.Rat, source *PaymentSource, total dcrutil.Amount, poolFee float64, height uint32, estMaturity uint32) ([]*Payment, int64, error) { + funcName := "calculatePayments" if len(ratios) == 0 { - return nil, 0, fmt.Errorf("valid share ratios required to" + - " generate payments") + desc := fmt.Sprintf("%s: valid share ratios required to "+ + "generate payments", funcName) + return nil, 0, poolError(ErrShareRatio, desc) } // Deduct pool fee from the amount to be shared. @@ -519,11 +525,16 @@ func (pm *PaymentMgr) calculatePayments(ratios map[string]*big.Rat, source *Paym if amtSansFees < paymentTotal { diff := paymentTotal - amtSansFees - return nil, 0, fmt.Errorf("total payments (%s) is greater than "+ + desc := fmt.Sprintf("%s: total payments (%s) is greater than "+ "the remaining coinbase amount after fees (%s). Difference is %s", - paymentTotal, amtSansFees, diff) + funcName, paymentTotal, amtSansFees, diff) + return nil, 0, poolError(ErrPaymentSource, desc) } + // TODO: Need to check each payment to ensure its not dust if it is it + // should be added to the pool fee. Will be resolving this in a seperate + // PR. + // Add a payout entry for pool fees. feePayment := NewPayment(PoolFeesK, source, fee, height, estMaturity) payments = append(payments, feePayment) @@ -789,56 +800,23 @@ func (pm *PaymentMgr) maturePendingPayments(height uint32) (map[string][]*Paymen return pmts, nil } - return pmts, nil -} - -// PayDividends pays mature mining rewards to participating accounts. -// -// TODO: need to break this down into smaller funcs to make it easier to -// test. -func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryActive bool) error { - pmts, err := pm.maturePendingPayments(height) - if err != nil { - return err - } - - if len(pmts) == 0 { - return nil - } - - // The fee address is being picked at random from the set of pool fee - // addresses to make it difficult for third-parties wanting to track - // pool fees collected by the pool and ultimately determine the - // cumulative value accrued by pool operators. - feeAddr := pm.cfg.PoolFeeAddrs[rand.Intn(len(pm.cfg.PoolFeeAddrs))] - - // Create the payout transaction. - inputs := make([]chainjson.TransactionInput, 0) - txHashes := make(map[string]*chainhash.Hash) - outputs := make(map[string]dcrutil.Amount) - var tIn dcrutil.Amount - var tOut dcrutil.Amount - var maxSpendableHeight uint32 - - txCreator := pm.cfg.FetchTxCreator() - if txCreator == nil { - return fmt.Errorf("tx creator unset") - } - - toDelete := make([]string, 0) +// pruneOrphanedPayments removes all orphaned payments from the provided payments. +func (pm *PaymentMgr) pruneOrphanedPayments(ctx context.Context, pmts map[string][]*Payment) (map[string][]*Payment, error) { + toDelete := make([]string, 0, len(pmts)) for key := range pmts { blockHash, err := chainhash.NewHashFromStr(key) if err != nil { - return err + desc := fmt.Sprintf("unable to generate hash: %v", err) + return nil, poolError(ErrCreateHash, desc) } confs, err := pm.cfg.GetBlockConfirmations(ctx, blockHash) if err != nil { - return err + return nil, err } - // If the block has no confirmations at the current height, - // it is an orphan. Remove payments associated with it. + // If the block has no confirmations for the current chain + // state it is an orphan. Remove payments associated with it. if confs <= 0 { toDelete = append(toDelete, key) } @@ -848,67 +826,193 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA for _, k := range toDelete { delete(pmts, k) } + return pmts, nil +} - for _, set := range pmts { - // The coinbase output prior to - // [DCP0006](https://github.com/decred/dcps/pull/17) - // activation is at the third index position and at - // the second index position once DCP0006 is activated. - index := uint32(1) - if !treasuryActive { - index = 2 +// applyTxFees determines the trasaction fees needed for the payout transaction +// and deducts portions of the fee from outputs of participating accounts +// being paid to. +// +// The deducted portions are calculated as the percentage of fees based on +// the ratio of the amount being paid to the total transaction output minus +// pool fees. +func (pm *PaymentMgr) applyTxFees(inputs []chainjson.TransactionInput, outputs map[string]dcrutil.Amount, + tIn dcrutil.Amount, tOut dcrutil.Amount, feeAddr dcrutil.Address) (dcrutil.Amount, dcrutil.Amount, error) { + funcName := "applyTxFees" + if len(inputs) == 0 { + desc := fmt.Sprint("%s: cannot create a payout transaction "+ + "without a tx input", funcName) + return 0, 0, poolError(ErrTxIn, desc) + } + if len(outputs) == 0 { + desc := fmt.Sprint("%s:cannot create a payout transaction "+ + "without a tx output", funcName) + return 0, 0, poolError(ErrTxOut, desc) + } + inSizes := make([]int, len(inputs)) + for range inputs { + inSizes = append(inSizes, txsizes.RedeemP2PKHSigScriptSize) + } + outSizes := make([]int, len(outputs)) + for range outputs { + outSizes = append(outSizes, txsizes.P2PKHOutputSize) + } + changeScriptSize := 0 + estSize := txsizes.EstimateSerializeSizeFromScriptSizes(inSizes, outSizes, + changeScriptSize) + estFee := txrules.FeeForSerializeSize(txrules.DefaultRelayFeePerKb, estSize) + sansFees := tOut - estFee + + for addr, v := range outputs { + // Pool fee payments are excluded from tx fee deductions. + if addr == feeAddr.String() { + continue } - txHash, err := chainhash.NewHashFromStr(set[0].Source.Coinbase) - if err != nil { - return err + ratio := float64(int64(sansFees)) / float64(int64(v)) + outFee := estFee.MulF64(ratio) + outputs[addr] -= outFee + } + + return sansFees, estFee, nil +} + +// confirmCoinbases ensures the coinbases referenced by the provided +// transaction hashes are spendable by the expected maximum spendable height. +// +// The context passed to this function must have a corresponding +// cancellation to allow for a clean shutdown process +func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string]*chainhash.Hash, spendableHeight uint32) error { + funcName := "confirmCoinbases" + hashes := make([]*chainhash.Hash, 0, len(txHashes)) + for _, hash := range txHashes { + hashes = append(hashes, hash) + } + + notifSource, err := pm.cfg.GetTxConfNotifications(hashes, + int32(spendableHeight)) + if err != nil { + return err + } + + // Wait for coinbase tx confirmations from the wallet. + maxSpendableConfs := int32(pm.cfg.ActiveNet.CoinbaseMaturity) + 1 + +txConfs: + for { + select { + case <-ctx.Done(): + log.Debugf("existing txConfs") + break txConfs + + default: + // Non-blocking receive fallthrough. } - txOutResult, err := txCreator.GetTxOut(ctx, txHash, index, false) + resp, err := notifSource() if err != nil { - return fmt.Errorf("unable to find tx output: %v", err) + desc := fmt.Sprintf("%s: unable to fetch tx confirmations: %v", + funcName, err) + return poolError(ErrTxConf, desc) } - if txOutResult == nil { - return fmt.Errorf("no transaction output found for hash %s "+ - "at index %d", txHash, index) + // Ensure all coinbases being spent are spendable. + for _, coinbase := range resp.Confirmations { + if coinbase.Confirmations >= maxSpendableConfs { + hash, err := chainhash.NewHash(coinbase.TxHash) + if err != nil { + desc := fmt.Sprintf("%s: unable to create block hash: %v", + funcName, err) + return poolError(ErrCreateHash, desc) + } + + // Remove spendable coinbase from the tx hash set. All + // coinbases are spendable when the tx hash set is empty. + delete(txHashes, hash.String()) + } + } + + if len(txHashes) == 0 { + break + } + } + + if len(txHashes) != 0 { + log.Debugf("txHashes are %d", len(txHashes)) + desc := fmt.Sprintf("%s: unable to confirm %d coinbase "+ + "trasaction(s)", funcName, len(txHashes)) + return poolError(ErrContextCancelled, desc) + } + + return nil +} + +// generatePayoutTxDetails creates the payout transaction inputs and outputs +// from the provided payments +func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, payments map[string][]*Payment) ([]chainjson.TransactionInput, + map[string]*chainhash.Hash, map[string]dcrutil.Amount, dcrutil.Address, dcrutil.Amount, dcrutil.Amount, error) { + funcName := "generatePayoutTxDetails" + + // The fee address is being picked at random from the set of pool fee + // addresses to make it difficult for third-parties wanting to track + // pool fees collected by the pool and ultimately determine the + // cumulative value accrued by pool operators. + feeAddr := pm.cfg.PoolFeeAddrs[rand.Intn(len(pm.cfg.PoolFeeAddrs))] + + var tIn, tOut dcrutil.Amount + coinbaseIndex := uint32(2) + inputs := make([]chainjson.TransactionInput, 0) + inputTxHashes := make(map[string]*chainhash.Hash) + outputs := make(map[string]dcrutil.Amount) + for _, pmtSet := range payments { + coinbaseTx := pmtSet[0].Source.Coinbase + txHash, err := chainhash.NewHashFromStr(coinbaseTx) + if err != nil { + desc := fmt.Sprintf("%s: unable to create tx hash: %v", + funcName, err) + return nil, nil, nil, nil, 0, 0, poolError(ErrCreateHash, desc) } // Ensure the referenced prevout to be spent is a coinbase and // spendable at the current height. + txOutResult, err := txC.GetTxOut(ctx, txHash, coinbaseIndex, false) + if err != nil { + desc := fmt.Sprintf("%s: unable to find tx output: %v", + funcName, err) + return nil, nil, nil, nil, 0, 0, poolError(ErrTxOut, desc) + } if !txOutResult.Coinbase { - return fmt.Errorf("expected the referenced output at index %d "+ - "for tx %v to be a coinbase", index, txHash.String()) + desc := fmt.Sprintf("%s: referenced output at index %d "+ + "for tx %v is not a coinbase", + funcName, coinbaseIndex, txHash.String()) + return nil, nil, nil, nil, 0, 0, poolError(ErrCoinbase, desc) } - if txOutResult.Confirmations < int64(pm.cfg.ActiveNet.CoinbaseMaturity+1) { - return fmt.Errorf("expected the referenced coinbase at index %d "+ - "for tx %v to be spendable", index, txHash.String()) + desc := fmt.Sprintf("%s: referenced coinbase at "+ + "index %d for tx %v is not spendable", funcName, + coinbaseIndex, txHash.String()) + return nil, nil, nil, nil, 0, 0, poolError(ErrCoinbase, desc) } + // Create the transaction input using the provided prevOut. in := chainjson.TransactionInput{ Amount: txOutResult.Value, Txid: txHash.String(), - Vout: index, + Vout: coinbaseIndex, Tree: wire.TxTreeRegular, } inputs = append(inputs, in) + inputTxHashes[txHash.String()] = txHash - spendableHeight := set[0].EstimatedMaturity + 1 - if maxSpendableHeight < spendableHeight { - maxSpendableHeight = spendableHeight - } - - txHashes[txHash.String()] = txHash - - outV, err := dcrutil.NewAmount(txOutResult.Value) + prevOutV, err := dcrutil.NewAmount(in.Amount) if err != nil { - return err + desc := fmt.Sprintf("unable create the input amount: %v", err) + return nil, nil, nil, nil, 0, 0, poolError(ErrCreateAmount, desc) } - tIn += outV + tIn += prevOutV - // Generate the outputs paying dividends and fees. - for _, pmt := range set { + // Generate the outputs paying dividends to as well as pool fees. + for _, pmt := range pmtSet { if pmt.Account == PoolFeesK { _, ok := outputs[feeAddr.String()] if !ok { @@ -923,9 +1027,8 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA acc, err := FetchAccount(pm.cfg.DB, []byte(pmt.Account)) if err != nil { - return err + return nil, nil, nil, nil, 0, 0, err } - _, ok := outputs[acc.Address] if !ok { outputs[acc.Address] = pmt.Amount @@ -937,147 +1040,151 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA } } + // Ensure the transaction outputs do not source more value than possible + // from the provided inputs and also are consuming all of the input + // value after rounding errors. if tOut > tIn { - return fmt.Errorf("total output values for the transaction (%s) "+ - "is greater than the provided inputs %s", tIn, tOut) + desc := fmt.Sprintf("%s: total output values for the "+ + "transaction (%s) is greater than the provided inputs (%s)", + funcName, tOut, tIn) + return nil, nil, nil, nil, 0, 0, poolError(ErrCreateTx, desc) } diff := tIn - tOut if diff > maxRoundingDiff { - return fmt.Errorf("difference between total output values and "+ - "the provided inputs (%s) exceeds the maximum allowed "+ - "for rounding errors (%s)", diff, maxRoundingDiff) + desc := fmt.Sprintf("%s: difference between total output "+ + "values and the provided inputs (%s) exceeds the maximum "+ + "allowed for rounding errors (%s)", funcName, diff, maxRoundingDiff) + return nil, nil, nil, nil, 0, 0, poolError(ErrCreateTx, desc) } - inSizes := make([]int, len(inputs)) - for range inputs { - inSizes = append(inSizes, txsizes.RedeemP2PKHSigScriptSize) + return inputs, inputTxHashes, outputs, feeAddr, tIn, tOut, nil +} + +// PayDividends pays mature mining rewards to participating accounts. +func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32) error { + funcName := "payDividends" + mPmts, err := pm.maturePendingPayments(height) + if err != nil { + return err } - outSizes := make([]int, len(outputs)) - for range outputs { - outSizes = append(outSizes, txsizes.P2PKHOutputSize) + // Nothing to do if there are no mature payments to process. + if len(mPmts) == 0 { + return nil } - estSize := txsizes.EstimateSerializeSizeFromScriptSizes(inSizes, outSizes, 0) - estFee := txrules.FeeForSerializeSize(txrules.DefaultRelayFeePerKb, estSize) - sansFees := tOut - estFee + txC := pm.cfg.FetchTxCreator() + if txC == nil { + desc := fmt.Sprintf("%s: tx creator cannot be nil", funcName) + return poolError(ErrDisconnected, desc) + } - // Deduct the portion of transaction fees being paid for by - // participating accounts from outputs being paid to them. - // - // It is calculated as the percentage of fees based on the - // ratio of the amount being paid to the total transaction - // output minus pool fees. - for addr, v := range outputs { - if addr == feeAddr.String() { - continue - } + // remove all matured orphaned payments. Since the associated blocks + // to these payments are not part of the main chain they will not be + // paid out. + pmts, err := pm.pruneOrphanedPayments(ctx, mPmts) + if err != nil { + return err + } - ratio := float64(int64(sansFees)) / float64(int64(v)) - outFee := estFee.MulF64(ratio) - outputs[addr] -= outFee + inputs, inputTxHashes, outputs, feeAddr, tIn, tOut, err := + pm.generatePayoutTxDetails(ctx, txC, pmts) + if err != nil { + return err } - // Generate the output set with decoded addresses. + _, estFee, err := pm.applyTxFees(inputs, outputs, tIn, tOut, feeAddr) + if err != nil { + return err + } + + // Generate the transaction output set. outs := make(map[dcrutil.Address]dcrutil.Amount, len(outputs)) for sAddr, amt := range outputs { addr, err := dcrutil.DecodeAddress(sAddr, pm.cfg.ActiveNet) if err != nil { - return fmt.Errorf("unable to decode address: %v", err) + desc := fmt.Sprintf("%s: unable to decode payout address: %v", + funcName, err) + return poolError(ErrDecode, desc) } - outs[addr] = amt } - // Ensure the wallet is aware of all the outputs to be spent by the payout - // transaction. + // Ensure the wallet is aware of all the coinbase outputs being + // spent by the payout transaction. + var maxSpendableHeight uint32 + for _, pmtSet := range pmts { + spendableHeight := pmtSet[0].EstimatedMaturity + 1 + if maxSpendableHeight < spendableHeight { + maxSpendableHeight = spendableHeight + } + } if maxSpendableHeight < height { maxSpendableHeight = height } - hashes := make([]*chainhash.Hash, 0, len(txHashes)) - for _, hash := range txHashes { - hashes = append(hashes, hash) - } - - notifSource, err := pm.cfg.GetTxConfNotifications(hashes, - int32(maxSpendableHeight)) + tCtx, tCancel := context.WithTimeout(ctx, pm.cfg.CoinbaseConfTimeout) + defer tCancel() + err = pm.confirmCoinbases(tCtx, inputTxHashes, maxSpendableHeight) if err != nil { - return fmt.Errorf("unable to stream tx confirmations: %v", err) - } - - // Wait for coinbase tx confirmations from the wallet. - maxSpendableConfs := int32(pm.cfg.ActiveNet.CoinbaseMaturity) + 1 - for { - resp, err := notifSource() - if err != nil { - return fmt.Errorf("tx confirmations notification error: %v", err) - } - - // Ensure all coinbases being spent are spendable before proceeding - // with creating and publishing the transaction. - for _, coinbase := range resp.Confirmations { - if coinbase.Confirmations >= maxSpendableConfs { - cbHash, err := chainhash.NewHash(coinbase.TxHash) - if err != nil { - return fmt.Errorf("unable to create block hash: %v", err) - } - - // Remove spendable coinbases from the tx hash set. All - // coinbases are spendable when the tx hash set is empty. - delete(txHashes, cbHash.String()) - } + // Do not error if coinbase spendable confirmatiom requests are + // terminated by the context cancellation. + if !errors.Is(err, ErrContextCancelled) { + return err } - if len(txHashes) == 0 { - break - } + return nil } - tx, err := txCreator.CreateRawTransaction(ctx, inputs, outs, nil, nil) + // Create, sign and publish the payout transaction. + tx, err := txC.CreateRawTransaction(ctx, inputs, outs, nil, nil) if err != nil { - return fmt.Errorf("unable to create raw transaction: %v", err) + desc := fmt.Sprintf("%s: unable to create transaction: %v", + funcName, err) + return poolError(ErrCreateTx, desc) } - - txB, err := tx.Bytes() + txBytes, err := tx.Bytes() if err != nil { return err } - txBroadcaster := pm.cfg.FetchTxBroadcaster() - if txBroadcaster == nil { - return fmt.Errorf("tx broadcaster unset") + txB := pm.cfg.FetchTxBroadcaster() + if txB == nil { + desc := fmt.Sprintf("%s: tx broadcaster cannot be nil", funcName) + return poolError(ErrDisconnected, desc) } - - // Sign the transaction. signTxReq := &walletrpc.SignTransactionRequest{ - SerializedTransaction: txB, + SerializedTransaction: txBytes, Passphrase: []byte(pm.cfg.WalletPass), } - signedTxResp, err := txBroadcaster.SignTransaction(ctx, signTxReq) + signedTxResp, err := txB.SignTransaction(ctx, signTxReq) if err != nil { - return fmt.Errorf("unable to sign transaction: %v", err) + desc := fmt.Sprintf("%s: unable to sign transaction: %v", + funcName, err) + return poolError(ErrSignTx, desc) + } - // Publish the transaction. pubTxReq := &walletrpc.PublishTransactionRequest{ SignedTransaction: signedTxResp.Transaction, } - pubTxResp, err := txBroadcaster.PublishTransaction(ctx, pubTxReq) + pubTxResp, err := txB.PublishTransaction(ctx, pubTxReq) if err != nil { - return fmt.Errorf("unable to publish transaction: %v", err) + desc := fmt.Sprintf("%s: unable to publish transaction: %v", + funcName, err) + return poolError(ErrPublishTx, desc) } txid, err := chainhash.NewHash(pubTxResp.TransactionHash) if err != nil { - return err + desc := fmt.Sprintf("unable to create transaction hash: %v", err) + return poolError(ErrCreateHash, desc) } - fees := outputs[feeAddr.String()] - log.Infof("paid a total of %v in tx %s, including %v in pool fees.", - tOut, txid.String(), fees) + log.Infof("paid a total of %v in tx %s, including %v in pool fees. "+ + "Tx fee: %v", tOut, txid.String(), fees, estFee) // Update all associated payments as paid and archive them. for _, set := range pmts { @@ -1086,12 +1193,15 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA pmt.TransactionID = txid.String() err := pmt.Update(pm.cfg.DB) if err != nil { - return fmt.Errorf("unable to update payment: %v", err) + desc := fmt.Sprintf("%s: unable to update payment: %v", + funcName, err) + return poolError(ErrPersistEntry, desc) } - err = pmt.Archive(pm.cfg.DB) if err != nil { - return fmt.Errorf("unable to archive payment: %v", err) + desc := fmt.Sprintf("%s: unable to archive payment: %v", + funcName, err) + return poolError(ErrPersistEntry, desc) } } } @@ -1103,13 +1213,11 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryA if err != nil { return err } - pm.setLastPaymentPaidOn(uint64(time.Now().UnixNano())) return pm.persistLastPaymentPaidOn(tx) }) if err != nil { return err } - return nil } From 35c277707a46818b8650830e00bb4713d90f21a1 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Mon, 21 Sep 2020 23:27:46 +0000 Subject: [PATCH 3/8] pool: test payDividends and its helpers. This adds tests for payDividends and its helper functions. --- pool/paymentmgr_test.go | 768 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 763 insertions(+), 5 deletions(-) diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index 85a52520..d244bfdb 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -2,6 +2,8 @@ package pool import ( "context" + "crypto/rand" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -9,12 +11,53 @@ import ( "testing" "time" + "decred.org/dcrwallet/rpc/walletrpc" "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/dcrutil/v3" + chainjson "github.com/decred/dcrd/rpc/jsonrpc/types/v2" + "github.com/decred/dcrd/wire" bolt "go.etcd.io/bbolt" + "google.golang.org/grpc" ) +type txCreatorImpl struct { + getBlock func(ctx context.Context, blockHash *chainhash.Hash) (*wire.MsgBlock, error) + getTxOut func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) + createRawTransaction func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) +} + +// GetBlock fetches the block associated with the provided block hash. +func (txC *txCreatorImpl) GetBlock(ctx context.Context, blockHash *chainhash.Hash) (*wire.MsgBlock, error) { + return txC.getBlock(ctx, blockHash) +} + +// GetTxOut fetches the output referenced by the provided txHash and index. +func (txC *txCreatorImpl) GetTxOut(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return txC.getTxOut(ctx, txHash, index, mempool) +} + +// CreateRawTransaction generates a transaction from the provided inputs and payouts. +func (txC *txCreatorImpl) CreateRawTransaction(ctx context.Context, inputs []chainjson.TransactionInput, + amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return txC.createRawTransaction(ctx, inputs, amounts, lockTime, expiry) +} + +type txBroadcasterImpl struct { + signTransaction func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) + publishTransaction func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) +} + +// SignTransaction signs transaction inputs, unlocking them for use. +func (txB *txBroadcasterImpl) SignTransaction(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return txB.signTransaction(ctx, req, options...) +} + +// PublishTransaction broadcasts the transaction unto the network. +func (txB *txBroadcasterImpl) PublishTransaction(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return txB.publishTransaction(ctx, req, options...) +} + // fetchShare fetches the share referenced by the provided id. func fetchShare(db *bolt.DB, id []byte) (*Share, error) { var share Share @@ -356,9 +399,10 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { // pendingPaymentsForBlockHash. height := uint32(10) estMaturity := uint32(26) + zeroHash := chainhash.Hash{0} zeroSource := &PaymentSource{ - BlockHash: chainhash.Hash{0}.String(), - Coinbase: chainhash.Hash{0}.String(), + BlockHash: zeroHash.String(), + Coinbase: zeroHash.String(), } amt, _ := dcrutil.NewAmount(5) _, err = persistPayment(db, xID, zeroSource, amt, height+1, estMaturity+1) @@ -376,7 +420,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { t.Fatal(err) } pmtC.PaidOnHeight = estMaturity + 1 - pmtC.TransactionID = chainhash.Hash{0}.String() + pmtC.TransactionID = zeroHash.String() err = pmtC.Update(db) if err != nil { t.Fatal(err) @@ -911,8 +955,6 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { expectedFeeAmt, ft) } - // TODO: Add tests for payDividend. - // Empty the share bucket. err = emptyBucket(db, shareBkt) if err != nil { @@ -947,4 +989,720 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { if err != nil { t.Fatal(err) } + + // pruneOrphanedPayments tests. + var randBytes [chainhash.HashSize + 1]byte + _, err = rand.Read(randBytes[:]) + if err != nil { + t.Fatalf("unable to generate random bytes: %v", err) + } + + randHash := chainhash.HashH(randBytes[:]) + randSource := &PaymentSource{ + BlockHash: randHash.String(), + Coinbase: randHash.String(), + } + amt, _ = dcrutil.NewAmount(5) + mPmts := make(map[string][]*Payment) + pmtA := NewPayment(xID, zeroSource, amt, height, estMaturity) + mPmts[zeroSource.Coinbase] = []*Payment{pmtA} + pmtB := NewPayment(yID, randSource, amt, height, estMaturity) + mPmts[randSource.Coinbase] = []*Payment{pmtB} + + ctx, cancel := context.WithCancel(context.Background()) + + // Ensure orphaned payments pruning returns an error if it cannot + // confirm a block. + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return 0, fmt.Errorf("unable to confirm block") + } + _, err = mgr.pruneOrphanedPayments(ctx, mPmts) + if err == nil { + cancel() + t.Fatal("expected a block confirmation error") + } + + // Create an invalid block hash / payment set entry. + invalidBlockHash := "0123456789012345678901234567890123456789" + + "0123456789012345678912345" + mPmts[invalidBlockHash] = []*Payment{pmtB} + + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + if bh.String() != zeroSource.BlockHash { + return -1, nil + } + return 16, nil + } + + // Ensure orphaned payments pruning returns an errors if it encounters + // an invalid block hash as a key. + _, err = mgr.pruneOrphanedPayments(ctx, mPmts) + if !errors.Is(err, ErrCreateHash) { + cancel() + t.Fatalf("expected a hash error, got %v", err) + } + + // remove the invalid block hash key pair. + delete(mPmts, invalidBlockHash) + + // Ensure orphaned payments pruning accurately prunes payments + // sourcing from orphaned blocks. + pmtSet, err = mgr.pruneOrphanedPayments(ctx, mPmts) + if err != nil { + cancel() + t.Fatalf("unexpected pruneOrphanPayments error: %v", err) + } + if len(pmtSet) != 1 { + cancel() + t.Fatalf("expected a single valid mature payment after "+ + "pruning, got %v", len(pmtSet)) + } + + // applyTxFee tests. + tIn, _ := dcrutil.NewAmount(100) + in := chainjson.TransactionInput{ + Amount: float64(tIn), + Txid: chainhash.Hash{1}.String(), + Vout: 2, + Tree: wire.TxTreeRegular, + } + + poolFeeValue := amt.MulF64(0.1) + xValue := amt.MulF64(0.6) + yValue := amt.MulF64(0.3) + + feeAddr := poolFeeAddrs.String() + out := make(map[string]dcrutil.Amount) + out[xAddr] = xValue + out[yAddr] = yValue + out[feeAddr] = poolFeeValue + + tOut := tIn + + _, txFee, err := mgr.applyTxFees([]chainjson.TransactionInput{in}, + out, tIn, tOut, poolFeeAddrs) + if err != nil { + t.Fatalf("unexpected applyTxFees error: %v", err) + } + + // Ensure the pool fee payment was exempted from tx fee deductions. + if out[feeAddr] != amt.MulF64(0.1) { + t.Fatalf("expected pool fee payment to be %v, got %v", + txFee, out[feeAddr]) + } + + // Ensure the difference between initial account payments and updated + // account payments plus the transaction fee is not more than the + // maximum rounding difference. + initialAccountPayments := xValue + yValue + updatedAccountPaymentsPlusTxFee := out[xAddr] + out[yAddr] + txFee + if initialAccountPayments-updatedAccountPaymentsPlusTxFee <= maxRoundingDiff { + t.Fatalf("initial account payment total %v to be equal to updated "+ + "values plus the transaction fee %v", initialAccountPayments, + updatedAccountPaymentsPlusTxFee) + } + + // Ensure providing no tx inputs triggers an error. + _, _, err = mgr.applyTxFees([]chainjson.TransactionInput{}, + out, tIn, tOut, poolFeeAddrs) + if !errors.Is(err, ErrTxIn) { + t.Fatalf("expected a tx input error: %v", err) + } + + // Ensure providing no tx outputs triggers an error. + _, _, err = mgr.applyTxFees([]chainjson.TransactionInput{in}, + make(map[string]dcrutil.Amount), tIn, tOut, poolFeeAddrs) + if !errors.Is(err, ErrTxOut) { + cancel() + t.Fatalf("expected a tx output error: %v", err) + } + + // confirmCoinbases tests. + txHashes := make(map[string]*chainhash.Hash) + hashA := chainhash.Hash{'a'} + txHashes[hashA.String()] = &hashA + hashB := chainhash.Hash{'b'} + txHashes[hashB.String()] = &hashB + hashC := chainhash.Hash{'c'} + txHashes[hashC.String()] = &hashC + spendableHeight := uint32(10) + + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return nil, fmt.Errorf("unable to fetch tx conf notification source") + } + + // Ensure confirming coinbases returns an error if transaction + // confirmation notifications cannot be fetched. + err = mgr.confirmCoinbases(ctx, txHashes, spendableHeight) + if err == nil { + cancel() + t.Fatalf("expected tx conf notification source error") + } + + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return &walletrpc.ConfirmationNotificationsResponse{}, nil + }, nil + } + + go func() { + time.Sleep(time.Microsecond * 200) + cancel() + }() + + // Ensure confirming coinbases returns an error if the provided context + // is cancelled. + err = mgr.confirmCoinbases(ctx, txHashes, spendableHeight) + if !errors.Is(err, ErrContextCancelled) { + t.Fatalf("expected a context cancellation error") + } + + // The context here needs to be recreated after the previous test. + ctx, cancel = context.WithCancel(context.Background()) + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return nil, fmt.Errorf("unable to confirm transactions") + }, nil + } + + // Ensure confirming coinbases returns an error if notification source + // cannot confirm transactions. + err = mgr.confirmCoinbases(ctx, txHashes, spendableHeight) + if !errors.Is(err, ErrTxConf) { + cancel() + t.Fatalf("expected tx confirmation error, got %v", err) + } + + txConfs := make([]*walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations, 0) + confA := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: hashA[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confA) + confB := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: hashB[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confB) + confC := walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: hashC[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confC) + + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return &walletrpc.ConfirmationNotificationsResponse{ + Confirmations: txConfs, + }, nil + }, nil + } + + // Ensure confirming coinbases returns without error if all expected + // tx confirmations are returned. + err = mgr.confirmCoinbases(ctx, txHashes, spendableHeight) + if err != nil { + cancel() + t.Fatalf("expected no tx confirmation errors, got %v", err) + } + + // generatePayoutTxDetails tests. + amt, _ = dcrutil.NewAmount(5) + mPmts = make(map[string][]*Payment) + pmtA = NewPayment(xID, zeroSource, amt, height, estMaturity) + mPmts[zeroSource.Coinbase] = []*Payment{pmtA} + pmtB = NewPayment(yID, randSource, amt, height, estMaturity) + mPmts[randSource.Coinbase] = []*Payment{pmtB} + + // Ensure generating payout tx details returns an error if fetching txOut + // information fails. + txC := &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return nil, fmt.Errorf("unable to fetch txOut") + }, + } + _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + if err == nil { + cancel() + t.Fatalf("expected a fetch txOut error") + } + + // Ensure generating payout tx details returns an error if the returned + // output is not a coinbase. + txC = &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: 50, + Value: 5, + Coinbase: false, + }, nil + }, + } + + _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + if !errors.Is(err, ErrCoinbase) { + cancel() + t.Fatalf("expected a coinbase txOut error") + } + + // Ensure generating payout tx details returns an error if the returned + // output is not spendable. + txC = &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: 0, + Value: 5, + Coinbase: true, + }, nil + }, + } + + _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + if !errors.Is(err, ErrCoinbase) { + cancel() + t.Fatalf("expected a spendable error") + } + + // Ensure generating payout tx details returns an error if an account + // referenced by a payment cannot be found. + unknownID := "abcd" + unknownIDCoinbase := chainhash.Hash{'u'} + pmtD = NewPayment(unknownID, randSource, amt, height, estMaturity) + mPmts[unknownIDCoinbase.String()] = []*Payment{pmtD} + txC = &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: 50, + Value: 5, + Coinbase: true, + }, nil + }, + } + + _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + if !errors.Is(err, ErrValueNotFound) { + cancel() + t.Fatalf("expected an account not found error") + } + + // Ensure generating payout tx details returns an error if the + // total input value is less than the total output value. + delete(mPmts, unknownIDCoinbase.String()) + txC = &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: 50, + Value: 1, + Coinbase: true, + }, nil + }, + } + + _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + if !errors.Is(err, ErrCreateTx) { + cancel() + t.Fatalf("expected an input output mismatch error") + } + + // Ensure generating payout tx details returns an error if the outputs of + // the transaction do not exhaust all remaining input value after rounding + // errors. + txC = &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: 50, + Value: 100, + Coinbase: true, + }, nil + }, + } + + _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + if !errors.Is(err, ErrCreateTx) { + cancel() + t.Fatalf("expected an unclaimed input value error, got %v", err) + } + + // Ensure generating payout tx details does not error with valid parameters. + txC = &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: 50, + Value: 5, + Coinbase: true, + }, nil + }, + } + + inputs, inputTxHashes, outputs, _, _, _, err := mgr.generatePayoutTxDetails(ctx, txC, mPmts) + if err != nil { + cancel() + t.Fatalf("unexpected payout tx details error, got %v", err) + } + + expectedTxHashes := 2 + if len(inputTxHashes) != expectedTxHashes { + cancel() + t.Fatalf("expected %d input tx hashes, got %d", + expectedTxHashes, len(inputTxHashes)) + } + + expectedInputs := 2 + if len(inputs) != expectedInputs { + cancel() + t.Fatalf("expected %d inputs, got %d", expectedInputs, len(inputs)) + } + + for _, hash := range inputTxHashes { + txHash := hash.String() + var match bool + for _, in := range inputs { + if in.Txid == txHash { + match = true + } + } + if !match { + cancel() + t.Fatalf("no input found for tx hash: %s", txHash) + } + } + + expectedOutputs := 2 + if len(outputs) != expectedOutputs { + cancel() + t.Fatalf("expected %d inputs, got %d", expectedOutputs, len(outputs)) + } + + for addr := range outputs { + var match bool + if addr == feeAddr || addr == xAddr || addr == yAddr { + match = true + } + if !match { + cancel() + t.Fatalf("no payment found for output destination: %s", addr) + } + } + + // payDividends tests. + height = uint32(10) + estMaturity = uint32(26) + amt, _ = dcrutil.NewAmount(5) + _, err = persistPayment(db, xID, zeroSource, amt, height, estMaturity) + if err != nil { + cancel() + t.Fatal(err) + } + _, err = persistPayment(db, yID, randSource, amt, height, estMaturity) + if err != nil { + cancel() + t.Fatal(err) + } + + // Ensure dividend payments returns no error if there are no mature + // payments to work with. + err = mgr.payDividends(ctx, estMaturity-1) + if err != nil { + cancel() + t.Fatal("expected no error since there are no mature payments") + } + + // Ensure dividend payment returns an error if the tx creator cannot be + // fetched. + mgr.cfg.FetchTxCreator = func() TxCreator { + return nil + } + + err = mgr.payDividends(ctx, estMaturity+1) + if !errors.Is(err, ErrDisconnected) { + cancel() + t.Fatalf("expected a nil tx creator error, got %v", err) + } + + // Ensure dividend payment returns an error if pruning orphaned payments + // fails. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{} + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return -1, fmt.Errorf("unable to confirm blocks") + } + + err = mgr.payDividends(ctx, estMaturity+1) + if err == nil { + cancel() + t.Fatalf("expected a prune orphan payments error") + } + + // Ensure dividend payment returns an error if generating payout tx details + // fails. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return nil, fmt.Errorf("unable to fetch txOut") + }, + } + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return 16, nil + } + + err = mgr.payDividends(ctx, estMaturity+1) + if !errors.Is(err, ErrTxOut) { + cancel() + t.Fatalf("expected a generate payout tx details error, got %v", err) + } + + // Ensure dividend payment returns an error if applying tx fees fails. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{} + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return -1, nil + } + + err = mgr.payDividends(ctx, estMaturity+1) + if !errors.Is(err, ErrTxIn) { + cancel() + t.Fatalf("expected an apply tx fee error, got %v", err) + } + + // Ensure dividend payment returns an error if confirming a coinbase fails. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + } + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return nil, fmt.Errorf("unable to fetch tx conf notification source") + } + + err = mgr.payDividends(ctx, estMaturity+1) + if err == nil { + cancel() + t.Fatalf("expected a coinbase confirmation error, got %v", err) + } + + // Ensure dividend payment returns an error if the payout transaction cannot + // be created. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return nil, fmt.Errorf("unable to create raw transactions") + }, + } + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + + txConfs = make([]*walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations, 0) + confA = walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: zeroHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confA) + confB = walletrpc.ConfirmationNotificationsResponse_TransactionConfirmations{ + TxHash: randHash[:], + Confirmations: 50, + BlockHash: []byte(zeroSource.BlockHash), + BlockHeight: 60, + } + txConfs = append(txConfs, &confB) + + mgr.cfg.CoinbaseConfTimeout = time.Millisecond * 500 + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return &walletrpc.ConfirmationNotificationsResponse{ + Confirmations: txConfs, + }, nil + }, nil + } + + err = mgr.payDividends(ctx, estMaturity+1) + if err == nil { + cancel() + t.Fatal("expected a create transaction error") + } + + // Ensure dividend payment returns an error if the tx broadcaster cannot be + // fetched. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return &wire.MsgTx{}, nil + }, + } + } + mgr.cfg.GetBlockConfirmations = func(ctx context.Context, bh *chainhash.Hash) (int64, error) { + return int64(estMaturity) + 1, nil + } + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return nil + } + mgr.cfg.WalletPass = "123" + mgr.cfg.GetTxConfNotifications = func([]*chainhash.Hash, int32) (func() (*walletrpc.ConfirmationNotificationsResponse, error), error) { + return func() (*walletrpc.ConfirmationNotificationsResponse, error) { + return &walletrpc.ConfirmationNotificationsResponse{ + Confirmations: txConfs, + }, nil + }, nil + } + + err = mgr.payDividends(ctx, estMaturity+1) + if !errors.Is(err, ErrDisconnected) { + cancel() + t.Fatalf("expected a fetch tx broadcaster error, got %v", err) + } + + // Ensure dividend payment returns an error if the payout transaction cannot + // be signed. + mgr.cfg.FetchTxCreator = func() TxCreator { + return &txCreatorImpl{ + getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { + return &chainjson.GetTxOutResult{ + BestBlock: chainhash.Hash{0}.String(), + Confirmations: int64(estMaturity) + 1, + Value: 5, + Coinbase: true, + }, nil + }, + createRawTransaction: func(ctx context.Context, inputs []chainjson.TransactionInput, amounts map[dcrutil.Address]dcrutil.Amount, lockTime *int64, expiry *int64) (*wire.MsgTx, error) { + return &wire.MsgTx{}, nil + }, + } + } + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return nil, fmt.Errorf("unable to sign transaction") + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1) + if !errors.Is(err, ErrSignTx) { + cancel() + t.Fatalf("expected a signing error, got %v", err) + } + + // Ensure dividend payment returns an error if the payout transaction + // cannot be published. + txBytes := []byte("01000000018e17619f0d627c2769ee3f957582691aea59c2" + + "e79cc45b8ba1f08485dd88d75c0300000001ffffffff017a64e43703000000" + + "00001976a914978fa305bd66f63f0de847338bb56ff65fa8e27288ac000000" + + "000000000001f46ce43703000000846c0700030000006b483045022100d668" + + "5812801db991b72e80863eba7058466dfebb4aba0af75ab47bade177325102" + + "205f466fc47435c1a177482e527ff0e76f3c2c613940b358e57f0f0d78d5f2" + + "ffcb012102d040a4c34ae65a2b87ea8e9df7413e6504e5f27c6bde019a78ee" + + "96145b27c517") + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return nil, fmt.Errorf("unable to publish transaction") + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1) + if !errors.Is(err, ErrPublishTx) { + cancel() + t.Fatalf("expected a publish error, got %v", err) + } + + // Ensure paying dividend payment succeeds with valid inputs. + txHash, _ := hex.DecodeString("013264da8cc53f70022dc2b5654ebefc9ecfed24ea18dfcfc9adca5642d4fe66") + mgr.cfg.FetchTxBroadcaster = func() TxBroadcaster { + return &txBroadcasterImpl{ + signTransaction: func(ctx context.Context, req *walletrpc.SignTransactionRequest, options ...grpc.CallOption) (*walletrpc.SignTransactionResponse, error) { + return &walletrpc.SignTransactionResponse{ + Transaction: txBytes, + }, nil + }, + publishTransaction: func(ctx context.Context, req *walletrpc.PublishTransactionRequest, options ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) { + return &walletrpc.PublishTransactionResponse{ + TransactionHash: txHash, + }, nil + }, + } + } + + err = mgr.payDividends(ctx, estMaturity+1) + if err != nil { + cancel() + t.Fatalf("unexpected dividend payment error, got %v", err) + } + + // Reset backed up values to their defaults. + mgr.setLastPaymentHeight(0) + mgr.setLastPaymentPaidOn(0) + mgr.setLastPaymentCreatedOn(0) + err = db.Update(func(tx *bolt.Tx) error { + err := mgr.persistLastPaymentHeight(tx) + if err != nil { + return fmt.Errorf("unable to persist default last "+ + "payment height: %v", err) + } + err = mgr.persistLastPaymentPaidOn(tx) + if err != nil { + return fmt.Errorf("unable to persist default last "+ + "payment paid on: %v", err) + } + err = mgr.persistLastPaymentCreatedOn(tx) + if err != nil { + return fmt.Errorf("unable to persist default last "+ + "payment created on: %v", err) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + + cancel() } From b4e9a8a1e832bcaeff9480c33cd2acf14eb8b6c9 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Tue, 6 Oct 2020 12:10:43 +0000 Subject: [PATCH 4/8] multi: add --conftimeout config option. This adds the --conftimeout config option to allow for custom tx confirmation timeouts. --- config.go | 3 +++ dcrpool.go | 1 + pool/hub.go | 2 ++ 3 files changed, 6 insertions(+) diff --git a/config.go b/config.go index 66f9cea2..33495d35 100644 --- a/config.go +++ b/config.go @@ -55,6 +55,7 @@ const ( defaultDesignation = "YourPoolNameHere" defaultMaxConnectionsPerHost = 100 // 100 connected clients per host defaultWalletAccount = 0 + defaultCoinbaseConfTimeout = time.Minute * 5 // one block time ) var ( @@ -114,6 +115,7 @@ type config struct { DR5Port uint32 `long:"dr5port" ini-name:"dr5port" description:"Antminer DR5 connection port."` D1Port uint32 `long:"d1port" ini-name:"d1port" description:"Whatsminer D1 connection port."` DCR1Port uint32 `long:"dcr1port" ini-name:"dcr1port" description:"Obelisk DCR1 connection port."` + CoinbaseConfTimeout time.Duration `long:"conftimeout" ini-name:"conftimeout" description:"The duration to wait for coinbase confirmations."` poolFeeAddrs []dcrutil.Address dcrdRPCCerts []byte net *params @@ -345,6 +347,7 @@ func loadConfig() (*config, []string, error) { D1Port: defaultD1Port, DCR1Port: defaultDCR1Port, WalletAccount: defaultWalletAccount, + CoinbaseConfTimeout: defaultCoinbaseConfTimeout, } // Service options which are only added on Windows. diff --git a/dcrpool.go b/dcrpool.go index 21c19cf4..747ab766 100644 --- a/dcrpool.go +++ b/dcrpool.go @@ -107,6 +107,7 @@ func newPool(cfg *config) (*miningPool, error) { MinerPorts: minerPorts, MaxConnectionsPerHost: cfg.MaxConnectionsPerHost, WalletAccount: cfg.WalletAccount, + CoinbaseConfTimeout: cfg.CoinbaseConfTimeout, } p.hub, err = pool.NewHub(p.cancel, hcfg) if err != nil { diff --git a/pool/hub.go b/pool/hub.go index 3c79b4bb..256cb6c3 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -135,6 +135,7 @@ type HubConfig struct { MinerPorts map[string]uint32 MaxConnectionsPerHost uint32 WalletAccount uint32 + CoinbaseConfTimeout time.Duration } // Hub maintains the set of active clients and facilitates message broadcasting @@ -248,6 +249,7 @@ func NewHub(cancel context.CancelFunc, hcfg *HubConfig) (*Hub, error) { GetTxConfNotifications: h.getTxConfNotifications, FetchTxCreator: func() TxCreator { return h.nodeConn }, FetchTxBroadcaster: func() TxBroadcaster { return h.walletConn }, + CoinbaseConfTimeout: h.cfg.CoinbaseConfTimeout, } var err error From b310c0d95f8fb7b079beb19c407b20c6acba92d5 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Tue, 6 Oct 2020 17:17:29 +0000 Subject: [PATCH 5/8] pool: resolve review issues (1 of x). --- pool/error.go | 3 + pool/error_test.go | 1 + pool/paymentmgr.go | 226 ++++++++++++++++++++++------------------ pool/paymentmgr_test.go | 77 +++++++------- 4 files changed, 162 insertions(+), 145 deletions(-) diff --git a/pool/error.go b/pool/error.go index ba218903..6c587179 100644 --- a/pool/error.go +++ b/pool/error.go @@ -36,6 +36,9 @@ const ( // ErrDeleteEntry indicates a database entry delete error. ErrDeleteEntry = ErrorKind("ErrDeleteEntry") + // ErrFetchEntry indicates a database entry fetching error. + ErrFetchEntry = ErrorKind("ErrFetchEntry") + // ErrBackup indicates database backup error. ErrBackup = ErrorKind("ErrBackup") diff --git a/pool/error_test.go b/pool/error_test.go index f119abc3..cf2f676e 100644 --- a/pool/error_test.go +++ b/pool/error_test.go @@ -22,6 +22,7 @@ func TestErrorKindStringer(t *testing.T) { {ErrDBUpgrade, "ErrDBUpgrade"}, {ErrPersistEntry, "ErrPersistEntry"}, {ErrDeleteEntry, "ErrDeleteEntry"}, + {ErrFetchEntry, "ErrFetchEntry"}, {ErrBackup, "ErrBackup"}, {ErrParse, "ErrParse"}, {ErrDecode, "ErrDecode"}, diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 8dc13b88..189ff014 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -110,20 +110,68 @@ type PaymentMgr struct { // NewPaymentMgr creates a new payment manager. func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { + funcName := "newPaymentManager" + pm := &PaymentMgr{ cfg: pCfg, } rand.Seed(time.Now().UnixNano()) + err := pm.cfg.DB.Update(func(tx *bolt.Tx) error { - err := pm.loadLastPaymentHeight(tx) + pbkt, err := fetchPoolBucket(tx) if err != nil { return err } - err = pm.loadLastPaymentPaidOn(tx) - if err != nil { - return err + + // Initialize the last payment paid-on time. + lastPaymentPaidOnB := pbkt.Get(lastPaymentPaidOn) + if lastPaymentPaidOnB == nil { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, 0) + err := pbkt.Put(lastPaymentPaidOn, b) + if err != nil { + desc := fmt.Sprintf("%s: unable to load last payment "+ + "paid-on time: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + pm.setLastPaymentPaidOn(0) + } else { + pm.setLastPaymentPaidOn(bigEndianBytesToNano(lastPaymentPaidOnB)) + } + + // Initialize the last payment height. + lastPaymentHeightB := pbkt.Get(lastPaymentHeight) + if lastPaymentHeightB == nil { + b := make([]byte, 4) + binary.LittleEndian.PutUint32(b, 0) + err := pbkt.Put(lastPaymentHeight, b) + if err != nil { + desc := fmt.Sprintf("%s: unable to load last payment "+ + "height: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + pm.setLastPaymentHeight(0) + } else { + pm.setLastPaymentHeight(binary.LittleEndian.Uint32(lastPaymentHeightB)) + } + + // Initialize the last payment created-on time. + lastPaymentCreatedOnB := pbkt.Get(lastPaymentCreatedOn) + if lastPaymentCreatedOnB == nil { + b := make([]byte, 8) + binary.LittleEndian.PutUint64(b, 0) + err := pbkt.Put(lastPaymentCreatedOn, b) + if err != nil { + desc := fmt.Sprintf("%s: unable to load last payment "+ + "created-on time: %v", funcName, err) + return dbError(ErrPersistEntry, desc) + } + pm.setLastPaymentCreatedOn(0) + } else { + pm.setLastPaymentCreatedOn(bigEndianBytesToNano(lastPaymentCreatedOnB)) } - return pm.loadLastPaymentCreatedOn(tx) + + return nil }) if err != nil { return nil, err @@ -131,6 +179,18 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { return pm, nil } +// fetchPoolBucket is a helper function for getting the pool bucket. +func fetchPoolBucket(tx *bolt.Tx) (*bolt.Bucket, error) { + funcName := "fetchPoolBucket" + pbkt := tx.Bucket(poolBkt) + if pbkt == nil { + desc := fmt.Sprintf("%s: bucket %s not found", funcName, + string(poolBkt)) + return nil, dbError(ErrBucketNotFound, desc) + } + return pbkt, nil +} + // setLastPaymentHeight updates the last payment height. func (pm *PaymentMgr) setLastPaymentHeight(height uint32) { atomic.StoreUint32(&pm.lastPaymentHeight, height) @@ -144,16 +204,14 @@ func (pm *PaymentMgr) fetchLastPaymentHeight() uint32 { // persistLastPaymentHeight saves the last payment height to the db. func (pm *PaymentMgr) persistLastPaymentHeight(tx *bolt.Tx) error { funcName := "persistLastPaymentHeight" - pbkt := tx.Bucket(poolBkt) - if pbkt == nil { - desc := fmt.Sprintf("%s: bucket %s not found", funcName, - string(poolBkt)) - return dbError(ErrBucketNotFound, desc) + pbkt, err := fetchPoolBucket(tx) + if err != nil { + return err } height := atomic.LoadUint32(&pm.lastPaymentHeight) b := make([]byte, 4) binary.LittleEndian.PutUint32(b, height) - err := pbkt.Put(lastPaymentHeight, b) + err = pbkt.Put(lastPaymentHeight, b) if err != nil { desc := fmt.Sprintf("%s: unable to persist last payment height: %v", funcName, err) @@ -165,24 +223,14 @@ func (pm *PaymentMgr) persistLastPaymentHeight(tx *bolt.Tx) error { // loadLastPaymentHeight fetches the last payment height from the db. func (pm *PaymentMgr) loadLastPaymentHeight(tx *bolt.Tx) error { funcName := "loadLastPaymentHeight" - pbkt := tx.Bucket(poolBkt) - if pbkt == nil { - desc := fmt.Sprintf("%s: bucket %s not found", funcName, - string(poolBkt)) - return dbError(ErrBucketNotFound, desc) + pbkt, err := fetchPoolBucket(tx) + if err != nil { + return err } lastPaymentHeightB := pbkt.Get(lastPaymentHeight) if lastPaymentHeightB == nil { - pm.setLastPaymentHeight(0) - b := make([]byte, 4) - binary.LittleEndian.PutUint32(b, 0) - err := pbkt.Put(lastPaymentHeight, b) - if err != nil { - desc := fmt.Sprintf("%s: unable to load last payment "+ - "height: %v", funcName, err) - return dbError(ErrPersistEntry, desc) - } - return nil + desc := fmt.Sprintf("%s: last payment height not initialized", funcName) + return dbError(ErrFetchEntry, desc) } pm.setLastPaymentHeight(binary.LittleEndian.Uint32(lastPaymentHeightB)) return nil @@ -201,13 +249,11 @@ func (pm *PaymentMgr) fetchLastPaymentPaidOn() uint64 { // persistLastPaymentPaidOn saves the last payment paid on time to the db. func (pm *PaymentMgr) persistLastPaymentPaidOn(tx *bolt.Tx) error { funcName := "persistLastPaymentPaidOn" - pbkt := tx.Bucket(poolBkt) - if pbkt == nil { - desc := fmt.Sprintf("%s: bucket %s not found", funcName, - string(poolBkt)) - return dbError(ErrBucketNotFound, desc) + pbkt, err := fetchPoolBucket(tx) + if err != nil { + return err } - err := pbkt.Put(lastPaymentPaidOn, + err = pbkt.Put(lastPaymentPaidOn, nanoToBigEndianBytes(int64(pm.lastPaymentPaidOn))) if err != nil { desc := fmt.Sprintf("%s: unable to persist last payment "+ @@ -248,18 +294,6 @@ func (pm *PaymentMgr) pruneShares(tx *bolt.Tx, minNano int64) error { return nil } -// fetchPoolBucket is a helper function for getting the pool bucket. -func fetchPoolBucket(tx *bolt.Tx) (*bolt.Bucket, error) { - funcName := "fetchPoolBucket" - pbkt := tx.Bucket(poolBkt) - if pbkt == nil { - desc := fmt.Sprintf("%s: bucket %s not found", funcName, - string(poolBkt)) - return nil, dbError(ErrBucketNotFound, desc) - } - return pbkt, nil -} - // bigEndianBytesToNano returns nanosecond time from the provided // big endian bytes. func bigEndianBytesToNano(b []byte) uint64 { @@ -275,16 +309,8 @@ func (pm *PaymentMgr) loadLastPaymentPaidOn(tx *bolt.Tx) error { } lastPaymentPaidOnB := pbkt.Get(lastPaymentPaidOn) if lastPaymentPaidOnB == nil { - pm.setLastPaymentPaidOn(0) - b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, 0) - err := pbkt.Put(lastPaymentPaidOn, b) - if err != nil { - desc := fmt.Sprintf("%s: unable to load last payment "+ - "paid-on time: %v", funcName, err) - return dbError(ErrPersistEntry, desc) - } - return nil + desc := fmt.Sprintf("%s: last payment paid-on not initialized", funcName) + return dbError(ErrFetchEntry, desc) } pm.setLastPaymentPaidOn(bigEndianBytesToNano(lastPaymentPaidOnB)) return nil @@ -317,7 +343,7 @@ func (pm *PaymentMgr) persistLastPaymentCreatedOn(tx *bolt.Tx) error { return nil } -// loadLastPaymentCreaedOn fetches the last payment created on time from the db. +// loadLastPaymentCreatedOn fetches the last payment created on time from the db. func (pm *PaymentMgr) loadLastPaymentCreatedOn(tx *bolt.Tx) error { funcName := "loadLastPaymentCreatedOn" pbkt, err := fetchPoolBucket(tx) @@ -326,16 +352,9 @@ func (pm *PaymentMgr) loadLastPaymentCreatedOn(tx *bolt.Tx) error { } lastPaymentCreatedOnB := pbkt.Get(lastPaymentCreatedOn) if lastPaymentCreatedOnB == nil { - pm.setLastPaymentCreatedOn(0) - b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, 0) - err := pbkt.Put(lastPaymentCreatedOn, b) - if err != nil { - desc := fmt.Sprintf("%s: unable to load last payment "+ - "created-on time: %v", funcName, err) - return dbError(ErrPersistEntry, desc) - } - return nil + desc := fmt.Sprintf("%s: last payment created-on not initialized", + funcName) + return dbError(ErrFetchEntry, desc) } pm.setLastPaymentCreatedOn(bigEndianBytesToNano(lastPaymentCreatedOnB)) return nil @@ -829,7 +848,7 @@ func (pm *PaymentMgr) pruneOrphanedPayments(ctx context.Context, pmts map[string return pmts, nil } -// applyTxFees determines the trasaction fees needed for the payout transaction +// applyTxFees determines the transaction fees needed for the payout transaction // and deducts portions of the fee from outputs of participating accounts // being paid to. // @@ -837,7 +856,7 @@ func (pm *PaymentMgr) pruneOrphanedPayments(ctx context.Context, pmts map[string // the ratio of the amount being paid to the total transaction output minus // pool fees. func (pm *PaymentMgr) applyTxFees(inputs []chainjson.TransactionInput, outputs map[string]dcrutil.Amount, - tIn dcrutil.Amount, tOut dcrutil.Amount, feeAddr dcrutil.Address) (dcrutil.Amount, dcrutil.Amount, error) { + tOut dcrutil.Amount, feeAddr dcrutil.Address) (dcrutil.Amount, dcrutil.Amount, error) { funcName := "applyTxFees" if len(inputs) == 0 { desc := fmt.Sprint("%s: cannot create a payout transaction "+ @@ -902,7 +921,6 @@ txConfs: for { select { case <-ctx.Done(): - log.Debugf("existing txConfs") break txConfs default: @@ -921,7 +939,7 @@ txConfs: if coinbase.Confirmations >= maxSpendableConfs { hash, err := chainhash.NewHash(coinbase.TxHash) if err != nil { - desc := fmt.Sprintf("%s: unable to create block hash: %v", + desc := fmt.Sprintf("%s: unable to create tx hash: %v", funcName, err) return poolError(ErrCreateHash, desc) } @@ -938,9 +956,8 @@ txConfs: } if len(txHashes) != 0 { - log.Debugf("txHashes are %d", len(txHashes)) - desc := fmt.Sprintf("%s: unable to confirm %d coinbase "+ - "trasaction(s)", funcName, len(txHashes)) + desc := fmt.Sprintf("%s: cancelled confirming %d coinbase "+ + "transaction(s)", funcName, len(txHashes)) return poolError(ErrContextCancelled, desc) } @@ -949,18 +966,20 @@ txConfs: // generatePayoutTxDetails creates the payout transaction inputs and outputs // from the provided payments -func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, payments map[string][]*Payment) ([]chainjson.TransactionInput, - map[string]*chainhash.Hash, map[string]dcrutil.Amount, dcrutil.Address, dcrutil.Amount, dcrutil.Amount, error) { +func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator, feeAddr dcrutil.Address, payments map[string][]*Payment, treasuryActive bool) ([]chainjson.TransactionInput, + map[string]*chainhash.Hash, map[string]dcrutil.Amount, dcrutil.Amount, error) { funcName := "generatePayoutTxDetails" - // The fee address is being picked at random from the set of pool fee - // addresses to make it difficult for third-parties wanting to track - // pool fees collected by the pool and ultimately determine the - // cumulative value accrued by pool operators. - feeAddr := pm.cfg.PoolFeeAddrs[rand.Intn(len(pm.cfg.PoolFeeAddrs))] + // The coinbase output prior to + // [DCP0006](https://github.com/decred/dcps/pull/17) + // activation is at the third index position and at + // the second index position once DCP0006 is activated. + coinbaseIndex := uint32(1) + if !treasuryActive { + coinbaseIndex = 2 + } var tIn, tOut dcrutil.Amount - coinbaseIndex := uint32(2) inputs := make([]chainjson.TransactionInput, 0) inputTxHashes := make(map[string]*chainhash.Hash) outputs := make(map[string]dcrutil.Amount) @@ -970,28 +989,22 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator if err != nil { desc := fmt.Sprintf("%s: unable to create tx hash: %v", funcName, err) - return nil, nil, nil, nil, 0, 0, poolError(ErrCreateHash, desc) + return nil, nil, nil, 0, poolError(ErrCreateHash, desc) } - // Ensure the referenced prevout to be spent is a coinbase and - // spendable at the current height. + // Ensure the referenced prevout to be spent is spendable at + // the current height. txOutResult, err := txC.GetTxOut(ctx, txHash, coinbaseIndex, false) if err != nil { desc := fmt.Sprintf("%s: unable to find tx output: %v", funcName, err) - return nil, nil, nil, nil, 0, 0, poolError(ErrTxOut, desc) - } - if !txOutResult.Coinbase { - desc := fmt.Sprintf("%s: referenced output at index %d "+ - "for tx %v is not a coinbase", - funcName, coinbaseIndex, txHash.String()) - return nil, nil, nil, nil, 0, 0, poolError(ErrCoinbase, desc) + return nil, nil, nil, 0, poolError(ErrTxOut, desc) } if txOutResult.Confirmations < int64(pm.cfg.ActiveNet.CoinbaseMaturity+1) { desc := fmt.Sprintf("%s: referenced coinbase at "+ "index %d for tx %v is not spendable", funcName, coinbaseIndex, txHash.String()) - return nil, nil, nil, nil, 0, 0, poolError(ErrCoinbase, desc) + return nil, nil, nil, 0, poolError(ErrCoinbase, desc) } // Create the transaction input using the provided prevOut. @@ -1006,12 +1019,13 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator prevOutV, err := dcrutil.NewAmount(in.Amount) if err != nil { - desc := fmt.Sprintf("unable create the input amount: %v", err) - return nil, nil, nil, nil, 0, 0, poolError(ErrCreateAmount, desc) + desc := fmt.Sprintf("%s: unable create the input amount: %v", + funcName, err) + return nil, nil, nil, 0, poolError(ErrCreateAmount, desc) } tIn += prevOutV - // Generate the outputs paying dividends to as well as pool fees. + // Generate the outputs paying dividends as well as pool fees. for _, pmt := range pmtSet { if pmt.Account == PoolFeesK { _, ok := outputs[feeAddr.String()] @@ -1027,7 +1041,7 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator acc, err := FetchAccount(pm.cfg.DB, []byte(pmt.Account)) if err != nil { - return nil, nil, nil, nil, 0, 0, err + return nil, nil, nil, 0, err } _, ok := outputs[acc.Address] if !ok { @@ -1047,7 +1061,7 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator desc := fmt.Sprintf("%s: total output values for the "+ "transaction (%s) is greater than the provided inputs (%s)", funcName, tOut, tIn) - return nil, nil, nil, nil, 0, 0, poolError(ErrCreateTx, desc) + return nil, nil, nil, 0, poolError(ErrCreateTx, desc) } diff := tIn - tOut @@ -1055,14 +1069,14 @@ func (pm *PaymentMgr) generatePayoutTxDetails(ctx context.Context, txC TxCreator desc := fmt.Sprintf("%s: difference between total output "+ "values and the provided inputs (%s) exceeds the maximum "+ "allowed for rounding errors (%s)", funcName, diff, maxRoundingDiff) - return nil, nil, nil, nil, 0, 0, poolError(ErrCreateTx, desc) + return nil, nil, nil, 0, poolError(ErrCreateTx, desc) } - return inputs, inputTxHashes, outputs, feeAddr, tIn, tOut, nil + return inputs, inputTxHashes, outputs, tOut, nil } // PayDividends pays mature mining rewards to participating accounts. -func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32) error { +func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32, treasuryActive bool) error { funcName := "payDividends" mPmts, err := pm.maturePendingPayments(height) if err != nil { @@ -1088,13 +1102,19 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32) error { return err } - inputs, inputTxHashes, outputs, feeAddr, tIn, tOut, err := - pm.generatePayoutTxDetails(ctx, txC, pmts) + // The fee address is being picked at random from the set of pool fee + // addresses to make it difficult for third-parties wanting to track + // pool fees collected by the pool and ultimately determine the + // cumulative value accrued by pool operators. + feeAddr := pm.cfg.PoolFeeAddrs[rand.Intn(len(pm.cfg.PoolFeeAddrs))] + + inputs, inputTxHashes, outputs, tOut, err := + pm.generatePayoutTxDetails(ctx, txC, feeAddr, pmts, treasuryActive) if err != nil { return err } - _, estFee, err := pm.applyTxFees(inputs, outputs, tIn, tOut, feeAddr) + _, estFee, err := pm.applyTxFees(inputs, outputs, tOut, feeAddr) if err != nil { return err } @@ -1128,7 +1148,7 @@ func (pm *PaymentMgr) payDividends(ctx context.Context, height uint32) error { defer tCancel() err = pm.confirmCoinbases(tCtx, inputTxHashes, maxSpendableHeight) if err != nil { - // Do not error if coinbase spendable confirmatiom requests are + // Do not error if coinbase spendable confirmation requests are // terminated by the context cancellation. if !errors.Is(err, ErrContextCancelled) { return err diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index d244bfdb..c4471099 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -537,8 +537,8 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { t.Fatal(err) } - // Ensure backed up values to the database persist and load as expected. - err = db.Update(func(tx *bolt.Tx) error { + // Ensure backed up values to the database load as expected. + err = db.View(func(tx *bolt.Tx) error { err = mgr.loadLastPaymentHeight(tx) if err != nil { return fmt.Errorf("unable to load last payment height: %v", err) @@ -547,7 +547,11 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { if err != nil { return fmt.Errorf("unable to load last payment created on: %v", err) } - return mgr.loadLastPaymentPaidOn(tx) + err = mgr.loadLastPaymentPaidOn(tx) + if err != nil { + return fmt.Errorf("unable to load last payment paid on: %v", err) + } + return nil }) if err != nil { t.Fatal(err) @@ -1080,7 +1084,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { tOut := tIn _, txFee, err := mgr.applyTxFees([]chainjson.TransactionInput{in}, - out, tIn, tOut, poolFeeAddrs) + out, tOut, poolFeeAddrs) if err != nil { t.Fatalf("unexpected applyTxFees error: %v", err) } @@ -1104,14 +1108,14 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { // Ensure providing no tx inputs triggers an error. _, _, err = mgr.applyTxFees([]chainjson.TransactionInput{}, - out, tIn, tOut, poolFeeAddrs) + out, tOut, poolFeeAddrs) if !errors.Is(err, ErrTxIn) { t.Fatalf("expected a tx input error: %v", err) } // Ensure providing no tx outputs triggers an error. _, _, err = mgr.applyTxFees([]chainjson.TransactionInput{in}, - make(map[string]dcrutil.Amount), tIn, tOut, poolFeeAddrs) + make(map[string]dcrutil.Amount), tOut, poolFeeAddrs) if !errors.Is(err, ErrTxOut) { cancel() t.Fatalf("expected a tx output error: %v", err) @@ -1219,6 +1223,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { mPmts[zeroSource.Coinbase] = []*Payment{pmtA} pmtB = NewPayment(yID, randSource, amt, height, estMaturity) mPmts[randSource.Coinbase] = []*Payment{pmtB} + treasuryActive := true // Ensure generating payout tx details returns an error if fetching txOut // information fails. @@ -1227,31 +1232,13 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { return nil, fmt.Errorf("unable to fetch txOut") }, } - _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, poolFeeAddrs, + mPmts, treasuryActive) if err == nil { cancel() t.Fatalf("expected a fetch txOut error") } - // Ensure generating payout tx details returns an error if the returned - // output is not a coinbase. - txC = &txCreatorImpl{ - getTxOut: func(ctx context.Context, txHash *chainhash.Hash, index uint32, mempool bool) (*chainjson.GetTxOutResult, error) { - return &chainjson.GetTxOutResult{ - BestBlock: chainhash.Hash{0}.String(), - Confirmations: 50, - Value: 5, - Coinbase: false, - }, nil - }, - } - - _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) - if !errors.Is(err, ErrCoinbase) { - cancel() - t.Fatalf("expected a coinbase txOut error") - } - // Ensure generating payout tx details returns an error if the returned // output is not spendable. txC = &txCreatorImpl{ @@ -1265,7 +1252,8 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { }, } - _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, poolFeeAddrs, + mPmts, treasuryActive) if !errors.Is(err, ErrCoinbase) { cancel() t.Fatalf("expected a spendable error") @@ -1288,7 +1276,8 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { }, } - _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, poolFeeAddrs, + mPmts, treasuryActive) if !errors.Is(err, ErrValueNotFound) { cancel() t.Fatalf("expected an account not found error") @@ -1308,7 +1297,8 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { }, } - _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, poolFeeAddrs, + mPmts, treasuryActive) if !errors.Is(err, ErrCreateTx) { cancel() t.Fatalf("expected an input output mismatch error") @@ -1328,7 +1318,8 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { }, } - _, _, _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, mPmts) + _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, poolFeeAddrs, + mPmts, treasuryActive) if !errors.Is(err, ErrCreateTx) { cancel() t.Fatalf("expected an unclaimed input value error, got %v", err) @@ -1346,7 +1337,9 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { }, } - inputs, inputTxHashes, outputs, _, _, _, err := mgr.generatePayoutTxDetails(ctx, txC, mPmts) + inputs, inputTxHashes, outputs, _, err := mgr.generatePayoutTxDetails(ctx, + txC, poolFeeAddrs, + mPmts, treasuryActive) if err != nil { cancel() t.Fatalf("unexpected payout tx details error, got %v", err) @@ -1413,7 +1406,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { // Ensure dividend payments returns no error if there are no mature // payments to work with. - err = mgr.payDividends(ctx, estMaturity-1) + err = mgr.payDividends(ctx, estMaturity-1, treasuryActive) if err != nil { cancel() t.Fatal("expected no error since there are no mature payments") @@ -1425,7 +1418,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { return nil } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, ErrDisconnected) { cancel() t.Fatalf("expected a nil tx creator error, got %v", err) @@ -1440,7 +1433,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { return -1, fmt.Errorf("unable to confirm blocks") } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if err == nil { cancel() t.Fatalf("expected a prune orphan payments error") @@ -1459,7 +1452,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { return 16, nil } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, ErrTxOut) { cancel() t.Fatalf("expected a generate payout tx details error, got %v", err) @@ -1473,7 +1466,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { return -1, nil } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, ErrTxIn) { cancel() t.Fatalf("expected an apply tx fee error, got %v", err) @@ -1499,7 +1492,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { return nil, fmt.Errorf("unable to fetch tx conf notification source") } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if err == nil { cancel() t.Fatalf("expected a coinbase confirmation error, got %v", err) @@ -1551,7 +1544,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { }, nil } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if err == nil { cancel() t.Fatal("expected a create transaction error") @@ -1589,7 +1582,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { }, nil } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, ErrDisconnected) { cancel() t.Fatalf("expected a fetch tx broadcaster error, got %v", err) @@ -1620,7 +1613,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { } } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, ErrSignTx) { cancel() t.Fatalf("expected a signing error, got %v", err) @@ -1649,7 +1642,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { } } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if !errors.Is(err, ErrPublishTx) { cancel() t.Fatalf("expected a publish error, got %v", err) @@ -1672,7 +1665,7 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { } } - err = mgr.payDividends(ctx, estMaturity+1) + err = mgr.payDividends(ctx, estMaturity+1, treasuryActive) if err != nil { cancel() t.Fatalf("unexpected dividend payment error, got %v", err) From 9828e9963713fa7005b067d6790c55f1784205e0 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Tue, 6 Oct 2020 17:18:05 +0000 Subject: [PATCH 6/8] pool: cleanly terminate tx confirmation streams. This upates the tx confirmation streaming process to be non-blocking in order to terminate cleanly on context cancellation. --- pool/paymentmgr.go | 44 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index 189ff014..b268bb09 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -59,6 +59,12 @@ type TxBroadcaster interface { PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) } +// confNotifMsg represents a tx confirmation notification messege. +type confNotifMsg struct { + resp *walletrpc.ConfirmationNotificationsResponse + err error +} + // PaymentMgrConfig contains all of the configuration values which should be // provided when creating a new instance of PaymentMgr. type PaymentMgrConfig struct { @@ -896,6 +902,34 @@ func (pm *PaymentMgr) applyTxFees(inputs []chainjson.TransactionInput, outputs m return sansFees, estFee, nil } +// fetchTxConfNotifications is a helper function for fetching tx confirmation +// notifications without blocking. +func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrpc.ConfirmationNotificationsResponse, error)) (*walletrpc.ConfirmationNotificationsResponse, error) { + funcName := "fetchTxConfNotifications" + notifCh := make(chan confNotifMsg) + go func(ch chan confNotifMsg) { + resp, err := notifSource() + ch <- confNotifMsg{ + resp: resp, + err: err, + } + }(notifCh) + + select { + case <-ctx.Done(): + log.Tracef("%s: unable to fx tx confirmation notifications", funcName) + return nil, ErrContextCancelled + case notif := <-notifCh: + close(notifCh) + if notif.err != nil { + desc := fmt.Sprintf("%s: unable to fetch tx confirmation "+ + "notifications, %s", funcName, notif.err) + return nil, poolError(ErrTxConf, desc) + } + return notif.resp, nil + } +} + // confirmCoinbases ensures the coinbases referenced by the provided // transaction hashes are spendable by the expected maximum spendable height. // @@ -927,11 +961,13 @@ txConfs: // Non-blocking receive fallthrough. } - resp, err := notifSource() + resp, err := fetchTxConfNotifications(ctx, notifSource) if err != nil { - desc := fmt.Sprintf("%s: unable to fetch tx confirmations: %v", - funcName, err) - return poolError(ErrTxConf, desc) + if errors.Is(err, ErrContextCancelled) { + // Terminate the tx confirmation process. + continue + } + return err } // Ensure all coinbases being spent are spendable. From b193e246d433fb001cd5e87dd88a2825f9210b25 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Wed, 7 Oct 2020 13:47:40 +0000 Subject: [PATCH 7/8] pool: resolve review issues (2 of x). --- pool/paymentmgr.go | 39 +++++++++++++-------------------------- 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/pool/paymentmgr.go b/pool/paymentmgr.go index b268bb09..b11dba56 100644 --- a/pool/paymentmgr.go +++ b/pool/paymentmgr.go @@ -59,7 +59,7 @@ type TxBroadcaster interface { PublishTransaction(context.Context, *walletrpc.PublishTransactionRequest, ...grpc.CallOption) (*walletrpc.PublishTransactionResponse, error) } -// confNotifMsg represents a tx confirmation notification messege. +// confNotifMsg represents a tx confirmation notification message. type confNotifMsg struct { resp *walletrpc.ConfirmationNotificationsResponse err error @@ -136,7 +136,7 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { binary.LittleEndian.PutUint64(b, 0) err := pbkt.Put(lastPaymentPaidOn, b) if err != nil { - desc := fmt.Sprintf("%s: unable to load last payment "+ + desc := fmt.Sprintf("%s: unable to persist last payment "+ "paid-on time: %v", funcName, err) return dbError(ErrPersistEntry, desc) } @@ -152,7 +152,7 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { binary.LittleEndian.PutUint32(b, 0) err := pbkt.Put(lastPaymentHeight, b) if err != nil { - desc := fmt.Sprintf("%s: unable to load last payment "+ + desc := fmt.Sprintf("%s: unable to persist last payment "+ "height: %v", funcName, err) return dbError(ErrPersistEntry, desc) } @@ -168,7 +168,7 @@ func NewPaymentMgr(pCfg *PaymentMgrConfig) (*PaymentMgr, error) { binary.LittleEndian.PutUint64(b, 0) err := pbkt.Put(lastPaymentCreatedOn, b) if err != nil { - desc := fmt.Sprintf("%s: unable to load last payment "+ + desc := fmt.Sprintf("%s: unable to persist last payment "+ "created-on time: %v", funcName, err) return dbError(ErrPersistEntry, desc) } @@ -903,7 +903,9 @@ func (pm *PaymentMgr) applyTxFees(inputs []chainjson.TransactionInput, outputs m } // fetchTxConfNotifications is a helper function for fetching tx confirmation -// notifications without blocking. +// notifications. It will return when either a notification or error is +// received from the provided notification source, or when the provided +// context is cancelled. func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrpc.ConfirmationNotificationsResponse, error)) (*walletrpc.ConfirmationNotificationsResponse, error) { funcName := "fetchTxConfNotifications" notifCh := make(chan confNotifMsg) @@ -917,7 +919,8 @@ func fetchTxConfNotifications(ctx context.Context, notifSource func() (*walletrp select { case <-ctx.Done(): - log.Tracef("%s: unable to fx tx confirmation notifications", funcName) + log.Tracef("%s: unable to fetch tx confirmation notifications", + funcName) return nil, ErrContextCancelled case notif := <-notifCh: close(notifCh) @@ -951,21 +954,13 @@ func (pm *PaymentMgr) confirmCoinbases(ctx context.Context, txHashes map[string] // Wait for coinbase tx confirmations from the wallet. maxSpendableConfs := int32(pm.cfg.ActiveNet.CoinbaseMaturity) + 1 -txConfs: for { - select { - case <-ctx.Done(): - break txConfs - - default: - // Non-blocking receive fallthrough. - } - resp, err := fetchTxConfNotifications(ctx, notifSource) if err != nil { if errors.Is(err, ErrContextCancelled) { - // Terminate the tx confirmation process. - continue + desc := fmt.Sprintf("%s: cancelled confirming %d coinbase "+ + "transaction(s)", funcName, len(txHashes)) + return poolError(ErrContextCancelled, desc) } return err } @@ -987,17 +982,9 @@ txConfs: } if len(txHashes) == 0 { - break + return nil } } - - if len(txHashes) != 0 { - desc := fmt.Sprintf("%s: cancelled confirming %d coinbase "+ - "transaction(s)", funcName, len(txHashes)) - return poolError(ErrContextCancelled, desc) - } - - return nil } // generatePayoutTxDetails creates the payout transaction inputs and outputs From a9a04d7649c262fc685c0cc0a1802bcd9f3d97ef Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Fri, 9 Oct 2020 10:20:23 +0000 Subject: [PATCH 8/8] pool: resolve review issues (3 of x). --- pool/paymentmgr_test.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index c4471099..3fe81319 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -1063,9 +1063,9 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { } // applyTxFee tests. - tIn, _ := dcrutil.NewAmount(100) + outV, _ := dcrutil.NewAmount(100) in := chainjson.TransactionInput{ - Amount: float64(tIn), + Amount: float64(outV), Txid: chainhash.Hash{1}.String(), Vout: 2, Tree: wire.TxTreeRegular, @@ -1081,18 +1081,16 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { out[yAddr] = yValue out[feeAddr] = poolFeeValue - tOut := tIn - _, txFee, err := mgr.applyTxFees([]chainjson.TransactionInput{in}, - out, tOut, poolFeeAddrs) + out, outV, poolFeeAddrs) if err != nil { t.Fatalf("unexpected applyTxFees error: %v", err) } // Ensure the pool fee payment was exempted from tx fee deductions. - if out[feeAddr] != amt.MulF64(0.1) { + if out[feeAddr] != poolFeeValue { t.Fatalf("expected pool fee payment to be %v, got %v", - txFee, out[feeAddr]) + poolFeeValue, out[feeAddr]) } // Ensure the difference between initial account payments and updated @@ -1108,17 +1106,16 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { // Ensure providing no tx inputs triggers an error. _, _, err = mgr.applyTxFees([]chainjson.TransactionInput{}, - out, tOut, poolFeeAddrs) + out, outV, poolFeeAddrs) if !errors.Is(err, ErrTxIn) { - t.Fatalf("expected a tx input error: %v", err) + t.Fatalf("expected a tx input error, got %v", err) } // Ensure providing no tx outputs triggers an error. _, _, err = mgr.applyTxFees([]chainjson.TransactionInput{in}, - make(map[string]dcrutil.Amount), tOut, poolFeeAddrs) + make(map[string]dcrutil.Amount), outV, poolFeeAddrs) if !errors.Is(err, ErrTxOut) { - cancel() - t.Fatalf("expected a tx output error: %v", err) + t.Fatalf("expected a tx output error, got %v", err) } // confirmCoinbases tests. @@ -1234,9 +1231,9 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { } _, _, _, _, err = mgr.generatePayoutTxDetails(ctx, txC, poolFeeAddrs, mPmts, treasuryActive) - if err == nil { + if !errors.Is(err, ErrTxOut) { cancel() - t.Fatalf("expected a fetch txOut error") + t.Fatalf("expected a fetch txOut error, got %v", err) } // Ensure generating payout tx details returns an error if the returned @@ -1671,6 +1668,8 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { t.Fatalf("unexpected dividend payment error, got %v", err) } + cancel() + // Reset backed up values to their defaults. mgr.setLastPaymentHeight(0) mgr.setLastPaymentPaidOn(0) @@ -1696,6 +1695,4 @@ func testPaymentMgr(t *testing.T, db *bolt.DB) { if err != nil { t.Fatal(err) } - - cancel() }