Skip to content

Commit

Permalink
cpfp batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
yash1io committed Jul 10, 2024
1 parent a962199 commit ec01186
Show file tree
Hide file tree
Showing 3 changed files with 354 additions and 85 deletions.
189 changes: 104 additions & 85 deletions btc/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/wallet/txsizes"
"github.com/decred/dcrd/dcrec/secp256k1/v4"
Expand All @@ -20,46 +19,53 @@ var (
SegwitSpendWeight int = txsizes.RedeemP2WPKHInputWitnessWeight - 10 // removes 10vb of overhead for segwit
)
var (
ErrBatchNotFound = errors.New("batch not found")
ErrBatcherStillRunning = errors.New("batcher is still running")
ErrBatcherNotRunning = errors.New("batcher is not running")
ErrBatchParametersNotMet = errors.New("batch parameters not met")
ErrHighFeeEstimate = errors.New("estimated fee too high")
ErrFeeDeltaHigh = errors.New("fee delta too high")
ErrFeeUpdateNotNeeded = errors.New("fee update not needed")
ErrMaxBatchLimitReached = errors.New("max batch limit reached")
ErrBatchNotFound = errors.New("batch not found")
ErrBatcherStillRunning = errors.New("batcher is still running")
ErrBatcherNotRunning = errors.New("batcher is not running")
ErrBatchParametersNotMet = errors.New("batch parameters not met")
ErrHighFeeEstimate = errors.New("estimated fee too high")
ErrFeeDeltaHigh = errors.New("fee delta too high")
ErrFeeUpdateNotNeeded = errors.New("fee update not needed")
ErrMaxBatchLimitReached = errors.New("max batch limit reached")
ErrCPFPFeeUpdateParamsNotMet = errors.New("CPFP fee update parameters not met")
ErrCPFPBatchingParamsNotMet = errors.New("CPFP batching parameters not met")
ErrCPFPBatchingCorrupted = errors.New("CPFP batching corrupted")
ErrSavingBatch = errors.New("failed to save batch")
)

type SpendUTXOs struct {
Witness [][]byte
Script []byte
ScriptAddress btcutil.Address
Utxo UTXO
HashType txscript.SigHashType
}

// Batcher is a wallet that runs as a service and batches requests
// into transactions based on the strategy provided
// It is responsible for creating, signing and submitting transactions
// to the network.
type BatcherWallet interface {
Wallet
Lifecycle
}

// Lifecycle interface defines the lifecycle of a BatcherWallet
// It provides methods to start, stop and restart the wallet service
type Lifecycle interface {
Start(ctx context.Context) error
Stop() error
Restart(ctx context.Context) error
}

// Cache interface defines the methods that a BatcherWallet's state
// should implement example implementations include in-memory cache and
// rdbs cache
type Cache interface {
ReadBatch(ctx context.Context, id string) (Batch, error)
ReadBatchByReqId(ctx context.Context, id string) (Batch, error)
ReadPendingBatches(ctx context.Context) ([]Batch, error)
ReadPendingBatches(ctx context.Context, strategy Strategy) ([]Batch, error)
SaveBatch(ctx context.Context, batch Batch) error

ReadRequest(ctx context.Context, id string) (BatcherRequest, error)
ReadPendingRequests(ctx context.Context) ([]BatcherRequest, error)
SaveRequest(ctx context.Context, id string, req BatcherRequest) error
}

// Batcher store spend and send requests in a batched request
// and returns a tracking id
type BatcherRequest struct {
ID string
Spends []SpendRequest
Expand All @@ -68,11 +74,16 @@ type BatcherRequest struct {
}

type BatcherOptions struct {
PTI time.Duration
PTI time.Duration // Periodic Time Interval for batching
TxOptions TxOptions
Strategy Strategy
}

// Strategy defines the batching strategy to be used by the BatcherWallet
// It can be one of RBF, CPFP, RBF_CPFP, Multi_CPFP
// RBF - Replace By Fee
// CPFP - Child Pays For Parent
// Multi_CPFP - Multiple CPFP threads are maintained across multiple addresses
type Strategy string

var (
Expand All @@ -89,12 +100,12 @@ type TxOptions struct {
MaxUnconfirmedAge int

MaxBatches int
MaxBatchSize float64
MaxBatchSize int

FeeLevel FeeLevel
MaxFeeRate float64
MinFeeDelta float64
MaxFeeDelta float64
MaxFeeRate int
MinFeeDelta int
MaxFeeDelta int
}

type batcherWallet struct {
Expand All @@ -108,50 +119,38 @@ type batcherWallet struct {
cache Cache
}

type Inputs map[btcutil.Address][]RawInputs
type Inputs map[string][]RawInputs

type Output struct {
wire.OutPoint
Recipient
}

type Outputs map[btcutil.Address][]Output
type Outputs map[string][]Output

type FeeUTXOS struct {
Utxos []UTXO
Used map[string]bool
}

type FeeData struct {
Fee int64
FeeRate float64
NetFeeRate float64
BaseSize float64
WitnessSize float64
FeeUtxos FeeUTXOS
Fee int64
Size int
}

type FeeStats struct {
MaxFeeRate float64
TotalSize float64
FeeDelta float64
MaxFeeRate int
TotalSize int
FeeDelta int
}

type Batch struct {
TxId string
Inputs Inputs
Outputs Outputs
TotalIn int64
TotalOut int64
FeeData FeeData
Tx Transaction
RequestIds map[string]bool
IsStable bool
IsConfirmed bool
Transaction Transaction
}

func verifyOptions(opts BatcherOptions) error {
return nil
Strategy Strategy
ChangeUtxo UTXO
}

func NewBatcherWallet(indexer IndexerClient, address btcutil.Address, privateKey *secp256k1.PrivateKey, cache Cache, opts ...func(*batcherWallet) error) (BatcherWallet, error) {
Expand All @@ -175,6 +174,7 @@ func (w *batcherWallet) Address() btcutil.Address {
return w.address
}

// Send creates a batch request , saves it in the cache and returns a tracking id
func (w *batcherWallet) Send(ctx context.Context, sends []SendRequest, spends []SpendRequest) (string, error) {
id := chainhash.HashH([]byte(fmt.Sprintf("%v_%v", spends, sends))).String()
req := BatcherRequest{
Expand All @@ -186,6 +186,7 @@ func (w *batcherWallet) Send(ctx context.Context, sends []SendRequest, spends []
return id, w.cache.SaveRequest(ctx, id, req)
}

// Status returns the status of a transaction based on the tracking id
func (w *batcherWallet) Status(ctx context.Context, id string) (Transaction, bool, error) {
request, err := w.cache.ReadRequest(ctx, id)
if err != nil {
Expand All @@ -199,13 +200,14 @@ func (w *batcherWallet) Status(ctx context.Context, id string) (Transaction, boo
return Transaction{}, false, err
}

tx, err := w.indexer.GetTx(ctx, batch.TxId)
tx, err := w.indexer.GetTx(ctx, batch.Tx.TxID)
if err != nil {
return Transaction{}, false, err
}
return tx, true, nil
}

// Start starts the batcher wallet service
func (w *batcherWallet) Start(ctx context.Context) error {
if w.quit != nil {
return ErrBatcherStillRunning
Expand All @@ -214,6 +216,7 @@ func (w *batcherWallet) Start(ctx context.Context) error {
return nil
}

// Stop gracefully stops the batcher wallet service
func (w *batcherWallet) Stop() error {
if w.quit == nil {
return ErrBatcherNotRunning
Expand All @@ -223,13 +226,19 @@ func (w *batcherWallet) Stop() error {
return nil
}

// Restart restarts the batcher wallet service
func (w *batcherWallet) Restart(ctx context.Context) error {
if err := w.Stop(); err != nil {
return err
}
return w.Start(ctx)
}

// starts the batcher based on the strategy
// There are two types of batching triggers
// 1. Periodic Time Interval (PTI) - Batches are created at regular intervals
// 2. Pending Request - Batches are created when a certain number of requests are pending
// 3. Exponential Time Interval (ETI) - Batches are created at exponential intervals but the interval is custom
func (w *batcherWallet) run(ctx context.Context) {
switch w.opts.Strategy {
case CPFP:
Expand All @@ -242,6 +251,10 @@ func (w *batcherWallet) run(ctx context.Context) {
}

// PTI stands for Periodic time interval
// 1. It creates a batch at regular intervals
// 2. It also updates the fee rate at regular intervals
// if fee rate increases more than threshold and there are
// no batches to create
func (w *batcherWallet) runPTIBatcher(ctx context.Context) {
go func() {
ticker := time.NewTicker(w.opts.PTI)
Expand All @@ -268,60 +281,63 @@ func (w *batcherWallet) runPTIBatcher(ctx context.Context) {
}()
}

// updateFeeRate updates the fee rate based on the strategy
func (w *batcherWallet) updateFeeRate() error {
requiredFeeRate, feeStats, err := w.getFee()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
feeRate, err := w.indexer.FeeEstimate(ctx)
if err != nil {
return err
}
requiredFeeRate := selectFee(feeRate, w.opts.TxOptions.FeeLevel)

feeStats, err := w.getFeeStats(requiredFeeRate)
if err != nil {
return err
}

switch w.opts.Strategy {
case CPFP:
return w.updateCPFP(feeStats, requiredFeeRate)
case RBF:
return w.updateRBF(feeStats, requiredFeeRate)
// case RBF:
// return w.updateRBF(feeStats, requiredFeeRate)
default:
panic("fee update for strategy not implemented")
}
}

// createBatch creates a batch based on the strategy
func (w *batcherWallet) createBatch() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
pendingRequests, err := w.cache.ReadPendingRequests(ctx)
if err != nil {
return err
}
switch w.opts.Strategy {
case CPFP:
return w.createCPFPBatch(ctx, pendingRequests)
case RBF:
return w.createRBFBatch(ctx, pendingRequests)
return w.createCPFPBatch()
// case RBF:
// return w.createRBFBatch()
default:
panic("batch creation for strategy not implemented")
}
}

func (w *batcherWallet) getFee() (float64, FeeStats, error) {
// Generate fee stats based on the required fee rate
// Fee stats are used to determine how much fee is required
// to bump existing batches
func (w *batcherWallet) getFeeStats(requiredFeeRate int) (FeeStats, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
feeRate, err := w.indexer.FeeEstimate(ctx)
if err != nil {
return 0, FeeStats{}, err
}
requiredFeeRate := selectFee(feeRate, w.opts.TxOptions.FeeLevel)
pendingBatches, err := w.cache.ReadPendingBatches(ctx)
pendingBatches, err := w.cache.ReadPendingBatches(ctx, w.opts.Strategy)
if err != nil {
return 0, FeeStats{}, err
return FeeStats{}, err
}

feeStats := getFeeStats(requiredFeeRate, pendingBatches)
feeStats := calculateFeeStats(requiredFeeRate, pendingBatches)
if err := w.validateUpdate(feeStats.MaxFeeRate, requiredFeeRate); err != nil {
return 0, FeeStats{}, err
return FeeStats{}, err
}
return requiredFeeRate, feeStats, nil
return feeStats, nil
}

func (w *batcherWallet) validateUpdate(currentFeeRate, requiredFeeRate float64) error {
// verifies if the fee rate delta is within the threshold
func (w *batcherWallet) validateUpdate(currentFeeRate, requiredFeeRate int) error {
if currentFeeRate > requiredFeeRate {
return ErrFeeUpdateNotNeeded
}
Expand All @@ -337,20 +353,22 @@ func (w *batcherWallet) validateUpdate(currentFeeRate, requiredFeeRate float64)
return nil
}

func getFeeStats(feeRate float64, batches []Batch) FeeStats {
maxFeeRate := float64(0)
totalSize := float64(0)
feeDelta := float64(0)
// calculates the fee stats based on the required fee rate
func calculateFeeStats(reqFeeRate int, batches []Batch) FeeStats {
maxFeeRate := int(0)
totalSize := int(0)
feeDelta := int(0)

for _, batch := range batches {
if batch.FeeData.FeeRate > maxFeeRate {
maxFeeRate = batch.FeeData.FeeRate
size := batch.Tx.Weight / 4
feeRate := int(batch.Tx.Fee) / size
if feeRate > maxFeeRate {
maxFeeRate = feeRate
}
batchSize := batch.FeeData.BaseSize + batch.FeeData.WitnessSize
if batch.FeeData.FeeRate > feeRate {
feeDelta += (batch.FeeData.FeeRate - feeRate) * batchSize
if reqFeeRate > feeRate {
feeDelta += (reqFeeRate - feeRate) * size
}
totalSize += batchSize
totalSize += size
}
return FeeStats{
MaxFeeRate: maxFeeRate,
Expand All @@ -359,15 +377,16 @@ func getFeeStats(feeRate float64, batches []Batch) FeeStats {
}
}

func selectFee(feeRate FeeSuggestion, feeLevel FeeLevel) float64 {
// selects the fee rate based on the fee level option
func selectFee(feeRate FeeSuggestion, feeLevel FeeLevel) int {
switch feeLevel {
case MediumFee:
return float64(feeRate.Medium)
return feeRate.Medium
case HighFee:
return float64(feeRate.High)
return feeRate.High
case LowFee:
return float64(feeRate.Low)
return feeRate.Low
default:
return float64(feeRate.High)
return feeRate.High
}
}
Loading

0 comments on commit ec01186

Please sign in to comment.