Skip to content

Commit

Permalink
[indexer-alt] add prune impls for each pipeline (#20635)
Browse files Browse the repository at this point in the history
## Description 

Added `prune` implementations for pipeline inside indexer alt schema,
built upon Will's cp mapping PR.

## Test plan 

Will add tests.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:

---------

Co-authored-by: Will Yang <[email protected]>
  • Loading branch information
emmazzz and wlmyng authored Jan 9, 2025
1 parent 101e4d8 commit a114f51
Show file tree
Hide file tree
Showing 12 changed files with 258 additions and 10 deletions.
20 changes: 20 additions & 0 deletions crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeSet, sync::Arc};

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::models::cp_sequence_numbers::tx_interval;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{events::StoredEvEmitMod, schema::ev_emit_mod};
use sui_pg_db as db;
Expand Down Expand Up @@ -57,4 +60,21 @@ impl Handler for EvEmitMod {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;

let filter = ev_emit_mod::table
.filter(ev_emit_mod::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
25 changes: 23 additions & 2 deletions crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::BTreeSet, sync::Arc};
use std::{collections::BTreeSet, ops::Range, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{events::StoredEvStructInst, schema::ev_struct_inst};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -60,4 +64,21 @@ impl Handler for EvStructInst {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;

let filter = ev_struct_inst::table
.filter(ev_struct_inst::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
13 changes: 13 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{checkpoints::StoredCheckpoint, schema::kv_checkpoints};
Expand Down Expand Up @@ -38,4 +39,16 @@ impl Handler for KvCheckpoints {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let filter = kv_checkpoints::table
.filter(kv_checkpoints::sequence_number.between(from as i64, to_exclusive as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
26 changes: 25 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_ends.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochEnd, schema::kv_epoch_ends};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -125,4 +130,23 @@ impl Handler for KvEpochEnds {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to_exclusive).await?;
if from_epoch < to_epoch {
let filter = kv_epoch_ends::table
.filter(kv_epoch_ends::epoch.between(from_epoch as i64, to_epoch as i64 - 1));
Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
27 changes: 26 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_epoch_starts.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{bail, Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::epoch_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{epochs::StoredEpochStart, schema::kv_epoch_starts};
use sui_pg_db as db;
use sui_types::{
Expand Down Expand Up @@ -72,4 +77,24 @@ impl Handler for KvEpochStarts {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_epoch,
end: to_epoch,
} = epoch_interval(conn, from..to_exclusive).await?;
if from_epoch < to_epoch {
let filter = kv_epoch_starts::table
.filter(kv_epoch_starts::epoch.between(from_epoch as i64, to_epoch as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
} else {
Ok(0)
}
}
}
16 changes: 16 additions & 0 deletions crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_schema::{schema::kv_transactions, transactions::StoredTransaction};
Expand Down Expand Up @@ -66,4 +67,19 @@ impl Handler for KvTransactions {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
// TODO: use tx_interval. `tx_sequence_number` needs to be added to this table, and an index
// created as its primary key is on `tx_digest`.
let filter = kv_transactions::table.filter(
kv_transactions::cp_sequence_number.between(from as i64, to_exclusive as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_addresses.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use itertools::Itertools;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_affected_addresses, transactions::StoredTxAffectedAddress,
};
Expand Down Expand Up @@ -69,4 +74,21 @@ impl Handler for TxAffectedAddresses {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_affected_addresses::table.filter(
tx_affected_addresses::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::Result;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_affected_objects, transactions::StoredTxAffectedObject};
use sui_pg_db as db;
use sui_types::{effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData};
Expand Down Expand Up @@ -59,4 +64,21 @@ impl Handler for TxAffectedObjects {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_affected_objects::table.filter(
tx_affected_objects::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}
24 changes: 23 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{Context, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{
schema::tx_balance_changes,
transactions::{BalanceChange, StoredTxBalanceChange},
Expand Down Expand Up @@ -65,6 +70,23 @@ impl Handler for TxBalanceChanges {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_balance_changes::table.filter(
tx_balance_changes::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1),
);

Ok(diesel::delete(filter).execute(conn).await?)
}
}

/// Calculate balance changes based on the object's input and output objects.
Expand Down
23 changes: 22 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::ops::Range;
use std::sync::Arc;

use anyhow::{Ok, Result};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use sui_indexer_alt_framework::pipeline::{concurrent::Handler, Processor};
use sui_indexer_alt_framework::{
models::cp_sequence_numbers::tx_interval,
pipeline::{concurrent::Handler, Processor},
};
use sui_indexer_alt_schema::{schema::tx_calls, transactions::StoredTxCalls};
use sui_pg_db as db;
use sui_types::full_checkpoint_content::CheckpointData;
Expand Down Expand Up @@ -62,4 +67,20 @@ impl Handler for TxCalls {
.execute(conn)
.await?)
}

async fn prune(
&self,
from: u64,
to_exclusive: u64,
conn: &mut db::Connection<'_>,
) -> Result<usize> {
let Range {
start: from_tx,
end: to_tx,
} = tx_interval(conn, from..to_exclusive).await?;
let filter = tx_calls::table
.filter(tx_calls::tx_sequence_number.between(from_tx as i64, to_tx as i64 - 1));

Ok(diesel::delete(filter).execute(conn).await?)
}
}
Loading

0 comments on commit a114f51

Please sign in to comment.