Skip to content

Commit

Permalink
Updated all pagination queries to work with the Stream instead of `…
Browse files Browse the repository at this point in the history
…Iterator`
  • Loading branch information
xgreenx committed Oct 12, 2024
1 parent ab5e940 commit 1782684
Show file tree
Hide file tree
Showing 19 changed files with 464 additions and 346 deletions.
323 changes: 182 additions & 141 deletions crates/fuel-core/src/coins_query.rs

Large diffs are not rendered by default.

43 changes: 27 additions & 16 deletions crates/fuel-core/src/graphql_api/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use fuel_core_types::{
txpool::TransactionStatus,
},
};
use futures::Stream;
use std::{
borrow::Cow,
sync::Arc,
Expand Down Expand Up @@ -249,8 +250,8 @@ impl ReadView {
&self,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<Message>> {
self.on_chain.all_messages(start_message_id, direction)
) -> impl Stream<Item = StorageResult<Message>> + '_ {
futures::stream::iter(self.on_chain.all_messages(start_message_id, direction))
}

pub fn message_exists(&self, nonce: &Nonce) -> StorageResult<bool> {
Expand All @@ -269,9 +270,12 @@ impl ReadView {
contract: ContractId,
start_asset: Option<AssetId>,
direction: IterDirection,
) -> BoxedIter<StorageResult<ContractBalance>> {
self.on_chain
.contract_balances(contract, start_asset, direction)
) -> impl Stream<Item = StorageResult<ContractBalance>> + '_ {
futures::stream::iter(self.on_chain.contract_balances(
contract,
start_asset,
direction,
))
}

pub fn da_height(&self) -> StorageResult<DaBlockHeight> {
Expand Down Expand Up @@ -306,28 +310,35 @@ impl ReadView {
owner: &Address,
start_coin: Option<UtxoId>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<UtxoId>> {
self.off_chain.owned_coins_ids(owner, start_coin, direction)
) -> impl Stream<Item = StorageResult<UtxoId>> + '_ {
let iter = self.off_chain.owned_coins_ids(owner, start_coin, direction);

futures::stream::iter(iter)
}

pub fn owned_message_ids(
&self,
owner: &Address,
pub fn owned_message_ids<'a>(
&'a self,
owner: &'a Address,
start_message_id: Option<Nonce>,
direction: IterDirection,
) -> BoxedIter<'_, StorageResult<Nonce>> {
self.off_chain
.owned_message_ids(owner, start_message_id, direction)
) -> impl Stream<Item = StorageResult<Nonce>> + 'a {
futures::stream::iter(self.off_chain.owned_message_ids(
owner,
start_message_id,
direction,
))
}

pub fn owned_transactions_ids(
&self,
owner: Address,
start: Option<TxPointer>,
direction: IterDirection,
) -> BoxedIter<StorageResult<(TxPointer, TxId)>> {
self.off_chain
.owned_transactions_ids(owner, start, direction)
) -> impl Stream<Item = StorageResult<(TxPointer, TxId)>> + '_ {
futures::stream::iter(
self.off_chain
.owned_transactions_ids(owner, start, direction),
)
}

pub fn contract_salt(&self, contract_id: &ContractId) -> StorageResult<Salt> {
Expand Down
112 changes: 55 additions & 57 deletions crates/fuel-core/src/query/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ use asset_query::{
AssetsQuery,
};
use fuel_core_storage::{
iter::{
BoxedIter,
IntoBoxedIter,
IterDirection,
},
iter::IterDirection,
Result as StorageResult,
};
use fuel_core_types::{
Expand All @@ -19,7 +15,12 @@ use fuel_core_types::{
},
services::graphql_api::AddressBalance,
};
use itertools::Itertools;
use futures::{
FutureExt,
Stream,
StreamExt,
TryStreamExt,
};
use std::{
cmp::Ordering,
collections::HashMap,
Expand All @@ -28,29 +29,28 @@ use std::{
pub mod asset_query;

impl ReadView {
pub fn balance(
pub async fn balance(
&self,
owner: Address,
asset_id: AssetId,
base_asset_id: AssetId,
) -> StorageResult<AddressBalance> {
let amount = AssetQuery::new(
&owner,
&AssetSpendTarget::new(asset_id, u64::MAX, usize::MAX),
&AssetSpendTarget::new(asset_id, u64::MAX, u16::MAX),
&base_asset_id,
None,
self,
)
.coins()
.map(|res| res.map(|coins| coins.amount()))
.try_fold(0u64, |mut balance, res| -> StorageResult<_> {
let amount = res?;

// Increase the balance
balance = balance.saturating_add(amount);

Ok(balance)
})?;
.try_fold(0u64, |balance, amount| {
async move {
// Increase the balance
Ok(balance.saturating_add(amount))
}
})
.await?;

Ok(AddressBalance {
owner,
Expand All @@ -59,54 +59,52 @@ impl ReadView {
})
}

pub fn balances(
&self,
owner: Address,
pub fn balances<'a>(
&'a self,
owner: &'a Address,
direction: IterDirection,
base_asset_id: AssetId,
) -> BoxedIter<StorageResult<AddressBalance>> {
let mut amounts_per_asset = HashMap::new();
let mut errors = vec![];
base_asset_id: &'a AssetId,
) -> impl Stream<Item = StorageResult<AddressBalance>> + 'a {
let query = AssetsQuery::new(owner, None, None, self, base_asset_id);
let stream = query.coins();

for coin in AssetsQuery::new(&owner, None, None, self, &base_asset_id).coins() {
match coin {
Ok(coin) => {
stream
.try_fold(
HashMap::new(),
move |mut amounts_per_asset, coin| async move {
let amount: &mut u64 = amounts_per_asset
.entry(*coin.asset_id(&base_asset_id))
.entry(*coin.asset_id(base_asset_id))
.or_default();
*amount = amount.saturating_add(coin.amount());
}
Err(err) => {
errors.push(err);
}
}
}

let mut balances = amounts_per_asset
.into_iter()
.map(|(asset_id, amount)| AddressBalance {
owner,
amount,
asset_id,
})
.collect_vec();
Ok(amounts_per_asset)
},
)
.into_stream()
.try_filter_map(move |amounts_per_asset| async move {
let mut balances = amounts_per_asset
.into_iter()
.map(|(asset_id, amount)| AddressBalance {
owner: *owner,
amount,
asset_id,
})
.collect::<Vec<_>>();

balances.sort_by(|l, r| {
if l.asset_id < r.asset_id {
Ordering::Less
} else {
Ordering::Greater
}
});
balances.sort_by(|l, r| {
if l.asset_id < r.asset_id {
Ordering::Less
} else {
Ordering::Greater
}
});

if direction == IterDirection::Reverse {
balances.reverse();
}
if direction == IterDirection::Reverse {
balances.reverse();
}

balances
.into_iter()
.map(Ok)
.chain(errors.into_iter().map(Err))
.into_boxed()
Ok(Some(futures::stream::iter(balances)))
})
.map_ok(|stream| stream.map(Ok))
.try_flatten()
}
}
Loading

0 comments on commit 1782684

Please sign in to comment.