Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support backward iteration in the RocksDB #1492

Merged
merged 11 commits into from
Nov 20, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Added

- [#1492](https://github.com/FuelLabs/fuel-core/pull/1492): Support backward iteration in the RocksDB. It allows backward queries that were not allowed before.
- [#1490](https://github.com/FuelLabs/fuel-core/pull/1490): Add push and pop benchmarks.
- [#1485](https://github.com/FuelLabs/fuel-core/pull/1485): Prepare rc release of fuel core v0.21
- [#1476](https://github.com/FuelLabs/fuel-core/pull/1453): Add the majority of the "other" benchmarks for contract opcodes.
Expand Down
6 changes: 0 additions & 6 deletions crates/fuel-core/src/schema/coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{
U64,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -167,11 +166,6 @@ impl CoinQuery {
last: Option<i32>,
before: Option<String>,
) -> async_graphql::Result<Connection<UtxoId, Coin, EmptyFields, EmptyFields>> {
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(anyhow!("reverse pagination isn't supported for this coins").into())
}

let query: &Database = ctx.data_unchecked();
crate::schema::query_pagination(after, before, first, last, |start, direction| {
let owner: fuel_tx::Address = filter.owner.into();
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
U64,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -138,13 +137,6 @@ impl ContractBalanceQuery {
> {
let query: &Database = ctx.data_unchecked();

// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(
anyhow!("reverse pagination isn't supported for this resource").into(),
)
}

crate::schema::query_pagination(after, before, first, last, |start, direction| {
let balances = query
.contract_balances(
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ impl MessageQuery {
};

let messages = if let Some(owner) = owner {
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(anyhow!(
"reverse pagination isn't supported for this resource"
)
.into())
}

query.owned_messages(&owner.0, start, direction)
} else {
query.all_messages(start, direction)
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
TxPointer,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -168,13 +167,6 @@ impl TxQuery {
before: Option<String>,
) -> async_graphql::Result<Connection<TxPointer, Transaction, EmptyFields, EmptyFields>>
{
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(
anyhow!("reverse pagination isn't supported for this resource").into(),
)
}

let query: &Database = ctx.data_unchecked();
let config = ctx.data_unchecked::<Config>();
let owner = fuel_types::Address::from(owner);
Expand Down
78 changes: 72 additions & 6 deletions crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,68 @@ impl KeyValueStore for RocksDb {
.into_boxed()
}
(Some(prefix), None) => {
// start iterating in a certain direction within the keyspace
let iter_mode =
IteratorMode::From(prefix, convert_to_rocksdb_direction(direction));
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
// RocksDB prefix iteration doesn't support reverse order,
// but seeking the start key and iterating in reverse order works.
// So we can create a workaround. We need to find the next available
// element and use it as an anchor for reverse iteration,
// but skip the first element to jump on the previous prefix.
// If we can't find the next element, we are at the end of the list,
// so we can use `IteratorMode::End` to start reverse iteration.
if direction == IterDirection::Reverse {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this section could be a little easier to read.

Something like:

match direction {
    IterDirection::Reverse => self.reverse_prefix_iter(prefix, ...),
    _ => self.prefix_iter(prefix, ...),
}

Then inside the new reverse_prefix_iter, instead of returning early, just do something like:

let maybe_next_item = next_prefix(prefix.to_vec())
    .and_then(|next_prefix| self
                            .iter_all(
                                column,
                                Some(next_prefix.as_slice()),
                                None,
                                IterDirection::Forward,
                            )
                            .next())
    .and_then(|res| res.ok());
if let Some(next_item) = maybe_next_item {
    ...
} else {
    ...
}

Of course, breaking those down into into smaller functions wouldn't hurt either, but those are the changes that would improve readability the most.

if let Some(next_prefix) = next_prefix(prefix.to_vec()) {
let next_item = self
.iter_all(
column,
Some(next_prefix.as_slice()),
None,
IterDirection::Forward,
)
.next();

if let Some(Ok(next_item)) = next_item {
let (next_start_key, _) = next_item;
let iter_mode = IteratorMode::From(
next_start_key.as_slice(),
rocksdb::Direction::Reverse,
);
let prefix = prefix.to_vec();
return self
._iter_all(column, ReadOptions::default(), iter_mode)
// Skip the element under the `next_start_key` key.
.skip(1)
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
}
}

// No next item, so we can start backward iteration from the end.
let prefix = prefix.to_vec();
self._iter_all(column, ReadOptions::default(), IteratorMode::End)
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
} else {
// start iterating in a certain direction within the keyspace
let iter_mode = IteratorMode::From(
prefix,
convert_to_rocksdb_direction(direction),
);
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);

self._iter_all(column, opts, iter_mode).into_boxed()
self._iter_all(column, opts, iter_mode).into_boxed()
}
}
(None, Some(start)) => {
// start iterating in a certain direction from the start key
Expand Down Expand Up @@ -503,6 +558,17 @@ impl TransactableStorage for RocksDb {
}
}

/// The `None` means overflow, so there is not following prefix.
fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
for byte in prefix.iter_mut().rev() {
if byte != &u8::MAX {
*byte = byte.checked_add(1).expect("We've just checked it above");
return Some(prefix)
}
xgreenx marked this conversation as resolved.
Show resolved Hide resolved
}
None
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
7 changes: 1 addition & 6 deletions tests/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ async fn test_contract_balance(
#[rstest]
#[tokio::test]
async fn test_5_contract_balances(
#[values(PageDirection::Forward)] direction: PageDirection,
// #[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
// Rocksdb doesn't support reverse seeks using a prefix, we'd need to implement a custom
// comparator to support this usecase.
// > One common bug of using prefix iterating is to use prefix mode to iterate in reverse order. But it is not yet supported.
// https://github.com/facebook/rocksdb/wiki/Prefix-Seek#limitation
#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
) {
let mut test_builder = TestSetupBuilder::new(SEED);
let (_, contract_id) = test_builder.setup_contract(
Expand Down
4 changes: 1 addition & 3 deletions tests/tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,7 @@ async fn messages_by_owner_returns_messages_for_the_given_owner() {
#[rstest]
#[tokio::test]
async fn messages_empty_results_for_owner_with_no_messages(
#[values(PageDirection::Forward)] direction: PageDirection,
//#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
// reverse iteration with prefix not supported by rocksdb
#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
#[values(Address::new([16; 32]), Address::new([0; 32]))] owner: Address,
) {
let srv = FuelService::new_node(Config::local_node()).await.unwrap();
Expand Down
106 changes: 94 additions & 12 deletions tests/tests/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ async fn get_transactions() {
}

#[tokio::test]
async fn get_transactions_by_owner_forward_and_backward_iterations() {
async fn get_transactions_by_owner_forward() {
let alice = Address::from([1; 32]);
let bob = Address::from([2; 32]);

Expand Down Expand Up @@ -413,17 +413,6 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
.collect_vec();
assert_eq!(transactions_forward.len(), 5);

let all_transactions_backward = PaginationRequest {
cursor: None,
results: 10,
direction: PageDirection::Backward,
};
let response = client
.transactions_by_owner(&bob, all_transactions_backward)
.await;
// Backward request is not supported right now.
assert!(response.is_err());

///////////////// Iteration

let forward_iter_three = PaginationRequest {
Expand Down Expand Up @@ -475,6 +464,99 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
);
}

#[tokio::test]
async fn get_transactions_by_owner_backward_iterations() {
let alice = Address::from([1; 32]);
let bob = Address::from([2; 32]);

let mut context = TestContext::new(100).await;
let _ = context.transfer(alice, bob, 1).await.unwrap();
let _ = context.transfer(alice, bob, 2).await.unwrap();
let _ = context.transfer(alice, bob, 3).await.unwrap();
let _ = context.transfer(alice, bob, 4).await.unwrap();
let _ = context.transfer(alice, bob, 5).await.unwrap();

let client = context.client;

let all_transactions_backward = PaginationRequest {
cursor: None,
results: 10,
direction: PageDirection::Backward,
};
let response = client
.transactions_by_owner(&bob, all_transactions_backward)
.await
.unwrap();
let transactions_backward = response
.results
.into_iter()
.map(|tx| {
assert!(matches!(tx.status, TransactionStatus::Success { .. }));
tx.transaction
})
.collect_vec();
assert_eq!(transactions_backward.len(), 5);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like you're testing that the request returns paginated data with the correct length. After this, you test that the request returns paginated data with the expected results. Lastly, you test that the request returns paginated data with the expected results when using a cursor. Therefore, I think this test could be broken up into two or three separate tests, since these are three separate concerns.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting this text into three doesn't make much sense because the third test will be almost the same except for two asserts. It increases the test file size and makes it harder to navigate by adding a very slight improvement to the one test readability.

I agree that we need to try to test different things with different tests, but when one test is the copy of another + one new line, it is not very nice.

Copy link
Contributor

@bvrooman bvrooman Nov 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my opinion on testing, with a bias from working in TDD:

Each unit test should illustrate the conformance of a single, functional behaviour (a unit). The behaviour being tested should be described succinctly by the test name. The test suite creates a specification that outlines the complete behaviour of the subject under test, and this specification acts as a promise to the developers using the code: The test names tell us what it will do and the test body proves that it does it. A well-written test suite can serve as living documentation for the code. It clearly demonstrates the intended use cases and the expected behaviour of the software. E.g., "backward_pagination_returns_correct_number_of_results", "backward_pagination_returns_results_in_reverse_order", "backward_pagination_supports_cursor", etc.

Therefore, the goal isn't simply readability, and the fact that test code is duplicated is not really that important. What's more important is that the test suite communicates the intended behaviour succinctly, and test names are the easiest way to achieve that. When I read the test mod, or run all the tests, I can simply look at the list of test names to have an overall sense of what the behaviours are.

If that resonates with you, feel free to update the test suite. If not, no worries - again, this is just my opinion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


///////////////// Iteration

let backward_iter_three = PaginationRequest {
cursor: None,
results: 3,
direction: PageDirection::Backward,
};
let response_after_iter_three = client
.transactions_by_owner(&bob, backward_iter_three)
.await
.unwrap();
let transactions_backward_iter_three = response_after_iter_three
.results
.into_iter()
.map(|tx| {
assert!(matches!(tx.status, TransactionStatus::Success { .. }));
tx.transaction
})
.collect_vec();
assert_eq!(transactions_backward_iter_three.len(), 3);
assert_eq!(
transactions_backward_iter_three[0],
transactions_backward[0]
);
assert_eq!(
transactions_backward_iter_three[1],
transactions_backward[1]
);
assert_eq!(
transactions_backward_iter_three[2],
transactions_backward[2]
);

let backward_iter_next_two = PaginationRequest {
cursor: response_after_iter_three.cursor.clone(),
results: 2,
direction: PageDirection::Backward,
};
let response = client
.transactions_by_owner(&bob, backward_iter_next_two)
.await
.unwrap();
let transactions_backward_iter_next_two = response
.results
.into_iter()
.map(|tx| {
assert!(matches!(tx.status, TransactionStatus::Success { .. }));
tx.transaction
})
.collect_vec();
assert_eq!(
transactions_backward_iter_next_two[0],
transactions_backward[3]
);
assert_eq!(
transactions_backward_iter_next_two[1],
transactions_backward[4]
);
}

#[tokio::test]
async fn get_transactions_from_manual_blocks() {
let (executor, db) = get_executor_and_db();
Expand Down
Loading