From a114f5194a3c8ff1a537f1e6192b6f683d59c66f Mon Sep 17 00:00:00 2001 From: Emma Zhong Date: Thu, 9 Jan 2025 11:17:51 -0800 Subject: [PATCH] [indexer-alt] add prune impls for each pipeline (#20635) ## Description Added `prune` implementations for pipeline inside indexer alt schema, built upon Will's cp mapping PR. ## Test plan Will add tests. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --------- Co-authored-by: Will Yang --- .../src/handlers/ev_emit_mod.rs | 20 ++++++++++++++ .../src/handlers/ev_struct_inst.rs | 25 +++++++++++++++-- .../src/handlers/kv_checkpoints.rs | 13 +++++++++ .../src/handlers/kv_epoch_ends.rs | 26 +++++++++++++++++- .../src/handlers/kv_epoch_starts.rs | 27 ++++++++++++++++++- .../src/handlers/kv_transactions.rs | 16 +++++++++++ .../src/handlers/tx_affected_addresses.rs | 24 ++++++++++++++++- .../src/handlers/tx_affected_objects.rs | 24 ++++++++++++++++- .../src/handlers/tx_balance_changes.rs | 24 ++++++++++++++++- .../sui-indexer-alt/src/handlers/tx_calls.rs | 23 +++++++++++++++- .../src/handlers/tx_digests.rs | 23 +++++++++++++++- .../sui-indexer-alt/src/handlers/tx_kinds.rs | 23 +++++++++++++++- 12 files changed, 258 insertions(+), 10 deletions(-) diff --git a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs index b5f6f0a4b7ea3..6828c21fe5899 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs @@ -1,10 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::{collections::BTreeSet, sync::Arc}; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +use sui_indexer_alt_framework::models::cp_sequence_numbers::tx_interval; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod}; use sui_pg_db as db; @@ -57,4 +60,21 @@ impl Handler for EvEmitMod { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + + let filter = ev_emit_mod::table + .filter(ev_emit_mod::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs index c66d5592fe57e..10c67c17f3953 100644 --- a/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs +++ b/crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs @@ -1,11 +1,15 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::BTreeSet, sync::Arc}; +use std::{collections::BTreeSet, ops::Range, sync::Arc}; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -60,4 +64,21 @@ impl Handler for EvStructInst { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + + let filter = ev_struct_inst::table + .filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs index a9bc26a7e90f4..f45ad7f22ffee 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints}; @@ -38,4 +39,16 @@ impl Handler for KvCheckpoints { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let filter = kv_checkpoints::table + .filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs index 926d9325f442e..7d5d7b2409164 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::{bail, Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::epoch_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends}; use sui_pg_db as db; use sui_types::{ @@ -125,4 +130,23 @@ impl Handler for KvEpochEnds { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_epoch, + end: to_epoch, + } = epoch_interval(conn, from..to_exclusive).await?; + if from_epoch < to_epoch { + let filter = kv_epoch_ends::table + .filter(kv_epoch_ends::epoch.between(from_epoch as i64, to_epoch as i64 - 1)); + Ok(diesel::delete(filter).execute(conn).await?) + } else { + Ok(0) + } + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs index bd5efcdf61463..f6d44d93fdd0a 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::{bail, Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::epoch_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts}; use sui_pg_db as db; use sui_types::{ @@ -72,4 +77,24 @@ impl Handler for KvEpochStarts { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_epoch, + end: to_epoch, + } = epoch_interval(conn, from..to_exclusive).await?; + if from_epoch < to_epoch { + let filter = kv_epoch_starts::table + .filter(kv_epoch_starts::epoch.between(from_epoch as i64, to_epoch as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } else { + Ok(0) + } + } } diff --git a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs index 7bef2130d8177..4d12bb4f86867 100644 --- a/crates/sui-indexer-alt/src/handlers/kv_transactions.rs +++ b/crates/sui-indexer-alt/src/handlers/kv_transactions.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction}; @@ -66,4 +67,19 @@ impl Handler for KvTransactions { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + // TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index + // created as its primary key is on `tx_digest`. + let filter = kv_transactions::table.filter( + kv_transactions::cp_sequence_number.between(from as i64, to_exclusive as i64 - 1), + ); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs index 51fb7e6917b8f..f6c456c527b5d 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs @@ -1,12 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; use itertools::Itertools; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{ schema::tx_affected_addresses, transactions::StoredTxAffectedAddress, }; @@ -69,4 +74,21 @@ impl Handler for TxAffectedAddresses { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + let filter = tx_affected_addresses::table.filter( + tx_affected_addresses::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1), + ); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs index c99f8dd56a49b..6d79959a8fdec 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject}; use sui_pg_db as db; use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData}; @@ -59,4 +64,21 @@ impl Handler for TxAffectedObjects { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + let filter = tx_affected_objects::table.filter( + tx_affected_objects::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1), + ); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs index 31a49d33943cc..1f353b525b40e 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::{collections::BTreeMap, sync::Arc}; use anyhow::{Context, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{ schema::tx_balance_changes, transactions::{BalanceChange, StoredTxBalanceChange}, @@ -65,6 +70,23 @@ impl Handler for TxBalanceChanges { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + let filter = tx_balance_changes::table.filter( + tx_balance_changes::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1), + ); + + Ok(diesel::delete(filter).execute(conn).await?) + } } /// Calculate balance changes based on the object's input and output objects. diff --git a/crates/sui-indexer-alt/src/handlers/tx_calls.rs b/crates/sui-indexer-alt/src/handlers/tx_calls.rs index e189bdd9acd2d..726af48d79f3e 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_calls.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_calls.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::{Ok, Result}; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -62,4 +67,20 @@ impl Handler for TxCalls { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + let filter = tx_calls::table + .filter(tx_calls::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_digests.rs b/crates/sui-indexer-alt/src/handlers/tx_digests.rs index 579ec32429240..5de82ee45a974 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_digests.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_digests.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{schema::tx_digests, transactions::StoredTxDigest}; use sui_pg_db as db; use sui_types::full_checkpoint_content::CheckpointData; @@ -49,4 +54,20 @@ impl Handler for TxDigests { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + let filter = tx_digests::table + .filter(tx_digests::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } } diff --git a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs index 5f61e66be360f..c616a085d42b4 100644 --- a/crates/sui-indexer-alt/src/handlers/tx_kinds.rs +++ b/crates/sui-indexer-alt/src/handlers/tx_kinds.rs @@ -1,11 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::ops::Range; use std::sync::Arc; use anyhow::Result; +use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor}; +use sui_indexer_alt_framework::{ + models::cp_sequence_numbers::tx_interval, + pipeline::{concurrent::Handler, Processor}, +}; use sui_indexer_alt_schema::{ schema::tx_kinds, transactions::{StoredKind, StoredTxKind}, @@ -60,4 +65,20 @@ impl Handler for TxKinds { .execute(conn) .await?) } + + async fn prune( + &self, + from: u64, + to_exclusive: u64, + conn: &mut db::Connection<'_>, + ) -> Result { + let Range { + start: from_tx, + end: to_tx, + } = tx_interval(conn, from..to_exclusive).await?; + let filter = tx_kinds::table + .filter(tx_kinds::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1)); + + Ok(diesel::delete(filter).execute(conn).await?) + } }