Skip to content

Commit

Permalink
[indexer-alt] pruning tests for kv_epoch_ends and pipelines that don'…
Browse files Browse the repository at this point in the history
…t need `CpSeqeuenceNumbers` (#20925)
  • Loading branch information
wlmyng authored Jan 22, 2025
1 parent 3a9a114 commit c769fa5
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 12 deletions.
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_types::digests::{ChainIdentifier, CheckpointDigest};

use crate::schema::{kv_checkpoints, kv_genesis};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, Debug, Clone, FieldCount, Queryable)]
#[diesel(table_name = kv_checkpoints)]
pub struct StoredCheckpoint {
pub sequence_number: i64,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/epochs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sui_field_count::FieldCount;

use crate::schema::{kv_epoch_ends, kv_epoch_starts, kv_feature_flags, kv_protocol_configs};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, Debug, Clone, FieldCount, Queryable)]
#[diesel(table_name = kv_epoch_ends)]
#[diesel(treat_none_as_default_value = false)]
pub struct StoredEpochEnd {
Expand Down
49 changes: 49 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,52 @@ impl Handler for KvCheckpoints {
Ok(diesel::delete(filter).execute(conn).await?)
}
}

#[cfg(test)]
mod tests {
use super::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_schema::MIGRATIONS;
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

async fn get_all_kv_checkpoints(
conn: &mut db::Connection<'_>,
) -> Result<Vec<StoredCheckpoint>> {
let query = kv_checkpoints::table.load(conn).await?;
Ok(query)
}

/// The kv_checkpoints pruner does not require cp_sequence_numbers, it can prune directly with the
/// checkpoint sequence number range.
#[tokio::test]
async fn test_kv_checkpoints_pruning() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

// Create 3 checkpoints
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvCheckpoints.process(&checkpoint).unwrap();
KvCheckpoints::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvCheckpoints.process(&checkpoint).unwrap();
KvCheckpoints::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvCheckpoints.process(&checkpoint).unwrap();
KvCheckpoints::commit(&values, &mut conn).await.unwrap();

// Prune checkpoints from `[0, 2)`
let rows_pruned = KvCheckpoints.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 2);

// Checkpoint 2 remains
let remaining_checkpoints = get_all_kv_checkpoints(&mut conn).await.unwrap();
assert_eq!(remaining_checkpoints.len(), 1);
}
}
137 changes: 137 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,140 @@ impl Handler for KvEpochEnds {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use sui_indexer_alt_framework::{handlers::cp_sequence_numbers::CpSequenceNumbers, Indexer};
use sui_indexer_alt_schema::MIGRATIONS;
use sui_pg_db::Connection;
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

async fn get_all_kv_epoch_ends(conn: &mut Connection<'_>) -> Result<Vec<StoredEpochEnd>> {
let result = kv_epoch_ends::table
.order_by(kv_epoch_ends::epoch.asc())
.load(conn)
.await?;
Ok(result)
}

async fn get_epoch_num_of_all_kv_epoch_ends(conn: &mut Connection<'_>) -> Result<Vec<i64>> {
let epochs = get_all_kv_epoch_ends(conn).await?;
Ok(epochs.iter().map(|e| e.epoch).collect())
}

#[tokio::test]
pub async fn test_kv_epoch_ends_safe_mode() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

let mut builder = TestCheckpointDataBuilder::new(0);
let checkpoint = Arc::new(builder.advance_epoch(true));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();

let epochs = get_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs.len(), 1);
assert!(epochs[0].safe_mode);
assert_eq!(epochs[0].total_gas_fees, None);

let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();

let epochs = get_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs.len(), 2);
assert!(!epochs[1].safe_mode);
assert_eq!(epochs[1].total_gas_fees, Some(0));
}

#[tokio::test]
pub async fn test_kv_epoch_ends_same_epoch() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

// Test that there is nothing to commit while we haven't reached epoch end.
let mut builder = TestCheckpointDataBuilder::new(0);
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

// When the advance epoch tx is detected, there should be an entry to commit.
let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 1);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

// Afterwards, kv_epoch_ends should not have anything to commit until the next advance epoch
// tx.
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs, vec![0]);

let rows_pruned = KvEpochEnds.prune(0, 4, &mut conn).await.unwrap();
let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs.len(), 0);
assert_eq!(rows_pruned, 1);
}

#[tokio::test]
pub async fn test_kv_epoch_ends_advance_multiple_epochs() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

// Advance epoch three times, 0, 1, 2
let mut builder = TestCheckpointDataBuilder::new(0);
let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs, vec![0, 1, 2]);

let rows_pruned = KvEpochEnds.prune(0, 2, &mut conn).await.unwrap();
let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
// Only epoch 2 remains, after pruning 0 and 1.
assert_eq!(epochs, vec![2]);
assert_eq!(rows_pruned, 2);
}
}
55 changes: 53 additions & 2 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,63 @@ impl Handler for KvTransactions {
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
// TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index
// created as its primary key is on `tx_digest`.
let filter = kv_transactions::table.filter(
kv_transactions::cp_sequence_number.between(from as i64, to_exclusive as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}

#[cfg(test)]
mod tests {
use super::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_schema::MIGRATIONS;
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

async fn get_all_kv_transactions(
conn: &mut db::Connection<'_>,
) -> Result<Vec<StoredTransaction>> {
Ok(kv_transactions::table.load(conn).await?)
}

/// The kv_checkpoints pruner does not require cp_sequence_numbers, it can prune directly with the
/// checkpoint sequence number range.
#[tokio::test]
async fn test_kv_transactions_pruning() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvTransactions.process(&checkpoint).unwrap();
KvTransactions::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
builder = builder.start_transaction(1).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvTransactions.process(&checkpoint).unwrap();
KvTransactions::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
builder = builder.start_transaction(1).finish_transaction();
builder = builder.start_transaction(2).finish_transaction();
builder = builder.start_transaction(3).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvTransactions.process(&checkpoint).unwrap();
KvTransactions::commit(&values, &mut conn).await.unwrap();

let transactions = get_all_kv_transactions(&mut conn).await.unwrap();
assert_eq!(transactions.len(), 7);

// Prune checkpoints from `[0, 2)`
let rows_pruned = KvTransactions.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 3);

let remaining_transactions = get_all_kv_transactions(&mut conn).await.unwrap();
assert_eq!(remaining_transactions.len(), 4);
}
}
2 changes: 1 addition & 1 deletion crates/sui-types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Event {
}

// Event emitted in move code `fun advance_epoch`
#[derive(Deserialize)]
#[derive(Serialize, Deserialize, Default)]
pub struct SystemEpochInfoEvent {
pub epoch: u64,
pub protocol_version: u64,
Expand Down
Loading

0 comments on commit c769fa5

Please sign in to comment.