Skip to content

Commit

Permalink
[indexer-alt] Add tests for balance bucket pipeline and pruner (#20843)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
lxfind authored Jan 13, 2025
1 parent 4e98549 commit d80449b
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct StoredObjInfo {
pub instantiation: Option<Vec<u8>>,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, Queryable, Debug, Clone, FieldCount, Eq, PartialEq)]
#[diesel(table_name = coin_balance_buckets, primary_key(object_id, cp_sequence_number))]
#[diesel(treat_none_as_default_value = false)]
pub struct StoredCoinBalanceBucket {
Expand Down
218 changes: 196 additions & 22 deletions crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,30 @@ mod tests {
use std::str::FromStr;

use super::*;
use diesel::QueryDsl;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_schema::MIGRATIONS;
use sui_protocol_config::ProtocolConfig;
use sui_types::base_types::{dbg_addr, MoveObjectType, ObjectID, SequenceNumber, SuiAddress};
use sui_types::digests::TransactionDigest;
use sui_types::gas_coin::GAS;
use sui_types::object::{Authenticator, MoveObject, Object};
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

// Get all balance buckets from the database, sorted by object_id and cp_sequence_number.
async fn get_all_balance_buckets(
conn: &mut db::Connection<'_>,
) -> Vec<StoredCoinBalanceBucket> {
coin_balance_buckets::table
.order_by((
coin_balance_buckets::object_id,
coin_balance_buckets::cp_sequence_number,
))
.load(conn)
.await
.unwrap()
}

#[test]
fn test_get_coin_balance_bucket() {
let id = ObjectID::random();
Expand Down Expand Up @@ -366,10 +383,12 @@ mod tests {
);
}

#[test]
fn test_process_coin_balance_buckets_new_sui_coin() {
#[tokio::test]
async fn test_process_coin_balance_buckets_new_sui_coin() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_sui_object(0, 0)
Expand All @@ -394,12 +413,22 @@ mod tests {
..
}
)));
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 2);
let rows_pruned = handler.prune(0, 1, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 0);
}

#[test]
fn test_process_coin_balance_buckets_new_other_coin() {
#[tokio::test]
async fn test_process_coin_balance_buckets_new_other_coin() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
let coin_type = TypeTag::from_str("0x0::a::b").unwrap();
builder = builder
.start_transaction(0)
Expand All @@ -417,19 +446,30 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(0),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 1);
let rows_pruned = handler.prune(0, 1, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 0);
}

#[test]
fn test_process_coin_balance_buckets_balance_change() {
#[tokio::test]
async fn test_process_coin_balance_buckets_balance_change() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_sui_object(0, 10010)
.finish_transaction();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
// Checkpoint 0 creates coin object 0.
assert_eq!(
values[0].change,
CoinBalanceBucketChangeKind::Insert {
Expand All @@ -439,6 +479,13 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(0),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 1);

// Transfer 10 MIST, balance goes from 10010 to 10000.
// The balance bucket for the original coin does not change.
// We should only see the creation of the new coin in the processed results.
Expand All @@ -449,6 +496,7 @@ mod tests {
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
// Checkpoint 1 creates coin object 1.
assert_eq!(
values[0].change,
CoinBalanceBucketChangeKind::Insert {
Expand All @@ -458,6 +506,16 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(1),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 2);

// Nothing to prune because the two coins in the table have not been updated since creation.
let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 0);

// Transfer 1 MIST, balance goes from 10000 to 9999.
// The balance bucket changes, we should see a change, both for the old owner and the new owner.
Expand All @@ -468,6 +526,7 @@ mod tests {
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 2);
// Checkpoint 2 creates coin object 2, and mutates coin object 0.
assert!(values.iter().any(|v| v.change
== CoinBalanceBucketChangeKind::Insert {
owner_kind: StoredCoinOwnerKind::Fastpath,
Expand All @@ -482,17 +541,68 @@ mod tests {
coin_type: GAS::type_tag(),
owner_id: TestCheckpointDataBuilder::derive_address(1),
}));
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 4);

let rows_pruned = handler.prune(2, 3, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 3);
assert_eq!(
all_balance_buckets[0],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(),
cp_sequence_number: 2,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(0).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(3),
}
);
assert_eq!(
all_balance_buckets[1],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(1).to_vec(),
cp_sequence_number: 1,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(1),
}
);
assert_eq!(
all_balance_buckets[2],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(2).to_vec(),
cp_sequence_number: 2,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(0),
}
);
}

#[test]
fn test_process_coin_balance_buckets_coin_deleted() {
#[tokio::test]
async fn test_process_coin_balance_buckets_coin_deleted() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_owned_object(0)
.finish_transaction();
builder.build_checkpoint();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

builder = builder
.start_transaction(0)
Expand All @@ -502,17 +612,46 @@ mod tests {
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 2);
assert_eq!(
all_balance_buckets[1],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(),
cp_sequence_number: 1,
owner_kind: None,
owner_id: None,
coin_type: None,
coin_balance_bucket: None,
}
);

let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 0);
}

#[test]
fn test_process_coin_balance_buckets_owner_change() {
#[tokio::test]
async fn test_process_coin_balance_buckets_owner_change() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_sui_object(0, 100)
.finish_transaction();
builder.build_checkpoint();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

builder = builder
.start_transaction(0)
Expand All @@ -530,19 +669,45 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(1),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 1);
assert_eq!(
all_balance_buckets[0],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(),
cp_sequence_number: 1,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(2),
}
);
}

#[test]
fn test_process_coin_balance_buckets_object_owned() {
#[tokio::test]
async fn test_process_coin_balance_buckets_object_owned() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_owned_object(0)
.finish_transaction();
builder.build_checkpoint();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

// We do not track balance buckets for object owners.
// So this is considered as a delete.
builder = builder
.start_transaction(0)
Expand All @@ -552,5 +717,14 @@ mod tests {
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 0);
}
}

0 comments on commit d80449b

Please sign in to comment.