diff --git a/benches/benches/transaction_throughput.rs b/benches/benches/transaction_throughput.rs index 23454dda359..5d78818e8da 100644 --- a/benches/benches/transaction_throughput.rs +++ b/benches/benches/transaction_throughput.rs @@ -89,6 +89,7 @@ where test_builder.trigger = Trigger::Never; test_builder.utxo_validation = true; test_builder.gas_limit = Some(10_000_000_000); + test_builder.block_size_limit = Some(1_000_000_000_000); // spin up node let transactions: Vec = diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index ab3523dec1a..1a76fb18a68 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -488,6 +488,7 @@ impl Command { let config = Config { graphql_config: GraphQLConfig { addr, + database_butch_size: graphql.database_butch_size, max_queries_depth: graphql.graphql_max_depth, max_queries_complexity: graphql.graphql_max_complexity, max_queries_recursive_depth: graphql.graphql_max_recursive_depth, diff --git a/bin/fuel-core/src/cli/run/graphql.rs b/bin/fuel-core/src/cli/run/graphql.rs index 95cfb2852f5..73c682d0643 100644 --- a/bin/fuel-core/src/cli/run/graphql.rs +++ b/bin/fuel-core/src/cli/run/graphql.rs @@ -12,6 +12,10 @@ pub struct GraphQLArgs { #[clap(long = "port", default_value = "4000", env)] pub port: u16, + /// The size of the butch fetched from the database by GraphQL service. + #[clap(long = "graphql-database-butch-size", default_value = "100", env)] + pub database_butch_size: usize, + /// The max depth of GraphQL queries. #[clap(long = "graphql-max-depth", default_value = "16", env)] pub graphql_max_depth: usize, diff --git a/crates/fuel-core/src/coins_query.rs b/crates/fuel-core/src/coins_query.rs index e07630333f8..ea7c2b34c0d 100644 --- a/crates/fuel-core/src/coins_query.rs +++ b/crates/fuel-core/src/coins_query.rs @@ -279,10 +279,7 @@ mod tests { fuel_asm::Word, fuel_tx::*, }; - use futures::{ - StreamExt, - TryStreamExt, - }; + use futures::TryStreamExt; use itertools::Itertools; use rand::{ rngs::StdRng, @@ -987,7 +984,7 @@ mod tests { fn service_database(&self) -> ServiceDatabase { let on_chain = self.database.on_chain().clone(); let off_chain = self.database.off_chain().clone(); - ServiceDatabase::new(0u32.into(), on_chain, off_chain) + ServiceDatabase::new(100, 0u32.into(), on_chain, off_chain) } } @@ -1044,8 +1041,7 @@ mod tests { let query = self.service_database(); let query = query.test_view(); query - .owned_coins_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| query.coin(id).unwrap())) + .owned_coins(owner, None, IterDirection::Forward) .try_collect() .await .unwrap() @@ -1055,8 +1051,7 @@ mod tests { let query = self.service_database(); let query = query.test_view(); query - .owned_message_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| query.message(&id).unwrap())) + .owned_messages(owner, None, IterDirection::Forward) .try_collect() .await .unwrap() diff --git a/crates/fuel-core/src/database/block.rs b/crates/fuel-core/src/database/block.rs index c6295e62017..9202bd04043 100644 --- a/crates/fuel-core/src/database/block.rs +++ b/crates/fuel-core/src/database/block.rs @@ -67,7 +67,7 @@ impl OnChainIterableKeyValueView { let db_block = self.storage::().get(height)?; if let Some(block) = db_block { // fetch all the transactions - // TODO: optimize with multi-key get + // TODO: Use multiget when it's implemented. let txs = block .transactions() .iter() diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 6d0d280fd3f..8b8706b75e5 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -20,6 +20,7 @@ pub mod worker_service; #[derive(Clone, Debug)] pub struct ServiceConfig { pub addr: SocketAddr, + pub database_butch_size: usize, pub max_queries_depth: usize, pub max_queries_complexity: usize, pub max_queries_recursive_depth: usize, diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index ec79918e09c..ce0fdbc9f6c 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -221,8 +221,12 @@ where OffChain::LatestView: OffChainDatabase, { let network_addr = config.config.addr; - let combined_read_database = - ReadDatabase::new(genesis_block_height, on_database, off_database); + let combined_read_database = ReadDatabase::new( + config.config.database_butch_size, + genesis_block_height, + on_database, + off_database, + ); let request_timeout = config.config.api_request_timeout; let concurrency_limit = config.config.max_concurrent_queries; let body_limit = config.config.request_body_bytes_limit; diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index 3f254032b0c..bd08761870c 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -5,6 +5,7 @@ use crate::fuel_core_graphql_api::{ OnChainDatabase, }, }; +use async_graphql::futures_util::StreamExt; use fuel_core_storage::{ iter::{ BoxedIter, @@ -77,6 +78,8 @@ pub type OffChainView = Arc; /// The container of the on-chain and off-chain database view provides. /// It is used only by `ViewExtension` to create a [`ReadView`]. pub struct ReadDatabase { + /// The size of the butch during fetching from the database. + butch_size: usize, /// The height of the genesis block. genesis_height: BlockHeight, /// The on-chain database view provider. @@ -88,6 +91,7 @@ pub struct ReadDatabase { impl ReadDatabase { /// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers. pub fn new( + butch_size: usize, genesis_height: BlockHeight, on_chain: OnChain, off_chain: OffChain, @@ -99,6 +103,7 @@ impl ReadDatabase { OffChain::LatestView: OffChainDatabase, { Self { + butch_size, genesis_height, on_chain: Box::new(ArcWrapper::new(on_chain)), off_chain: Box::new(ArcWrapper::new(off_chain)), @@ -111,6 +116,7 @@ impl ReadDatabase { // It is not possible to implement until `view_at` is implemented for the `AtomicView`. // https://github.com/FuelLabs/fuel-core/issues/1582 Ok(ReadView { + butch_size: self.butch_size, genesis_height: self.genesis_height, on_chain: self.on_chain.latest_view()?, off_chain: self.off_chain.latest_view()?, @@ -125,6 +131,7 @@ impl ReadDatabase { #[derive(Clone)] pub struct ReadView { + pub(crate) butch_size: usize, pub(crate) genesis_height: BlockHeight, pub(crate) on_chain: OnChainView, pub(crate) off_chain: OffChainView, @@ -134,7 +141,7 @@ impl ReadView { pub fn transaction(&self, tx_id: &TxId) -> StorageResult { let result = self.on_chain.transaction(tx_id); if result.is_not_found() { - if let Some(tx) = self.old_transaction(tx_id)? { + if let Some(tx) = self.off_chain.old_transaction(tx_id)? { Ok(tx) } else { Err(not_found!(Transactions)) @@ -144,6 +151,20 @@ impl ReadView { } } + pub async fn transactions( + &self, + tx_ids: Vec, + ) -> Vec> { + // TODO: Use multiget when it's implemented. + let result = tx_ids + .iter() + .map(|tx_id| self.transaction(tx_id)) + .collect::>(); + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + result + } + pub fn block(&self, height: &BlockHeight) -> StorageResult { if *height >= self.genesis_height { self.on_chain.block(height) @@ -252,6 +273,13 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { futures::stream::iter(self.on_chain.all_messages(start_message_id, direction)) + .chunks(self.butch_size) + .filter_map(|chunk| async move { + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + Some(futures::stream::iter(chunk)) + }) + .flatten() } pub fn message_exists(&self, nonce: &Nonce) -> StorageResult { @@ -276,6 +304,13 @@ impl ReadView { start_asset, direction, )) + .chunks(self.butch_size) + .filter_map(|chunk| async move { + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + Some(futures::stream::iter(chunk)) + }) + .flatten() } pub fn da_height(&self) -> StorageResult { @@ -345,29 +380,6 @@ impl ReadView { self.off_chain.contract_salt(contract_id) } - pub fn old_block(&self, height: &BlockHeight) -> StorageResult { - self.off_chain.old_block(height) - } - - pub fn old_blocks( - &self, - height: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.off_chain.old_blocks(height, direction) - } - - pub fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult { - self.off_chain.old_block_consensus(height) - } - - pub fn old_transaction( - &self, - id: &TxId, - ) -> StorageResult> { - self.off_chain.old_transaction(id) - } - pub fn relayed_tx_status( &self, id: Bytes32, diff --git a/crates/fuel-core/src/query/balance.rs b/crates/fuel-core/src/query/balance.rs index 1a715b74522..0638dc768b9 100644 --- a/crates/fuel-core/src/query/balance.rs +++ b/crates/fuel-core/src/query/balance.rs @@ -106,5 +106,12 @@ impl ReadView { }) .map_ok(|stream| stream.map(Ok)) .try_flatten() + .chunks(self.butch_size) + .filter_map(|chunk| async move { + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + Some(futures::stream::iter(chunk)) + }) + .flatten() } } diff --git a/crates/fuel-core/src/query/balance/asset_query.rs b/crates/fuel-core/src/query/balance/asset_query.rs index bbed21c5568..3c9ba49cee6 100644 --- a/crates/fuel-core/src/query/balance/asset_query.rs +++ b/crates/fuel-core/src/query/balance/asset_query.rs @@ -1,4 +1,5 @@ use crate::graphql_api::database::ReadView; +use async_graphql::futures_util::TryStreamExt; use fuel_core_services::stream::IntoBoxStream; use fuel_core_storage::{ iter::IterDirection, @@ -78,7 +79,9 @@ impl<'a> AssetsQuery<'a> { fn coins_iter(mut self) -> impl Stream> + 'a { let assets = self.assets.take(); - self.database + let database = self.database; + let stream = self + .database .owned_coins_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) .filter(move |result| { @@ -99,12 +102,25 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not UTXO").into()); }; - // TODO: Fetch coin in a separate thread - let coin = self.database.coin(id)?; - - Ok(CoinType::Coin(coin)) + Ok(id) }) + }); + + futures::stream::StreamExt::chunks(stream, database.butch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) }) + .try_filter_map(move |chunk| async move { + let chunk = database + .coins(chunk) + .await + .map(|result| result.map(CoinType::Coin)); + Ok(Some(futures::stream::iter(chunk))) + }) + .try_flatten() .filter(move |result| { if let Ok(CoinType::Coin(coin)) = result { has_asset(&assets, &coin.asset_id) @@ -117,7 +133,8 @@ impl<'a> AssetsQuery<'a> { fn messages_iter(&self) -> impl Stream> + 'a { let exclude = self.exclude; let database = self.database; - self.database + let stream = self + .database .owned_message_ids(self.owner, None, IterDirection::Forward) .map(|id| id.map(CoinId::from)) .filter(move |result| { @@ -138,11 +155,22 @@ impl<'a> AssetsQuery<'a> { } else { return Err(anyhow::anyhow!("The coin is not a message").into()); }; - // TODO: Fetch message in a separate thread - let message = database.message(&id)?; - Ok(message) + Ok(id) }) + }); + + futures::stream::StreamExt::chunks(stream, database.butch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = database.messages(chunk).await; + Ok::<_, StorageError>(Some(futures::stream::iter(chunk))) }) + .try_flatten() .filter(|result| { if let Ok(message) = result { message.data().is_empty() diff --git a/crates/fuel-core/src/query/block.rs b/crates/fuel-core/src/query/block.rs index 3b725ab8b49..60f98838879 100644 --- a/crates/fuel-core/src/query/block.rs +++ b/crates/fuel-core/src/query/block.rs @@ -7,7 +7,10 @@ use fuel_core_types::{ blockchain::block::CompressedBlock, fuel_types::BlockHeight, }; -use futures::Stream; +use futures::{ + Stream, + StreamExt, +}; impl ReadView { pub fn latest_block_height(&self) -> StorageResult { @@ -24,5 +27,12 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { futures::stream::iter(self.blocks(height, direction)) + .chunks(self.butch_size) + .filter_map(|chunk| async move { + // Give a chance to other tasks to run. + tokio::task::yield_now().await; + Some(futures::stream::iter(chunk)) + }) + .flatten() } } diff --git a/crates/fuel-core/src/query/coin.rs b/crates/fuel-core/src/query/coin.rs index c6d52001ddd..7de4a5211d6 100644 --- a/crates/fuel-core/src/query/coin.rs +++ b/crates/fuel-core/src/query/coin.rs @@ -1,8 +1,10 @@ use crate::fuel_core_graphql_api::database::ReadView; +use async_graphql::futures_util::TryStreamExt; use fuel_core_storage::{ iter::IterDirection, not_found, tables::Coins, + Error as StorageError, Result as StorageResult, StorageAsRef, }; @@ -29,6 +31,17 @@ impl ReadView { Ok(coin.uncompress(utxo_id)) } + pub async fn coins( + &self, + utxo_ids: Vec, + ) -> impl Iterator> + '_ { + // TODO: Use multiget when it's implemented. + let coins = utxo_ids.into_iter().map(|id| self.coin(id)); + // Yield to the runtime to allow other tasks to run. + tokio::task::yield_now().await; + coins + } + pub fn owned_coins( &self, owner: &Address, @@ -36,11 +49,17 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { self.owned_coins_ids(owner, start_coin, direction) - .map(|res| { - res.and_then(|id| { - // TODO: Move fetching of the coin to a separate thread - self.coin(id) - }) + .chunks(self.butch_size) + .map(|chunk| { + use itertools::Itertools; + + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = self.coins(chunk).await; + Ok(Some(futures::stream::iter(chunk))) }) + .try_flatten() } } diff --git a/crates/fuel-core/src/query/message.rs b/crates/fuel-core/src/query/message.rs index 89cd21b8ae7..51550af3555 100644 --- a/crates/fuel-core/src/query/message.rs +++ b/crates/fuel-core/src/query/message.rs @@ -25,7 +25,6 @@ use fuel_core_types::{ fuel_tx::{ input::message::compute_message_id, Receipt, - Transaction, TxId, }, fuel_types::{ @@ -40,6 +39,7 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; use itertools::Itertools; use std::borrow::Cow; @@ -81,6 +81,16 @@ impl ReadView { .map(Cow::into_owned) } + pub async fn messages( + &self, + ids: Vec, + ) -> impl Iterator> + '_ { + let messages = ids.into_iter().map(|id| self.message(&id)); + // Yield to the runtime to allow other tasks to run. + tokio::task::yield_now().await; + messages + } + pub fn owned_messages<'a>( &'a self, owner: &'a Address, @@ -88,12 +98,16 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + 'a { self.owned_message_ids(owner, start_message_id, direction) - .map(|result| { - result.and_then(|id| { - // TODO: Move `message` fetching to a separate thread - self.message(&id) - }) + .chunks(self.butch_size) + .map(|chunk| { + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok(chunk) + }) + .try_filter_map(move |chunk| async move { + let chunk = self.messages(chunk).await; + Ok::<_, StorageError>(Some(futures::stream::iter(chunk))) }) + .try_flatten() } } @@ -102,9 +116,6 @@ pub trait MessageProofData { /// Get the block. fn block(&self, id: &BlockHeight) -> StorageResult; - /// Get the transaction. - fn transaction(&self, transaction_id: &TxId) -> StorageResult; - /// Return all receipts in the given transaction. fn receipts(&self, transaction_id: &TxId) -> StorageResult>; @@ -128,10 +139,6 @@ impl MessageProofData for ReadView { self.block(id) } - fn transaction(&self, transaction_id: &TxId) -> StorageResult { - self.transaction(transaction_id) - } - fn receipts(&self, transaction_id: &TxId) -> StorageResult> { self.receipts(transaction_id) } @@ -140,7 +147,7 @@ impl MessageProofData for ReadView { &self, transaction_id: &TxId, ) -> StorageResult { - self.status(transaction_id) + self.tx_status(transaction_id) } fn block_history_proof( diff --git a/crates/fuel-core/src/query/message/test.rs b/crates/fuel-core/src/query/message/test.rs index d49f173370b..43d6ecbef1a 100644 --- a/crates/fuel-core/src/query/message/test.rs +++ b/crates/fuel-core/src/query/message/test.rs @@ -10,8 +10,6 @@ use fuel_core_types::{ fuel_tx::{ AssetId, ContractId, - Script, - Transaction, }, fuel_types::BlockHeight, tai64::Tai64, @@ -64,7 +62,6 @@ mockall::mock! { message_block_height: &BlockHeight, commit_block_height: &BlockHeight, ) -> StorageResult; - fn transaction(&self, transaction_id: &TxId) -> StorageResult; fn receipts(&self, transaction_id: &TxId) -> StorageResult>; fn transaction_status(&self, transaction_id: &TxId) -> StorageResult; } @@ -107,16 +104,6 @@ async fn can_build_message_proof() { } }); - data.expect_transaction().returning(move |txn_id| { - let tx = TXNS - .iter() - .find(|t| *t == txn_id) - .map(|_| Script::default().into()) - .ok_or(not_found!("Transaction in `TXNS`"))?; - - Ok(tx) - }); - let commit_block_header = PartialBlockHeader { application: ApplicationHeader { da_height: 0u64.into(), diff --git a/crates/fuel-core/src/query/tx.rs b/crates/fuel-core/src/query/tx.rs index 8989efc9bd2..cfeaf00f8d1 100644 --- a/crates/fuel-core/src/query/tx.rs +++ b/crates/fuel-core/src/query/tx.rs @@ -3,6 +3,7 @@ use fuel_core_storage::{ iter::IterDirection, not_found, tables::Transactions, + Error as StorageError, Result as StorageResult, }; use fuel_core_types::{ @@ -18,11 +19,12 @@ use fuel_core_types::{ use futures::{ Stream, StreamExt, + TryStreamExt, }; impl ReadView { pub fn receipts(&self, tx_id: &TxId) -> StorageResult> { - let status = self.status(tx_id)?; + let status = self.tx_status(tx_id)?; let receipts = match status { TransactionStatus::Success { receipts, .. } @@ -32,10 +34,6 @@ impl ReadView { receipts.ok_or(not_found!(Transactions)) } - pub fn status(&self, tx_id: &TxId) -> StorageResult { - self.tx_status(tx_id) - } - pub fn owned_transactions( &self, owner: Address, @@ -43,13 +41,22 @@ impl ReadView { direction: IterDirection, ) -> impl Stream> + '_ { self.owned_transactions_ids(owner, start, direction) - .map(|result| { - result.and_then(|(tx_pointer, tx_id)| { - // TODO: Fetch transactions in a separate thread - let tx = self.transaction(&tx_id)?; + .chunks(self.butch_size) + .map(|chunk| { + use itertools::Itertools; - Ok((tx_pointer, tx)) - }) + let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?; + Ok::<_, StorageError>(chunk) + }) + .try_filter_map(move |chunk| async move { + let tx_ids = chunk.iter().map(|(_, tx_id)| *tx_id).collect::>(); + let txs = self.transactions(tx_ids).await; + let txs = txs + .into_iter() + .zip(chunk) + .map(|(result, (tx_pointer, _))| result.map(|tx| (tx_pointer, tx))); + Ok(Some(futures::stream::iter(txs))) }) + .try_flatten() } } diff --git a/crates/fuel-core/src/schema/block.rs b/crates/fuel-core/src/schema/block.rs index d7bc8366282..0dd83103545 100644 --- a/crates/fuel-core/src/schema/block.rs +++ b/crates/fuel-core/src/schema/block.rs @@ -44,12 +44,14 @@ use fuel_core_types::{ block::CompressedBlock, header::BlockHeader, }, + fuel_tx::TxId, fuel_types, fuel_types::BlockHeight, }; use futures::{ Stream, StreamExt, + TryStreamExt, }; pub struct Block(pub(crate) CompressedBlock); @@ -135,14 +137,27 @@ impl Block { ctx: &Context<'_>, ) -> async_graphql::Result> { let query = ctx.read_view()?; - self.0 - .transactions() - .iter() - .map(|tx_id| { - let tx = query.transaction(tx_id)?; - Ok(Transaction::from_tx(*tx_id, tx)) + let tx_ids = futures::stream::iter(self.0.transactions().iter().copied()); + + let result = tx_ids + .chunks(query.butch_size) + .filter_map(move |tx_ids: Vec| { + let async_query = query.as_ref().clone(); + async move { + let txs = async_query.transactions(tx_ids.clone()).await; + let txs = txs + .into_iter() + .zip(tx_ids.into_iter()) + .map(|(r, tx_id)| r.map(|tx| Transaction::from_tx(tx_id, tx))); + + Some(futures::stream::iter(txs)) + } }) - .collect() + .flatten() + .try_collect() + .await?; + + Ok(result) } } diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 877aaa9df91..e58ef7ea911 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -71,7 +71,6 @@ use std::{ borrow::Cow, iter, }; -use tokio_stream::StreamExt; use types::{ DryRunTransactionExecutionStatus, Transaction, @@ -123,7 +122,9 @@ impl TxQuery { ) -> async_graphql::Result< Connection, > { + use futures::stream::StreamExt; let query = ctx.read_view()?; + let query_ref = query.as_ref(); crate::schema::query_pagination( after, before, @@ -158,14 +159,36 @@ impl TxQuery { async move { Ok(skip) } }) - .map(|result: StorageResult| { - result.and_then(|sorted| { - // TODO: Request transactions in a separate thread - let tx = query.transaction(&sorted.tx_id.0)?; - - Ok((sorted, Transaction::from_tx(sorted.tx_id.0, tx))) - }) - }); + .chunks(query_ref.butch_size) + .filter_map(move |chunk: Vec>| { + let async_query = query_ref.clone(); + async move { + use itertools::Itertools; + let result = chunk.into_iter().try_collect::<_, Vec<_>, _>(); + + let chunk = match result { + Ok(chunk) => chunk, + Err(err) => { + return Some(Err(err)); + } + }; + + let tx_ids = chunk + .iter() + .map(|sorted| sorted.tx_id.0) + .collect::>(); + let txs = async_query.transactions(tx_ids).await; + let txs = txs.into_iter().zip(chunk.into_iter()).map( + |(result, sorted)| { + result.map(|tx| { + (sorted, Transaction::from_tx(sorted.tx_id.0, tx)) + }) + }, + ); + Some(Ok(futures::stream::iter(txs))) + } + }) + .try_flatten(); Ok(all_txs) }, @@ -188,6 +211,7 @@ impl TxQuery { before: Option, ) -> async_graphql::Result> { + use futures::stream::StreamExt; let query = ctx.read_view()?; let params = ctx .data_unchecked::() @@ -382,6 +406,7 @@ impl TxStatusSubscription { ) -> async_graphql::Result< impl Stream> + 'a, > { + use tokio_stream::StreamExt; let subscription = submit_and_await_status(ctx, tx).await?; Ok(subscription @@ -410,6 +435,7 @@ async fn submit_and_await_status<'a>( ) -> async_graphql::Result< impl Stream> + 'a, > { + use tokio_stream::StreamExt; let txpool = ctx.data_unchecked::(); let params = ctx .data_unchecked::() diff --git a/crates/fuel-core/src/schema/tx/types.rs b/crates/fuel-core/src/schema/tx/types.rs index 097d6f06fdd..48c78e3509e 100644 --- a/crates/fuel-core/src/schema/tx/types.rs +++ b/crates/fuel-core/src/schema/tx/types.rs @@ -987,7 +987,7 @@ pub(crate) async fn get_tx_status( txpool: &TxPool, ) -> Result, StorageError> { match query - .status(&id) + .tx_status(&id) .into_api_result::()? { Some(status) => { diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index e5473135d6d..ed75edae477 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -138,6 +138,7 @@ impl Config { std::net::Ipv4Addr::new(127, 0, 0, 1).into(), 0, ), + database_butch_size: 100, max_queries_depth: 16, max_queries_complexity: 80000, max_queries_recursive_depth: 16, diff --git a/tests/test-helpers/src/builder.rs b/tests/test-helpers/src/builder.rs index 91134ceb7c3..0d16bc48dc6 100644 --- a/tests/test-helpers/src/builder.rs +++ b/tests/test-helpers/src/builder.rs @@ -93,6 +93,7 @@ pub struct TestSetupBuilder { pub initial_coins: Vec, pub starting_gas_price: u64, pub gas_limit: Option, + pub block_size_limit: Option, pub starting_block: Option, pub utxo_validation: bool, pub privileged_address: Address, @@ -201,6 +202,13 @@ impl TestSetupBuilder { .set_block_gas_limit(gas_limit); } + if let Some(block_size_limit) = self.block_size_limit { + chain_conf + .consensus_parameters + .set_block_transaction_size_limit(block_size_limit) + .expect("Should set new block size limit"); + } + chain_conf .consensus_parameters .set_privileged_address(self.privileged_address); @@ -251,6 +259,7 @@ impl Default for TestSetupBuilder { initial_coins: vec![], starting_gas_price: 0, gas_limit: None, + block_size_limit: None, starting_block: None, utxo_validation: true, privileged_address: Default::default(), diff --git a/tests/tests/dos.rs b/tests/tests/dos.rs index b9067738a5d..52109c46c6e 100644 --- a/tests/tests/dos.rs +++ b/tests/tests/dos.rs @@ -1,6 +1,9 @@ #![allow(non_snake_case)] -use std::time::Instant; +use std::time::{ + Duration, + Instant, +}; use fuel_core::service::{ Config, @@ -666,3 +669,40 @@ async fn schema_is_retrievable() { let result = send_graph_ql_query(&url, query).await; assert!(result.contains("__schema"), "{:?}", result); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn heavy_tasks_doesnt_block_graphql() { + let mut config = Config::local_node(); + + const NUM_OF_BLOCKS: u32 = 4000; + config.graphql_config.max_queries_complexity = 10_000_000; + + let query = FULL_BLOCK_QUERY.to_string(); + let query = query.replace("$NUMBER_OF_BLOCKS", NUM_OF_BLOCKS.to_string().as_str()); + + let node = FuelService::new_node(config).await.unwrap(); + let url = format!("http://{}/v1/graphql", node.bound_address); + let client = FuelClient::new(url.clone()).unwrap(); + client.produce_blocks(NUM_OF_BLOCKS, None).await.unwrap(); + + // Given + for _ in 0..50 { + let url = url.clone(); + let query = query.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(20)).await; + let result = send_graph_ql_query(&url, &query).await; + assert!(result.contains("transactions")); + }); + } + // Wait for all queries to start be processed on the node. + tokio::time::sleep(Duration::from_secs(1)).await; + + // When + let result = tokio::time::timeout(Duration::from_secs(5), client.health()).await; + + // Then + let result = result.expect("Health check timed out"); + let health = result.expect("Health check failed"); + assert!(health); +}