Skip to content

Commit

Permalink
Add new object_per_epoch_marker_table that includes consensus start…
Browse files Browse the repository at this point in the history
… versions (#20822)

This introduces the concept of a "FullObjectID" and "FullObjectKey",
which for consensus objects includes the initial shared version/start
version.

Migrates to these new types across the codebase where relevant.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] gRPC:
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
aschran authored Jan 23, 2025
1 parent 59ce28f commit b82ada5
Show file tree
Hide file tree
Showing 42 changed files with 663 additions and 186 deletions.
2 changes: 2 additions & 0 deletions crates/simulacrum/src/store/in_mem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ impl ChildObjectResolver for InMemoryStore {
receiving_object_id: &ObjectID,
receive_object_at_version: SequenceNumber,
_epoch_id: EpochId,
// TODO: Delete this parameter once table migration is complete.
_use_object_per_epoch_marker_table_v2: bool,
) -> sui_types::error::SuiResult<Option<Object>> {
let recv_object = match crate::store::SimulatorStore::get_object(self, receiving_object_id)
{
Expand Down
25 changes: 24 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_objects_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

let (_gas_status, checked_input_objects) = sui_transaction_checks::check_transaction_input(
Expand Down Expand Up @@ -1557,7 +1561,14 @@ impl AuthorityState {
inner_temporary_store,
);
self.get_cache_writer()
.write_transaction_outputs(epoch_store.epoch(), transaction_outputs.into())
.write_transaction_outputs(
epoch_store.epoch(),
transaction_outputs.into(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

if certificate.transaction_data().is_end_of_epoch_tx() {
Expand Down Expand Up @@ -1797,6 +1808,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// make a gas object if one was not provided
Expand Down Expand Up @@ -1983,6 +1998,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// make a gas object if one was not provided
Expand Down Expand Up @@ -2145,6 +2164,10 @@ impl AuthorityState {
&input_object_kinds,
&receiving_object_refs,
epoch_store.epoch(),
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)?;

// Create and use a dummy gas object if there is no gas object provided.
Expand Down
11 changes: 6 additions & 5 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use sui_storage::mutex_table::{MutexGuard, MutexTable};
use sui_types::accumulator::Accumulator;
use sui_types::authenticator_state::{get_authenticator_state, ActiveJwk};
use sui_types::base_types::{
AuthorityName, ConsensusObjectSequenceKey, EpochId, ObjectID, SequenceNumber, TransactionDigest,
AuthorityName, ConsensusObjectSequenceKey, EpochId, FullObjectID, ObjectID, SequenceNumber,
TransactionDigest,
};
use sui_types::base_types::{ConciseableName, ObjectRef};
use sui_types::committee::Committee;
Expand Down Expand Up @@ -1437,7 +1438,7 @@ impl AuthorityPerEpochStore {
error: "no assigned shared versions".to_string(),
})?;

let initial_shared_version =
let modified_initial_shared_version =
if self.epoch_start_config().use_version_assignment_tables_v3() {
*initial_shared_version
} else {
Expand All @@ -1447,21 +1448,21 @@ impl AuthorityPerEpochStore {
};
// If we found assigned versions, but they are missing the assignment for
// this object, it indicates a serious inconsistency!
let Some(version) = assigned_shared_versions.get(&(*id, initial_shared_version)) else {
let Some(version) = assigned_shared_versions.get(&(*id, modified_initial_shared_version)) else {
panic!(
"Shared object version should have been assigned. key: {key:?}, \
obj id: {id:?}, initial_shared_version: {initial_shared_version:?}, \
assigned_shared_versions: {assigned_shared_versions:?}",
)
};
InputKey::VersionedObject {
id: *id,
id: FullObjectID::new(*id, Some(*initial_shared_version)),
version: *version,
}
}
InputObjectKind::MovePackage(id) => InputKey::Package { id: *id },
InputObjectKind::ImmOrOwnedMoveObject(objref) => InputKey::VersionedObject {
id: objref.0,
id: FullObjectID::new(objref.0, None),
version: objref.1,
},
})
Expand Down
118 changes: 85 additions & 33 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use sui_types::error::UserInputError;
use sui_types::execution::TypeLayoutStore;
use sui_types::message_envelope::Message;
use sui_types::storage::{
get_module, BackingPackageStore, MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore,
get_module, BackingPackageStore, FullObjectKey, MarkerValue, ObjectKey, ObjectOrTombstone,
ObjectStore,
};
use sui_types::sui_system_state::get_sui_system_state;
use sui_types::{base_types::SequenceNumber, fp_bail, fp_ensure};
Expand Down Expand Up @@ -212,9 +213,12 @@ impl AuthorityStore {
// We can safely delete all entries in the per epoch marker table since this is only called
// at epoch boundaries (during reconfiguration). Therefore any entries that currently
// exist can be removed. Because of this we can use the `schedule_delete_all` method.
self.perpetual_tables
.object_per_epoch_marker_table
.schedule_delete_all()?;
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.object_per_epoch_marker_table_v2
.schedule_delete_all()?)
}

Expand Down Expand Up @@ -412,40 +416,70 @@ impl AuthorityStore {

pub fn get_marker_value(
&self,
object_id: &ObjectID,
version: &SequenceNumber,
object_key: FullObjectKey,
epoch_id: EpochId,
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult<Option<MarkerValue>> {
let object_key = (epoch_id, ObjectKey(*object_id, *version));
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.get(&object_key)?)
if use_object_per_epoch_marker_table_v2 {
Ok(self
.perpetual_tables
.object_per_epoch_marker_table_v2
.get(&(epoch_id, object_key))?)
} else {
Ok(self
.perpetual_tables
.object_per_epoch_marker_table
.get(&(epoch_id, object_key.into_object_key()))?)
}
}

pub fn get_latest_marker(
&self,
object_id: &ObjectID,
object_id: FullObjectID,
epoch_id: EpochId,
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult<Option<(SequenceNumber, MarkerValue)>> {
let min_key = (epoch_id, ObjectKey::min_for_id(object_id));
let max_key = (epoch_id, ObjectKey::max_for_id(object_id));
if use_object_per_epoch_marker_table_v2 {
let min_key = (epoch_id, FullObjectKey::min_for_id(&object_id));
let max_key = (epoch_id, FullObjectKey::max_for_id(&object_id));

let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.0, *object_id);
Ok(Some((key.1, marker)))
let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table_v2
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.id(), object_id);
Ok(Some((key.version(), marker)))
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
} else {
let min_key = (epoch_id, ObjectKey::min_for_id(&object_id.id()));
let max_key = (epoch_id, ObjectKey::max_for_id(&object_id.id()));

let marker_entry = self
.perpetual_tables
.object_per_epoch_marker_table
.safe_iter_with_bounds(Some(min_key), Some(max_key))
.skip_prior_to(&max_key)?
.next();
match marker_entry {
Some(Ok(((epoch, key), marker))) => {
// because of the iterator bounds these cannot fail
assert_eq!(epoch, epoch_id);
assert_eq!(key.0, object_id.id());
Ok(Some((key.1, marker)))
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
Some(Err(e)) => Err(e.into()),
None => Ok(None),
}
}

Expand Down Expand Up @@ -824,6 +858,8 @@ impl AuthorityStore {
&self,
epoch_id: EpochId,
tx_outputs: &[Arc<TransactionOutputs>],
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult {
let mut written = Vec::with_capacity(tx_outputs.len());
for outputs in tx_outputs {
Expand All @@ -834,7 +870,12 @@ impl AuthorityStore {

let mut write_batch = self.perpetual_tables.transactions.batch();
for outputs in tx_outputs {
self.write_one_transaction_outputs(&mut write_batch, epoch_id, outputs)?;
self.write_one_transaction_outputs(
&mut write_batch,
epoch_id,
outputs,
use_object_per_epoch_marker_table_v2,
)?;
}
// test crashing before writing the batch
fail_point_async!("crash");
Expand All @@ -859,6 +900,8 @@ impl AuthorityStore {
write_batch: &mut DBBatch,
epoch_id: EpochId,
tx_outputs: &TransactionOutputs,
// TODO: Delete this parameter once table migration is complete.
use_object_per_epoch_marker_table_v2: bool,
) -> SuiResult {
let TransactionOutputs {
transaction,
Expand All @@ -883,12 +926,21 @@ impl AuthorityStore {
// Add batched writes for objects and locks.
let effects_digest = effects.digest();

write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
)?;
if use_object_per_epoch_marker_table_v2 {
write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table_v2,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, *key), *marker_value)),
)?;
} else {
write_batch.insert_batch(
&self.perpetual_tables.object_per_epoch_marker_table,
markers
.iter()
.map(|(key, marker_value)| ((epoch_id, key.into_object_key()), *marker_value)),
)?;
}

write_batch.insert_batch(
&self.perpetual_tables.objects,
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sui_types::accumulator::Accumulator;
use sui_types::base_types::SequenceNumber;
use sui_types::digests::TransactionEventsDigest;
use sui_types::effects::TransactionEffects;
use sui_types::storage::MarkerValue;
use sui_types::storage::{FullObjectKey, MarkerValue};
use typed_store::metrics::SamplingInterval;
use typed_store::rocks::util::{empty_compaction_filter, reference_count_merge_operator};
use typed_store::rocks::{
Expand Down Expand Up @@ -136,6 +136,7 @@ pub struct AuthorityPerpetualTables {
/// objects that have been deleted. This table is meant to be pruned per-epoch, and all
/// previous epochs other than the current epoch may be pruned safely.
pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
pub(crate) object_per_epoch_marker_table_v2: DBMap<(EpochId, FullObjectKey), MarkerValue>,
}

impl AuthorityPerpetualTables {
Expand Down Expand Up @@ -459,6 +460,7 @@ impl AuthorityPerpetualTables {
self.expected_network_sui_amount.unsafe_clear()?;
self.expected_storage_fund_imbalance.unsafe_clear()?;
self.object_per_epoch_marker_table.unsafe_clear()?;
self.object_per_epoch_marker_table_v2.unsafe_clear()?;
self.objects.rocksdb.flush()?;
Ok(())
}
Expand Down
9 changes: 8 additions & 1 deletion crates/sui-core/src/authority/test_authority_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,14 @@ impl<'a> TestAuthorityBuilder<'a> {

state
.get_cache_commit()
.commit_transaction_outputs(epoch_store.epoch(), &[*genesis.transaction().digest()])
.commit_transaction_outputs(
epoch_store.epoch(),
&[*genesis.transaction().digest()],
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

// We want to insert these objects directly instead of relying on genesis because
Expand Down
18 changes: 16 additions & 2 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,14 @@ impl CheckpointExecutor {
let cache_commit = self.state.get_cache_commit();
debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
cache_commit
.commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
.commit_transaction_outputs(
epoch_store.epoch(),
all_tx_digests,
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;

epoch_store
Expand Down Expand Up @@ -657,7 +664,14 @@ impl CheckpointExecutor {

let cache_commit = self.state.get_cache_commit();
cache_commit
.commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
.commit_transaction_outputs(
cur_epoch,
&[change_epoch_tx_digest],
epoch_store
.protocol_config()
.use_object_per_epoch_marker_table_v2_as_option()
.unwrap_or(false),
)
.await;
fail_point_async!("prune-and-compact");

Expand Down
Loading

0 comments on commit b82ada5

Please sign in to comment.