From d80449bf0f51ee5e0959b7bb192123d1daecc46b Mon Sep 17 00:00:00 2001 From: Xun Li Date: Mon, 13 Jan 2025 08:41:56 -0800 Subject: [PATCH] [indexer-alt] Add tests for balance bucket pipeline and pruner (#20843) ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] gRPC: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- crates/sui-indexer-alt-schema/src/objects.rs | 2 +- .../src/handlers/coin_balance_buckets.rs | 218 ++++++++++++++++-- 2 files changed, 197 insertions(+), 23 deletions(-) diff --git a/crates/sui-indexer-alt-schema/src/objects.rs b/crates/sui-indexer-alt-schema/src/objects.rs index 8e050cda256c9..a2472ffe6522d 100644 --- a/crates/sui-indexer-alt-schema/src/objects.rs +++ b/crates/sui-indexer-alt-schema/src/objects.rs @@ -62,7 +62,7 @@ pub struct StoredObjInfo { pub instantiation: Option>, } -#[derive(Insertable, Debug, Clone, FieldCount)] +#[derive(Insertable, Queryable, Debug, Clone, FieldCount, Eq, PartialEq)] #[diesel(table_name = coin_balance_buckets, primary_key(object_id, cp_sequence_number))] #[diesel(treat_none_as_default_value = false)] pub struct StoredCoinBalanceBucket { diff --git a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs index 75a091955945a..f52f5a8bebc35 100644 --- a/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs +++ b/crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs @@ -285,6 +285,9 @@ mod tests { use std::str::FromStr; use super::*; + use diesel::QueryDsl; + use sui_indexer_alt_framework::Indexer; + use sui_indexer_alt_schema::MIGRATIONS; use sui_protocol_config::ProtocolConfig; use sui_types::base_types::{dbg_addr, MoveObjectType, ObjectID, SequenceNumber, SuiAddress}; use sui_types::digests::TransactionDigest; @@ -292,6 +295,20 @@ mod tests { use sui_types::object::{Authenticator, MoveObject, Object}; use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder; + // Get all balance buckets from the database, sorted by object_id and cp_sequence_number. + async fn get_all_balance_buckets( + conn: &mut db::Connection<'_>, + ) -> Vec { + coin_balance_buckets::table + .order_by(( + coin_balance_buckets::object_id, + coin_balance_buckets::cp_sequence_number, + )) + .load(conn) + .await + .unwrap() + } + #[test] fn test_get_coin_balance_bucket() { let id = ObjectID::random(); @@ -366,10 +383,12 @@ mod tests { ); } - #[test] - fn test_process_coin_balance_buckets_new_sui_coin() { + #[tokio::test] + async fn test_process_coin_balance_buckets_new_sui_coin() { + let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await; + let mut conn = indexer.db().connect().await.unwrap(); let handler = CoinBalanceBuckets::default(); - let mut builder = TestCheckpointDataBuilder::new(1); + let mut builder = TestCheckpointDataBuilder::new(0); builder = builder .start_transaction(0) .create_sui_object(0, 0) @@ -394,12 +413,22 @@ mod tests { .. } ))); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 2); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 2); + let rows_pruned = handler.prune(0, 1, &mut conn).await.unwrap(); + assert_eq!(rows_pruned, 0); } - #[test] - fn test_process_coin_balance_buckets_new_other_coin() { + #[tokio::test] + async fn test_process_coin_balance_buckets_new_other_coin() { + let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await; + let mut conn = indexer.db().connect().await.unwrap(); let handler = CoinBalanceBuckets::default(); - let mut builder = TestCheckpointDataBuilder::new(1); + let mut builder = TestCheckpointDataBuilder::new(0); let coin_type = TypeTag::from_str("0x0::a::b").unwrap(); builder = builder .start_transaction(0) @@ -417,12 +446,22 @@ mod tests { owner_id: TestCheckpointDataBuilder::derive_address(0), } ); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 1); + let rows_pruned = handler.prune(0, 1, &mut conn).await.unwrap(); + assert_eq!(rows_pruned, 0); } - #[test] - fn test_process_coin_balance_buckets_balance_change() { + #[tokio::test] + async fn test_process_coin_balance_buckets_balance_change() { + let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await; + let mut conn = indexer.db().connect().await.unwrap(); let handler = CoinBalanceBuckets::default(); - let mut builder = TestCheckpointDataBuilder::new(1); + let mut builder = TestCheckpointDataBuilder::new(0); builder = builder .start_transaction(0) .create_sui_object(0, 10010) @@ -430,6 +469,7 @@ mod tests { let checkpoint = builder.build_checkpoint(); let values = handler.process(&Arc::new(checkpoint)).unwrap(); assert_eq!(values.len(), 1); + // Checkpoint 0 creates coin object 0. assert_eq!( values[0].change, CoinBalanceBucketChangeKind::Insert { @@ -439,6 +479,13 @@ mod tests { owner_id: TestCheckpointDataBuilder::derive_address(0), } ); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 1); + // Transfer 10 MIST, balance goes from 10010 to 10000. // The balance bucket for the original coin does not change. // We should only see the creation of the new coin in the processed results. @@ -449,6 +496,7 @@ mod tests { let checkpoint = builder.build_checkpoint(); let values = handler.process(&Arc::new(checkpoint)).unwrap(); assert_eq!(values.len(), 1); + // Checkpoint 1 creates coin object 1. assert_eq!( values[0].change, CoinBalanceBucketChangeKind::Insert { @@ -458,6 +506,16 @@ mod tests { owner_id: TestCheckpointDataBuilder::derive_address(1), } ); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 2); + + // Nothing to prune because the two coins in the table have not been updated since creation. + let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap(); + assert_eq!(rows_pruned, 0); // Transfer 1 MIST, balance goes from 10000 to 9999. // The balance bucket changes, we should see a change, both for the old owner and the new owner. @@ -468,6 +526,7 @@ mod tests { let checkpoint = builder.build_checkpoint(); let values = handler.process(&Arc::new(checkpoint)).unwrap(); assert_eq!(values.len(), 2); + // Checkpoint 2 creates coin object 2, and mutates coin object 0. assert!(values.iter().any(|v| v.change == CoinBalanceBucketChangeKind::Insert { owner_kind: StoredCoinOwnerKind::Fastpath, @@ -482,17 +541,68 @@ mod tests { coin_type: GAS::type_tag(), owner_id: TestCheckpointDataBuilder::derive_address(1), })); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 2); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 4); + + let rows_pruned = handler.prune(2, 3, &mut conn).await.unwrap(); + assert_eq!(rows_pruned, 1); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 3); + assert_eq!( + all_balance_buckets[0], + StoredCoinBalanceBucket { + object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(), + cp_sequence_number: 2, + owner_kind: Some(StoredCoinOwnerKind::Fastpath), + owner_id: Some(TestCheckpointDataBuilder::derive_address(0).to_vec()), + coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()), + coin_balance_bucket: Some(3), + } + ); + assert_eq!( + all_balance_buckets[1], + StoredCoinBalanceBucket { + object_id: TestCheckpointDataBuilder::derive_object_id(1).to_vec(), + cp_sequence_number: 1, + owner_kind: Some(StoredCoinOwnerKind::Fastpath), + owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()), + coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()), + coin_balance_bucket: Some(1), + } + ); + assert_eq!( + all_balance_buckets[2], + StoredCoinBalanceBucket { + object_id: TestCheckpointDataBuilder::derive_object_id(2).to_vec(), + cp_sequence_number: 2, + owner_kind: Some(StoredCoinOwnerKind::Fastpath), + owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()), + coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()), + coin_balance_bucket: Some(0), + } + ); } - #[test] - fn test_process_coin_balance_buckets_coin_deleted() { + #[tokio::test] + async fn test_process_coin_balance_buckets_coin_deleted() { + let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await; + let mut conn = indexer.db().connect().await.unwrap(); let handler = CoinBalanceBuckets::default(); - let mut builder = TestCheckpointDataBuilder::new(1); + let mut builder = TestCheckpointDataBuilder::new(0); builder = builder .start_transaction(0) .create_owned_object(0) .finish_transaction(); - builder.build_checkpoint(); + let checkpoint = builder.build_checkpoint(); + let values = handler.process(&Arc::new(checkpoint)).unwrap(); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); builder = builder .start_transaction(0) @@ -502,17 +612,46 @@ mod tests { let values = handler.process(&Arc::new(checkpoint)).unwrap(); assert_eq!(values.len(), 1); assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 2); + assert_eq!( + all_balance_buckets[1], + StoredCoinBalanceBucket { + object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(), + cp_sequence_number: 1, + owner_kind: None, + owner_id: None, + coin_type: None, + coin_balance_bucket: None, + } + ); + + let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap(); + assert_eq!(rows_pruned, 2); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 0); } - #[test] - fn test_process_coin_balance_buckets_owner_change() { + #[tokio::test] + async fn test_process_coin_balance_buckets_owner_change() { + let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await; + let mut conn = indexer.db().connect().await.unwrap(); let handler = CoinBalanceBuckets::default(); - let mut builder = TestCheckpointDataBuilder::new(1); + let mut builder = TestCheckpointDataBuilder::new(0); builder = builder .start_transaction(0) .create_sui_object(0, 100) .finish_transaction(); - builder.build_checkpoint(); + let checkpoint = builder.build_checkpoint(); + let values = handler.process(&Arc::new(checkpoint)).unwrap(); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); builder = builder .start_transaction(0) @@ -530,19 +669,45 @@ mod tests { owner_id: TestCheckpointDataBuilder::derive_address(1), } ); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); + + let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap(); + assert_eq!(rows_pruned, 1); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 1); + assert_eq!( + all_balance_buckets[0], + StoredCoinBalanceBucket { + object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(), + cp_sequence_number: 1, + owner_kind: Some(StoredCoinOwnerKind::Fastpath), + owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()), + coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()), + coin_balance_bucket: Some(2), + } + ); } - #[test] - fn test_process_coin_balance_buckets_object_owned() { + #[tokio::test] + async fn test_process_coin_balance_buckets_object_owned() { + let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await; + let mut conn = indexer.db().connect().await.unwrap(); let handler = CoinBalanceBuckets::default(); - let mut builder = TestCheckpointDataBuilder::new(1); + let mut builder = TestCheckpointDataBuilder::new(0); builder = builder .start_transaction(0) .create_owned_object(0) .finish_transaction(); - builder.build_checkpoint(); + let checkpoint = builder.build_checkpoint(); + let values = handler.process(&Arc::new(checkpoint)).unwrap(); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); - // We do not track balance buckets for object owners. // So this is considered as a delete. builder = builder .start_transaction(0) @@ -552,5 +717,14 @@ mod tests { let values = handler.process(&Arc::new(checkpoint)).unwrap(); assert_eq!(values.len(), 1); assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete); + let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn) + .await + .unwrap(); + assert_eq!(rows_inserted, 1); + + let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap(); + assert_eq!(rows_pruned, 2); + let all_balance_buckets = get_all_balance_buckets(&mut conn).await; + assert_eq!(all_balance_buckets.len(), 0); } }