Skip to content

Commit

Permalink
Support address reset
Browse files Browse the repository at this point in the history
  • Loading branch information
dimriou committed Jan 10, 2025
1 parent df39b08 commit 9ce5870
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 51 deletions.
92 changes: 76 additions & 16 deletions core/chains/evm/txm/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 94 additions & 19 deletions core/chains/evm/txm/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math"
"math/big"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/google/uuid"
Expand All @@ -28,14 +29,17 @@ import (
)

type OrchestratorTxStore interface {
Abandon(context.Context, *big.Int, common.Address) error
Add(addresses ...common.Address) error
FetchUnconfirmedTransactionAtNonceWithCount(context.Context, uint64, common.Address) (*txmtypes.Transaction, int, error)
FindTxWithIdempotencyKey(context.Context, string) (*txmtypes.Transaction, error)
Remove(addresses ...common.Address) error
}

type OrchestratorKeystore interface {
CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
SubscribeToKeyChanges(ctx context.Context) (ch chan struct{}, unsub func())
}

type OrchestratorAttemptBuilder[
Expand All @@ -52,14 +56,18 @@ type Orchestrator[
HEAD types.Head[BLOCK_HASH],
] struct {
services.StateMachine
lggr logger.SugaredLogger
chainID *big.Int
txm *Txm
txStore OrchestratorTxStore
fwdMgr *forwarders.FwdMgr
keystore OrchestratorKeystore
attemptBuilder OrchestratorAttemptBuilder[BLOCK_HASH, HEAD]
resumeCallback txmgr.ResumeCallback
lggr logger.SugaredLogger
chainID *big.Int
txm *Txm
txStore OrchestratorTxStore
fwdMgr *forwarders.FwdMgr
keystore OrchestratorKeystore
attemptBuilder OrchestratorAttemptBuilder[BLOCK_HASH, HEAD]
resumeCallback txmgr.ResumeCallback
enabledAddresses map[common.Address]bool
chReset chan *common.Address
chStop services.StopChan
wg *sync.WaitGroup
}

func NewTxmOrchestrator[BLOCK_HASH types.Hashable, HEAD types.Head[BLOCK_HASH]](
Expand All @@ -72,13 +80,17 @@ func NewTxmOrchestrator[BLOCK_HASH types.Hashable, HEAD types.Head[BLOCK_HASH]](
attemptBuilder OrchestratorAttemptBuilder[BLOCK_HASH, HEAD],
) *Orchestrator[BLOCK_HASH, HEAD] {
return &Orchestrator[BLOCK_HASH, HEAD]{
lggr: logger.Sugared(logger.Named(lggr, "Orchestrator")),
chainID: chainID,
txm: txm,
txStore: txStore,
keystore: keystore,
attemptBuilder: attemptBuilder,
fwdMgr: fwdMgr,
lggr: logger.Sugared(logger.Named(lggr, "Orchestrator")),
chainID: chainID,
txm: txm,
txStore: txStore,
keystore: keystore,
attemptBuilder: attemptBuilder,
fwdMgr: fwdMgr,
enabledAddresses: make(map[common.Address]bool),
chReset: make(chan *common.Address),
chStop: make(chan struct{}),
wg: new(sync.WaitGroup),
}
}

Expand All @@ -93,6 +105,7 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) Start(ctx context.Context) error {
return err
}
for _, address := range addresses {
o.enabledAddresses[address] = true
err := o.txStore.Add(address)
if err != nil {
return err
Expand All @@ -106,12 +119,19 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) Start(ctx context.Context) error {
return fmt.Errorf("Orchestrator: ForwarderManager failed to start: %w", err)
}
}

o.wg.Add(1)
go o.runLoop()
return nil
})
}

func (o *Orchestrator[BLOCK_HASH, HEAD]) Close() (merr error) {
return o.StopOnce("Orchestrator", func() error {
close(o.chReset)
close(o.chStop)
o.wg.Done()

if o.fwdMgr != nil {
if err := o.fwdMgr.Close(); err != nil {
merr = errors.Join(merr, fmt.Errorf("Orchestrator failed to stop ForwarderManager: %w", err))
Expand All @@ -127,6 +147,59 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) Close() (merr error) {
})
}

func (o *Orchestrator[BLOCK_HASH, HEAD]) runLoop() {
defer o.wg.Done()
ctx, cancel := o.chStop.NewCtx()
defer cancel()

keysChanged, unsub := o.keystore.SubscribeToKeyChanges(ctx)
defer unsub()

for {
select {
case <-o.chStop:
return
case <-keysChanged:
updatedEnabledAddresses, err := o.keystore.EnabledAddressesForChain(ctx, o.chainID)
if err != nil {
o.lggr.Critical("Failed to reload key states after key change")
o.SvcErrBuffer.Append(err)
continue
}
o.lggr.Debugw("Keys changed, reloading", "enabledAddresses", updatedEnabledAddresses)

// this will help with lookup
updatedEnabledAddressesMap := make(map[common.Address]bool)
for _, updatedAddress := range updatedEnabledAddresses {
updatedEnabledAddressesMap[updatedAddress] = true
if _, exists := o.enabledAddresses[updatedAddress]; !exists {
if err := o.txStore.Add(updatedAddress); err != nil {
o.lggr.Errorw("Failed to add address to InMemoryStore", "address", updatedAddress, "err", err)
continue
}
}
}

for oldEnabledAddress := range o.enabledAddresses {
if !updatedEnabledAddressesMap[oldEnabledAddress] {
if err := o.txStore.Remove(oldEnabledAddress); err != nil {
o.lggr.Errorw("Failed to remove address from InMemoryStore", "address", oldEnabledAddress, "err", err)
continue
}
}
}
o.enabledAddresses = updatedEnabledAddressesMap
if err := o.txm.Reset(ctx, nil); err != nil {
o.lggr.Errorw("Failed to Reset TXM", "err", err)
}
case abandonAddress := <-o.chReset:
if err := o.txm.Reset(ctx, abandonAddress); err != nil {
o.lggr.Errorw("Failed to Reset TXM", "err", err)
}
}
}
}

func (o *Orchestrator[BLOCK_HASH, HEAD]) Trigger(addr common.Address) {
o.txm.Trigger(addr)
}
Expand All @@ -143,16 +216,18 @@ func (o *Orchestrator[BLOCK_HASH, HEAD]) RegisterResumeCallback(fn txmgr.ResumeC
o.resumeCallback = fn
}

func (o *Orchestrator[BLOCK_HASH, HEAD]) Reset(addr common.Address, abandon bool) error {
func (o *Orchestrator[BLOCK_HASH, HEAD]) Reset(addr common.Address, abandon bool) (err error) {
ok := o.IfStarted(func() {
if err := o.txm.Abandon(addr); err != nil {
o.lggr.Error(err)
if abandon {
o.chReset <- &addr
} else {
o.chReset <- nil
}
})
if !ok {
return errors.New("Orchestrator not started yet")
}
return nil
return err
}

func (o *Orchestrator[BLOCK_HASH, HEAD]) OnNewLongestChain(ctx context.Context, head HEAD) {
Expand Down
20 changes: 19 additions & 1 deletion core/chains/evm/txm/storage/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewInMemoryStore(lggr logger.Logger, address common.Address, chainID *big.I
}
}

func (m *InMemoryStore) AbandonPendingTransactions() {
func (m *InMemoryStore) Abandon() {
// TODO: append existing fatal transactions and cap the size
m.Lock()
defer m.Unlock()
Expand Down Expand Up @@ -182,6 +182,24 @@ func (m *InMemoryStore) FetchUnconfirmedTransactionAtNonceWithCount(latestNonce
return
}

func (m *InMemoryStore) FindLatestNonce() (maxNonce uint64) {
m.RLock()
defer m.RUnlock()

for _, tx := range m.UnconfirmedTransactions {
if tx.Nonce != nil {
maxNonce = max(*tx.Nonce, maxNonce)
}
}

for _, tx := range m.ConfirmedTransactions {
if tx.Nonce != nil {
maxNonce = max(*tx.Nonce, maxNonce)
}
}
return
}

func (m *InMemoryStore) MarkConfirmedAndReorgedTransactions(latestNonce uint64) ([]*types.Transaction, []uint64, error) {
m.Lock()
defer m.Unlock()
Expand Down
Loading

0 comments on commit 9ce5870

Please sign in to comment.