Skip to content

Commit

Permalink
feat: [MR-627] Time out messages in subnet queues (#2708)
Browse files Browse the repository at this point in the history
Time out best-effort requests in subnet input queues and responses in
subnet output queues.

The management canister does not make outgoing calls as itself, so there
are no requests in subnet output queues or responses in subnet input
queues. Meaning that there are also no `SubnetCallContextManager`
callbacks to time out.

We already shed messages from subnet queues, so this is the last missing
piece of the puzzle in terms of best-effort message shedding and
expiration.
  • Loading branch information
alin-at-dfinity authored Nov 21, 2024
1 parent 3d01d63 commit 13006ee
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
18 changes: 18 additions & 0 deletions rs/replicated_state/src/replicated_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,16 @@ pub struct ReplicatedState {
pub metadata: SystemMetadata,

/// Queues for holding messages sent/received by the subnet.
///
/// The Management Canister does not make outgoing calls as itself (it does so
/// on behalf of canisters, but those messages are enqueued in the canister's
/// output queue). Therefore, there's only a `push_subnet_output_response()`
/// method (no equivalent for requests) and an explicit check against inducting
/// responses into the subnet queues. This assumption is used in a number of
/// places (e.g. when shedding or timing out messages), so adding support for
/// outgoing calls in the future will likely require significant changes across
/// `ReplicatedState` and `SystemState`.
//
// Must remain private.
#[validate_eq(CompareWithValidateEq)]
subnet_queues: CanisterQueues,
Expand Down Expand Up @@ -953,6 +963,14 @@ impl ReplicatedState {
self.canister_states.insert(canister_id, canister);
}

if self.subnet_queues.has_expired_deadlines(current_time) {
timed_out_messages_count += self.subnet_queues.time_out_messages(
current_time,
&self.metadata.own_subnet_id.into(),
&self.canister_states,
);
}

timed_out_messages_count
}

Expand Down
50 changes: 39 additions & 11 deletions rs/replicated_state/tests/replicated_state.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use assert_matches::assert_matches;
use ic_base_types::{CanisterId, NumBytes, NumSeconds, PrincipalId, SubnetId};
use ic_btc_interface::Network;
use ic_btc_replica_types::{
Expand All @@ -11,13 +12,18 @@ use ic_management_canister_types::{
};
use ic_registry_routing_table::{CanisterIdRange, RoutingTable};
use ic_registry_subnet_type::SubnetType;
use ic_replicated_state::metadata_state::subnet_call_context_manager::BitcoinSendTransactionInternalContext;
use ic_replicated_state::canister_state::execution_state::{
CustomSection, CustomSectionType, WasmMetadata,
};
use ic_replicated_state::metadata_state::subnet_call_context_manager::{
BitcoinGetSuccessorsContext, BitcoinSendTransactionInternalContext, SubnetCallContext,
};
use ic_replicated_state::replicated_state::testing::ReplicatedStateTesting;
use ic_replicated_state::replicated_state::{
MemoryTaken, PeekableOutputIterator, ReplicatedStateMessageRouting,
};
use ic_replicated_state::testing::{CanisterQueuesTesting, SystemStateTesting};
use ic_replicated_state::{
canister_state::execution_state::{CustomSection, CustomSectionType, WasmMetadata},
metadata_state::subnet_call_context_manager::{BitcoinGetSuccessorsContext, SubnetCallContext},
replicated_state::{MemoryTaken, PeekableOutputIterator, ReplicatedStateMessageRouting},
CanisterState, IngressHistoryState, InputSource, ReplicatedState, SchedulerState, StateError,
SystemState,
};
Expand All @@ -26,14 +32,12 @@ use ic_test_utilities_types::ids::{canister_test_id, message_test_id, user_test_
use ic_test_utilities_types::messages::{RequestBuilder, ResponseBuilder};
use ic_types::ingress::{IngressState, IngressStatus};
use ic_types::messages::{CallbackId, RejectContext};
use ic_types::time::CoarseTime;
use ic_types::{
messages::{
CanisterMessage, Payload, Request, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES,
},
time::UNIX_EPOCH,
CountBytes, Cycles, MemoryAllocation, Time,
use ic_types::messages::{
CanisterMessage, Payload, Request, RequestOrResponse, Response, MAX_RESPONSE_COUNT_BYTES,
};
use ic_types::time::CoarseTime;
use ic_types::time::UNIX_EPOCH;
use ic_types::{CountBytes, Cycles, MemoryAllocation, Time};
use maplit::btreemap;
use proptest::prelude::*;
use std::collections::{BTreeMap, VecDeque};
Expand Down Expand Up @@ -713,6 +717,30 @@ fn time_out_messages_updates_subnet_input_schedules_correctly() {
);
}

#[test]
fn time_out_messages_in_subnet_queues() {
let mut fixture = ReplicatedStateFixture::new();

// Enqueue 2 incoming best-effort requests for `SUBNET_ID`.
for i in 0..2 {
let mut request = request_to(SUBNET_ID.into());
request.deadline = CoarseTime::from_secs_since_unix_epoch(1000 + i as u32);
fixture.push_input(request.into()).unwrap();
}

// Time out the first request.
let second_request_deadline = CoarseTime::from_secs_since_unix_epoch(1001);
fixture.state.metadata.batch_time = second_request_deadline.into();
assert_eq!(1, fixture.state.time_out_messages());

// Second request should still be in the queue.
assert_matches!(
fixture.state.pop_subnet_input(),
Some(CanisterMessage::Request(request)) if request.deadline == second_request_deadline
);
assert_eq!(None, fixture.state.pop_subnet_input());
}

#[test]
fn enforce_best_effort_message_limit() {
let mut fixture = ReplicatedStateFixture::with_canisters(&[CANISTER_ID, OTHER_CANISTER_ID]);
Expand Down

0 comments on commit 13006ee

Please sign in to comment.