diff --git a/gateway/node.go b/gateway/node.go index a5cbf6aff9f..0bfe7e191b2 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -180,6 +180,7 @@ var ( _ full.GasModuleAPI = (*Node)(nil) _ full.MpoolModuleAPI = (*Node)(nil) _ full.StateModuleAPI = (*Node)(nil) + _ full.EthModuleAPI = (*Node)(nil) ) type options struct { diff --git a/node/builder_chain.go b/node/builder_chain.go index ffdcf3a64a2..cdf529cb743 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -141,6 +141,7 @@ var ChainNode = Options( Override(new(full.StateModuleAPI), From(new(api.Gateway))), Override(new(stmgr.StateManagerAPI), rpcstmgr.NewRPCStateManager), Override(new(full.EthModuleAPI), From(new(api.Gateway))), + Override(new(full.EthTxHashManager), &full.EthTxHashManagerDummy{}), Override(new(full.EthEventAPI), From(new(api.Gateway))), Override(new(full.ActorEventAPI), From(new(api.Gateway))), ), @@ -261,12 +262,14 @@ func ConfigFullNode(c interface{}) Option { If(cfg.Fevm.EnableEthRPC, Override(new(*full.EthEventHandler), modules.EthEventHandler(cfg.Events, cfg.Fevm.EnableEthRPC)), + Override(new(full.EthTxHashManager), modules.EthTxHashManager(cfg.Fevm)), Override(new(full.EthModuleAPI), modules.EthModuleAPI(cfg.Fevm)), Override(new(full.EthEventAPI), From(new(*full.EthEventHandler))), ), If(!cfg.Fevm.EnableEthRPC, Override(new(full.EthModuleAPI), &full.EthModuleDummy{}), Override(new(full.EthEventAPI), &full.EthModuleDummy{}), + Override(new(full.EthTxHashManager), &full.EthTxHashManagerDummy{}), ), If(cfg.Events.EnableActorEventsAPI, diff --git a/node/impl/full/dummy.go b/node/impl/full/dummy.go index ed953de112a..491fd136c31 100644 --- a/node/impl/full/dummy.go +++ b/node/impl/full/dummy.go @@ -139,10 +139,6 @@ func (e *EthModuleDummy) EthSendRawTransaction(ctx context.Context, rawTx ethtyp return ethtypes.EthHash{}, ErrModuleDisabled } -func (e *EthModuleDummy) EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) { - return ethtypes.EthHash{}, ErrModuleDisabled -} - func (e *EthModuleDummy) Web3ClientVersion(ctx context.Context) (string, error) { return "", ErrModuleDisabled } diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 05fb8f7fec0..e2f25283b33 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -79,7 +79,6 @@ type EthModuleAPI interface { EthCall(ctx context.Context, tx ethtypes.EthCall, blkParam ethtypes.EthBlockNumberOrHash) (ethtypes.EthBytes, error) EthMaxPriorityFeePerGas(ctx context.Context) (ethtypes.EthBigInt, error) EthSendRawTransaction(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) - EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) Web3ClientVersion(ctx context.Context) (string, error) EthTraceBlock(ctx context.Context, blkNum string) ([]*ethtypes.EthTraceBlock, error) EthTraceReplayBlockTransactions(ctx context.Context, blkNum string, traceTypes []string) ([]*ethtypes.EthTraceReplayBlockTransaction, error) @@ -102,6 +101,7 @@ type EthEventAPI interface { var ( _ EthModuleAPI = *new(api.FullNode) _ EthEventAPI = *new(api.FullNode) + _ EthModuleAPI = *new(api.Gateway) ) // EthModule provides the default implementation of the standard Ethereum JSON-RPC API. @@ -135,7 +135,7 @@ type EthModule struct { Chain *store.ChainStore Mpool *messagepool.MessagePool StateManager *stmgr.StateManager - EthTxHashManager *EthTxHashManager + EthTxHashManager EthTxHashManager EthTraceFilterMaxResults uint64 EthEventHandler *EthEventHandler @@ -166,8 +166,10 @@ var _ EthEventAPI = (*EthEventHandler)(nil) type EthAPI struct { fx.In - Chain *store.ChainStore - StateManager *stmgr.StateManager + Chain *store.ChainStore + StateManager *stmgr.StateManager + EthTxHashManager EthTxHashManager + MpoolAPI MpoolAPI EthModuleAPI EthEventAPI @@ -355,7 +357,7 @@ func (a *EthModule) EthGetTransactionByHashLimited(ctx context.Context, txHash * return nil, nil } - c, err := a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(*txHash) + c, err := a.EthTxHashManager.GetCidFromHash(*txHash) if err != nil { log.Debug("could not find transaction hash %s in lookup table", txHash.String()) } @@ -414,7 +416,7 @@ func (a *EthModule) EthGetMessageCidByTransactionHash(ctx context.Context, txHas return nil, nil } - c, err := a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(*txHash) + c, err := a.EthTxHashManager.GetCidFromHash(*txHash) // We fall out of the first condition and continue if errors.Is(err, ethhashlookup.ErrNotFound) { log.Debug("could not find transaction hash %s in lookup table", txHash.String()) @@ -498,7 +500,7 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash ethtype } func (a *EthModule) EthGetTransactionReceiptLimited(ctx context.Context, txHash ethtypes.EthHash, limit abi.ChainEpoch) (*api.EthTxReceipt, error) { - c, err := a.EthTxHashManager.TransactionHashLookup.GetCidFromHash(txHash) + c, err := a.EthTxHashManager.GetCidFromHash(txHash) if err != nil { log.Debug("could not find transaction hash %s in lookup table", txHash.String()) } @@ -917,14 +919,14 @@ func (a *EthModule) EthGasPrice(ctx context.Context) (ethtypes.EthBigInt, error) } func (a *EthModule) EthSendRawTransaction(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) { - return a.ethSendRawTransaction(ctx, rawTx, false) + return ethSendRawTransaction(ctx, a.MpoolAPI, a.EthTxHashManager, rawTx, false) } -func (a *EthModule) EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) { - return a.ethSendRawTransaction(ctx, rawTx, true) +func (a *EthAPI) EthSendRawTransactionUntrusted(ctx context.Context, rawTx ethtypes.EthBytes) (ethtypes.EthHash, error) { + return ethSendRawTransaction(ctx, a.MpoolAPI, a.EthTxHashManager, rawTx, true) } -func (a *EthModule) ethSendRawTransaction(ctx context.Context, rawTx ethtypes.EthBytes, untrusted bool) (ethtypes.EthHash, error) { +func ethSendRawTransaction(ctx context.Context, mpool MpoolAPI, ethTxHashManager EthTxHashManager, rawTx ethtypes.EthBytes, untrusted bool) (ethtypes.EthHash, error) { txArgs, err := ethtypes.ParseEthTransaction(rawTx) if err != nil { return ethtypes.EmptyEthHash, err @@ -941,18 +943,18 @@ func (a *EthModule) ethSendRawTransaction(ctx context.Context, rawTx ethtypes.Et } if untrusted { - if _, err = a.MpoolAPI.MpoolPushUntrusted(ctx, smsg); err != nil { + if _, err = mpool.MpoolPushUntrusted(ctx, smsg); err != nil { return ethtypes.EmptyEthHash, err } } else { - if _, err = a.MpoolAPI.MpoolPush(ctx, smsg); err != nil { + if _, err = mpool.MpoolPush(ctx, smsg); err != nil { return ethtypes.EmptyEthHash, err } } // make it immediately available in the transaction hash lookup db, even though it will also // eventually get there via the mpool - if err := a.EthTxHashManager.TransactionHashLookup.UpsertHash(txHash, smsg.Cid()); err != nil { + if err := ethTxHashManager.UpsertHash(txHash, smsg.Cid()); err != nil { log.Errorf("error inserting tx mapping to db: %s", err) } diff --git a/node/impl/full/txhashmanager.go b/node/impl/full/txhashmanager.go index df31670b60a..00a5980a3fe 100644 --- a/node/impl/full/txhashmanager.go +++ b/node/impl/full/txhashmanager.go @@ -4,34 +4,59 @@ import ( "context" "time" + "github.com/ipfs/go-cid" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build/buildconstants" "github.com/filecoin-project/lotus/chain/ethhashlookup" + "github.com/filecoin-project/lotus/chain/events" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" ) -type EthTxHashManager struct { - StateAPI StateAPI - TransactionHashLookup *ethhashlookup.EthTxHashLookup +type EthTxHashManager interface { + events.TipSetObserver + + PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error + ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) + UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error + GetCidFromHash(txHash ethtypes.EthHash) (cid.Cid, error) + DeleteEntriesOlderThan(days int) (int64, error) +} + +var ( + _ EthTxHashManager = (*ethTxHashManager)(nil) + _ EthTxHashManager = (*EthTxHashManagerDummy)(nil) +) + +type ethTxHashManager struct { + stateAPI StateAPI + transactionHashLookup *ethhashlookup.EthTxHashLookup +} + +func NewEthTxHashManager(stateAPI StateAPI, transactionHashLookup *ethhashlookup.EthTxHashLookup) EthTxHashManager { + return ðTxHashManager{ + stateAPI: stateAPI, + transactionHashLookup: transactionHashLookup, + } } -func (m *EthTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error { +func (m *ethTxHashManager) Revert(ctx context.Context, from, to *types.TipSet) error { return nil } -func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error { +func (m *ethTxHashManager) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error { if minHeight < buildconstants.UpgradeHyggeHeight { minHeight = buildconstants.UpgradeHyggeHeight } - ts := m.StateAPI.Chain.GetHeaviestTipSet() + ts := m.stateAPI.Chain.GetHeaviestTipSet() for ts.Height() > minHeight { for _, block := range ts.Blocks() { - msgs, err := m.StateAPI.Chain.SecpkMessagesForBlock(ctx, block) + msgs, err := m.stateAPI.Chain.SecpkMessagesForBlock(ctx, block) if err != nil { // If we can't find the messages, we've either imported from snapshot or pruned the store log.Debug("exiting message mapping population at epoch ", ts.Height()) @@ -44,7 +69,7 @@ func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeig } var err error - ts, err = m.StateAPI.Chain.GetTipSetFromKey(ctx, ts.Parents()) + ts, err = m.stateAPI.Chain.GetTipSetFromKey(ctx, ts.Parents()) if err != nil { return err } @@ -53,9 +78,9 @@ func (m *EthTxHashManager) PopulateExistingMappings(ctx context.Context, minHeig return nil } -func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) error { +func (m *ethTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) error { for _, blk := range to.Blocks() { - _, smsgs, err := m.StateAPI.Chain.MessagesForBlock(ctx, blk) + _, smsgs, err := m.stateAPI.Chain.MessagesForBlock(ctx, blk) if err != nil { return err } @@ -70,7 +95,7 @@ func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) er return err } - err = m.TransactionHashLookup.UpsertHash(hash, smsg.Cid()) + err = m.transactionHashLookup.UpsertHash(hash, smsg.Cid()) if err != nil { return err } @@ -80,7 +105,19 @@ func (m *EthTxHashManager) Apply(ctx context.Context, from, to *types.TipSet) er return nil } -func (m *EthTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) { +func (m *ethTxHashManager) UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error { + return m.transactionHashLookup.UpsertHash(txHash, c) +} + +func (m *ethTxHashManager) GetCidFromHash(txHash ethtypes.EthHash) (cid.Cid, error) { + return m.transactionHashLookup.GetCidFromHash(txHash) +} + +func (m *ethTxHashManager) DeleteEntriesOlderThan(days int) (int64, error) { + return m.transactionHashLookup.DeleteEntriesOlderThan(days) +} + +func (m *ethTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) { if msg.Signature.Type != crypto.SigTypeDelegated { return } @@ -97,14 +134,14 @@ func (m *EthTxHashManager) ProcessSignedMessage(ctx context.Context, msg *types. return } - err = m.TransactionHashLookup.UpsertHash(txHash, msg.Cid()) + err = m.UpsertHash(txHash, msg.Cid()) if err != nil { log.Errorf("error inserting tx mapping to db: %s", err) return } } -func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager *EthTxHashManager) { +func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager EthTxHashManager) { for { select { case <-ctx.Done(): @@ -119,14 +156,14 @@ func WaitForMpoolUpdates(ctx context.Context, ch <-chan api.MpoolUpdate, manager } } -func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManager) { +func EthTxHashGC(ctx context.Context, retentionDays int, manager EthTxHashManager) { if retentionDays == 0 { return } gcPeriod := 1 * time.Hour for { - entriesDeleted, err := manager.TransactionHashLookup.DeleteEntriesOlderThan(retentionDays) + entriesDeleted, err := manager.DeleteEntriesOlderThan(retentionDays) if err != nil { log.Errorf("error garbage collecting eth transaction hash database: %s", err) } @@ -134,3 +171,31 @@ func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManag time.Sleep(gcPeriod) } } + +type EthTxHashManagerDummy struct{} + +func (d *EthTxHashManagerDummy) PopulateExistingMappings(ctx context.Context, minHeight abi.ChainEpoch) error { + return nil +} + +func (d *EthTxHashManagerDummy) Revert(ctx context.Context, from, to *types.TipSet) error { + return nil +} + +func (d *EthTxHashManagerDummy) Apply(ctx context.Context, from, to *types.TipSet) error { + return nil +} + +func (d *EthTxHashManagerDummy) ProcessSignedMessage(ctx context.Context, msg *types.SignedMessage) {} + +func (d *EthTxHashManagerDummy) UpsertHash(txHash ethtypes.EthHash, c cid.Cid) error { + return nil +} + +func (d *EthTxHashManagerDummy) GetCidFromHash(txHash ethtypes.EthHash) (cid.Cid, error) { + return cid.Undef, nil +} + +func (d *EthTxHashManagerDummy) DeleteEntriesOlderThan(days int) (int64, error) { + return 0, nil +} diff --git a/node/modules/ethmodule.go b/node/modules/ethmodule.go index ff087036545..d701cfb0c14 100644 --- a/node/modules/ethmodule.go +++ b/node/modules/ethmodule.go @@ -25,8 +25,8 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) -func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler) (*full.EthModule, error) { - return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI, ethEventHandler *full.EthEventHandler) (*full.EthModule, error) { +func EthTxHashManager(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, EventHelperAPI, *messagepool.MessagePool, full.StateAPI, full.SyncAPI) (full.EthTxHashManager, error) { + return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, evapi EventHelperAPI, mp *messagepool.MessagePool, stateapi full.StateAPI, syncapi full.SyncAPI) (full.EthTxHashManager, error) { ctx := helpers.LifecycleCtx(mctx, lc) sqlitePath, err := r.SqlitePath() @@ -51,10 +51,7 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep }, }) - ethTxHashManager := full.EthTxHashManager{ - StateAPI: stateapi, - TransactionHashLookup: transactionHashLookup, - } + ethTxHashManager := full.NewEthTxHashManager(stateapi, transactionHashLookup) if !dbAlreadyExists { err = ethTxHashManager.PopulateExistingMappings(mctx, 0) @@ -82,21 +79,29 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep } // Tipset listener - _ = ev.Observe(ðTxHashManager) + _ = ev.Observe(ethTxHashManager) ch, err := mp.Updates(ctx) if err != nil { return err } - go full.WaitForMpoolUpdates(ctx, ch, ðTxHashManager) - go full.EthTxHashGC(ctx, cfg.EthTxHashMappingLifetimeDays, ðTxHashManager) + go full.WaitForMpoolUpdates(ctx, ch, ethTxHashManager) + go full.EthTxHashGC(ctx, cfg.EthTxHashMappingLifetimeDays, ethTxHashManager) return nil }, }) + return ethTxHashManager, nil + } +} + +func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRepo, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, *messagepool.MessagePool, full.StateAPI, full.ChainAPI, full.MpoolAPI, full.SyncAPI, *full.EthEventHandler, full.EthTxHashManager) (*full.EthModule, error) { + return func(mctx helpers.MetricsCtx, r repo.LockedRepo, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, mp *messagepool.MessagePool, stateapi full.StateAPI, chainapi full.ChainAPI, mpoolapi full.MpoolAPI, syncapi full.SyncAPI, ethEventHandler *full.EthEventHandler, ethTxHashManager full.EthTxHashManager) (*full.EthModule, error) { + var blkCache *arc.ARCCache[cid.Cid, *ethtypes.EthBlock] var blkTxCache *arc.ARCCache[cid.Cid, *ethtypes.EthBlock] + var err error if cfg.EthBlkCacheSize > 0 { blkCache, err = arc.NewARC[cid.Cid, *ethtypes.EthBlock](cfg.EthBlkCacheSize) if err != nil { @@ -120,7 +125,7 @@ func EthModuleAPI(cfg config.FevmConfig) func(helpers.MetricsCtx, repo.LockedRep SyncAPI: syncapi, EthEventHandler: ethEventHandler, - EthTxHashManager: ðTxHashManager, + EthTxHashManager: ethTxHashManager, EthTraceFilterMaxResults: cfg.EthTraceFilterMaxResults, EthBlkCache: blkCache,