Skip to content

Commit

Permalink
Avoid long heavy tasks in the GraphQL service
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx committed Oct 13, 2024
1 parent feeb816 commit 4237feb
Show file tree
Hide file tree
Showing 21 changed files with 281 additions and 107 deletions.
1 change: 1 addition & 0 deletions benches/benches/transaction_throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ where
test_builder.trigger = Trigger::Never;
test_builder.utxo_validation = true;
test_builder.gas_limit = Some(10_000_000_000);
test_builder.block_size_limit = Some(1_000_000_000_000);

// spin up node
let transactions: Vec<Transaction> =
Expand Down
1 change: 1 addition & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ impl Command {
let config = Config {
graphql_config: GraphQLConfig {
addr,
database_butch_size: graphql.database_butch_size,
max_queries_depth: graphql.graphql_max_depth,
max_queries_complexity: graphql.graphql_max_complexity,
max_queries_recursive_depth: graphql.graphql_max_recursive_depth,
Expand Down
4 changes: 4 additions & 0 deletions bin/fuel-core/src/cli/run/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pub struct GraphQLArgs {
#[clap(long = "port", default_value = "4000", env)]
pub port: u16,

/// The size of the butch fetched from the database by GraphQL service.
#[clap(long = "graphql-database-butch-size", default_value = "100", env)]
pub database_butch_size: usize,

/// The max depth of GraphQL queries.
#[clap(long = "graphql-max-depth", default_value = "16", env)]
pub graphql_max_depth: usize,
Expand Down
13 changes: 4 additions & 9 deletions crates/fuel-core/src/coins_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,7 @@ mod tests {
fuel_asm::Word,
fuel_tx::*,
};
use futures::{
StreamExt,
TryStreamExt,
};
use futures::TryStreamExt;
use itertools::Itertools;
use rand::{
rngs::StdRng,
Expand Down Expand Up @@ -987,7 +984,7 @@ mod tests {
fn service_database(&self) -> ServiceDatabase {
let on_chain = self.database.on_chain().clone();
let off_chain = self.database.off_chain().clone();
ServiceDatabase::new(0u32.into(), on_chain, off_chain)
ServiceDatabase::new(100, 0u32.into(), on_chain, off_chain)
}
}

Expand Down Expand Up @@ -1044,8 +1041,7 @@ mod tests {
let query = self.service_database();
let query = query.test_view();
query
.owned_coins_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.coin(id).unwrap()))
.owned_coins(owner, None, IterDirection::Forward)
.try_collect()
.await
.unwrap()
Expand All @@ -1055,8 +1051,7 @@ mod tests {
let query = self.service_database();
let query = query.test_view();
query
.owned_message_ids(owner, None, IterDirection::Forward)
.map(|res| res.map(|id| query.message(&id).unwrap()))
.owned_messages(owner, None, IterDirection::Forward)
.try_collect()
.await
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/database/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl OnChainIterableKeyValueView {
let db_block = self.storage::<FuelBlocks>().get(height)?;
if let Some(block) = db_block {
// fetch all the transactions
// TODO: optimize with multi-key get
// TODO: Use multiget when it's implemented.
let txs = block
.transactions()
.iter()
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod worker_service;
#[derive(Clone, Debug)]
pub struct ServiceConfig {
pub addr: SocketAddr,
pub database_butch_size: usize,
pub max_queries_depth: usize,
pub max_queries_complexity: usize,
pub max_queries_recursive_depth: usize,
Expand Down
8 changes: 6 additions & 2 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,12 @@ where
OffChain::LatestView: OffChainDatabase,
{
let network_addr = config.config.addr;
let combined_read_database =
ReadDatabase::new(genesis_block_height, on_database, off_database);
let combined_read_database = ReadDatabase::new(
config.config.database_butch_size,
genesis_block_height,
on_database,
off_database,
);
let request_timeout = config.config.api_request_timeout;
let concurrency_limit = config.config.max_concurrent_queries;
let body_limit = config.config.request_body_bytes_limit;
Expand Down
60 changes: 36 additions & 24 deletions crates/fuel-core/src/graphql_api/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::fuel_core_graphql_api::{
OnChainDatabase,
},
};
use async_graphql::futures_util::StreamExt;
use fuel_core_storage::{
iter::{
BoxedIter,
Expand Down Expand Up @@ -77,6 +78,8 @@ pub type OffChainView = Arc<dyn OffChainDatabase>;
/// The container of the on-chain and off-chain database view provides.
/// It is used only by `ViewExtension` to create a [`ReadView`].
pub struct ReadDatabase {
/// The size of the butch during fetching from the database.
butch_size: usize,
/// The height of the genesis block.
genesis_height: BlockHeight,
/// The on-chain database view provider.
Expand All @@ -88,6 +91,7 @@ pub struct ReadDatabase {
impl ReadDatabase {
/// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers.
pub fn new<OnChain, OffChain>(
butch_size: usize,
genesis_height: BlockHeight,
on_chain: OnChain,
off_chain: OffChain,
Expand All @@ -99,6 +103,7 @@ impl ReadDatabase {
OffChain::LatestView: OffChainDatabase,
{
Self {
butch_size,
genesis_height,
on_chain: Box::new(ArcWrapper::new(on_chain)),
off_chain: Box::new(ArcWrapper::new(off_chain)),
Expand All @@ -111,6 +116,7 @@ impl ReadDatabase {
// It is not possible to implement until `view_at` is implemented for the `AtomicView`.
// https://github.com/FuelLabs/fuel-core/issues/1582
Ok(ReadView {
butch_size: self.butch_size,
genesis_height: self.genesis_height,
on_chain: self.on_chain.latest_view()?,
off_chain: self.off_chain.latest_view()?,
Expand All @@ -125,6 +131,7 @@ impl ReadDatabase {

#[derive(Clone)]
pub struct ReadView {
pub(crate) butch_size: usize,
pub(crate) genesis_height: BlockHeight,
pub(crate) on_chain: OnChainView,
pub(crate) off_chain: OffChainView,
Expand All @@ -134,7 +141,7 @@ impl ReadView {
pub fn transaction(&self, tx_id: &TxId) -> StorageResult<Transaction> {
let result = self.on_chain.transaction(tx_id);
if result.is_not_found() {
if let Some(tx) = self.old_transaction(tx_id)? {
if let Some(tx) = self.off_chain.old_transaction(tx_id)? {
Ok(tx)
} else {
Err(not_found!(Transactions))
Expand All @@ -144,6 +151,20 @@ impl ReadView {
}
}

pub async fn transactions(
&self,
tx_ids: Vec<TxId>,
) -> Vec<StorageResult<Transaction>> {
// TODO: Use multiget when it's implemented.
let result = tx_ids
.iter()
.map(|tx_id| self.transaction(tx_id))
.collect::<Vec<_>>();
// Give a chance to other tasks to run.
tokio::task::yield_now().await;
result
}

pub fn block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> {
if *height >= self.genesis_height {
self.on_chain.block(height)
Expand Down Expand Up @@ -252,6 +273,13 @@ impl ReadView {
direction: IterDirection,
) -> impl Stream<Item = StorageResult<Message>> + '_ {
futures::stream::iter(self.on_chain.all_messages(start_message_id, direction))
.chunks(self.butch_size)
.filter_map(|chunk| async move {
// Give a chance to other tasks to run.
tokio::task::yield_now().await;
Some(futures::stream::iter(chunk))
})
.flatten()
}

pub fn message_exists(&self, nonce: &Nonce) -> StorageResult<bool> {
Expand All @@ -276,6 +304,13 @@ impl ReadView {
start_asset,
direction,
))
.chunks(self.butch_size)
.filter_map(|chunk| async move {
// Give a chance to other tasks to run.
tokio::task::yield_now().await;
Some(futures::stream::iter(chunk))
})
.flatten()
}

pub fn da_height(&self) -> StorageResult<DaBlockHeight> {
Expand Down Expand Up @@ -345,29 +380,6 @@ impl ReadView {
self.off_chain.contract_salt(contract_id)
}

pub fn old_block(&self, height: &BlockHeight) -> StorageResult<CompressedBlock> {
self.off_chain.old_block(height)
}

pub fn old_blocks(
&self,
height: Option<BlockHeight>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<CompressedBlock>> {
self.off_chain.old_blocks(height, direction)
}

pub fn old_block_consensus(&self, height: &BlockHeight) -> StorageResult<Consensus> {
self.off_chain.old_block_consensus(height)
}

pub fn old_transaction(
&self,
id: &TxId,
) -> StorageResult<Option<fuel_core_types::fuel_tx::Transaction>> {
self.off_chain.old_transaction(id)
}

pub fn relayed_tx_status(
&self,
id: Bytes32,
Expand Down
7 changes: 7 additions & 0 deletions crates/fuel-core/src/query/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,12 @@ impl ReadView {
})
.map_ok(|stream| stream.map(Ok))
.try_flatten()
.chunks(self.butch_size)
.filter_map(|chunk| async move {
// Give a chance to other tasks to run.
tokio::task::yield_now().await;
Some(futures::stream::iter(chunk))
})
.flatten()
}
}
46 changes: 37 additions & 9 deletions crates/fuel-core/src/query/balance/asset_query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::graphql_api::database::ReadView;
use async_graphql::futures_util::TryStreamExt;
use fuel_core_services::stream::IntoBoxStream;
use fuel_core_storage::{
iter::IterDirection,
Expand Down Expand Up @@ -78,7 +79,9 @@ impl<'a> AssetsQuery<'a> {

fn coins_iter(mut self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
let assets = self.assets.take();
self.database
let database = self.database;
let stream = self
.database
.owned_coins_ids(self.owner, None, IterDirection::Forward)
.map(|id| id.map(CoinId::from))
.filter(move |result| {
Expand All @@ -99,12 +102,25 @@ impl<'a> AssetsQuery<'a> {
} else {
return Err(anyhow::anyhow!("The coin is not UTXO").into());
};
// TODO: Fetch coin in a separate thread
let coin = self.database.coin(id)?;

Ok(CoinType::Coin(coin))
Ok(id)
})
});

futures::stream::StreamExt::chunks(stream, database.butch_size)
.map(|chunk| {
use itertools::Itertools;

let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
Ok::<_, StorageError>(chunk)
})
.try_filter_map(move |chunk| async move {
let chunk = database
.coins(chunk)
.await
.map(|result| result.map(CoinType::Coin));
Ok(Some(futures::stream::iter(chunk)))
})
.try_flatten()
.filter(move |result| {
if let Ok(CoinType::Coin(coin)) = result {
has_asset(&assets, &coin.asset_id)
Expand All @@ -117,7 +133,8 @@ impl<'a> AssetsQuery<'a> {
fn messages_iter(&self) -> impl Stream<Item = StorageResult<CoinType>> + 'a {
let exclude = self.exclude;
let database = self.database;
self.database
let stream = self
.database
.owned_message_ids(self.owner, None, IterDirection::Forward)
.map(|id| id.map(CoinId::from))
.filter(move |result| {
Expand All @@ -138,11 +155,22 @@ impl<'a> AssetsQuery<'a> {
} else {
return Err(anyhow::anyhow!("The coin is not a message").into());
};
// TODO: Fetch message in a separate thread
let message = database.message(&id)?;
Ok(message)
Ok(id)
})
});

futures::stream::StreamExt::chunks(stream, database.butch_size)
.map(|chunk| {
use itertools::Itertools;

let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
Ok(chunk)
})
.try_filter_map(move |chunk| async move {
let chunk = database.messages(chunk).await;
Ok::<_, StorageError>(Some(futures::stream::iter(chunk)))
})
.try_flatten()
.filter(|result| {
if let Ok(message) = result {
message.data().is_empty()
Expand Down
12 changes: 11 additions & 1 deletion crates/fuel-core/src/query/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use fuel_core_types::{
blockchain::block::CompressedBlock,
fuel_types::BlockHeight,
};
use futures::Stream;
use futures::{
Stream,
StreamExt,
};

impl ReadView {
pub fn latest_block_height(&self) -> StorageResult<BlockHeight> {
Expand All @@ -24,5 +27,12 @@ impl ReadView {
direction: IterDirection,
) -> impl Stream<Item = StorageResult<CompressedBlock>> + '_ {
futures::stream::iter(self.blocks(height, direction))
.chunks(self.butch_size)
.filter_map(|chunk| async move {
// Give a chance to other tasks to run.
tokio::task::yield_now().await;
Some(futures::stream::iter(chunk))
})
.flatten()
}
}
Loading

0 comments on commit 4237feb

Please sign in to comment.