diff --git a/ethers-providers/src/rpc/pubsub.rs b/ethers-providers/src/rpc/pubsub.rs index a31c0dcbc6..8e79de2e71 100644 --- a/ethers-providers/src/rpc/pubsub.rs +++ b/ethers-providers/src/rpc/pubsub.rs @@ -1,6 +1,6 @@ -use crate::{JsonRpcClient, Middleware, Provider, TransactionStream}; +use crate::{JsonRpcClient, Middleware, Provider}; -use ethers_core::types::{TxHash, U256}; +use ethers_core::types::U256; use futures_util::stream::Stream; use pin_project::{pin_project, pinned_drop}; @@ -35,7 +35,7 @@ pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> { loaded_elements: VecDeque, - provider: &'a Provider

, + pub(crate) provider: &'a Provider

, #[pin] rx: P::NotificationStream, @@ -114,18 +114,3 @@ where let _ = (*self.provider).as_ref().unsubscribe(self.id); } } - -impl<'a, P> SubscriptionStream<'a, P, TxHash> -where - P: PubsubClient, -{ - /// Returns a stream that yields the `Transaction`s for the transaction hashes this stream - /// yields. - /// - /// This internally calls `Provider::get_transaction` with every new transaction. - /// No more than n futures will be buffered at any point in time, and less than n may also be - /// buffered depending on the state of each future. - pub fn transactions_unordered(self, n: usize) -> TransactionStream<'a, P, Self> { - TransactionStream::new(self.provider, self, n) - } -} diff --git a/ethers-providers/src/stream/tx_stream.rs b/ethers-providers/src/stream/tx_stream.rs index cd4f16815f..b4fb723e52 100644 --- a/ethers-providers/src/stream/tx_stream.rs +++ b/ethers-providers/src/stream/tx_stream.rs @@ -13,7 +13,10 @@ use futures_util::{ use ethers_core::types::{Transaction, TxHash}; -use crate::{FilterWatcher, JsonRpcClient, Middleware, Provider, ProviderError}; +use crate::{ + FilterWatcher, JsonRpcClient, Middleware, Provider, ProviderError, PubsubClient, + SubscriptionStream, +}; /// Errors `TransactionStream` can throw #[derive(Debug, thiserror::Error)] @@ -146,6 +149,21 @@ where } } +impl<'a, P> SubscriptionStream<'a, P, TxHash> +where + P: PubsubClient, +{ + /// Returns a stream that yields the `Transaction`s for the transaction hashes this stream + /// yields. + /// + /// This internally calls `Provider::get_transaction` with every new transaction. + /// No more than n futures will be buffered at any point in time, and less than n may also be + /// buffered depending on the state of each future. + pub fn transactions_unordered(self, n: usize) -> TransactionStream<'a, P, Self> { + TransactionStream::new(self.provider, self, n) + } +} + #[cfg(test)] #[cfg(not(target_arch = "wasm32"))] mod tests {