From 13006ee4d9f1c234a38b2f7b1cc5d41e677a9247 Mon Sep 17 00:00:00 2001 From: Alin Sinpalean <58422065+alin-at-dfinity@users.noreply.github.com> Date: Thu, 21 Nov 2024 19:00:05 +0100 Subject: [PATCH] feat: [MR-627] Time out messages in subnet queues (#2708) Time out best-effort requests in subnet input queues and responses in subnet output queues. The management canister does not make outgoing calls as itself, so there are no requests in subnet output queues or responses in subnet input queues. Meaning that there are also no `SubnetCallContextManager` callbacks to time out. We already shed messages from subnet queues, so this is the last missing piece of the puzzle in terms of best-effort message shedding and expiration. --- rs/replicated_state/src/replicated_state.rs | 18 +++++++ rs/replicated_state/tests/replicated_state.rs | 50 +++++++++++++++---- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/rs/replicated_state/src/replicated_state.rs b/rs/replicated_state/src/replicated_state.rs index 459876596e9..ac4fbf91ba6 100644 --- a/rs/replicated_state/src/replicated_state.rs +++ b/rs/replicated_state/src/replicated_state.rs @@ -387,6 +387,16 @@ pub struct ReplicatedState { pub metadata: SystemMetadata, /// Queues for holding messages sent/received by the subnet. + /// + /// The Management Canister does not make outgoing calls as itself (it does so + /// on behalf of canisters, but those messages are enqueued in the canister's + /// output queue). Therefore, there's only a `push_subnet_output_response()` + /// method (no equivalent for requests) and an explicit check against inducting + /// responses into the subnet queues. This assumption is used in a number of + /// places (e.g. when shedding or timing out messages), so adding support for + /// outgoing calls in the future will likely require significant changes across + /// `ReplicatedState` and `SystemState`. + // // Must remain private. #[validate_eq(CompareWithValidateEq)] subnet_queues: CanisterQueues, @@ -953,6 +963,14 @@ impl ReplicatedState { self.canister_states.insert(canister_id, canister); } + if self.subnet_queues.has_expired_deadlines(current_time) { + timed_out_messages_count += self.subnet_queues.time_out_messages( + current_time, + &self.metadata.own_subnet_id.into(), + &self.canister_states, + ); + } + timed_out_messages_count } diff --git a/rs/replicated_state/tests/replicated_state.rs b/rs/replicated_state/tests/replicated_state.rs index 9025b3a12de..2b47fa9d8e7 100644 --- a/rs/replicated_state/tests/replicated_state.rs +++ b/rs/replicated_state/tests/replicated_state.rs @@ -1,3 +1,4 @@ +use assert_matches::assert_matches; use ic_base_types::{CanisterId, NumBytes, NumSeconds, PrincipalId, SubnetId}; use ic_btc_interface::Network; use ic_btc_replica_types::{ @@ -11,13 +12,18 @@ use ic_management_canister_types::{ }; use ic_registry_routing_table::{CanisterIdRange, RoutingTable}; use ic_registry_subnet_type::SubnetType; -use ic_replicated_state::metadata_state::subnet_call_context_manager::BitcoinSendTransactionInternalContext; +use ic_replicated_state::canister_state::execution_state::{ + CustomSection, CustomSectionType, WasmMetadata, +}; +use ic_replicated_state::metadata_state::subnet_call_context_manager::{ + BitcoinGetSuccessorsContext, BitcoinSendTransactionInternalContext, SubnetCallContext, +}; use ic_replicated_state::replicated_state::testing::ReplicatedStateTesting; +use ic_replicated_state::replicated_state::{ + MemoryTaken, PeekableOutputIterator, ReplicatedStateMessageRouting, +}; use ic_replicated_state::testing::{CanisterQueuesTesting, SystemStateTesting}; use ic_replicated_state::{ - canister_state::execution_state::{CustomSection, CustomSectionType, WasmMetadata}, - metadata_state::subnet_call_context_manager::{BitcoinGetSuccessorsContext, SubnetCallContext}, - replicated_state::{MemoryTaken, PeekableOutputIterator, ReplicatedStateMessageRouting}, CanisterState, IngressHistoryState, InputSource, ReplicatedState, SchedulerState, StateError, SystemState, }; @@ -26,14 +32,12 @@ use ic_test_utilities_types::ids::{canister_test_id, message_test_id, user_test_ use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder}; use ic_types::ingress::{IngressState, IngressStatus}; use ic_types::messages::{CallbackId, RejectContext}; -use ic_types::time::CoarseTime; -use ic_types::{ - messages::{ - CanisterMessage, Payload, Request, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES, - }, - time::UNIX_EPOCH, - CountBytes, Cycles, MemoryAllocation, Time, +use ic_types::messages::{ + CanisterMessage, Payload, Request, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES, }; +use ic_types::time::CoarseTime; +use ic_types::time::UNIX_EPOCH; +use ic_types::{CountBytes, Cycles, MemoryAllocation, Time}; use maplit::btreemap; use proptest::prelude::*; use std::collections::{BTreeMap, VecDeque}; @@ -713,6 +717,30 @@ fn time_out_messages_updates_subnet_input_schedules_correctly() { ); } +#[test] +fn time_out_messages_in_subnet_queues() { + let mut fixture = ReplicatedStateFixture::new(); + + // Enqueue 2 incoming best-effort requests for `SUBNET_ID`. + for i in 0..2 { + let mut request = request_to(SUBNET_ID.into()); + request.deadline = CoarseTime::from_secs_since_unix_epoch(1000 + i as u32); + fixture.push_input(request.into()).unwrap(); + } + + // Time out the first request. + let second_request_deadline = CoarseTime::from_secs_since_unix_epoch(1001); + fixture.state.metadata.batch_time = second_request_deadline.into(); + assert_eq!(1, fixture.state.time_out_messages()); + + // Second request should still be in the queue. + assert_matches!( + fixture.state.pop_subnet_input(), + Some(CanisterMessage::Request(request)) if request.deadline == second_request_deadline + ); + assert_eq!(None, fixture.state.pop_subnet_input()); +} + #[test] fn enforce_best_effort_message_limit() { let mut fixture = ReplicatedStateFixture::with_canisters(&[CANISTER_ID, OTHER_CANISTER_ID]);