Skip to content

Commit

Permalink
add batcher store and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Revantark committed Jul 24, 2024
1 parent a4c663b commit 57f136d
Show file tree
Hide file tree
Showing 18 changed files with 1,149 additions and 218 deletions.
91 changes: 44 additions & 47 deletions btc/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ var (
DefaultAPITimeout = 5 * time.Second
)
var (
ErrBatchNotFound = errors.New("batch not found")
ErrBatcherStillRunning = errors.New("batcher is still running")
ErrBatcherNotRunning = errors.New("batcher is not running")
ErrBatchParametersNotMet = errors.New("batch parameters not met")
Expand All @@ -37,7 +36,7 @@ var (
ErrStrategyNotSupported = errors.New("strategy not supported")
ErrBuildCPFPDepthExceeded = errors.New("build CPFP depth exceeded")
ErrBuildRBFDepthExceeded = errors.New("build RBF depth exceeded")
ErrTxIdEmpty = errors.New("txid is empty")
ErrTxIDEmpty = errors.New("txid is empty")
ErrInsufficientFundsInRequest = func(have, need int64) error {
return fmt.Errorf("%v , have :%v, need at least : %v", ErrBatchParametersNotMet, have, need)
}
Expand Down Expand Up @@ -67,33 +66,40 @@ type Lifecycle interface {
// should implement example implementations include in-memory cache and
// rdbs cache
type Cache interface {
// ReadBatchByReqId reads a batch based on the request ID.
ReadBatchByReqId(ctx context.Context, reqId string) (Batch, error)
// ReadBatchByReqID reads a batch based on the request ID.
ReadBatchByReqID(ctx context.Context, reqID string) (Batch, error)
// ReadPendingBatches reads all pending batches for a given strategy.
ReadPendingBatches(ctx context.Context, strategy Strategy) ([]Batch, error)
ReadPendingBatches(ctx context.Context) ([]Batch, error)
// ReadLatestBatch reads the latest batch for a given strategy.
ReadLatestBatch(ctx context.Context, strategy Strategy) (Batch, error)
// ReadPendingChangeUtxos reads all pending change UTXOs for a given strategy.
ReadPendingChangeUtxos(ctx context.Context, strategy Strategy) ([]UTXO, error)
// ReadPendingFundingUtxos reads all pending funding UTXOs for a given strategy.
ReadPendingFundingUtxos(ctx context.Context, strategy Strategy) ([]UTXO, error)
// ConfirmBatchStatuses updates the status of multiple batches and delete pending batches based on confirmed transaction IDs.
ConfirmBatchStatuses(ctx context.Context, txIds []string, deletePending bool, strategy Strategy) error
// UpdateBatchFees updates the fees for multiple batches.
UpdateBatchFees(ctx context.Context, txId []string, fee int64) error
ReadLatestBatch(ctx context.Context) (Batch, error)

ReadBatch(ctx context.Context, id string) (Batch, error)

// UpdateBatches updates multiple batches.
// It completely overwrites the existing batches with the newer ones.
// If no batch exists, it will create a new one.
//
// Note: All the requests which are a part of this batch will be removed from pending requests.
UpdateBatches(ctx context.Context, updatedBatches ...Batch) error

// UpdateAndDeletePendingBatches overwrites the existing batches with newer ones and also deletes pending batches.
// If no batch exists, it will create a new one and delete pending batches.
//
// Even if updating batch is a pending batch, it will not be deleted.
UpdateAndDeletePendingBatches(ctx context.Context, updatedBatches ...Batch) error

// DeletePendingBatches deletes pending batches based on confirmed transaction IDs and strategy.
DeletePendingBatches(ctx context.Context) error

// SaveBatch saves a batch.
SaveBatch(ctx context.Context, batch Batch) error
// DeletePendingBatches deletes pending batches based on confirmed transaction IDs and strategy.
DeletePendingBatches(ctx context.Context, confirmedTxIds map[string]bool, strategy Strategy) error

// ReadRequest reads a request based on its ID.
ReadRequest(ctx context.Context, id string) (BatcherRequest, error)
// ReadRequests reads multiple requests based on their IDs.
ReadRequests(ctx context.Context, id []string) ([]BatcherRequest, error)
// ReadRequest reads a request based on its ID. // ReadRequests reads multiple requests based on their IDs.
ReadRequests(ctx context.Context, id ...string) ([]BatcherRequest, error)
// ReadPendingRequests reads all pending requests.
ReadPendingRequests(ctx context.Context) ([]BatcherRequest, error)
// SaveRequest saves a request.
SaveRequest(ctx context.Context, id string, req BatcherRequest) error
SaveRequest(ctx context.Context, req BatcherRequest) error
}

// Batcher store spend and send requests in a batched request
Expand Down Expand Up @@ -152,15 +158,13 @@ type batcherWallet struct {
feeEstimator FeeEstimator
cache Cache
}

type Batch struct {
Tx Transaction
RequestIds map[string]bool
// true indicates that the batch is finalized and will not be replaced by more fee.
isFinalized bool
IsConfirmed bool
Strategy Strategy
SelfUtxos UTXOs
FundingUtxos UTXOs
IsFinalized bool
Strategy Strategy
}

func NewBatcherWallet(privateKey *secp256k1.PrivateKey, indexer IndexerClient, feeEstimator FeeEstimator, chainParams *chaincfg.Params, cache Cache, logger *zap.Logger, opts ...func(*batcherWallet) error) (BatcherWallet, error) {
Expand Down Expand Up @@ -263,19 +267,19 @@ func (w *batcherWallet) Send(ctx context.Context, sends []SendRequest, spends []
SACPs: sacps,
Status: false,
}
return id, w.cache.SaveRequest(ctx, id, req)
return id, w.cache.SaveRequest(ctx, req)
}

// Status returns the status of a transaction based on the tracking id
func (w *batcherWallet) Status(ctx context.Context, id string) (Transaction, bool, error) {
request, err := w.cache.ReadRequest(ctx, id)
request, err := w.cache.ReadRequests(ctx, id)
if err != nil {
return Transaction{}, false, err
}
if !request.Status {
if !request[0].Status {
return Transaction{}, false, nil
}
batch, err := w.cache.ReadBatchByReqId(ctx, id)
batch, err := w.cache.ReadBatchByReqID(ctx, id)
if err != nil {
return Transaction{}, false, err
}
Expand Down Expand Up @@ -332,17 +336,16 @@ func (w *batcherWallet) Restart(ctx context.Context) error {
func (w *batcherWallet) run(ctx context.Context) error {
switch w.opts.Strategy {
case CPFP, RBF:
w.runPTIBatcher(ctx)
w.runPeriodicBatcher(ctx)
default:
return ErrStrategyNotSupported
}
return nil
}

// PTI stands for Periodic time interval
// runPTIBatcher is used by strategies that require
// runPeriodicBatcher is used by strategies that require
// triggering the batching process at regular intervals
func (w *batcherWallet) runPTIBatcher(ctx context.Context) {
func (w *batcherWallet) runPeriodicBatcher(ctx context.Context) {
ticker := time.NewTicker(w.opts.PTI)
w.wg.Add(1)
go func() {
Expand All @@ -356,9 +359,7 @@ func (w *batcherWallet) runPTIBatcher(ctx context.Context) {
case <-ticker.C:
w.processBatch()
}

}

}()
}

Expand All @@ -375,7 +376,7 @@ func (w *batcherWallet) processBatch() {
w.logger.Info("waiting for new batch")
}

if err := w.updateFeeRate(); err != nil {
if err := w.updateBatchFeeRate(); err != nil {
if !errors.Is(err, ErrFeeUpdateNotNeeded) {
w.logger.Error("failed to update fee rate", zap.Error(err))
} else {
Expand All @@ -389,8 +390,8 @@ func (w *batcherWallet) processBatch() {
}
}

// updateFeeRate updates the fee rate based on the strategy
func (w *batcherWallet) updateFeeRate() error {
// updateBatchFeeRate updates the fee rate based on the strategy
func (w *batcherWallet) updateBatchFeeRate() error {
feeRates, err := w.feeEstimator.FeeSuggestion()
if err != nil {
return err
Expand Down Expand Up @@ -520,25 +521,21 @@ func selectFee(feeRate FeeSuggestion, feeLevel FeeLevel) int {
}
}

func filterPendingBatches(batches []Batch, indexer IndexerClient) ([]Batch, []string, []string, error) {
pendingBatches := []Batch{}
confirmedTxs := []string{}
pendingTxs := []string{}
func filterPendingBatches(batches []Batch, indexer IndexerClient) (pendingBatches []Batch, confirmedBatches []Batch, err error) {
for _, batch := range batches {
ctx, cancel := context.WithTimeout(context.Background(), DefaultAPITimeout)
defer cancel()
tx, err := indexer.GetTx(ctx, batch.Tx.TxID)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if tx.Status.Confirmed {
confirmedTxs = append(confirmedTxs, tx.TxID)
confirmedBatches = append(confirmedBatches, batch)
continue
}
pendingBatches = append(pendingBatches, batch)
pendingTxs = append(pendingTxs, tx.TxID)
}
return pendingBatches, confirmedTxs, pendingTxs, nil
return pendingBatches, confirmedBatches, nil
}

func withContextTimeout(parentContext context.Context, duration time.Duration, fn func(ctx context.Context) error) error {
Expand Down
103 changes: 42 additions & 61 deletions btc/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ type mockCache struct {
batchList []string
requests map[string]btc.BatcherRequest
requestList []string
mode btc.Strategy
}

func NewTestCache() btc.Cache {
func NewTestCache(mode btc.Strategy) btc.Cache {
return &mockCache{
batches: make(map[string]btc.Batch),
requests: make(map[string]btc.BatcherRequest),
mode: mode,
}
}

func (m *mockCache) ReadBatchByReqId(ctx context.Context, id string) (btc.Batch, error) {
func (m *mockCache) ReadBatchByReqID(ctx context.Context, id string) (btc.Batch, error) {
for _, batchId := range m.batchList {
batch, ok := m.batches[batchId]
if !ok {
Expand All @@ -42,7 +44,7 @@ func (m *mockCache) ReadBatch(ctx context.Context, txId string) (btc.Batch, erro
return batch, nil
}

func (m *mockCache) ReadPendingBatches(ctx context.Context, strategy btc.Strategy) ([]btc.Batch, error) {
func (m *mockCache) ReadPendingBatches(ctx context.Context) ([]btc.Batch, error) {
batches := []btc.Batch{}
for _, batch := range m.batches {
if batch.Tx.Status.Confirmed == false {
Expand All @@ -65,28 +67,6 @@ func (m *mockCache) SaveBatch(ctx context.Context, batch btc.Batch) error {
return nil
}

func (m *mockCache) ConfirmBatchStatuses(ctx context.Context, txIds []string, deletePending bool, strategy btc.Strategy) error {
if len(txIds) == 0 {
return nil
}
confirmedBatchIds := make(map[string]bool)
for _, id := range txIds {
batch, ok := m.batches[id]
if !ok {
return fmt.Errorf("UpdateBatchStatuses, batch not found")
}
batch.Tx.Status.Confirmed = true
m.batches[id] = batch

confirmedBatchIds[id] = true

}
if deletePending {
return m.DeletePendingBatches(ctx, confirmedBatchIds, strategy)
}
return nil
}

func (m *mockCache) ReadRequest(ctx context.Context, id string) (btc.BatcherRequest, error) {
request, ok := m.requests[id]
if !ok {
Expand All @@ -104,12 +84,12 @@ func (m *mockCache) ReadPendingRequests(ctx context.Context) ([]btc.BatcherReque
return requests, nil
}

func (m *mockCache) SaveRequest(ctx context.Context, id string, req btc.BatcherRequest) error {
if _, ok := m.requests[id]; ok {
func (m *mockCache) SaveRequest(ctx context.Context, req btc.BatcherRequest) error {
if _, ok := m.requests[req.ID]; ok {
return fmt.Errorf("request already exists")
}
m.requests[id] = req
m.requestList = append(m.requestList, id)
m.requests[req.ID] = req
m.requestList = append(m.requestList, req.ID)
return nil
}

Expand All @@ -126,22 +106,49 @@ func (m *mockCache) UpdateBatchFees(ctx context.Context, txId []string, feeRate
return nil
}

func (m *mockCache) ReadLatestBatch(ctx context.Context, strategy btc.Strategy) (btc.Batch, error) {
func (m *mockCache) UpdateAndDeletePendingBatches(ctx context.Context, updatedBatches ...btc.Batch) error {
for _, batch := range updatedBatches {
if _, ok := m.batches[batch.Tx.TxID]; !ok {
return fmt.Errorf("UpdateAndDeleteBatches, batch not found")
}
m.batches[batch.Tx.TxID] = batch
}

// delete pending batches
for _, id := range m.batchList {
if m.batches[id].Tx.Status.Confirmed == false {
delete(m.batches, id)
}
}
return nil
}

func (m *mockCache) UpdateBatches(ctx context.Context, updatedBatches ...btc.Batch) error {
for _, batch := range updatedBatches {
if _, ok := m.batches[batch.Tx.TxID]; !ok {
return fmt.Errorf("UpdateBatches, batch not found")
}
m.batches[batch.Tx.TxID] = batch
}
return nil
}

func (m *mockCache) ReadLatestBatch(ctx context.Context) (btc.Batch, error) {
if len(m.batchList) == 0 {
return btc.Batch{}, btc.ErrBatchNotFound
return btc.Batch{}, btc.ErrStoreNotFound
}
nbatches := len(m.batchList) - 1
for nbatches >= 0 {
batch, ok := m.batches[m.batchList[nbatches]]
if ok && batch.Strategy == strategy {
if ok && batch.Strategy == m.mode {
return batch, nil
}
nbatches--
}
return btc.Batch{}, fmt.Errorf("no batch found")
}

func (m *mockCache) ReadRequests(ctx context.Context, ids []string) ([]btc.BatcherRequest, error) {
func (m *mockCache) ReadRequests(ctx context.Context, ids ...string) ([]btc.BatcherRequest, error) {
requests := []btc.BatcherRequest{}
for _, id := range ids {
request, ok := m.requests[id]
Expand All @@ -153,17 +160,10 @@ func (m *mockCache) ReadRequests(ctx context.Context, ids []string) ([]btc.Batch
return requests, nil
}

func (m *mockCache) DeletePendingBatches(ctx context.Context, confirmedBatchIds map[string]bool, strategy btc.Strategy) error {
func (m *mockCache) DeletePendingBatches(ctx context.Context) error {
newList := m.batchList
for i, id := range m.batchList {
if m.batches[id].Strategy != strategy {
continue
}

if _, ok := confirmedBatchIds[id]; ok {
batch := m.batches[id]
batch.Tx.Status.Confirmed = true
m.batches[id] = batch
if m.batches[id].Strategy != m.mode {
continue
}

Expand All @@ -177,25 +177,6 @@ func (m *mockCache) DeletePendingBatches(ctx context.Context, confirmedBatchIds
return nil
}

func (m *mockCache) ReadPendingChangeUtxos(ctx context.Context, strategy btc.Strategy) ([]btc.UTXO, error) {
utxos := []btc.UTXO{}
for _, id := range m.batchList {
if m.batches[id].Strategy == strategy && m.batches[id].Tx.Status.Confirmed == false {
utxos = append(utxos, m.batches[id].SelfUtxos...)
}
}
return utxos, nil
}
func (m *mockCache) ReadPendingFundingUtxos(ctx context.Context, strategy btc.Strategy) ([]btc.UTXO, error) {
utxos := []btc.UTXO{}
for _, id := range m.batchList {
if m.batches[id].Strategy == strategy && (m.batches[id].Tx.Status.Confirmed == false) {
utxos = append(utxos, m.batches[id].FundingUtxos...)
}
}
return utxos, nil
}

type mockFeeEstimator struct {
fee int
}
Expand Down
2 changes: 1 addition & 1 deletion btc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (client *client) SubmitTx(ctx context.Context, tx *wire.MsgTx) error {
}
}
return err
case _ = <-results:
case <-results:
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion btc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ var _ = Describe("bitcoin client", func() {
Expect(errors.Is(err, btc.ErrTxInputsMissingOrSpent)).Should(BeTrue())
})

It("should return an error when the utxo has been spent", func(ctx context.Context) {
It("should return an error when the utxo has been spent", func() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

Expand Down
Loading

0 comments on commit 57f136d

Please sign in to comment.