Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[indexer-alt] pruning tests for kv_epoch_ends and pipelines that don't need CpSeqeuenceNumbers #20925

Merged
merged 8 commits into from
Jan 22, 2025
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_types::digests::{ChainIdentifier, CheckpointDigest};

use crate::schema::{kv_checkpoints, kv_genesis};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, Debug, Clone, FieldCount, Queryable)]
#[diesel(table_name = kv_checkpoints)]
pub struct StoredCheckpoint {
pub sequence_number: i64,
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer-alt-schema/src/epochs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use sui_field_count::FieldCount;

use crate::schema::{kv_epoch_ends, kv_epoch_starts, kv_feature_flags, kv_protocol_configs};

#[derive(Insertable, Debug, Clone, FieldCount)]
#[derive(Insertable, Debug, Clone, FieldCount, Queryable)]
#[diesel(table_name = kv_epoch_ends)]
#[diesel(treat_none_as_default_value = false)]
pub struct StoredEpochEnd {
Expand Down
49 changes: 49 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,52 @@ impl Handler for KvCheckpoints {
Ok(diesel::delete(filter).execute(conn).await?)
}
}

#[cfg(test)]
mod tests {
use super::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_schema::MIGRATIONS;
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

async fn get_all_kv_checkpoints(
conn: &mut db::Connection<'_>,
) -> Result<Vec<StoredCheckpoint>> {
let query = kv_checkpoints::table.load(conn).await?;
Ok(query)
}

/// The kv_checkpoints pruner does not require cp_sequence_numbers, it can prune directly with the
/// checkpoint sequence number range.
#[tokio::test]
async fn test_kv_checkpoints_pruning() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

// Create 3 checkpoints
let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvCheckpoints.process(&checkpoint).unwrap();
KvCheckpoints::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvCheckpoints.process(&checkpoint).unwrap();
KvCheckpoints::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvCheckpoints.process(&checkpoint).unwrap();
KvCheckpoints::commit(&values, &mut conn).await.unwrap();

// Prune checkpoints from `[0, 2)`
let rows_pruned = KvCheckpoints.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 2);

// Checkpoint 2 remains
let remaining_checkpoints = get_all_kv_checkpoints(&mut conn).await.unwrap();
assert_eq!(remaining_checkpoints.len(), 1);
}
}
137 changes: 137 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,140 @@ impl Handler for KvEpochEnds {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use sui_indexer_alt_framework::{handlers::cp_sequence_numbers::CpSequenceNumbers, Indexer};
use sui_indexer_alt_schema::MIGRATIONS;
use sui_pg_db::Connection;
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

async fn get_all_kv_epoch_ends(conn: &mut Connection<'_>) -> Result<Vec<StoredEpochEnd>> {
let result = kv_epoch_ends::table
.order_by(kv_epoch_ends::epoch.asc())
.load(conn)
.await?;
Ok(result)
}

async fn get_epoch_num_of_all_kv_epoch_ends(conn: &mut Connection<'_>) -> Result<Vec<i64>> {
let epochs = get_all_kv_epoch_ends(conn).await?;
Ok(epochs.iter().map(|e| e.epoch).collect())
}

#[tokio::test]
pub async fn test_kv_epoch_ends_safe_mode() -> () {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

let mut builder = TestCheckpointDataBuilder::new(0);
let checkpoint = Arc::new(builder.advance_epoch(true));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();

let epochs = get_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs.len(), 1);
assert_eq!(epochs[0].safe_mode, true);
assert_eq!(epochs[0].total_gas_fees, None);

let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();

let epochs = get_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs.len(), 2);
assert_eq!(epochs[1].safe_mode, false);
assert_eq!(epochs[1].total_gas_fees, Some(0));
}

#[tokio::test]
pub async fn test_kv_epoch_ends_same_epoch() -> () {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

// Test that there is nothing to commit while we haven't reached epoch end.
let mut builder = TestCheckpointDataBuilder::new(0);
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

// When the advance epoch tx is detected, there should be an entry to commit.
let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 1);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

// Afterwards, kv_epoch_ends should not have anything to commit until the next advance epoch
// tx.
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
assert_eq!(values.len(), 0);
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs, vec![0]);

let rows_pruned = KvEpochEnds.prune(0, 4, &mut conn).await.unwrap();
let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs.len(), 0);
assert_eq!(rows_pruned, 1);
}

#[tokio::test]
pub async fn test_kv_epoch_ends_advance_multiple_epochs() -> () {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

// Advance epoch three times, 0, 1, 2
let mut builder = TestCheckpointDataBuilder::new(0);
let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let checkpoint = Arc::new(builder.advance_epoch(false));
let values = KvEpochEnds.process(&checkpoint).unwrap();
KvEpochEnds::commit(&values, &mut conn).await.unwrap();
let values = CpSequenceNumbers.process(&checkpoint).unwrap();
CpSequenceNumbers::commit(&values, &mut conn).await.unwrap();

let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
assert_eq!(epochs, vec![0, 1, 2]);

let rows_pruned = KvEpochEnds.prune(0, 2, &mut conn).await.unwrap();
let epochs = get_epoch_num_of_all_kv_epoch_ends(&mut conn).await.unwrap();
// Only epoch 2 remains, after pruning 0 and 1.
assert_eq!(epochs, vec![2]);
assert_eq!(rows_pruned, 2);
}
}
55 changes: 53 additions & 2 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,63 @@ impl Handler for KvTransactions {
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
// TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index
// created as its primary key is on `tx_digest`.
let filter = kv_transactions::table.filter(
kv_transactions::cp_sequence_number.between(from as i64, to_exclusive as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}

#[cfg(test)]
mod tests {
use super::*;
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::Indexer;
use sui_indexer_alt_schema::MIGRATIONS;
use sui_types::test_checkpoint_data_builder::TestCheckpointDataBuilder;

async fn get_all_kv_transactions(
conn: &mut db::Connection<'_>,
) -> Result<Vec<StoredTransaction>> {
Ok(kv_transactions::table.load(conn).await?)
}

/// The kv_checkpoints pruner does not require cp_sequence_numbers, it can prune directly with the
/// checkpoint sequence number range.
#[tokio::test]
async fn test_kv_transactions_pruning() {
let (indexer, _db) = Indexer::new_for_testing(&MIGRATIONS).await;
let mut conn = indexer.db().connect().await.unwrap();

let mut builder = TestCheckpointDataBuilder::new(0);
builder = builder.start_transaction(0).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvTransactions.process(&checkpoint).unwrap();
KvTransactions::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
builder = builder.start_transaction(1).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvTransactions.process(&checkpoint).unwrap();
KvTransactions::commit(&values, &mut conn).await.unwrap();

builder = builder.start_transaction(0).finish_transaction();
builder = builder.start_transaction(1).finish_transaction();
builder = builder.start_transaction(2).finish_transaction();
builder = builder.start_transaction(3).finish_transaction();
let checkpoint = Arc::new(builder.build_checkpoint());
let values = KvTransactions.process(&checkpoint).unwrap();
KvTransactions::commit(&values, &mut conn).await.unwrap();

let transactions = get_all_kv_transactions(&mut conn).await.unwrap();
assert_eq!(transactions.len(), 7);

// Prune checkpoints from `[0, 2)`
let rows_pruned = KvTransactions.prune(0, 2, &mut conn).await.unwrap();
assert_eq!(rows_pruned, 3);

let remaining_transactions = get_all_kv_transactions(&mut conn).await.unwrap();
assert_eq!(remaining_transactions.len(), 4);
}
}
2 changes: 1 addition & 1 deletion crates/sui-types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl Event {
}

// Event emitted in move code `fun advance_epoch`
#[derive(Deserialize)]
#[derive(Serialize, Deserialize, Default)]
pub struct SystemEpochInfoEvent {
pub epoch: u64,
pub protocol_version: u64,
Expand Down
Loading
Loading