diff --git a/api/stream.go b/api/stream.go index 9fa1b6756..64923224d 100644 --- a/api/stream.go +++ b/api/stream.go @@ -99,23 +99,16 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (* s.transactionsPublisher, func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error { return func(data any) error { - tx, ok := data.(models.Transaction) + tx, ok := data.(*gethTypes.Transaction) if !ok { return fmt.Errorf("invalid data sent to pending transaction subscription") } - var res any if fullTx != nil && *fullTx { - if r, err := NewTransaction(tx, s.config.EVMNetworkID); err != nil { - return err - } else { - res = r - } - } else { - res = tx.Hash() + return notifier.Notify(sub.ID, tx) } - return notifier.Notify(sub.ID, res) + return notifier.Notify(sub.ID, tx.Hash()) } }, ) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index ddc3a0e11..980f6180f 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -223,7 +223,6 @@ func startIngestion( transactions, accounts, blocksPublisher, - transactionsPublisher, logsPublisher, logger, ) @@ -286,12 +285,16 @@ func startServer( return fmt.Errorf("failed to create a COA signer: %w", err) } + // create transaction pool + txPool := requester.NewTxPool(client, transactionsPublisher, logger) + evm, err := requester.NewEVM( client, cfg, signer, logger, blocks, + txPool, ) if err != nil { return fmt.Errorf("failed to create EVM requester: %w", err) diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 87659bdcf..8395c9218 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -17,18 +17,17 @@ import ( var _ models.Engine = &Engine{} type Engine struct { - subscriber EventSubscriber - store *pebble.Storage - blocks storage.BlockIndexer - receipts storage.ReceiptIndexer - transactions storage.TransactionIndexer - accounts storage.AccountIndexer - log zerolog.Logger - evmLastHeight *models.SequentialHeight - status *models.EngineStatus - blocksPublisher *models.Publisher - transactionsPublisher *models.Publisher - logsPublisher *models.Publisher + subscriber EventSubscriber + store *pebble.Storage + blocks storage.BlockIndexer + receipts storage.ReceiptIndexer + transactions storage.TransactionIndexer + accounts storage.AccountIndexer + log zerolog.Logger + evmLastHeight *models.SequentialHeight + status *models.EngineStatus + blocksPublisher *models.Publisher + logsPublisher *models.Publisher } func NewEventIngestionEngine( @@ -39,24 +38,22 @@ func NewEventIngestionEngine( transactions storage.TransactionIndexer, accounts storage.AccountIndexer, blocksPublisher *models.Publisher, - transactionsPublisher *models.Publisher, logsPublisher *models.Publisher, log zerolog.Logger, ) *Engine { log = log.With().Str("component", "ingestion").Logger() return &Engine{ - subscriber: subscriber, - store: store, - blocks: blocks, - receipts: receipts, - transactions: transactions, - accounts: accounts, - log: log, - status: models.NewEngineStatus(), - blocksPublisher: blocksPublisher, - transactionsPublisher: transactionsPublisher, - logsPublisher: logsPublisher, + subscriber: subscriber, + store: store, + blocks: blocks, + receipts: receipts, + transactions: transactions, + accounts: accounts, + log: log, + status: models.NewEngineStatus(), + blocksPublisher: blocksPublisher, + logsPublisher: logsPublisher, } } @@ -181,9 +178,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { e.blocksPublisher.Publish(b) } - for i, r := range receipts { - e.transactionsPublisher.Publish(txs[i]) - + for _, r := range receipts { if len(r.Logs) > 0 { e.logsPublisher.Publish(r.Logs) } diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index 799d16771..15a4e4c5b 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -68,7 +68,6 @@ func TestSerialBlockIngestion(t *testing.T) { accounts, models.NewPublisher(), models.NewPublisher(), - models.NewPublisher(), zerolog.Nop(), ) @@ -148,7 +147,6 @@ func TestSerialBlockIngestion(t *testing.T) { accounts, models.NewPublisher(), models.NewPublisher(), - models.NewPublisher(), zerolog.Nop(), ) @@ -258,7 +256,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) { accounts, models.NewPublisher(), models.NewPublisher(), - models.NewPublisher(), zerolog.Nop(), ) @@ -357,7 +354,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) { accounts, models.NewPublisher(), models.NewPublisher(), - models.NewPublisher(), zerolog.Nop(), ) @@ -452,7 +448,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) { accounts, models.NewPublisher(), models.NewPublisher(), - models.NewPublisher(), zerolog.Nop(), ) diff --git a/services/requester/pool.go b/services/requester/pool.go index 8c08fb570..f62f1735a 100644 --- a/services/requester/pool.go +++ b/services/requester/pool.go @@ -25,18 +25,23 @@ const ( // and after submitted based on different strategies. type TxPool struct { - logger zerolog.Logger - client *CrossSporkClient - pool *sync.Map - // todo add a broadcaster for pending transaction streaming + logger zerolog.Logger + client *CrossSporkClient + pool *sync.Map + txPublisher *models.Publisher // todo add methods to inspect transaction pool state } -func NewTxPool(client *CrossSporkClient, logger zerolog.Logger) *TxPool { +func NewTxPool( + client *CrossSporkClient, + transactionsPublisher *models.Publisher, + logger zerolog.Logger, +) *TxPool { return &TxPool{ - logger: logger.With().Str("component", "tx-pool").Logger(), - client: client, - pool: &sync.Map{}, + logger: logger.With().Str("component", "tx-pool").Logger(), + client: client, + txPublisher: transactionsPublisher, + pool: &sync.Map{}, } } @@ -49,6 +54,8 @@ func (t *TxPool) Send( flowTx *flow.Transaction, evmTx *gethTypes.Transaction, ) error { + t.txPublisher.Publish(evmTx) // publish pending transaction event + if err := t.client.SendTransaction(ctx, *flowTx); err != nil { return err } diff --git a/services/requester/requester.go b/services/requester/requester.go index 5dc6a3ba8..ac6a7f986 100644 --- a/services/requester/requester.go +++ b/services/requester/requester.go @@ -113,6 +113,7 @@ func NewEVM( signer crypto.Signer, logger zerolog.Logger, blocks storage.BlockIndexer, + txPool *TxPool, ) (*EVM, error) { logger = logger.With().Str("component", "requester").Logger() // check that the address stores already created COA resource in the "evm" storage path. @@ -164,7 +165,7 @@ func NewEVM( signer: signer, logger: logger, blocks: blocks, - txPool: NewTxPool(client, logger), + txPool: txPool, head: head, evmSigner: evmSigner, validationOptions: validationOptions,