From ec0118634ba01bded795f377a935f4128363dc31 Mon Sep 17 00:00:00 2001 From: yash1io Date: Thu, 11 Jul 2024 01:03:09 +0530 Subject: [PATCH] cpfp batcher --- btc/batcher.go | 189 ++++++++++++++++++++----------------- btc/cpfp.go | 247 +++++++++++++++++++++++++++++++++++++++++++++++++ btc/wallet.go | 3 + 3 files changed, 354 insertions(+), 85 deletions(-) create mode 100644 btc/cpfp.go diff --git a/btc/batcher.go b/btc/batcher.go index 2118e56..a600a6e 100644 --- a/btc/batcher.go +++ b/btc/batcher.go @@ -8,7 +8,6 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcwallet/wallet/txsizes" "github.com/decred/dcrd/dcrec/secp256k1/v4" @@ -20,39 +19,44 @@ var ( SegwitSpendWeight int = txsizes.RedeemP2WPKHInputWitnessWeight - 10 // removes 10vb of overhead for segwit ) var ( - ErrBatchNotFound = errors.New("batch not found") - ErrBatcherStillRunning = errors.New("batcher is still running") - ErrBatcherNotRunning = errors.New("batcher is not running") - ErrBatchParametersNotMet = errors.New("batch parameters not met") - ErrHighFeeEstimate = errors.New("estimated fee too high") - ErrFeeDeltaHigh = errors.New("fee delta too high") - ErrFeeUpdateNotNeeded = errors.New("fee update not needed") - ErrMaxBatchLimitReached = errors.New("max batch limit reached") + ErrBatchNotFound = errors.New("batch not found") + ErrBatcherStillRunning = errors.New("batcher is still running") + ErrBatcherNotRunning = errors.New("batcher is not running") + ErrBatchParametersNotMet = errors.New("batch parameters not met") + ErrHighFeeEstimate = errors.New("estimated fee too high") + ErrFeeDeltaHigh = errors.New("fee delta too high") + ErrFeeUpdateNotNeeded = errors.New("fee update not needed") + ErrMaxBatchLimitReached = errors.New("max batch limit reached") + ErrCPFPFeeUpdateParamsNotMet = errors.New("CPFP fee update parameters not met") + ErrCPFPBatchingParamsNotMet = errors.New("CPFP batching parameters not met") + ErrCPFPBatchingCorrupted = errors.New("CPFP batching corrupted") + ErrSavingBatch = errors.New("failed to save batch") ) -type SpendUTXOs struct { - Witness [][]byte - Script []byte - ScriptAddress btcutil.Address - Utxo UTXO - HashType txscript.SigHashType -} - +// Batcher is a wallet that runs as a service and batches requests +// into transactions based on the strategy provided +// It is responsible for creating, signing and submitting transactions +// to the network. type BatcherWallet interface { Wallet Lifecycle } +// Lifecycle interface defines the lifecycle of a BatcherWallet +// It provides methods to start, stop and restart the wallet service type Lifecycle interface { Start(ctx context.Context) error Stop() error Restart(ctx context.Context) error } +// Cache interface defines the methods that a BatcherWallet's state +// should implement example implementations include in-memory cache and +// rdbs cache type Cache interface { ReadBatch(ctx context.Context, id string) (Batch, error) ReadBatchByReqId(ctx context.Context, id string) (Batch, error) - ReadPendingBatches(ctx context.Context) ([]Batch, error) + ReadPendingBatches(ctx context.Context, strategy Strategy) ([]Batch, error) SaveBatch(ctx context.Context, batch Batch) error ReadRequest(ctx context.Context, id string) (BatcherRequest, error) @@ -60,6 +64,8 @@ type Cache interface { SaveRequest(ctx context.Context, id string, req BatcherRequest) error } +// Batcher store spend and send requests in a batched request +// and returns a tracking id type BatcherRequest struct { ID string Spends []SpendRequest @@ -68,11 +74,16 @@ type BatcherRequest struct { } type BatcherOptions struct { - PTI time.Duration + PTI time.Duration // Periodic Time Interval for batching TxOptions TxOptions Strategy Strategy } +// Strategy defines the batching strategy to be used by the BatcherWallet +// It can be one of RBF, CPFP, RBF_CPFP, Multi_CPFP +// RBF - Replace By Fee +// CPFP - Child Pays For Parent +// Multi_CPFP - Multiple CPFP threads are maintained across multiple addresses type Strategy string var ( @@ -89,12 +100,12 @@ type TxOptions struct { MaxUnconfirmedAge int MaxBatches int - MaxBatchSize float64 + MaxBatchSize int FeeLevel FeeLevel - MaxFeeRate float64 - MinFeeDelta float64 - MaxFeeDelta float64 + MaxFeeRate int + MinFeeDelta int + MaxFeeDelta int } type batcherWallet struct { @@ -108,14 +119,14 @@ type batcherWallet struct { cache Cache } -type Inputs map[btcutil.Address][]RawInputs +type Inputs map[string][]RawInputs type Output struct { wire.OutPoint Recipient } -type Outputs map[btcutil.Address][]Output +type Outputs map[string][]Output type FeeUTXOS struct { Utxos []UTXO @@ -123,35 +134,23 @@ type FeeUTXOS struct { } type FeeData struct { - Fee int64 - FeeRate float64 - NetFeeRate float64 - BaseSize float64 - WitnessSize float64 - FeeUtxos FeeUTXOS + Fee int64 + Size int } type FeeStats struct { - MaxFeeRate float64 - TotalSize float64 - FeeDelta float64 + MaxFeeRate int + TotalSize int + FeeDelta int } type Batch struct { - TxId string - Inputs Inputs - Outputs Outputs - TotalIn int64 - TotalOut int64 - FeeData FeeData + Tx Transaction RequestIds map[string]bool IsStable bool IsConfirmed bool - Transaction Transaction -} - -func verifyOptions(opts BatcherOptions) error { - return nil + Strategy Strategy + ChangeUtxo UTXO } func NewBatcherWallet(indexer IndexerClient, address btcutil.Address, privateKey *secp256k1.PrivateKey, cache Cache, opts ...func(*batcherWallet) error) (BatcherWallet, error) { @@ -175,6 +174,7 @@ func (w *batcherWallet) Address() btcutil.Address { return w.address } +// Send creates a batch request , saves it in the cache and returns a tracking id func (w *batcherWallet) Send(ctx context.Context, sends []SendRequest, spends []SpendRequest) (string, error) { id := chainhash.HashH([]byte(fmt.Sprintf("%v_%v", spends, sends))).String() req := BatcherRequest{ @@ -186,6 +186,7 @@ func (w *batcherWallet) Send(ctx context.Context, sends []SendRequest, spends [] return id, w.cache.SaveRequest(ctx, id, req) } +// Status returns the status of a transaction based on the tracking id func (w *batcherWallet) Status(ctx context.Context, id string) (Transaction, bool, error) { request, err := w.cache.ReadRequest(ctx, id) if err != nil { @@ -199,13 +200,14 @@ func (w *batcherWallet) Status(ctx context.Context, id string) (Transaction, boo return Transaction{}, false, err } - tx, err := w.indexer.GetTx(ctx, batch.TxId) + tx, err := w.indexer.GetTx(ctx, batch.Tx.TxID) if err != nil { return Transaction{}, false, err } return tx, true, nil } +// Start starts the batcher wallet service func (w *batcherWallet) Start(ctx context.Context) error { if w.quit != nil { return ErrBatcherStillRunning @@ -214,6 +216,7 @@ func (w *batcherWallet) Start(ctx context.Context) error { return nil } +// Stop gracefully stops the batcher wallet service func (w *batcherWallet) Stop() error { if w.quit == nil { return ErrBatcherNotRunning @@ -223,6 +226,7 @@ func (w *batcherWallet) Stop() error { return nil } +// Restart restarts the batcher wallet service func (w *batcherWallet) Restart(ctx context.Context) error { if err := w.Stop(); err != nil { return err @@ -230,6 +234,11 @@ func (w *batcherWallet) Restart(ctx context.Context) error { return w.Start(ctx) } +// starts the batcher based on the strategy +// There are two types of batching triggers +// 1. Periodic Time Interval (PTI) - Batches are created at regular intervals +// 2. Pending Request - Batches are created when a certain number of requests are pending +// 3. Exponential Time Interval (ETI) - Batches are created at exponential intervals but the interval is custom func (w *batcherWallet) run(ctx context.Context) { switch w.opts.Strategy { case CPFP: @@ -242,6 +251,10 @@ func (w *batcherWallet) run(ctx context.Context) { } // PTI stands for Periodic time interval +// 1. It creates a batch at regular intervals +// 2. It also updates the fee rate at regular intervals +// if fee rate increases more than threshold and there are +// no batches to create func (w *batcherWallet) runPTIBatcher(ctx context.Context) { go func() { ticker := time.NewTicker(w.opts.PTI) @@ -268,8 +281,17 @@ func (w *batcherWallet) runPTIBatcher(ctx context.Context) { }() } +// updateFeeRate updates the fee rate based on the strategy func (w *batcherWallet) updateFeeRate() error { - requiredFeeRate, feeStats, err := w.getFee() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + feeRate, err := w.indexer.FeeEstimate(ctx) + if err != nil { + return err + } + requiredFeeRate := selectFee(feeRate, w.opts.TxOptions.FeeLevel) + + feeStats, err := w.getFeeStats(requiredFeeRate) if err != nil { return err } @@ -277,51 +299,45 @@ func (w *batcherWallet) updateFeeRate() error { switch w.opts.Strategy { case CPFP: return w.updateCPFP(feeStats, requiredFeeRate) - case RBF: - return w.updateRBF(feeStats, requiredFeeRate) + // case RBF: + // return w.updateRBF(feeStats, requiredFeeRate) default: panic("fee update for strategy not implemented") } } +// createBatch creates a batch based on the strategy func (w *batcherWallet) createBatch() error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - pendingRequests, err := w.cache.ReadPendingRequests(ctx) - if err != nil { - return err - } switch w.opts.Strategy { case CPFP: - return w.createCPFPBatch(ctx, pendingRequests) - case RBF: - return w.createRBFBatch(ctx, pendingRequests) + return w.createCPFPBatch() + // case RBF: + // return w.createRBFBatch() default: panic("batch creation for strategy not implemented") } } -func (w *batcherWallet) getFee() (float64, FeeStats, error) { +// Generate fee stats based on the required fee rate +// Fee stats are used to determine how much fee is required +// to bump existing batches +func (w *batcherWallet) getFeeStats(requiredFeeRate int) (FeeStats, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - feeRate, err := w.indexer.FeeEstimate(ctx) - if err != nil { - return 0, FeeStats{}, err - } - requiredFeeRate := selectFee(feeRate, w.opts.TxOptions.FeeLevel) - pendingBatches, err := w.cache.ReadPendingBatches(ctx) + pendingBatches, err := w.cache.ReadPendingBatches(ctx, w.opts.Strategy) if err != nil { - return 0, FeeStats{}, err + return FeeStats{}, err } - feeStats := getFeeStats(requiredFeeRate, pendingBatches) + feeStats := calculateFeeStats(requiredFeeRate, pendingBatches) if err := w.validateUpdate(feeStats.MaxFeeRate, requiredFeeRate); err != nil { - return 0, FeeStats{}, err + return FeeStats{}, err } - return requiredFeeRate, feeStats, nil + return feeStats, nil } -func (w *batcherWallet) validateUpdate(currentFeeRate, requiredFeeRate float64) error { +// verifies if the fee rate delta is within the threshold +func (w *batcherWallet) validateUpdate(currentFeeRate, requiredFeeRate int) error { if currentFeeRate > requiredFeeRate { return ErrFeeUpdateNotNeeded } @@ -337,20 +353,22 @@ func (w *batcherWallet) validateUpdate(currentFeeRate, requiredFeeRate float64) return nil } -func getFeeStats(feeRate float64, batches []Batch) FeeStats { - maxFeeRate := float64(0) - totalSize := float64(0) - feeDelta := float64(0) +// calculates the fee stats based on the required fee rate +func calculateFeeStats(reqFeeRate int, batches []Batch) FeeStats { + maxFeeRate := int(0) + totalSize := int(0) + feeDelta := int(0) for _, batch := range batches { - if batch.FeeData.FeeRate > maxFeeRate { - maxFeeRate = batch.FeeData.FeeRate + size := batch.Tx.Weight / 4 + feeRate := int(batch.Tx.Fee) / size + if feeRate > maxFeeRate { + maxFeeRate = feeRate } - batchSize := batch.FeeData.BaseSize + batch.FeeData.WitnessSize - if batch.FeeData.FeeRate > feeRate { - feeDelta += (batch.FeeData.FeeRate - feeRate) * batchSize + if reqFeeRate > feeRate { + feeDelta += (reqFeeRate - feeRate) * size } - totalSize += batchSize + totalSize += size } return FeeStats{ MaxFeeRate: maxFeeRate, @@ -359,15 +377,16 @@ func getFeeStats(feeRate float64, batches []Batch) FeeStats { } } -func selectFee(feeRate FeeSuggestion, feeLevel FeeLevel) float64 { +// selects the fee rate based on the fee level option +func selectFee(feeRate FeeSuggestion, feeLevel FeeLevel) int { switch feeLevel { case MediumFee: - return float64(feeRate.Medium) + return feeRate.Medium case HighFee: - return float64(feeRate.High) + return feeRate.High case LowFee: - return float64(feeRate.Low) + return feeRate.Low default: - return float64(feeRate.High) + return feeRate.High } } diff --git a/btc/cpfp.go b/btc/cpfp.go new file mode 100644 index 0000000..c49f829 --- /dev/null +++ b/btc/cpfp.go @@ -0,0 +1,247 @@ +package btc + +import ( + "context" + "fmt" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/mempool" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/ethereum/go-ethereum/log" +) + +// create a CPFP batch using the pending requests +// stores the batch in the cache +func (w *batcherWallet) createCPFPBatch() error { + ctx1, cancel1 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel1() + requests, err := w.cache.ReadPendingRequests(context.Background()) + if err != nil { + return err + } + sendRequests := []SendRequest{} + spendRequests := []SpendRequest{} + reqIds := make(map[string]bool) + + for _, req := range requests { + sendRequests = append(sendRequests, req.Sends...) + spendRequests = append(spendRequests, req.Spends...) + reqIds[req.ID] = true + } + + err = validateSpendRequest(spendRequests) + if err != nil { + return err + } + + utxos, err := w.indexer.GetUTXOs(ctx1, w.address) + if err != nil { + return err + } + + ctx2, cancel2 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel2() + feeRate, err := w.indexer.FeeEstimate(ctx2) + if err != nil { + return err + } + requiredFeeRate := selectFee(feeRate, w.opts.TxOptions.FeeLevel) + + feeStats, err := w.getFeeStats(requiredFeeRate) + if err != nil { + return err + } + + tx, err := w.buildCPFPTx( + utxos, + spendRequests, + sendRequests, + (len(utxos)+len(spendRequests)*SegwitSpendWeight)+10, + feeStats.FeeDelta, + requiredFeeRate, + ) + if err != nil { + return err + } + + ctx3, cancel3 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel3() + if err := w.indexer.SubmitTx(ctx3, tx); err != nil { + return err + } + + transaction, err := getTransaction(w.indexer, tx.TxHash().String()) + + batch := Batch{ + Tx: transaction, + RequestIds: reqIds, + IsStable: true, + IsConfirmed: false, + Strategy: CPFP, + ChangeUtxo: UTXO{ + TxID: tx.TxHash().String(), + Vout: uint32(len(tx.TxOut) - 1), + Amount: tx.TxOut[len(tx.TxOut)-1].Value, + }, + } + + ctx4, cancel4 := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel4() + if err := w.cache.SaveBatch(ctx4, batch); err != nil { + return ErrSavingBatch + } + + log.Info("submitted CPFP batch", "txid", tx.TxHash().String()) + return nil + +} + +func (w *batcherWallet) updateCPFP(feeStats FeeStats, requiredFeeRate int) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + utxos, err := w.indexer.GetUTXOs(ctx, w.address) + if err != nil { + return err + } + + batches, err := w.cache.ReadPendingBatches(ctx, w.opts.Strategy) + if err != nil { + return err + } + + if err := verifyCPFPConditions(utxos, batches, w.address); err != nil { + return fmt.Errorf("failed to verify CPFP conditions: %w", err) + } + + tx, err := w.buildCPFPTx( + utxos, + []SpendRequest{}, + []SendRequest{{Amount: -1, To: w.address}}, + (len(utxos)*SegwitSpendWeight)+10, + feeStats.FeeDelta, + requiredFeeRate, + ) + if err != nil { + return err + } + + if err := w.indexer.SubmitTx(ctx, tx); err != nil { + return err + } + + log.Info("submitted CPFP transaction", "txid", tx.TxHash().String()) + return nil +} + +func (w *batcherWallet) buildCPFPTx(utxos []UTXO, spendRequests []SpendRequest, sendRequests []SendRequest, fee, feeOverhead, feeRate int) (*wire.MsgTx, error) { + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + spendReqUTXOs, spendReqUTXOMap, balanceOfScripts, err := getUTXOsForSpendRequest(ctx, w.indexer, spendRequests) + if err != nil { + return nil, err + } + + if balanceOfScripts == 0 && len(spendRequests) > 0 { + return nil, fmt.Errorf("scripts have no funds to spend") + } + + // build the transaction + tx, err := buildTransaction(append(spendReqUTXOs, utxos...), sendRequests, w.address, feeOverhead+fee) + if err != nil { + return nil, err + } + + // Sign the spend inputs + err = signSpendTx(tx, spendRequests, spendReqUTXOMap, w.privateKey) + + // get the send signing script + script, err := txscript.PayToAddrScript(w.address) + if err != nil { + return tx, err + } + + // Sign the cover inputs + // This is a no op if there are no cover utxos + err = signSendTx(tx, utxos, len(spendReqUTXOs), script, w.privateKey) + if err != nil { + return tx, err + } + + txb := btcutil.NewTx(tx) + trueSize := mempool.GetTxVirtualSize(txb) + + newFeeEstimate := (int(trueSize) * (feeRate)) + feeOverhead + + if newFeeEstimate != fee+feeOverhead { + return w.buildCPFPTx(utxos, spendRequests, sendRequests, newFeeEstimate, 0, feeRate) + } + + return tx, nil +} + +func verifyCPFPConditions(utxos []UTXO, batches []Batch, walletAddr btcutil.Address) error { + ucUtxos := getUnconfirmedUtxos(utxos) + if len(ucUtxos) == 0 { + return ErrCPFPFeeUpdateParamsNotMet + } + trailingBatches, err := getTrailingBatches(batches, ucUtxos) + if err != nil { + return err + } + + if len(trailingBatches) == 0 || len(trailingBatches) > 1 { + return ErrCPFPBatchingCorrupted + } + + return reconstructCPFPBatches(batches, trailingBatches[0], walletAddr) +} + +func getUnconfirmedUtxos(utxos []UTXO) []UTXO { + var ucUtxos []UTXO + for _, utxo := range utxos { + if !utxo.Status.Confirmed { + ucUtxos = append(ucUtxos, utxo) + } + } + return ucUtxos +} + +func getTrailingBatches(batches []Batch, utxos []UTXO) ([]Batch, error) { + utxomap := make(map[string]bool) + for _, utxo := range utxos { + utxomap[utxo.TxID] = true + } + + batches = []Batch{} + + for _, batch := range batches { + if _, ok := utxomap[batch.ChangeUtxo.TxID]; ok { + batches = append(batches, batch) + } + } + + return batches, nil +} + +func reconstructCPFPBatches(batches []Batch, trailingBatch Batch, walletAddr btcutil.Address) error { + // todo : verify that the trailing batch can trace back to the funding utxos from wallet address + return nil +} + +func getTransaction(indexer IndexerClient, txid string) (Transaction, error) { + for i := 1; i < 5; i++ { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + tx, err := indexer.GetTx(ctx, txid) + if err != nil { + time.Sleep(time.Duration(i) * time.Second) + continue + } + return tx, nil + } + return Transaction{}, ErrTxNotFound +} diff --git a/btc/wallet.go b/btc/wallet.go index 8fbf695..b503e1a 100644 --- a/btc/wallet.go +++ b/btc/wallet.go @@ -249,6 +249,9 @@ func buildTransaction(utxos UTXOs, recipients []SendRequest, changeAddr btcutil. if err != nil { return nil, err } + if r.Amount < 0 { + r.Amount = totalUTXOAmount + } tx.AddTxOut(wire.NewTxOut(r.Amount, script)) totalSendAmount += r.Amount }