Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer-alt] Add tests for balance bucket pipeline and pruner #20843

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub struct StoredObjInfo {
pub instantiation: Option<Vec<u8>>,
}

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, Queryable, Debug, Clone, FieldCount, Eq, PartialEq)]
#[diesel(table_name = coin_balance_buckets, primary_key(object_id, cp_sequence_number))]
#[diesel(treat_none_as_default_value = false)]
pub struct StoredCoinBalanceBucket {
Expand Down
218 changes: 196 additions & 22 deletions crates/sui-indexer-alt/src/handlers/coin_balance_buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,30 @@ mod tests {
use std::str::FromStr;

use super::*;
use diesel::QueryDsl;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_schema::MIGRATIONS;
use sui_protocol_config::ProtocolConfig;
use sui_types::base_types::{dbg_addr, MoveObjectType, ObjectID, SequenceNumber, SuiAddress};
use sui_types::digests::TransactionDigest;
use sui_types::gas_coin::GAS;
use sui_types::object::{Authenticator, MoveObject, Object};
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

// Get all balance buckets from the database, sorted by object_id and cp_sequence_number.
async fn get_all_balance_buckets(
conn: &mut db::Connection<'_>,
) -> Vec<StoredCoinBalanceBucket> {
coin_balance_buckets::table
.order_by((
coin_balance_buckets::object_id,
coin_balance_buckets::cp_sequence_number,
))
.load(conn)
.await
.unwrap()
}

#[test]
fn test_get_coin_balance_bucket() {
let id = ObjectID::random();
Expand Down Expand Up @@ -366,10 +383,12 @@ mod tests {
);
}

#[test]
fn test_process_coin_balance_buckets_new_sui_coin() {
#[tokio::test]
async fn test_process_coin_balance_buckets_new_sui_coin() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_sui_object(0, 0)
Expand All @@ -394,12 +413,22 @@ mod tests {
..
}
)));
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 2);
let rows_pruned = handler.prune(0, 1, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 0);
}

#[test]
fn test_process_coin_balance_buckets_new_other_coin() {
#[tokio::test]
async fn test_process_coin_balance_buckets_new_other_coin() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
let coin_type = TypeTag::from_str("0x0::a::b").unwrap();
builder = builder
.start_transaction(0)
Expand All @@ -417,19 +446,30 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(0),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 1);
let rows_pruned = handler.prune(0, 1, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 0);
}

#[test]
fn test_process_coin_balance_buckets_balance_change() {
#[tokio::test]
async fn test_process_coin_balance_buckets_balance_change() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_sui_object(0, 10010)
.finish_transaction();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
// Checkpoint 0 creates coin object 0.
assert_eq!(
values[0].change,
CoinBalanceBucketChangeKind::Insert {
Expand All @@ -439,6 +479,13 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(0),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 1);

// Transfer 10 MIST, balance goes from 10010 to 10000.
// The balance bucket for the original coin does not change.
// We should only see the creation of the new coin in the processed results.
Expand All @@ -449,6 +496,7 @@ mod tests {
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
// Checkpoint 1 creates coin object 1.
assert_eq!(
values[0].change,
CoinBalanceBucketChangeKind::Insert {
Expand All @@ -458,6 +506,16 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(1),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 2);

// Nothing to prune because the two coins in the table have not been updated since creation.
let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 0);

// Transfer 1 MIST, balance goes from 10000 to 9999.
// The balance bucket changes, we should see a change, both for the old owner and the new owner.
Expand All @@ -468,6 +526,7 @@ mod tests {
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 2);
// Checkpoint 2 creates coin object 2, and mutates coin object 0.
assert!(values.iter().any(|v| v.change
== CoinBalanceBucketChangeKind::Insert {
owner_kind: StoredCoinOwnerKind::Fastpath,
Expand All @@ -482,17 +541,68 @@ mod tests {
coin_type: GAS::type_tag(),
owner_id: TestCheckpointDataBuilder::derive_address(1),
}));
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 4);

let rows_pruned = handler.prune(2, 3, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 3);
assert_eq!(
all_balance_buckets[0],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(),
cp_sequence_number: 2,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(0).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(3),
}
);
assert_eq!(
all_balance_buckets[1],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(1).to_vec(),
cp_sequence_number: 1,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(1),
}
);
assert_eq!(
all_balance_buckets[2],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(2).to_vec(),
cp_sequence_number: 2,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(0),
}
);
}

#[test]
fn test_process_coin_balance_buckets_coin_deleted() {
#[tokio::test]
async fn test_process_coin_balance_buckets_coin_deleted() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_owned_object(0)
.finish_transaction();
builder.build_checkpoint();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

builder = builder
.start_transaction(0)
Expand All @@ -502,17 +612,46 @@ mod tests {
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 2);
assert_eq!(
all_balance_buckets[1],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(),
cp_sequence_number: 1,
owner_kind: None,
owner_id: None,
coin_type: None,
coin_balance_bucket: None,
}
);

let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 0);
}

#[test]
fn test_process_coin_balance_buckets_owner_change() {
#[tokio::test]
async fn test_process_coin_balance_buckets_owner_change() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_sui_object(0, 100)
.finish_transaction();
builder.build_checkpoint();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

builder = builder
.start_transaction(0)
Expand All @@ -530,19 +669,45 @@ mod tests {
owner_id: TestCheckpointDataBuilder::derive_address(1),
}
);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 1);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 1);
assert_eq!(
all_balance_buckets[0],
StoredCoinBalanceBucket {
object_id: TestCheckpointDataBuilder::derive_object_id(0).to_vec(),
cp_sequence_number: 1,
owner_kind: Some(StoredCoinOwnerKind::Fastpath),
owner_id: Some(TestCheckpointDataBuilder::derive_address(1).to_vec()),
coin_type: Some(bcs::to_bytes(&GAS::type_tag()).unwrap()),
coin_balance_bucket: Some(2),
}
);
}

#[test]
fn test_process_coin_balance_buckets_object_owned() {
#[tokio::test]
async fn test_process_coin_balance_buckets_object_owned() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();
let handler = CoinBalanceBuckets::default();
let mut builder = TestCheckpointDataBuilder::new(1);
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder
.start_transaction(0)
.create_owned_object(0)
.finish_transaction();
builder.build_checkpoint();
let checkpoint = builder.build_checkpoint();
let values = handler.process(&Arc::new(checkpoint)).unwrap();
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

// We do not track balance buckets for object owners.
// So this is considered as a delete.
builder = builder
.start_transaction(0)
Expand All @@ -552,5 +717,14 @@ mod tests {
let values = handler.process(&Arc::new(checkpoint)).unwrap();
assert_eq!(values.len(), 1);
assert_eq!(values[0].change, CoinBalanceBucketChangeKind::Delete);
let rows_inserted = CoinBalanceBuckets::commit(&values, &mut conn)
.await
.unwrap();
assert_eq!(rows_inserted, 1);

let rows_pruned = handler.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 2);
let all_balance_buckets = get_all_balance_buckets(&mut conn).await;
assert_eq!(all_balance_buckets.len(), 0);
}
}
6 changes: 5 additions & 1 deletion crates/sui-types/src/test_checkpoint_data_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,12 @@ impl TestCheckpointDataBuilder {
}

/// Derive an object ID from an index. This is used to conveniently represent an object's ID.
/// We ensure that the bytes of object IDs have a stable order that is the same as object_idx.
pub fn derive_object_id(object_idx: u64) -> ObjectID {
ObjectID::derive_id(TransactionDigest::ZERO, object_idx)
// We achieve this by setting the first 8 bytes of the object ID to the object_idx.
let mut bytes = [0; ObjectID::LENGTH];
bytes[0..8].copy_from_slice(&object_idx.to_le_bytes());
ObjectID::from_bytes(bytes).unwrap()
}

/// Derive an address from an index.
Expand Down
Loading