Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…-to-transmit-state-height-for-each-completed-update-call' into 'master'

feat(execution): [NET-1709] Transmit message ID and height for finalised ingress messages.

This MR adds a [`tokio::sync::mpsc::Sender`](https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html) and an `Arc<dyn StateReader<State = ReplicatedState>>` to the `IngressHistoryWriterImpl` in `history.rs`. Each time `IngressHistoryWriterImpl::set_status()` is called for a message transitioning to a terminal state, it will send the message-id of the message and the height of the replicated state where the message has transitioned to a terminal state.

The receiver will be used in a `IngressWatcher` event loop [here](https://sourcegraph.com/github.com/dfinity/ic@c792b285485d8428437d9745558556e0775f1d5b/-/blob/rs/http_endpoints/public/src/call/ingress_watcher.rs?L232-235) to track at what height a message completes execution.

This change also enables the new `/v3/.../call` HTTPS endpoint for synchronous ingress messages.

Closes NET-1709 

Closes NET-1709

See merge request dfinity-lab/public/ic!19797
  • Loading branch information
DSharifi committed Jun 20, 2024
2 parents cbeffcf + d2ad64c commit 6a919d0
Show file tree
Hide file tree
Showing 21 changed files with 252 additions and 95 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions rs/determinism_test/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ pub(crate) fn setup() -> (
ic_types::malicious_flags::MaliciousFlags::default(),
));

let (completed_execution_messages_tx, _) = tokio::sync::mpsc::channel(1);

let execution_services = ExecutionServices::setup_execution(
log.clone().into(),
&metrics_registry,
Expand All @@ -141,6 +143,7 @@ pub(crate) fn setup() -> (
Arc::clone(&cycles_account_manager),
Arc::clone(&state_manager) as Arc<_>,
Arc::clone(&state_manager.get_fd_factory()),
completed_execution_messages_tx,
);

let message_routing = MessageRoutingImpl::new(
Expand Down
4 changes: 4 additions & 0 deletions rs/drun/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ pub async fn run_drun(uo: DrunOptions) -> Result<(), String> {
None,
ic_types::malicious_flags::MaliciousFlags::default(),
));

let (completed_execution_messages_tx, _) = tokio::sync::mpsc::channel(1);

let (_, ingress_history_writer, ingress_hist_reader, query_handler, scheduler) =
ExecutionServices::setup_execution(
log.clone().into(),
Expand All @@ -236,6 +239,7 @@ pub async fn run_drun(uo: DrunOptions) -> Result<(), String> {
Arc::clone(&cycles_account_manager),
Arc::clone(&state_manager) as Arc<_>,
state_manager.get_fd_factory(),
completed_execution_messages_tx,
)
.into_parts();

Expand Down
1 change: 1 addition & 0 deletions rs/execution_environment/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ BENCH_DEPENDENCIES = [
"@crate_index//:criterion",
"@crate_index//:lazy_static",
"@crate_index//:tempfile",
"@crate_index//:tokio",
"@crate_index//:wat",
]

Expand Down
2 changes: 2 additions & 0 deletions rs/execution_environment/benches/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ ic-test-utilities-execution-environment = { path = "../../../test_utilities/exec
ic-test-utilities-state = { path = "../../../test_utilities/state" }
ic-test-utilities-time = { path = "../../../test_utilities/time" }
ic-test-utilities-types = { path = "../../../test_utilities/types" }
ic-test-utilities = { path = "../../../test_utilities" }
ic-types = { path = "../../../types/types" }
ic-wasm-types = { path = "../../../types/wasm_types" }
lazy_static = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true }
wat = "1.0.52"
15 changes: 12 additions & 3 deletions rs/execution_environment/benches/lib/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::page_map::TestPageAllocatorFileDescriptorImpl;
use ic_replicated_state::{CallOrigin, CanisterState, NetworkTopology, ReplicatedState};
use ic_system_api::{ExecutionParameters, InstructionLimits};
use ic_test_utilities::state_manager::FakeStateManager;
use ic_test_utilities_execution_environment::generate_network_topology;
use ic_test_utilities_state::canister_from_exec_state;
use ic_test_utilities_types::ids::{canister_test_id, subnet_test_id, user_test_id};
Expand Down Expand Up @@ -278,9 +279,17 @@ where
SchedulerConfig::application_subnet().dirty_page_overhead,
Arc::new(TestPageAllocatorFileDescriptorImpl::new()),
));
let ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState>> = Arc::new(
IngressHistoryWriterImpl::new(config.clone(), log.clone(), &metrics_registry),
);

let (completed_execution_messages_tx, _) = tokio::sync::mpsc::channel(1);
let state_reader = Arc::new(FakeStateManager::new());
let ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState>> =
Arc::new(IngressHistoryWriterImpl::new(
config.clone(),
log.clone(),
&metrics_registry,
completed_execution_messages_tx,
state_reader,
));
let exec_env = ExecutionEnvironment::new(
log,
hypervisor,
Expand Down
5 changes: 5 additions & 0 deletions rs/execution_environment/src/canister_manager/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use ic_state_machine_tests::{StateMachineBuilder, StateMachineConfig};
use ic_system_api::{ExecutionParameters, InstructionLimits};
use ic_test_utilities::{
cycles_account_manager::CyclesAccountManagerBuilder,
state_manager::FakeStateManager,
universal_canister::{call_args, wasm, UNIVERSAL_CANISTER_WASM},
};
use ic_test_utilities_execution_environment::{
Expand Down Expand Up @@ -227,10 +228,14 @@ impl CanisterManagerBuilder {
fn build(self) -> CanisterManager {
let subnet_type = SubnetType::Application;
let metrics_registry = MetricsRegistry::new();
let state_reader = Arc::new(FakeStateManager::new());
let (completed_execution_messages_tx, _) = tokio::sync::mpsc::channel(1);
let ingress_history_writer = Arc::new(IngressHistoryWriterImpl::new(
Config::default(),
no_op_logger(),
&metrics_registry,
completed_execution_messages_tx,
state_reader,
));
let cycles_account_manager = Arc::new(self.cycles_account_manager);
let hypervisor = Hypervisor::new(
Expand Down
33 changes: 31 additions & 2 deletions rs/execution_environment/src/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use prometheus::{Histogram, HistogramVec};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use std::time::Instant;
use tokio::sync::mpsc::Sender;

/// Struct that implements the ingress history reader trait. Consumers of this
/// trait can use this to inspect the ingress history.
Expand Down Expand Up @@ -85,10 +86,18 @@ pub struct IngressHistoryWriterImpl {
message_state_transition_completed_wall_clock_duration_seconds: Histogram,
message_state_transition_failed_ic_duration_seconds: HistogramVec,
message_state_transition_failed_wall_clock_duration_seconds: HistogramVec,
completed_execution_messages_tx: Sender<(MessageId, Height)>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
}

impl IngressHistoryWriterImpl {
pub fn new(config: Config, log: ReplicaLogger, metrics_registry: &MetricsRegistry) -> Self {
pub fn new(
config: Config,
log: ReplicaLogger,
metrics_registry: &MetricsRegistry,
completed_execution_messages_tx: Sender<(MessageId, Height)>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
) -> Self {
Self {
config,
log,
Expand Down Expand Up @@ -126,7 +135,9 @@ impl IngressHistoryWriterImpl {
// The `user_error_code` label is internal information that provides more
// detail about the reason for rejection.
&["reject_code", "user_error_code"],
)
),
completed_execution_messages_tx,
state_reader
}
}
}
Expand Down Expand Up @@ -199,6 +210,24 @@ impl IngressHistoryWriter for IngressHistoryWriterImpl {
_ => {}
};

if let IngressStatus::Known { state, .. } = &status {
if state.is_terminal() {
// We want to send the height of the replicated state where
// ingress message went into a terminal state.
//
// latest_state_height() will return the height of the last committed state, `H`.
// The ingress message will have completed execution AND be updated to a terminal state from the next state, `H+1`.
let last_committed_height = self.state_reader.latest_state_height();
let completed_execution_and_updated_to_terminal_state: Height =
last_committed_height + Height::from(1);

let _ = self.completed_execution_messages_tx.try_send((
message_id.clone(),
completed_execution_and_updated_to_terminal_state,
));
}
};

state.set_ingress_status(
message_id,
status,
Expand Down
9 changes: 8 additions & 1 deletion rs/execution_environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ use ic_query_stats::QueryStatsPayloadBuilderParams;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::page_map::PageAllocatorFileDescriptor;
use ic_replicated_state::{CallOrigin, NetworkTopology, ReplicatedState};
use ic_types::{messages::CallContextId, SubnetId};
use ic_types::{
messages::{CallContextId, MessageId},
Height, SubnetId,
};
pub use metrics::IngressFilterMetrics;
pub use query_handler::InternalHttpQueryHandler;
use query_handler::{HttpQueryHandler, QueryScheduler, QuerySchedulerFlag};
pub use scheduler::RoundSchedule;
use scheduler::SchedulerImpl;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;

/// When executing a wasm method of query type, this enum indicates if we are
/// running in an replicated or non-replicated context. This information is
Expand Down Expand Up @@ -98,6 +102,7 @@ impl ExecutionServices {
cycles_account_manager: Arc<CyclesAccountManager>,
state_reader: Arc<dyn StateReader<State = ReplicatedState>>,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
completed_execution_messages_tx: Sender<(MessageId, Height)>,
) -> ExecutionServices {
let hypervisor = Arc::new(Hypervisor::new(
config.clone(),
Expand All @@ -114,6 +119,8 @@ impl ExecutionServices {
config.clone(),
logger.clone(),
metrics_registry,
completed_execution_messages_tx,
Arc::clone(&state_reader),
));
let ingress_history_reader =
Box::new(IngressHistoryReaderImpl::new(Arc::clone(&state_reader)));
Expand Down
13 changes: 11 additions & 2 deletions rs/execution_environment/src/scheduler/test_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use ic_system_api::{
sandbox_safe_system_state::{SandboxSafeSystemState, SystemStateChanges},
ApiType, ExecutionParameters,
};
use ic_test_utilities::state_manager::FakeStateManager;
use ic_test_utilities_execution_environment::{generate_subnets, test_registry_settings};
use ic_test_utilities_state::CanisterStateBuilder;
use ic_test_utilities_types::{
Expand Down Expand Up @@ -862,10 +863,18 @@ impl SchedulerTestBuilder {
SchedulerConfig::application_subnet().dirty_page_overhead,
);
let hypervisor = Arc::new(hypervisor);
let ingress_history_writer =
IngressHistoryWriterImpl::new(config.clone(), self.log.clone(), &self.metrics_registry);
let (completed_execution_messages_tx, _) = tokio::sync::mpsc::channel(1);
let state_reader = Arc::new(FakeStateManager::new());
let ingress_history_writer = IngressHistoryWriterImpl::new(
config.clone(),
self.log.clone(),
&self.metrics_registry,
completed_execution_messages_tx,
state_reader,
);
let ingress_history_writer: Arc<dyn IngressHistoryWriter<State = ReplicatedState>> =
Arc::new(ingress_history_writer);

let exec_env = ExecutionEnvironment::new(
self.log.clone(),
hypervisor,
Expand Down
92 changes: 88 additions & 4 deletions rs/execution_environment/tests/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ic_interfaces_state_manager_mocks::MockStateManager;
use ic_metrics::MetricsRegistry;
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::ReplicatedState;
use ic_test_utilities::state_manager::FakeStateManager;
use ic_test_utilities_logger::with_test_replica_logger;
use ic_test_utilities_types::ids::{
canister_test_id, message_test_id, subnet_test_id, user_test_id,
Expand All @@ -16,6 +17,9 @@ use ic_types::{
time::UNIX_EPOCH,
Height,
};
use std::sync::Arc;
use tokio::sync::mpsc::{channel, error::TryRecvError};

use IngressStatus::*;

#[test]
Expand Down Expand Up @@ -89,12 +93,85 @@ fn valid_transitions() -> Vec<(IngressStatus, Vec<IngressStatus>)> {
]
}

/// Tests that [`IngressHistoryWriterImpl`] transmits the height of the last committed state + 1
/// for messages that complete execution.
#[test]
fn test_terminal_states_are_transmitted() {
with_test_replica_logger(|log| {
let mut state_manager = MockStateManager::new();

let last_committed_state_height = Height::new(10);
state_manager
.expect_latest_state_height()
.return_const(last_committed_state_height);

let mut state = ReplicatedState::new(subnet_test_id(1), SubnetType::Application);
let (completed_execution_messages_tx, mut completed_execution_messages_rx) = channel(100);
let ingress_history_writer = IngressHistoryWriterImpl::new(
Config::default(),
log,
&MetricsRegistry::new(),
completed_execution_messages_tx,
Arc::new(state_manager),
);
let message_id = message_test_id(1);

ingress_history_writer.set_status(&mut state, message_id.clone(), received());
assert_eq!(
completed_execution_messages_rx.try_recv(),
Err(TryRecvError::Empty),
"Non terminal state should not trigger a transmission."
);

ingress_history_writer.set_status(&mut state, message_id.clone(), processing());
assert_eq!(
completed_execution_messages_rx.try_recv(),
Err(TryRecvError::Empty),
"Non terminal state should not trigger a transmission."
);

{
let mut state = state.clone();
ingress_history_writer.set_status(&mut state, message_id.clone(), completed());
assert_eq!(
completed_execution_messages_rx.try_recv(),
Ok((
message_id.clone(),
last_committed_state_height + Height::from(1)
)),
"Terminal state, `Completed`, should trigger the height of state to be sent"
);
}

{
let mut state = state.clone();
ingress_history_writer.set_status(&mut state, message_id.clone(), failed());
assert_eq!(
completed_execution_messages_rx.try_recv(),
Ok((
message_id.clone(),
last_committed_state_height + Height::from(1)
)),
"Terminal state, `Failed`, should trigger the height of state to be sent"
);
}
})
}

#[test]
fn test_valid_transitions() {
with_test_replica_logger(|log| {
let state = ReplicatedState::new(subnet_test_id(1), SubnetType::Application);
let ingress_history_writer =
IngressHistoryWriterImpl::new(Config::default(), log, &MetricsRegistry::new());

let state_reader = Arc::new(FakeStateManager::new());
let (completed_execution_messages_tx, _) = channel(1);
let ingress_history_writer = IngressHistoryWriterImpl::new(
Config::default(),
log,
&MetricsRegistry::new(),
completed_execution_messages_tx,
state_reader,
);
let message_id = message_test_id(1);

for (origin_state, next_states) in valid_transitions().into_iter() {
Expand All @@ -117,8 +194,15 @@ fn test_valid_transitions() {
#[test]
fn test_invalid_transitions() {
with_test_replica_logger(|log| {
let ingress_history_writer =
IngressHistoryWriterImpl::new(Config::default(), log, &MetricsRegistry::new());
let state_reader = Arc::new(FakeStateManager::new());
let (completed_execution_messages_tx, _) = channel(1);
let ingress_history_writer = IngressHistoryWriterImpl::new(
Config::default(),
log,
&MetricsRegistry::new(),
completed_execution_messages_tx,
state_reader,
);
let message_id = message_test_id(1);

// creates a set of valid transitions
Expand Down
Loading

0 comments on commit 6a919d0

Please sign in to comment.