diff --git a/.github/buildomat/jobs/test-ds.sh b/.github/buildomat/jobs/test-ds.sh index ff217f781..eee478c73 100755 --- a/.github/buildomat/jobs/test-ds.sh +++ b/.github/buildomat/jobs/test-ds.sh @@ -39,6 +39,7 @@ for t in "$input/bins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 banner test_ds ptime -m bash "$input/scripts/test_ds.sh" diff --git a/.github/buildomat/jobs/test-live-repair.sh b/.github/buildomat/jobs/test-live-repair.sh index 61d702dc6..5dfcf9694 100644 --- a/.github/buildomat/jobs/test-live-repair.sh +++ b/.github/buildomat/jobs/test-live-repair.sh @@ -78,6 +78,7 @@ for t in "$input/bins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 echo "BINDIR is $BINDIR" echo "bindir contains:" diff --git a/.github/buildomat/jobs/test-memory.sh b/.github/buildomat/jobs/test-memory.sh index 6c5659d98..fb83051d5 100755 --- a/.github/buildomat/jobs/test-memory.sh +++ b/.github/buildomat/jobs/test-memory.sh @@ -39,6 +39,7 @@ for t in "$input/rbins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 banner setup pfexec plimit -n 9123456 $$ diff --git a/.github/buildomat/jobs/test-region-create.sh b/.github/buildomat/jobs/test-region-create.sh index c63f134f0..c82feebbc 100755 --- a/.github/buildomat/jobs/test-region-create.sh +++ b/.github/buildomat/jobs/test-region-create.sh @@ -39,6 +39,7 @@ for t in "$input/rbins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 banner region pfexec plimit -n 9123456 $$ diff --git a/.github/buildomat/jobs/test-repair.sh b/.github/buildomat/jobs/test-repair.sh index 48b1e6354..f66cd8721 100644 --- a/.github/buildomat/jobs/test-repair.sh +++ b/.github/buildomat/jobs/test-repair.sh @@ -45,6 +45,7 @@ for t in "$input/bins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 echo "Setup self timeout" # Give this test two hours to finish diff --git a/.github/buildomat/jobs/test-replay.sh b/.github/buildomat/jobs/test-replay.sh index a4bdfe9af..d87f05b93 100644 --- a/.github/buildomat/jobs/test-replay.sh +++ b/.github/buildomat/jobs/test-replay.sh @@ -43,6 +43,7 @@ for t in "$input/bins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 banner setup echo "Setup self timeout" diff --git a/.github/buildomat/jobs/test-up-encrypted.sh b/.github/buildomat/jobs/test-up-encrypted.sh index 8ff024c9e..b8a1b3f96 100644 --- a/.github/buildomat/jobs/test-up-encrypted.sh +++ b/.github/buildomat/jobs/test-up-encrypted.sh @@ -41,6 +41,7 @@ for t in "$input/bins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 # Give this test one hour to finish jobpid=$$; (sleep $(( 60 * 60 )); banner fail-timeout; ps -ef; zfs list;kill $jobpid) & diff --git a/.github/buildomat/jobs/test-up-unencrypted.sh b/.github/buildomat/jobs/test-up-unencrypted.sh index ea4f74049..55ace65f9 100644 --- a/.github/buildomat/jobs/test-up-unencrypted.sh +++ b/.github/buildomat/jobs/test-up-unencrypted.sh @@ -41,6 +41,7 @@ for t in "$input/bins/"*.gz; do done export BINDIR=/var/tmp/bins +export RUST_BACKTRACE=1 # Give this test two hours to finish jobpid=$$; (sleep $(( 120 * 60 )); banner fail-timeout; ps -ef; zfs list;kill $jobpid) & diff --git a/cmon/src/main.rs b/cmon/src/main.rs index bc1e2371e..21627b2b5 100644 --- a/cmon/src/main.rs +++ b/cmon/src/main.rs @@ -8,7 +8,9 @@ use strum::IntoEnumIterator; use strum_macros::EnumIter; use tokio::time::{sleep, Duration}; -use crucible::{Arg, ClientStopReason, DsState}; +use crucible::{ + Arg, ClientStopReason, ConnectionMode, DsState, NegotiationState, +}; /// Connect to crucible control server #[derive(Parser, Debug)] @@ -87,24 +89,46 @@ enum Action { // Translate a DsState into a three letter string for printing. fn short_state(dss: DsState) -> String { match dss { - DsState::New - | DsState::Stopping(ClientStopReason::NegotiationFailed(..)) => { - "NEW".to_string() - } - DsState::WaitActive => "WAC".to_string(), - DsState::WaitQuorum => "WAQ".to_string(), - DsState::Reconcile => "REC".to_string(), + DsState::Connecting { + state: NegotiationState::WaitActive, + .. + } => "WAC".to_string(), + + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } => "WAQ".to_string(), + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } => "REC".to_string(), DsState::Active => "ACT".to_string(), - DsState::Faulted | DsState::Stopping(ClientStopReason::Fault(..)) => { - "FLT".to_string() + DsState::Connecting { + state: NegotiationState::LiveRepairReady, + .. + } => "LRR".to_string(), + DsState::Stopping(ClientStopReason::NegotiationFailed(..)) + | DsState::Connecting { + mode: ConnectionMode::New, + .. + } => "NEW".to_string(), + DsState::Connecting { + mode: ConnectionMode::Faulted, + .. } - DsState::LiveRepairReady => "LRR".to_string(), + | DsState::Stopping(ClientStopReason::Fault(..)) => "FLT".to_string(), DsState::LiveRepair => "LR".to_string(), - DsState::Offline => "OFF".to_string(), + DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } => "OFF".to_string(), DsState::Stopping(ClientStopReason::Deactivated) => "DAV".to_string(), DsState::Stopping(ClientStopReason::Disabled) => "DIS".to_string(), - DsState::Stopping(ClientStopReason::Replacing) => "RPC".to_string(), - DsState::Replaced => "RPD".to_string(), + DsState::Stopping(ClientStopReason::Replacing) + | DsState::Connecting { + mode: ConnectionMode::Replaced, + .. + } => "RPD".to_string(), } } diff --git a/common/src/lib.rs b/common/src/lib.rs index cf91c1ab6..094fb5fcb 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -133,9 +133,6 @@ pub enum CrucibleError { #[error("Repair stream error {0}")] RepairStreamError(String), - #[error("Generation number is too low: {0}")] - GenerationNumberTooLow(String), - #[error("Active with different generation number")] GenerationNumberInvalid, @@ -165,6 +162,9 @@ pub enum CrucibleError { #[error("Incompatible RegionDefinition {0}")] RegionIncompatible(String), + + #[error("Negotiation error: {0}")] + NegotiationError(NegotiationError), } impl From for CrucibleError { @@ -197,6 +197,61 @@ impl From> for CrucibleError { } } +impl From for CrucibleError { + fn from(value: NegotiationError) -> Self { + CrucibleError::NegotiationError(value) + } +} + +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive( + thiserror::Error, + Debug, + PartialEq, + Clone, + Copy, + Serialize, + Deserialize, + JsonSchema, +)] +pub enum NegotiationError { + #[error("Message received out of order")] + OutOfOrder, + + #[error("Generation 0 is illegal")] + GenerationZeroIsIllegal, + + #[error( + "Generation number is too low: requested {requested}, found {actual}" + )] + GenerationNumberTooLow { requested: u64, actual: u64 }, + + #[error("Incompatible message version: wanted {expected}, got {actual}")] + IncompatibleVersion { expected: u32, actual: u32 }, + + #[error( + "Incompatible encryption settings: wanted {expected}, got {actual}" + )] + EncryptionMismatch { expected: bool, actual: bool }, + + #[error( + "Incompatible read-only settings: wanted {expected}, got {actual}" + )] + ReadOnlyMismatch { expected: bool, actual: bool }, + + #[error("Incompatible upstairs ID: wanted {expected}, got {actual}")] + UpstairsIdMismatch { + expected: uuid::Uuid, + actual: uuid::Uuid, + }, + + #[error("Incompatible session ID: wanted {expected}, got {actual}")] + SessionIdMismatch { + expected: uuid::Uuid, + actual: uuid::Uuid, + }, +} + #[macro_export] macro_rules! crucible_bail { ($i:ident) => { return Err(CrucibleError::$i) }; @@ -394,7 +449,6 @@ impl From for dropshot::HttpError { | CrucibleError::DecryptionError | CrucibleError::Disconnect | CrucibleError::EncryptionError(_) - | CrucibleError::GenerationNumberTooLow(_) | CrucibleError::GenerationNumberInvalid | CrucibleError::GenericError(_) | CrucibleError::HashMismatch @@ -417,7 +471,8 @@ impl From for dropshot::HttpError { | CrucibleError::MissingContextSlot(..) | CrucibleError::BadMetadata(..) | CrucibleError::BadContextSlot(..) - | CrucibleError::MissingBlockContext => { + | CrucibleError::MissingBlockContext + | CrucibleError::NegotiationError(..) => { dropshot::HttpError::for_internal_error(e.to_string()) } } diff --git a/openapi/crucible-control.json b/openapi/crucible-control.json index 4ff8f0e10..4dd61e982 100644 --- a/openapi/crucible-control.json +++ b/openapi/crucible-control.json @@ -136,10 +136,24 @@ ] }, { - "description": "Negotiation says that we are incompatible", + "description": "Negotiation says that our message versions are incompatible", "type": "string", "enum": [ - "incompatible" + "incompatible_version" + ] + }, + { + "description": "Negotiation says that our session IDs are incompatible", + "type": "string", + "enum": [ + "incompatible_session" + ] + }, + { + "description": "Negotiation says that region settings are incompatible", + "type": "string", + "enum": [ + "incompatible_settings" ] } ] @@ -232,6 +246,38 @@ } ] }, + "ConnectionMode": { + "oneOf": [ + { + "description": "Connect through reconciliation once a quorum has come online", + "type": "string", + "enum": [ + "new" + ] + }, + { + "description": "Replay cached jobs when reconnecting", + "type": "string", + "enum": [ + "offline" + ] + }, + { + "description": "Reconnect through live-repair", + "type": "string", + "enum": [ + "faulted" + ] + }, + { + "description": "Reconnect through live-repair; the address is allowed to change", + "type": "string", + "enum": [ + "replaced" + ] + } + ] + }, "DownstairsWork": { "description": "`DownstairsWork` holds the information gathered from the downstairs", "type": "object", @@ -255,28 +301,47 @@ ] }, "DsState": { + "description": "High-level states for a Downstairs\n\nThe state machine for a Downstairs is relatively simple:\n\n```text ┌────────────┐ ┌────► LiveRepair ├─────┐ start ┌─────────┴┐ └─────┬──────┘ ┌─▼──────┐ ────►│Connecting│ │ │Stopping│ └─▲───────┬┘ ┌─────▼──────┐ └─▲────┬─┘ │ └────► Active ├─────┘ │ │ └─────┬──────┘ │ │ │ │ └─────────────────◄┴─────────────────┘ ```\n\nComplexity is hidden in the `Connecting` state, which wraps a [`NegotiationState`] implementing the negotiation state machine.", "oneOf": [ { + "description": "New connection", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "new" + "connecting" + ] + }, + "value": { + "type": "object", + "properties": { + "mode": { + "$ref": "#/components/schemas/ConnectionMode" + }, + "state": { + "$ref": "#/components/schemas/NegotiationState" + } + }, + "required": [ + "mode", + "state" ] } }, "required": [ - "type" + "type", + "value" ] }, { + "description": "Ready for and/or currently receiving IO", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "wait_active" + "active" ] } }, @@ -285,12 +350,13 @@ ] }, { + "description": "This downstairs is undergoing LiveRepair", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "wait_quorum" + "live_repair" ] } }, @@ -299,26 +365,83 @@ ] }, { + "description": "The IO task for the client is being stopped", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "reconcile" + "stopping" ] + }, + "value": { + "$ref": "#/components/schemas/ClientStopReason" } }, "required": [ - "type" + "type", + "value" + ] + } + ] + }, + "Error": { + "description": "Error information from a response.", + "type": "object", + "properties": { + "error_code": { + "type": "string" + }, + "message": { + "type": "string" + }, + "request_id": { + "type": "string" + } + }, + "required": [ + "message", + "request_id" + ] + }, + "NegotiationState": { + "description": "Tracks client negotiation progress\n\nThe exact path through negotiation depends on the [`ConnectionMode`].\n\nThere are three main paths, shown below:\n\n```text ┌───────┐ │ Start ├────────┐ └───┬───┘ │ │ │ ┌─────▼──────┐ │ │ WaitActive │ │ auto-promote └─────┬──────┘ │ │ │ ┌───────▼────────┐ │ │ WaitForPromote ◄───┘ └───────┬────────┘ │ ┌────────▼──────────┐ │ WaitForRegionInfo │ └──┬──────────────┬─┘ Offline │ │ New / Faulted / Replaced ┌──────▼─────┐ ┌────▼────────────┐ │GetLastFlush│ │GetExtentVersions│ └──────┬─────┘ └─┬─────────────┬─┘ │ │ New │ Faulted / Replaced │ ┌──────▼───┐ ┌────▼──────────┐ │ │WaitQuorum│ │LiveRepairReady│ │ └────┬─────┘ └────┬──────────┘ │ │ │ │ ┌────▼────┐ │ │ │Reconcile│ │ │ └────┬────┘ │ │ │ │ │ ┌───▼──┐ │ └─────► Done ◄────────────┘ └──────┘ ```\n\n`Done` isn't actually present in the state machine; it's indicated by returning a [`NegotiationResult`] other than [`NegotiationResult::NotDone`].", + "oneOf": [ + { + "description": "Initial state, waiting to hear `YesItsMe` from the client\n\nOnce this message is heard, transitions to either `WaitActive` (if `auto_promote` is `false`) or `WaitQuorum` (if `auto_promote` is `true`)", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "start" + ] + }, + "value": { + "type": "object", + "properties": { + "auto_promote": { + "type": "boolean" + } + }, + "required": [ + "auto_promote" + ] + } + }, + "required": [ + "type", + "value" ] }, { + "description": "Waiting for activation by the guest", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "active" + "wait_active" ] } }, @@ -327,12 +450,13 @@ ] }, { + "description": "Waiting to hear `YouAreNowActive` from the client", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "faulted" + "wait_for_promote" ] } }, @@ -341,12 +465,13 @@ ] }, { + "description": "Waiting to hear `RegionInfo` from the client", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "live_repair_ready" + "wait_for_region_info" ] } }, @@ -355,12 +480,13 @@ ] }, { + "description": "Waiting to hear `LastFlushAck` from the client", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "live_repair" + "get_last_flush" ] } }, @@ -369,12 +495,13 @@ ] }, { + "description": "Waiting to hear `ExtentVersions` from the client", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "offline" + "get_extent_versions" ] } }, @@ -383,12 +510,13 @@ ] }, { + "description": "Waiting for the minimum number of downstairs to be present.", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "replaced" + "wait_quorum" ] } }, @@ -397,43 +525,35 @@ ] }, { - "description": "The IO task for the client is being stopped", + "description": "Initial startup, downstairs are repairing from each other.", "type": "object", "properties": { "type": { "type": "string", "enum": [ - "stopping" + "reconcile" ] - }, - "value": { - "$ref": "#/components/schemas/ClientStopReason" } }, "required": [ - "type", - "value" + "type" ] - } - ] - }, - "Error": { - "description": "Error information from a response.", - "type": "object", - "properties": { - "error_code": { - "type": "string" - }, - "message": { - "type": "string" }, - "request_id": { - "type": "string" + { + "description": "Waiting for live-repair to begin", + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "live_repair_ready" + ] + } + }, + "required": [ + "type" + ] } - }, - "required": [ - "message", - "request_id" ] }, "UpState": { diff --git a/upstairs/src/client.rs b/upstairs/src/client.rs index 4b2f9b3a0..416c185c9 100644 --- a/upstairs/src/client.rs +++ b/upstairs/src/client.rs @@ -2,11 +2,13 @@ use crate::{ cdt, integrity_hash, io_limits::ClientIOLimits, live_repair::ExtentInfo, upstairs::UpstairsConfig, upstairs::UpstairsState, ClientIOStateCount, - ClientId, CrucibleDecoder, CrucibleError, DownstairsIO, DsState, - EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse, + ClientId, ConnectionMode, CrucibleDecoder, CrucibleError, DownstairsIO, + DsState, EncryptionContext, IOState, IOop, JobId, Message, RawReadResponse, ReconcileIO, ReconcileIOState, RegionDefinitionStatus, RegionMetadata, }; -use crucible_common::{x509::TLSContext, ExtentId, VerboseTimeout}; +use crucible_common::{ + x509::TLSContext, ExtentId, NegotiationError, VerboseTimeout, +}; use crucible_protocol::{ MessageWriter, ReconciliationId, CRUCIBLE_MESSAGE_VERSION, }; @@ -26,7 +28,10 @@ use serde::{Deserialize, Serialize}; use slog::{debug, error, info, o, warn, Logger}; use tokio::{ net::{TcpSocket, TcpStream}, - sync::{mpsc, oneshot}, + sync::{ + mpsc, + oneshot::{self, error::RecvError}, + }, time::{sleep, sleep_until, Duration, Instant}, }; use tokio_util::codec::FramedRead; @@ -142,9 +147,6 @@ pub(crate) struct DownstairsClient { /// This is set to `None` during initialization pub(crate) repair_addr: Option, - /// Flag indicating that the Upstairs should replay jobs to this client - needs_replay: bool, - /// TLS context (if present) /// /// This is passed as a pointer to minimize copies @@ -181,12 +183,6 @@ pub(crate) struct DownstairsClient { /// Accumulated statistics pub(crate) stats: DownstairsStats, - /// State for the "promote to active" action - promote_state: Option, - - /// State for startup negotiation - negotiation_state: NegotiationState, - /// Session ID for a clients connection to a downstairs. connection_id: ConnectionId, @@ -218,57 +214,16 @@ impl DownstairsClient { client_id, io_limits, region_uuid: None, - needs_replay: false, - negotiation_state: NegotiationState::Start, tls_context, - promote_state: None, log, target_addr, repair_addr: None, - state: DsState::New, - last_flush: JobId(0), - stats: DownstairsStats::default(), - skipped_jobs: BTreeSet::new(), - region_metadata: None, - repair_info: None, - io_state_job_count: ClientIOStateCount::default(), - io_state_byte_count: ClientIOStateCount::default(), - connection_id: ConnectionId(0), - client_delay_us, - } - } - - /// Builds a minimal `DownstairsClient` for testing - /// - /// The resulting client has no target address; any packets sent by the - /// client will disappear into the void. - #[cfg(test)] - fn test_default() -> Self { - let client_delay_us = Arc::new(AtomicU64::new(0)); - let cfg = Arc::new(UpstairsConfig { - encryption_context: None, - upstairs_id: Uuid::new_v4(), - session_id: Uuid::new_v4(), - generation: std::sync::atomic::AtomicU64::new(1), - read_only: false, - }); - Self { - cfg, - client_task: Self::new_dummy_task(false), - client_id: ClientId::new(0), - io_limits: ClientIOLimits::new( - crate::IO_OUTSTANDING_MAX_JOBS * 3 / 2, - crate::IO_OUTSTANDING_MAX_BYTES as usize * 3 / 2, - ), - region_uuid: None, - needs_replay: false, - negotiation_state: NegotiationState::Start, - tls_context: None, - promote_state: None, - log: crucible_common::build_logger(), - target_addr: None, - repair_addr: None, - state: DsState::New, + state: DsState::Connecting { + mode: ConnectionMode::New, + state: NegotiationState::Start { + auto_promote: false, + }, + }, last_flush: JobId(0), stats: DownstairsStats::default(), skipped_jobs: BTreeSet::new(), @@ -481,11 +436,18 @@ impl DownstairsClient { /// Sets our state to `DsState::Reconcile` /// /// # Panics - /// If the previous state is not `DsState::WaitQuorum` + /// If the current state is invalid pub(crate) fn begin_reconcile(&mut self) { - info!(self.log, "Transition from {} to Reconcile", self.state); - assert_eq!(self.state, DsState::WaitQuorum); - self.state = DsState::Reconcile; + info!(self.log, "Transition from {:?} to Reconcile", self.state); + let DsState::Connecting { state, mode } = &mut self.state else { + panic!( + "invalid state {:?} for client {}", + self.state, self.client_id + ); + }; + assert_eq!(*state, NegotiationState::WaitQuorum); + assert!(matches!(mode, ConnectionMode::New)); + *state = NegotiationState::Reconcile; } /// Go through the list of dependencies and remove any jobs that this @@ -528,12 +490,13 @@ impl DownstairsClient { } /// Checks whether this Downstairs is ready for the upstairs to deactivate - /// - /// # Panics - /// If the downstairs is offline pub(crate) fn ready_to_deactivate(&self) -> bool { match &self.state { - DsState::New | DsState::WaitActive => { + DsState::Connecting { + mode: ConnectionMode::New, + state: + NegotiationState::Start { .. } | NegotiationState::WaitActive, + } => { info!( self.log, "ready to deactivate from state {:?}", self.state @@ -566,75 +529,88 @@ impl DownstairsClient { // entirely; the repair address could have changed in any of these // cases. self.repair_addr = None; - self.needs_replay = false; // If the upstairs is already active (or trying to go active), then the // downstairs should automatically call PromoteToActive when it reaches // the relevant state. - self.promote_state = match up_state { - UpstairsState::Active | UpstairsState::GoActive(..) => { - Some(PromoteState::Waiting) - } + let auto_promote = match up_state { + UpstairsState::Active | UpstairsState::GoActive(..) => !matches!( + self.state, + DsState::Stopping(ClientStopReason::Disabled) + ), UpstairsState::Initializing - | UpstairsState::Deactivating { .. } => None, + | UpstairsState::Deactivating { .. } => false, }; - self.negotiation_state = NegotiationState::Start; - let current = &self.state; - let new_state = match current { - DsState::Active | DsState::Offline if !can_replay => { - Some(DsState::Faulted) - } + let new_mode = match current { + // If we can't replay jobs, then reconnection must happen through + // live-repair (rather than replay). + DsState::Active + | DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } if !can_replay => ConnectionMode::Faulted, + + // If the Downstairs has spontaneously stopped, we will attempt to + // replay jobs when reconnecting + DsState::Active => ConnectionMode::Offline, + + // Other failures during connection preserve the previous mode + DsState::Connecting { mode, .. } => *mode, + + // Faults or failures during live-repair must go through live-repair DsState::LiveRepair - | DsState::LiveRepairReady | DsState::Stopping(ClientStopReason::Fault(..)) => { - Some(DsState::Faulted) + ConnectionMode::Faulted } - DsState::Active => Some(DsState::Offline), - - DsState::Reconcile - | DsState::WaitQuorum - | DsState::WaitActive - | DsState::Stopping(ClientStopReason::NegotiationFailed(..)) - | DsState::Stopping(ClientStopReason::Disabled) + // Failures during deactivation restart from the very beginning + DsState::Stopping(ClientStopReason::Disabled) | DsState::Stopping(ClientStopReason::Deactivated) => { - Some(DsState::New) + ConnectionMode::New } - // If we have replaced a downstairs, don't forget that. - DsState::Stopping(ClientStopReason::Replacing) => { - Some(DsState::Replaced) + // Failures during negotiation either restart from the beginning, or + // go through the live-repair path. + DsState::Stopping(ClientStopReason::NegotiationFailed(..)) => { + match up_state { + // If we haven't activated yet (or we're deactivating) then + // start from New + UpstairsState::GoActive(..) + | UpstairsState::Initializing + | UpstairsState::Deactivating { .. } => ConnectionMode::New, + + // Otherwise, use live-repair + UpstairsState::Active => ConnectionMode::Faulted, + } } - // We stay in these states through the task restart - DsState::Offline - | DsState::Faulted - | DsState::New - | DsState::Replaced => None, + DsState::Stopping(ClientStopReason::Replacing) => match up_state { + // If we haven't activated yet (or we're deactivating), then + // start from New + UpstairsState::GoActive(..) + | UpstairsState::Initializing + | UpstairsState::Deactivating { .. } => ConnectionMode::New, + + // Otherwise, use live-repair; `ConnectionMode::Replaced` + // indicates that the address is allowed to change. + UpstairsState::Active => ConnectionMode::Replaced, + }, + }; + let new_state = DsState::Connecting { + mode: new_mode, + state: NegotiationState::Start { auto_promote }, }; - // Jobs are skipped and replayed in `Downstairs::reinitialize`, which is - // (probably) the caller of this function. - if let Some(new_state) = new_state { - self.checked_state_transition(up_state, new_state); - } + // Note that jobs are skipped / replayed in `Downstairs::reinitialize`, + // which is (probably) the caller of this function! + self.checked_state_transition(up_state, new_state); self.connection_id.update(); // Restart with a short delay, connecting if we're auto-promoting - self.start_task(true, self.promote_state.is_some()); - } - - /// Sets the `needs_replay` flag - pub(crate) fn needs_replay(&mut self) { - self.needs_replay = true; - } - - /// Returns and clears the `needs_replay` flag - pub(crate) fn check_replay(&mut self) -> bool { - std::mem::take(&mut self.needs_replay) + self.start_task(true, auto_promote); } /// Returns the last flush ID handled by this client @@ -796,58 +772,41 @@ impl DownstairsClient { ); } } - match self.promote_state { - Some(PromoteState::Waiting) => { - panic!("called set_active_request while already waiting") - } - Some(PromoteState::Sent) => { - panic!("called set_active_request after it was sent") - } - None => (), - } // If we're already in the point of negotiation where we're waiting to // go active, then immediately go active! - match self.state { - DsState::New => { + match &mut self.state { + DsState::Connecting { + state: NegotiationState::Start { auto_promote }, + mode: ConnectionMode::New, + } => { + if *auto_promote { + panic!("called set_active_request while already waiting") + } + *auto_promote = true; info!( self.log, "client set_active_request while in {:?}; waiting...", self.state, ); - self.promote_state = Some(PromoteState::Waiting); - } - DsState::Replaced => { - info!( - self.log, - "client set_active_request while Replaced; waiting..." - ); - self.promote_state = Some(PromoteState::Waiting); } - DsState::WaitActive => { + DsState::Connecting { + state: NegotiationState::WaitActive, + mode: ConnectionMode::New, + } => { info!( self.log, - "client set_active_request while in WaitActive \ - -> WaitForPromote" + "client set_active_request while in {:?} -> WaitForPromote", + self.state, ); - // If the client task has stopped, then print a warning but - // otherwise continue (because we'll be cleaned up by the - // JoinHandle watcher). self.send(Message::PromoteToActive { upstairs_id: self.cfg.upstairs_id, session_id: self.cfg.session_id, gen: self.cfg.generation(), }); - - self.promote_state = Some(PromoteState::Sent); - // TODO: negotiation / promotion state is spread across - // DsState, PromoteState, and NegotiationState. We should - // consolidate into a single place - assert!( - self.negotiation_state == NegotiationState::Start - || self.negotiation_state - == NegotiationState::WaitForPromote - ); - self.negotiation_state = NegotiationState::WaitForPromote; + self.state = DsState::Connecting { + state: NegotiationState::WaitForPromote, + mode: ConnectionMode::New, + }; } s => panic!("invalid state for set_active_request: {s:?}"), } @@ -920,9 +879,10 @@ impl DownstairsClient { ) -> EnqueueResult { match self.state { // We never send jobs if we're in certain inactive states - DsState::Faulted - | DsState::Replaced - | DsState::LiveRepairReady + DsState::Connecting { + mode: ConnectionMode::Faulted | ConnectionMode::Replaced, + .. + } | DsState::Stopping( ClientStopReason::Fault(..) | ClientStopReason::Disabled @@ -952,13 +912,16 @@ impl DownstairsClient { // cleared out by a subsequent flush (so we'll be able to bring that // client back into compliance by replaying jobs). DsState::Active => EnqueueResult::Send, - DsState::Offline => EnqueueResult::Hold, + DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } => EnqueueResult::Hold, - DsState::New - | DsState::WaitActive - | DsState::WaitQuorum - | DsState::Reconcile - | DsState::Stopping(ClientStopReason::Deactivated) => panic!( + DsState::Stopping(ClientStopReason::Deactivated) + | DsState::Connecting { + mode: ConnectionMode::New, + .. + } => panic!( "enqueue should not be called from state {:?}", self.state ), @@ -986,12 +949,6 @@ impl DownstairsClient { /// will panic if there is not a valid state transition edge between the /// current `self.state` and the requested `new_state`. /// - /// For example, transitioning to a `new_state` of [DsState::Replacing] is - /// *always* possible, so this will never panic for that state transition. - /// On the other hand, [DsState::Replaced] can *only* follow - /// [DsState::Replacing], so if the current state is *anything else*, that - /// indicates a logic error happened in some other part of the code. - /// /// If the state transition is valid, this function simply sets `self.state` /// to the newly requested state. There's no magic here beyond that; this /// function does not change anything about the state or any other internal @@ -1004,190 +961,143 @@ impl DownstairsClient { up_state: &UpstairsState, new_state: DsState, ) { - // TODO this should probably be private! - info!(self.log, "ds_transition from {} to {new_state}", self.state); + if !Self::is_state_transition_valid(up_state, self.state, new_state) { + panic!( + "invalid state transition for client {} from {:?} -> {:?} \ + (with up_state: {:?})", + self.client_id, self.state, new_state, up_state + ); + } + self.state = new_state; + } - let old_state = self.state; + /// Check if a state transition is valid, returning `true` or `false` + fn is_state_transition_valid( + up_state: &UpstairsState, + prev_state: DsState, + next_state: DsState, + ) -> bool { + match (prev_state, next_state) { + // Restarting negotiation is always allowed + ( + DsState::Connecting { .. }, + DsState::Connecting { + state: NegotiationState::Start { .. }, + .. + }, + ) => true, - /* - * Check that this is a valid transition - */ - let panic_invalid = || { - panic!( - "[{}] {} Invalid transition: {:?} -> {:?}", - self.client_id, self.cfg.upstairs_id, old_state, new_state - ) - }; - match new_state { - DsState::Replaced => { - assert_eq!( - old_state, - DsState::Stopping(ClientStopReason::Replacing) - ); - } - DsState::WaitActive => { - if old_state == DsState::Offline { - if matches!(up_state, UpstairsState::Active) { - panic!( - "[{}] {} Bad up active state change {} -> {}", - self.client_id, - self.cfg.upstairs_id, - old_state, - new_state, - ); - } - } else if old_state != DsState::New - && old_state != DsState::Faulted - && old_state != DsState::Replaced + // Check normal negotiation path + ( + DsState::Connecting { + state: prev_state, + mode: prev_mode, + }, + DsState::Connecting { + state: next_state, + mode: next_mode, + }, + ) => { + if next_mode == ConnectionMode::New + && matches!(up_state, UpstairsState::Active) { - panic!( - "[{}] {} Negotiation failed, {:?} -> {:?}", - self.client_id, - self.cfg.upstairs_id, - old_state, - new_state, - ); - } - } - DsState::WaitQuorum => { - assert_eq!(old_state, DsState::WaitActive); - } - DsState::Faulted => { - match old_state { - DsState::Active - | DsState::Faulted - | DsState::Reconcile - | DsState::LiveRepair - | DsState::LiveRepairReady - | DsState::Offline - | DsState::Stopping(ClientStopReason::Fault(..)) => {} // Okay - _ => { - panic_invalid(); - } + return false; } + next_mode == prev_mode + && NegotiationState::is_transition_valid( + prev_mode, prev_state, next_state, + ) } - DsState::Reconcile => { - assert!(!matches!(up_state, UpstairsState::Active)); - assert_eq!(old_state, DsState::WaitQuorum); - } - DsState::Active => { - match old_state { - DsState::WaitQuorum - | DsState::Reconcile - | DsState::LiveRepair - | DsState::Offline => {} // Okay - DsState::LiveRepairReady if self.cfg.read_only => {} // Okay - - _ => { - panic_invalid(); - } - } - /* - * Make sure reconcile happened when the upstairs is inactive. - */ - if old_state == DsState::Reconcile { - assert!(!matches!(up_state, UpstairsState::Active)); - } - } - DsState::LiveRepair => { - assert_eq!(old_state, DsState::LiveRepairReady); - } - DsState::LiveRepairReady => { - match old_state { - DsState::Faulted | DsState::Replaced => {} // Okay - _ => { - panic_invalid(); - } - } + // We can go to Active either through reconciliation or replay; + // in other cases, we must use live-repair + (DsState::Connecting { state, mode }, DsState::Active) => { + matches!( + (state, mode), + (NegotiationState::GetLastFlush, ConnectionMode::Offline) + | (NegotiationState::Reconcile, ConnectionMode::New) + | ( + NegotiationState::LiveRepairReady, + ConnectionMode::Faulted | ConnectionMode::Replaced + ) + ) } - DsState::New => { - // Before new, we must have been in - // on of these states. - match old_state { - DsState::Active - | DsState::Faulted - | DsState::Reconcile - | DsState::Stopping( - ClientStopReason::Deactivated - | ClientStopReason::Disabled - | ClientStopReason::Replacing - | ClientStopReason::NegotiationFailed(..), - ) => {} // Okay - _ => { - panic_invalid(); - } - } + (DsState::Connecting { state, mode }, DsState::LiveRepair) => { + matches!( + (state, mode), + ( + NegotiationState::LiveRepairReady, + ConnectionMode::Faulted | ConnectionMode::Replaced + ) + ) } - DsState::Offline => { - match old_state { - DsState::Active => {} // Okay - _ => { - panic_invalid(); - } - } + + // When can we stop the IO task ourselves? + (DsState::LiveRepair, DsState::Active) => true, + ( + DsState::Connecting { .. }, + DsState::Stopping( + ClientStopReason::NegotiationFailed(..) + | ClientStopReason::Replacing + | ClientStopReason::Disabled + | ClientStopReason::Fault(..), + ), + ) => true, + ( + DsState::Active | DsState::LiveRepair, + DsState::Stopping( + ClientStopReason::Fault(..) + | ClientStopReason::Replacing + | ClientStopReason::Disabled, + ), + ) => true, + (_, DsState::Stopping(ClientStopReason::Deactivated)) => { + matches!(up_state, UpstairsState::Deactivating(..)) } - // We only go deactivated if we were actually active, or - // somewhere past active. - // if deactivate is requested before active, the downstairs - // state should just go back to NEW and re-require an - // activation. - DsState::Stopping(ClientStopReason::Deactivated) => { - match old_state { - DsState::Active - | DsState::LiveRepair - | DsState::LiveRepairReady - | DsState::Offline - | DsState::Reconcile => {} // Okay - DsState::Faulted => { - if matches!(up_state, UpstairsState::Active) { - // Can't transition like this when active - panic_invalid(); + (DsState::Stopping(r), DsState::Connecting { mode, state }) => { + use ClientStopReason as R; + matches!( + (r, mode, state), + ( + R::Fault(..), + ConnectionMode::Faulted, + NegotiationState::Start { .. } + ) | ( + R::Deactivated | R::Disabled, + ConnectionMode::New, + NegotiationState::Start { + auto_promote: false } - } - _ => { - panic_invalid(); - } - } + ) | ( + R::Replacing, + ConnectionMode::Replaced, + NegotiationState::Start { auto_promote: true } + ) | ( + R::Replacing, + ConnectionMode::New, + NegotiationState::Start { .. } + ) | ( + R::NegotiationFailed(..), + ConnectionMode::New, + NegotiationState::Start { .. } + ) + ) } - // Some stop reasons may occur at any time - DsState::Stopping( - ClientStopReason::Fault(..) - | ClientStopReason::Replacing - | ClientStopReason::Disabled, - ) => {} + // When the upstairs is active, we can always spontaneously + // disconnect, which brings us to either Offline or Faulted + // depending on whether replay is valid + ( + _, + DsState::Connecting { + mode: ConnectionMode::Offline | ConnectionMode::Faulted, + state: NegotiationState::Start { auto_promote: true }, + }, + ) => matches!(up_state, UpstairsState::Active), - // The client may undergo negotiation for many reasons - DsState::Stopping(ClientStopReason::NegotiationFailed(..)) => { - match old_state { - DsState::New - | DsState::WaitActive - | DsState::WaitQuorum - | DsState::Reconcile - | DsState::Offline - | DsState::Faulted - | DsState::LiveRepairReady => {} - _ => panic_invalid(), - } - } - } - - if old_state != new_state { - info!( - self.log, - "[{}] Transition from {} to {}", - self.client_id, - old_state, - new_state, - ); - self.state = new_state; - } else { - warn!( - self.log, - "[{}] transition to same state: {}", self.client_id, new_state - ); + // Anything not allowed is prohibited + _ => false, } } @@ -1382,8 +1292,12 @@ impl DownstairsClient { /// # Panics /// If this downstairs is not read-only pub(crate) fn skip_live_repair(&mut self, up_state: &UpstairsState) { - if self.state == DsState::LiveRepairReady { + let DsState::Connecting { state, .. } = self.state else { + return; + }; + if state == NegotiationState::LiveRepairReady { assert!(self.cfg.read_only); + // TODO: could we do this transition early, by automatically // skipping LiveRepairReady if read-only? self.checked_state_transition(up_state, DsState::Active); @@ -1391,9 +1305,12 @@ impl DownstairsClient { } } - /// Moves from `LiveRepairReady` to `LiveRepair`; a no-op otherwise + /// Moves from `LiveRepairReady` to `LiveRepair`, a no-op otherwise pub(crate) fn start_live_repair(&mut self, up_state: &UpstairsState) { - if self.state == DsState::LiveRepairReady { + let DsState::Connecting { state, .. } = self.state else { + return; + }; + if state == NegotiationState::LiveRepairReady { self.checked_state_transition(up_state, DsState::LiveRepair); } } @@ -1403,13 +1320,13 @@ impl DownstairsClient { /// Returns an error if the upstairs should go inactive, which occurs if the /// error is at or after `Message::YouAreNowActive`. /// - /// Returns `true` if negotiation for this downstairs is complete + /// Returns a flag indicating how to proceed pub(crate) fn continue_negotiation( &mut self, m: Message, up_state: &UpstairsState, ddef: &mut RegionDefinitionStatus, - ) -> Result { + ) -> Result { /* * Either we get all the way through the negotiation, or we hit the * timeout and exit to retry. @@ -1501,19 +1418,23 @@ impl DownstairsClient { * upstairs. We set the downstairs to DsState::Active and the while * loop is exited. */ + let DsState::Connecting { state, mode } = &mut self.state else { + error!( + self.log, + "tried to continue negotiation while not connecting" + ); + return Err(NegotiationError::OutOfOrder); + }; + let mode = *mode; // mode is immutable here match m { Message::YesItsMe { version, repair_addr, } => { - if self.negotiation_state != NegotiationState::Start { + let NegotiationState::Start { auto_promote } = *state else { error!(self.log, "got version already"); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::BadNegotiationOrder, - ); - return Ok(false); - } + return Err(NegotiationError::OutOfOrder); + }; if version != CRUCIBLE_MESSAGE_VERSION { error!( self.log, @@ -1521,57 +1442,28 @@ impl DownstairsClient { CRUCIBLE_MESSAGE_VERSION, version ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Incompatible, - ); - return Ok(false); + return Err(NegotiationError::IncompatibleVersion { + expected: CRUCIBLE_MESSAGE_VERSION, + actual: version, + }); } - self.negotiation_state = NegotiationState::WaitForPromote; self.repair_addr = Some(repair_addr); - match self.promote_state { - Some(PromoteState::Waiting) => { - self.send(Message::PromoteToActive { - upstairs_id: self.cfg.upstairs_id, - session_id: self.cfg.session_id, - gen: self.cfg.generation(), - }); - self.promote_state = Some(PromoteState::Sent); - self.negotiation_state = - NegotiationState::WaitForPromote; - // TODO This is an unfortunate corner of the state - // machine, where we have to be in WaitActive despite - // _already_ having gone active. - // If we are Replaced and we have not yet gone active - // then it is valid for us to transition to WA. - if self.state == DsState::New - || (self.state == DsState::Replaced - && !matches!(up_state, UpstairsState::Active)) - { - self.checked_state_transition( - up_state, - DsState::WaitActive, - ); - } else { - warn!( - self.log, - "version negotiation from state {:?}", - self.state - ); - } - } - Some(PromoteState::Sent) => { - // We shouldn't be able to get here. - panic!("got YesItsMe with promote_state == Sent"); - } - None => { - // Nothing to do here, wait for set_active_request - self.checked_state_transition( - up_state, - DsState::WaitActive, - ); - } + if auto_promote { + *state = NegotiationState::WaitForPromote; + self.send(Message::PromoteToActive { + upstairs_id: self.cfg.upstairs_id, + session_id: self.cfg.session_id, + gen: self.cfg.generation(), + }); + info!( + self.log, + "version negotiation from state {:?}", self.state + ); + } else { + // Nothing to do here, wait for set_active_request + *state = NegotiationState::WaitActive; } + Ok(NegotiationResult::NotDone) } Message::VersionMismatch { version } => { error!( @@ -1579,10 +1471,10 @@ impl DownstairsClient { "downstairs version is {version}, \ ours is {CRUCIBLE_MESSAGE_VERSION}" ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Incompatible, - ); + Err(NegotiationError::IncompatibleVersion { + expected: CRUCIBLE_MESSAGE_VERSION, + actual: version, + }) } Message::EncryptedMismatch { expected } => { error!( @@ -1590,10 +1482,10 @@ impl DownstairsClient { "downstairs encrypted is {expected}, ours is {}", self.cfg.encrypted() ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Incompatible, - ); + Err(NegotiationError::EncryptionMismatch { + expected: self.cfg.encrypted(), + actual: expected, + }) } Message::ReadOnlyMismatch { expected } => { error!( @@ -1601,94 +1493,74 @@ impl DownstairsClient { "downstairs read_only is {expected}, ours is {}", self.cfg.read_only, ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Incompatible, - ); + Err(NegotiationError::ReadOnlyMismatch { + expected: self.cfg.read_only, + actual: expected, + }) } Message::YouAreNowActive { upstairs_id, session_id, gen, } => { - if self.negotiation_state != NegotiationState::WaitForPromote { + if *state != NegotiationState::WaitForPromote { error!( self.log, - "Received YouAreNowActive out of order! {:?}", - self.negotiation_state - ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::BadNegotiationOrder, + "Received YouAreNowActive out of order! {state:?}", ); - return Ok(false); + return Err(NegotiationError::OutOfOrder); } - let match_uuid = self.cfg.upstairs_id == upstairs_id; - let match_session = self.cfg.session_id == session_id; + let mut err = None; + if self.cfg.upstairs_id != upstairs_id { + error!( + self.log, + "UUID mismatch in YouAreNowActive: {:?} != {:?}", + self.cfg.upstairs_id, + upstairs_id + ); + err = Some(NegotiationError::UpstairsIdMismatch { + expected: self.cfg.upstairs_id, + actual: upstairs_id, + }); + } + if self.cfg.session_id != session_id { + error!( + self.log, + "Session mismatch in YouAreNowActive: {:?} != {:?}", + self.cfg.session_id, + session_id + ); + err = Some(NegotiationError::SessionIdMismatch { + expected: self.cfg.session_id, + actual: session_id, + }); + } let upstairs_gen = self.cfg.generation(); - let match_gen = upstairs_gen == gen; - let matches_self = match_uuid && match_session && match_gen; - - if !matches_self { + if upstairs_gen != gen { error!( self.log, - "YouAreNowActive didn't match self! {} {} {}", - if !match_uuid { - format!( - "UUID {:?} != {:?}", - self.cfg.upstairs_id, upstairs_id - ) - } else { - String::new() - }, - if !match_session { - format!( - "session {:?} != {:?}", - self.cfg.session_id, session_id - ) - } else { - String::new() - }, - if !match_gen { - format!("gen {:?} != {:?}", upstairs_gen, gen) - } else { - String::new() - }, + "generation mismatch in YouAreNowActive: {} != {}", + upstairs_gen, + gen ); - if !match_gen { - let gen_error = format!( - "Generation requested:{} found:{}", - gen, upstairs_gen, - ); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Incompatible, - ); - return Err(CrucibleError::GenerationNumberTooLow( - gen_error, - )); - } else { - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Incompatible, - ); - return Err(CrucibleError::UuidMismatch); - } + err = Some(NegotiationError::GenerationNumberTooLow { + requested: upstairs_gen, + actual: gen, + }); + } + if let Some(e) = err { + Err(e) + } else { + *state = NegotiationState::WaitForRegionInfo; + self.send(Message::RegionInfoPlease); + Ok(NegotiationResult::NotDone) } - - self.negotiation_state = NegotiationState::WaitForRegionInfo; - self.send(Message::RegionInfoPlease); } Message::RegionInfo { region_def } => { - if self.negotiation_state != NegotiationState::WaitForRegionInfo - { + if *state != NegotiationState::WaitForRegionInfo { error!(self.log, "Received RegionInfo out of order!"); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::BadNegotiationOrder, - ); - return Ok(false); + return Err(NegotiationError::OutOfOrder); } info!( self.log, @@ -1701,11 +1573,10 @@ impl DownstairsClient { // collection for each downstairs. if region_def.get_encrypted() != self.cfg.encrypted() { error!(self.log, "encryption expectation mismatch!"); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::Incompatible, - ); - return Ok(false); + return Err(NegotiationError::EncryptionMismatch { + expected: self.cfg.encrypted(), + actual: region_def.get_encrypted(), + }); } /* @@ -1729,7 +1600,7 @@ impl DownstairsClient { if uuid != region_def.uuid() { // If we are replacing the downstairs, then a new UUID // is okay. - if self.state == DsState::Replaced { + if mode == ConnectionMode::Replaced { warn!( self.log, "Replace downstairs uuid:{} with {}", @@ -1809,8 +1680,8 @@ impl DownstairsClient { *ddef = RegionDefinitionStatus::Received(region_def); // Match on the current state of this downstairs - match self.state { - DsState::Offline => { + match mode { + ConnectionMode::Offline => { /* * If we are coming from state Offline, then it means * the downstairs has departed then came back in short @@ -1826,87 +1697,85 @@ impl DownstairsClient { self.log, "send last flush ID to this DS: {}", lf ); - self.negotiation_state = NegotiationState::GetLastFlush; + *state = NegotiationState::GetLastFlush; self.send(Message::LastFlush { last_flush_number: lf, }); } - DsState::WaitActive - | DsState::Faulted - | DsState::Replaced => { + ConnectionMode::New + | ConnectionMode::Faulted + | ConnectionMode::Replaced => { /* * Ask for the current version of all extents. */ - self.negotiation_state = - NegotiationState::GetExtentVersions; + *state = NegotiationState::GetExtentVersions; self.send(Message::ExtentVersionsPlease); } - bad_state => { - panic!( - "[{}] join from invalid state {} {} {:?}", - self.client_id, - bad_state, - self.cfg.upstairs_id, - self.negotiation_state, - ); - } } + Ok(NegotiationResult::NotDone) } Message::LastFlushAck { last_flush_number } => { - if self.negotiation_state != NegotiationState::GetLastFlush { + if *state != NegotiationState::GetLastFlush { error!(self.log, "Received LastFlushAck out of order!"); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::BadNegotiationOrder, - ); - return Ok(false); // TODO should we trigger set_inactive? - } - match self.state { - DsState::Offline => (), - s => panic!("got LastFlushAck in bad state {s:?}"), + return Err(NegotiationError::OutOfOrder); } + assert_eq!( + mode, + ConnectionMode::Offline, + "got LastFlushAck in bad state {:?}", + self.state + ); info!( self.log, "Replied this last flush ID: {last_flush_number}" ); assert_eq!(self.last_flush, last_flush_number); - // Immediately set the state to Active, since we've already - // copied over the jobs. + // Immediately set the state to Active, and return a flag + // indicating that jobs should be replayed. self.checked_state_transition(up_state, DsState::Active); - - self.negotiation_state = NegotiationState::Done; + Ok(NegotiationResult::Replay) } Message::ExtentVersions { gen_numbers, flush_numbers, dirty_bits, } => { - if self.negotiation_state != NegotiationState::GetExtentVersions - { + if *state != NegotiationState::GetExtentVersions { error!(self.log, "Received ExtentVersions out of order!"); - self.abort_negotiation( - up_state, - ClientNegotiationFailed::BadNegotiationOrder, - ); - return Ok(false); // TODO should we trigger set_inactive? + return Err(NegotiationError::OutOfOrder); } - match self.state { - DsState::WaitActive => { - self.checked_state_transition( - up_state, - DsState::WaitQuorum, - ); + let out = match mode { + ConnectionMode::New => { + *state = NegotiationState::WaitQuorum; + NegotiationResult::WaitQuorum } - DsState::Faulted | DsState::Replaced => { - self.checked_state_transition( + // Special case: if a downstairs is replaced while we're + // still trying to go active, then we use the WaitQuorum + // path instead of LiveRepair. + ConnectionMode::Replaced + if matches!( up_state, - DsState::LiveRepairReady, + UpstairsState::Initializing + | UpstairsState::GoActive(..) + ) => + { + *state = NegotiationState::WaitQuorum; + NegotiationResult::WaitQuorum + } + + ConnectionMode::Faulted | ConnectionMode::Replaced => { + *state = NegotiationState::LiveRepairReady; + NegotiationResult::LiveRepair + } + ConnectionMode::Offline => { + panic!( + "got ExtentVersions from invalid state {:?}", + self.state ); } - s => panic!("downstairs in invalid state {s}"), - } + }; /* * Record this downstairs region info for later @@ -1922,11 +1791,10 @@ impl DownstairsClient { if let Some(old_rm) = self.region_metadata.replace(dsr) { warn!(self.log, "new RM replaced this: {:?}", old_rm); } - self.negotiation_state = NegotiationState::Done; + Ok(out) } m => panic!("invalid message in continue_negotiation: {m:?}"), } - Ok(self.negotiation_state == NegotiationState::Done) } /// Sends the next reconciliation job to all clients @@ -1938,8 +1806,17 @@ impl DownstairsClient { job: &mut ReconcileIO, ) { // If someone has moved us out of reconcile, this is a logic error - if self.state != DsState::Reconcile { - panic!("[{}] should still be in reconcile", self.client_id); + if !matches!( + self.state, + DsState::Connecting { + state: NegotiationState::Reconcile, + mode: ConnectionMode::New | ConnectionMode::Replaced + } + ) { + panic!( + "[{}] should still be in reconcile, not {:?}", + self.client_id, self.state + ); } let prev_state = job .state @@ -2048,24 +1925,147 @@ impl DownstairsClient { } } -/// How to handle "promote to active" requests -#[derive(Debug)] -enum PromoteState { - /// Send `PromoteToActive` when the state machine reaches `WaitForPromote` - Waiting, - /// We have already sent `PromoteToActive` - Sent, -} - /// Tracks client negotiation progress -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -enum NegotiationState { - Start, +/// +/// The exact path through negotiation depends on the [`ConnectionMode`]. +/// +/// There are three main paths, shown below: +/// +/// ```text +/// ┌───────┐ +/// │ Start ├────────┐ +/// └───┬───┘ │ +/// │ │ +/// ┌─────▼──────┐ │ +/// │ WaitActive │ │ auto-promote +/// └─────┬──────┘ │ +/// │ │ +/// ┌───────▼────────┐ │ +/// │ WaitForPromote ◄───┘ +/// └───────┬────────┘ +/// │ +/// ┌────────▼──────────┐ +/// │ WaitForRegionInfo │ +/// └──┬──────────────┬─┘ +/// Offline │ │ New / Faulted / Replaced +/// ┌──────▼─────┐ ┌────▼────────────┐ +/// │GetLastFlush│ │GetExtentVersions│ +/// └──────┬─────┘ └─┬─────────────┬─┘ +/// │ │ New │ Faulted / Replaced +/// │ ┌──────▼───┐ ┌────▼──────────┐ +/// │ │WaitQuorum│ │LiveRepairReady│ +/// │ └────┬─────┘ └────┬──────────┘ +/// │ │ │ +/// │ ┌────▼────┐ │ +/// │ │Reconcile│ │ +/// │ └────┬────┘ │ +/// │ │ │ +/// │ ┌───▼──┐ │ +/// └─────► Done ◄────────────┘ +/// └──────┘ +/// ``` +/// +/// `Done` isn't actually present in the state machine; it's indicated by +/// returning a [`NegotiationResult`] other than [`NegotiationResult::NotDone`]. +#[derive( + Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, JsonSchema, +)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "type", content = "value")] +pub enum NegotiationState { + /// Initial state, waiting to hear `YesItsMe` from the client + /// + /// Once this message is heard, transitions to either `WaitActive` (if + /// `auto_promote` is `false`) or `WaitQuorum` (if `auto_promote` is `true`) + Start { auto_promote: bool }, + + /// Waiting for activation by the guest + WaitActive, + + /// Waiting to hear `YouAreNowActive` from the client WaitForPromote, + + /// Waiting to hear `RegionInfo` from the client WaitForRegionInfo, + + /// Waiting to hear `LastFlushAck` from the client GetLastFlush, + + /// Waiting to hear `ExtentVersions` from the client GetExtentVersions, - Done, + + /// Waiting for the minimum number of downstairs to be present. + WaitQuorum, + + /// Initial startup, downstairs are repairing from each other. + Reconcile, + + /// Waiting for live-repair to begin + LiveRepairReady, +} + +impl NegotiationState { + /// Checks whether a particular transition is valid + /// + /// See the docstring of [`NegotiationState`] for a drawing of the full + /// state transition diagram + fn is_transition_valid( + mode: ConnectionMode, + prev_state: Self, + next_state: Self, + ) -> bool { + matches!( + (prev_state, next_state, mode), + ( + NegotiationState::Start { auto_promote: true }, + NegotiationState::WaitForPromote, + _ + ) | ( + NegotiationState::Start { + auto_promote: false, + }, + NegotiationState::WaitActive, + _, + ) | ( + NegotiationState::WaitActive, + NegotiationState::WaitForPromote, + _ + ) | ( + NegotiationState::WaitForPromote, + NegotiationState::WaitForRegionInfo, + _ + ) | ( + NegotiationState::WaitForRegionInfo, + NegotiationState::GetLastFlush, + ConnectionMode::Offline + ) | ( + NegotiationState::WaitForRegionInfo, + NegotiationState::GetExtentVersions, + ConnectionMode::New + | ConnectionMode::Faulted + | ConnectionMode::Replaced, + ) | ( + NegotiationState::GetExtentVersions, + NegotiationState::WaitQuorum, + ConnectionMode::New + ) | ( + NegotiationState::WaitQuorum, + NegotiationState::Reconcile, + ConnectionMode::New + ) | ( + NegotiationState::GetExtentVersions, + NegotiationState::LiveRepairReady, + ConnectionMode::Faulted | ConnectionMode::Replaced, + ) + ) + } +} +/// Result value returned when negotiation is complete +pub(crate) enum NegotiationResult { + NotDone, + WaitQuorum, + Replay, + LiveRepair, } /// Result value from [`DownstairsClient::enqueue`] @@ -2188,8 +2188,14 @@ pub enum ClientNegotiationFailed { /// Negotiation message received out of order BadNegotiationOrder, - /// Negotiation says that we are incompatible - Incompatible, + /// Negotiation says that our message versions are incompatible + IncompatibleVersion, + + /// Negotiation says that our session IDs are incompatible + IncompatibleSession, + + /// Negotiation says that region settings are incompatible + IncompatibleSettings, } impl From for ClientStopReason { @@ -2198,6 +2204,27 @@ impl From for ClientStopReason { } } +impl From for ClientNegotiationFailed { + fn from(value: NegotiationError) -> Self { + match value { + NegotiationError::OutOfOrder => Self::BadNegotiationOrder, + NegotiationError::IncompatibleVersion { .. } => { + Self::IncompatibleVersion + } + NegotiationError::ReadOnlyMismatch { .. } + | NegotiationError::EncryptionMismatch { .. } => { + Self::IncompatibleSettings + } + NegotiationError::GenerationZeroIsIllegal { .. } + | NegotiationError::GenerationNumberTooLow { .. } + | NegotiationError::UpstairsIdMismatch { .. } + | NegotiationError::SessionIdMismatch { .. } => { + Self::IncompatibleSession + } + } + } +} + /// Subset of [`ClientStopReason`] for faulting a client #[derive( Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, JsonSchema, @@ -2270,8 +2297,9 @@ pub(crate) enum ClientRunResult { #[allow(dead_code)] ReadFailed(anyhow::Error), /// The `DownstairsClient` requested that the task stop, so it did - #[allow(dead_code)] - RequestedStop(ClientStopReason), + /// + /// The reason for the stop will be in the `DsState::Stopping(..)` member + RequestedStop, /// The socket closed cleanly and the task exited Finished, /// One of the queues used to communicate with the main task closed @@ -2287,6 +2315,16 @@ pub(crate) enum ClientRunResult { ReceiveTaskCancelled, } +/// Convert a oneshot result into a `ClientStopReason` +impl From> for ClientRunResult { + fn from(value: Result) -> Self { + match value { + Ok(..) => ClientRunResult::RequestedStop, + Err(..) => ClientRunResult::QueueClosed, + } + } +} + /// Data structure to hold context for the client IO task /// /// Client IO is managed by two tasks: @@ -2404,18 +2442,11 @@ impl ClientIoTask { if self.delay { tokio::select! { s = &mut self.stop => { - warn!(self.log, "client IO task stopped during sleep"); - return match s { - Ok(s) => - ClientRunResult::RequestedStop(s), - Err(e) => { - warn!( - self.log, - "client_stop_rx closed unexpectedly: {e:?}" - ); - ClientRunResult::QueueClosed - } - } + warn!( + self.log, + "client IO task stopped during sleep: {s:?}" + ); + return s.into(); } _ = tokio::time::sleep(CLIENT_RECONNECT_DELAY) => { // this is fine @@ -2436,18 +2467,11 @@ impl ClientIoTask { // Otherwise, continue as usual } s = &mut self.stop => { - warn!(self.log, "client IO task stopped before connecting"); - return match s { - Ok(s) => - ClientRunResult::RequestedStop(s), - Err(e) => { - warn!( - self.log, - "client_stop_rx closed unexpectedly: {e:?}" - ); - ClientRunResult::QueueClosed - } - } + warn!( + self.log, + "client IO task stopped before connecting: {s:?}" + ); + return s.into(); } } @@ -2486,18 +2510,11 @@ impl ClientIoTask { } } s = &mut self.stop => { - warn!(self.log, "client IO task stopped during connection"); - return match s { - Ok(s) => - ClientRunResult::RequestedStop(s), - Err(e) => { - warn!( - self.log, - "client_stop_rx closed unexpectedly: {e:?}" - ); - ClientRunResult::QueueClosed - } - } + warn!( + self.log, + "client IO task stopped during connection: {s:?}" + ); + return s.into(); } }; @@ -2592,19 +2609,8 @@ impl ClientIoTask { } s = &mut self.stop => { - match s { - Ok(s) => { - break ClientRunResult::RequestedStop(s); - } - - Err(e) => { - warn!( - self.log, - "client_stop_rx closed unexpectedly: {e:?}" - ); - break ClientRunResult::QueueClosed; - } - } + info!(self.log, "client stopping due to {s:?}"); + break s.into(); } } } @@ -2662,19 +2668,8 @@ impl ClientIoTask { } } s = &mut self.stop => { - match s { - Ok(s) => { - Err(ClientRunResult::RequestedStop(s)) - } - - Err(e) => { - warn!( - self.log, - "client_stop_rx closed unexpectedly: {e:?}" - ); - Err(ClientRunResult::QueueClosed) - } - } + info!(self.log, "client stopped in write due to {s:?}"); + Err(s.into()) } join_result = self.recv_task.join() => { Err(join_result) @@ -2870,339 +2865,3 @@ pub(crate) fn validate_unencrypted_read_response( } } } - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn downstairs_transition_normal() { - // Verify the correct downstairs progression - // New -> WA -> WQ -> Active - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - } - - #[test] - fn downstairs_transition_deactivate_new() { - // Verify deactivate goes to new - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - // Upstairs goes active! - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - client.checked_state_transition( - &UpstairsState::Active, - DsState::Stopping(ClientStopReason::Deactivated), - ); - client.checked_state_transition(&UpstairsState::Active, DsState::New); - } - - #[test] - #[should_panic] - fn downstairs_transition_deactivate_not_new() { - // Verify deactivate goes to new - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Stopping(ClientStopReason::Deactivated), - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_deactivate_not_wa() { - // Verify no deactivate from wa - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Stopping(ClientStopReason::Deactivated), - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_deactivate_not_wq() { - // Verify no deactivate from wq - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Stopping(ClientStopReason::Deactivated), - ); - } - - #[test] - fn downstairs_transition_active_to_faulted() { - // Verify active upstairs can go to faulted - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Faulted, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_disconnect_no_active() { - // Verify no activation from disconnected - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Stopping(ClientStopReason::Deactivated), - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_same_wa() { - // Verify we can't go to the same state we are in - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_same_wq() { - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_same_active() { - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_no_new_to_offline() { - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Offline, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Offline, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_same_offline() { - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Offline, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Offline, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_backwards() { - // Verify state can't go backwards - // New -> WA -> WQ -> WA - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - } - - #[test] - #[should_panic] - fn downstairs_bad_transition_wq() { - // Verify error when going straight to WQ - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_bad_offline() { - // Verify offline cannot go to WQ - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Offline, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - } - - #[test] - #[should_panic] - fn downstairs_transition_bad_active() { - // Verify active can't go back to WQ - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - } - - #[test] - fn downstairs_transition_active_faulted() { - // Verify - let mut client = DownstairsClient::test_default(); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Active, - ); - client.checked_state_transition( - &UpstairsState::Initializing, - DsState::Faulted, - ); - } -} diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index 21eaad871..c69ca5422 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -10,7 +10,7 @@ use crate::{ cdt, client::{ ClientAction, ClientFaultReason, ClientNegotiationFailed, - ClientStopReason, DownstairsClient, EnqueueResult, + ClientStopReason, DownstairsClient, EnqueueResult, NegotiationState, }, guest::GuestBlockRes, io_limits::{IOLimitGuard, IOLimits}, @@ -18,14 +18,15 @@ use crate::{ stats::DownstairsStatOuter, upstairs::{UpstairsConfig, UpstairsState}, AckStatus, ActiveJobs, AllocRingBuffer, BlockRes, Buffer, ClientData, - ClientIOStateCount, ClientId, ClientMap, CrucibleError, DownstairsIO, - DownstairsMend, DsState, ExtentFix, ExtentRepairIDs, IOState, IOStateCount, - IOop, ImpactedBlocks, JobId, Message, RawReadResponse, RawWrite, - ReconcileIO, ReconciliationId, RegionDefinition, ReplaceResult, + ClientIOStateCount, ClientId, ClientMap, ConnectionMode, CrucibleError, + DownstairsIO, DownstairsMend, DsState, ExtentFix, ExtentRepairIDs, IOState, + IOStateCount, IOop, ImpactedBlocks, JobId, Message, RawReadResponse, + RawWrite, ReconcileIO, ReconciliationId, RegionDefinition, ReplaceResult, SnapshotDetails, WorkSummary, }; use crucible_common::{ impacted_blocks::ImpactedAddr, BlockIndex, BlockOffset, ExtentId, + NegotiationError, }; use crucible_protocol::WriteHeader; @@ -442,15 +443,26 @@ impl Downstairs { /// Helper function to set all 3x clients as active, legally #[cfg(test)] pub fn force_active(&mut self) { + let up_state = UpstairsState::GoActive(BlockRes::dummy()); for cid in ClientId::iter() { - for state in - [DsState::WaitActive, DsState::WaitQuorum, DsState::Active] - { + for state in [ + NegotiationState::WaitActive, + NegotiationState::WaitForPromote, + NegotiationState::WaitForRegionInfo, + NegotiationState::GetExtentVersions, + NegotiationState::WaitQuorum, + NegotiationState::Reconcile, + ] { self.clients[cid].checked_state_transition( - &UpstairsState::Initializing, - state, + &up_state, + DsState::Connecting { + state, + mode: ConnectionMode::New, + }, ); } + self.clients[cid] + .checked_state_transition(&up_state, DsState::Active); } } @@ -649,6 +661,16 @@ impl Downstairs { client_id: ClientId, up_state: &UpstairsState, ) { + // Check whether we asked the IO task to stop ourselves + let stopped_due_to_fault = matches!( + self.clients[client_id].state(), + DsState::Stopping(ClientStopReason::Fault(..)) + ); + + // Restart the IO task for that specific client, transitioning to a new + // state. + self.clients[client_id].reinitialize(up_state, self.can_replay); + // If the IO task stops on its own, then under certain circumstances, // we want to skip all of its jobs. (If we requested that the IO task // stop, then whoever made that request is responsible for skipping jobs @@ -657,21 +679,17 @@ impl Downstairs { // Specifically, we want to skip jobs if the only path back online for // that client goes through live-repair; if that client can come back // through replay, then the jobs must remain live. - let client_state = self.clients[client_id].state(); - if matches!( - client_state, - DsState::LiveRepair | DsState::LiveRepairReady - ) || matches!( - client_state, - DsState::Active | DsState::Offline if !self.can_replay - ) { + let now_faulted = matches!( + self.clients[client_id].state(), + DsState::Connecting { + mode: ConnectionMode::Faulted, + .. + } + ); + if now_faulted && !stopped_due_to_fault { self.skip_all_jobs(client_id); } - // Restart the IO task for that specific client, transitioning to a new - // state. - self.clients[client_id].reinitialize(up_state, self.can_replay); - for i in ClientId::iter() { // Clear per-client delay, because we're starting a new session self.clients[i].set_delay_us(0); @@ -679,15 +697,17 @@ impl Downstairs { // Special-case: if a Downstairs goes away midway through initial // reconciliation, then we have to manually abort reconciliation. - if self.clients.iter().any(|c| c.state() == DsState::Reconcile) { + if self.clients.iter().any(|c| { + matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } + ) + }) { self.abort_reconciliation(up_state); } - - // If this client is coming back from being offline, then mark that its - // jobs must be replayed when it completes negotiation. - if self.clients[client_id].state() == DsState::Offline { - self.clients[client_id].needs_replay(); - } } /// Returns true if we can deactivate immediately @@ -729,7 +749,13 @@ impl Downstairs { // setting faulted, we return false here and let the faulting framework // take care of clearing out the skipped jobs. This then allows the // requested deactivation to finish. - if self.clients[client_id].state() == DsState::Offline { + if matches!( + self.clients[client_id].state(), + DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } + ) { info!(self.log, "[{}] Offline client moved to Faulted", client_id); self.fault_client( client_id, @@ -792,17 +818,10 @@ impl Downstairs { self.next_id } - /// Sends replay jobs to the given client if `needs_replay` is set - pub(crate) fn check_replay(&mut self, client_id: ClientId) { - if self.clients[client_id].check_replay() { - self.replay_jobs(client_id); - } - } - /// Sends all pending jobs for the given client /// /// Jobs are pending if they have not yet been flushed by this client. - fn replay_jobs(&mut self, client_id: ClientId) { + pub(crate) fn replay_jobs(&mut self, client_id: ClientId) { let lf = self.clients[client_id].last_flush(); info!( @@ -840,7 +859,7 @@ impl Downstairs { /// Decide if we need repair, and if so create the repair list /// /// Returns `true` if repair is needed, `false` otherwise - pub(crate) fn collate(&mut self) -> Result { + pub(crate) fn collate(&mut self) -> Result { let r = self.collate_inner(); if r.is_err() { // If we failed to begin the repair, then assert that nothing has @@ -849,13 +868,19 @@ impl Downstairs { assert!(self.reconcile.is_none()); for c in self.clients.iter() { - assert_eq!(c.state(), DsState::WaitQuorum); + assert_eq!( + c.state(), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + mode: ConnectionMode::New + } + ); } } r } - fn collate_inner(&mut self) -> Result { + fn collate_inner(&mut self) -> Result { /* * Show some (or all if small) of the info from each region. * @@ -918,9 +943,7 @@ impl Downstairs { let requested_gen = self.cfg.generation(); if requested_gen == 0 { error!(self.log, "generation number should be at least 1"); - return Err(CrucibleError::GenerationNumberTooLow( - "Generation 0 illegal".to_owned(), - )); + return Err(NegotiationError::GenerationZeroIsIllegal); } else if requested_gen < max_gen { /* * We refuse to connect. The provided generation number is not @@ -932,10 +955,10 @@ impl Downstairs { max_gen, requested_gen, ); - return Err(CrucibleError::GenerationNumberTooLow(format!( - "found generation number {}, larger than requested: {}", - max_gen, requested_gen, - ))); + return Err(NegotiationError::GenerationNumberTooLow { + requested: requested_gen, + actual: max_gen, + }); } else { info!( self.log, @@ -1865,7 +1888,15 @@ impl Downstairs { // If any client have dropped out of repair-readiness (e.g. due to // failed reconciliation, timeouts, etc), then we have to kick // everything else back to the beginning. - if self.clients.iter().any(|c| c.state() != DsState::Reconcile) { + if self.clients.iter().any(|c| { + !matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } + ) + }) { // Something has changed, so abort this repair. // Mark any downstairs that have not changed as failed and disable // them so that they restart. @@ -1913,7 +1944,13 @@ impl Downstairs { // Mark any downstairs that have not changed as failed and disable // them so that they restart. for (i, c) in self.clients.iter_mut().enumerate() { - if c.state() == DsState::Reconcile { + if matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } + ) { // Restart the IO task. This will cause the Upstairs to // deactivate through a ClientAction::TaskStopped. c.abort_negotiation( @@ -1949,15 +1986,14 @@ impl Downstairs { /// /// # Panics /// If that isn't the case! - pub(crate) fn on_reconciliation_done(&mut self, from_state: DsState) { + pub(crate) fn on_reconciliation_done(&mut self, did_work: bool) { assert!(self.ds_active.is_empty()); - for (i, c) in self.clients.iter_mut().enumerate() { - assert_eq!(c.state(), from_state, "invalid state for client {i}"); + for c in self.clients.iter_mut() { c.set_active(); } - if from_state == DsState::Reconcile { + if did_work { // reconciliation completed let r = self.reconcile.take().unwrap(); assert!(r.task_list.is_empty()); @@ -1968,11 +2004,9 @@ impl Downstairs { &r, false, /* aborted */ ); } - } else if from_state == DsState::WaitQuorum { + } else { // no reconciliation was required assert!(self.reconcile.is_none()); - } else { - panic!("unexpected from_state {from_state}"); } } @@ -2557,8 +2591,10 @@ impl Downstairs { // as that info is gone to us now, so assume it was true. match self.clients[new_client_id].state() { DsState::Stopping(ClientStopReason::Replacing) - | DsState::Replaced - | DsState::LiveRepairReady + | DsState::Connecting { + mode: ConnectionMode::Replaced, + .. + } | DsState::LiveRepair => { // These states indicate a replacement is in progress. return Ok(ReplaceResult::StartedAlready); @@ -2586,18 +2622,25 @@ impl Downstairs { continue; } match self.clients[client_id].state() { - // XXX there are a bunch of states that aren't ready for IO but - // aren't listed here, e.g. all of the negotiation states. DsState::Stopping(..) - | DsState::Replaced - | DsState::LiveRepairReady - | DsState::LiveRepair => { + | DsState::LiveRepair + | DsState::Connecting { + mode: + ConnectionMode::Replaced + | ConnectionMode::Offline + | ConnectionMode::Faulted, + .. + } => { return Err(CrucibleError::ReplaceRequestInvalid(format!( "Replace {old} failed, downstairs {client_id} is {:?}", self.clients[client_id].state(), ))); } - _ => {} + DsState::Active + | DsState::Connecting { + mode: ConnectionMode::New, + .. + } => {} } } @@ -2624,7 +2667,13 @@ impl Downstairs { client_id: ClientId, up_state: &UpstairsState, ) { - assert_eq!(self.clients[client_id].state(), DsState::Offline); + assert!(matches!( + self.clients[client_id].state(), + DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } + )); let byte_count = self.clients[client_id].total_bytes_outstanding(); let work_count = self.clients[client_id].total_live_work(); @@ -2719,18 +2768,9 @@ impl Downstairs { ClientFaultReason::FailedLiveRepair, ); } - // If connection aborted, and restarted, then the re-negotiation - // could have won this race, and transitioned the reconnecting - // downstairs from LiveRepair to Faulted to LiveRepairReady. - DsState::LiveRepairReady => found_valid_state = true, - // If just a single IO reported failure, we will fault this // downstairs and it won't yet have had a chance to move back - // around to LiveRepairReady yet. - DsState::Faulted => found_valid_state = true, - - // It's also possible for a Downstairs to be in the process of - // stopping, due a fault or disconnection + // around to Connecting yet. DsState::Stopping( ClientStopReason::Replacing | ClientStopReason::Disabled @@ -2739,6 +2779,16 @@ impl Downstairs { ) => { found_valid_state = true; } + + // If connection aborted, and restarted, then the re-negotiation + // could have won this race, and transitioned the reconnecting + // downstairs from LiveRepair to Stopping to Connecting. + DsState::Connecting { + mode: ConnectionMode::Faulted, + .. + } => found_valid_state = true, + + // Other states are invalid _ => {} } // Set repair_info to None, so that the next ExtentFlushClose sees @@ -3239,8 +3289,12 @@ impl Downstairs { */ let ds_state = self.clients[client_id].state(); match ds_state { - DsState::Active | DsState::Reconcile | DsState::LiveRepair => {} - DsState::Faulted => { + DsState::Active | DsState::LiveRepair => {} + DsState::Stopping(ClientStopReason::Fault(..)) + | DsState::Connecting { + mode: ConnectionMode::Faulted, + .. + } => { error!( self.clients[client_id].log, "Dropping job {}, this downstairs is faulted", ds_id, @@ -3370,6 +3424,7 @@ impl Downstairs { "Saw CrucibleError::UpstairsInactive on client {}!", client_id ); + // XXX should we also change the upstairs state here? self.clients[client_id].disable(up_state); } Some(CrucibleError::DecryptionError) => { @@ -3597,20 +3652,7 @@ impl Downstairs { fn repair_test_all_active() -> Self { let mut ds = Self::test_default(); - for cid in ClientId::iter() { - ds.clients[cid].checked_state_transition( - &UpstairsState::Active, - DsState::WaitActive, - ); - ds.clients[cid].checked_state_transition( - &UpstairsState::Active, - DsState::WaitQuorum, - ); - ds.clients[cid].checked_state_transition( - &UpstairsState::Active, - DsState::Active, - ); - } + ds.force_active(); let mut ddef = RegionDefinition::default(); ddef.set_block_size(512); @@ -3629,16 +3671,7 @@ impl Downstairs { // Set one of the clients to want a repair let to_repair = ClientId::new(1); - ds.clients[to_repair] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); - ds.clients[to_repair].checked_state_transition( - &UpstairsState::Active, - DsState::LiveRepairReady, - ); - ds.clients[to_repair].checked_state_transition( - &UpstairsState::Active, - DsState::LiveRepair, - ); + test::move_to_live_repair(&mut ds, to_repair); // At this point you might think it makes sense to run // `self.start_live_repair(&UpstairsState::Active, gw, 3, 0);` @@ -4241,7 +4274,7 @@ impl Downstairs { ClientRunResult::ReadFailed(_) => { DownstairsClientStoppedReason::ReadFailed } - ClientRunResult::RequestedStop(_) => { + ClientRunResult::RequestedStop => { // skip this notification, it fires for *every* Upstairs // deactivation //DownstairsClientStoppedReason::RequestedStop @@ -4355,13 +4388,13 @@ struct DownstairsBackpressureConfig { pub(crate) mod test { use super::{ ClientFaultReason, ClientNegotiationFailed, ClientStopReason, - Downstairs, PendingJob, + ConnectionMode, Downstairs, NegotiationState, PendingJob, }; use crate::{ downstairs::{LiveRepairData, LiveRepairState, ReconcileData}, live_repair::ExtentInfo, upstairs::UpstairsState, - BlockOpWaiter, ClientId, CrucibleError, DsState, ExtentFix, + BlockOpWaiter, BlockRes, ClientId, CrucibleError, DsState, ExtentFix, ExtentRepairIDs, IOState, IOop, ImpactedAddr, ImpactedBlocks, JobId, RawReadResponse, ReconcileIO, ReconcileIOState, ReconciliationId, SnapshotDetails, @@ -4417,23 +4450,60 @@ pub(crate) mod test { } } - fn set_all_reconcile(ds: &mut Downstairs) { - for i in ClientId::iter() { - ds.clients[i].checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitActive, - ); - ds.clients[i].checked_state_transition( - &UpstairsState::Initializing, - DsState::WaitQuorum, - ); - ds.clients[i].checked_state_transition( - &UpstairsState::Initializing, - DsState::Reconcile, + /// Helper function to legally move the given client to live-repair + fn to_live_repair_ready(ds: &mut Downstairs, to_repair: ClientId) { + ds.fault_client( + to_repair, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); + let mode = ConnectionMode::Faulted; + for state in [ + NegotiationState::Start { auto_promote: true }, + NegotiationState::WaitForPromote, + NegotiationState::WaitForRegionInfo, + NegotiationState::GetExtentVersions, + NegotiationState::LiveRepairReady, + ] { + ds.clients[to_repair].checked_state_transition( + &UpstairsState::Active, + DsState::Connecting { state, mode }, ); } } + /// Helper function to legally move the given client to live-repair + pub(super) fn move_to_live_repair( + ds: &mut Downstairs, + to_repair: ClientId, + ) { + to_live_repair_ready(ds, to_repair); + ds.clients[to_repair].checked_state_transition( + &UpstairsState::Active, + DsState::LiveRepair, + ); + } + + fn set_all_reconcile(ds: &mut Downstairs) { + let mode = ConnectionMode::New; + let up_state = UpstairsState::GoActive(BlockRes::dummy()); + for cid in ClientId::iter() { + for state in [ + NegotiationState::WaitActive, + NegotiationState::WaitForPromote, + NegotiationState::WaitForRegionInfo, + NegotiationState::GetExtentVersions, + NegotiationState::WaitQuorum, + NegotiationState::Reconcile, + ] { + ds.clients[cid].checked_state_transition( + &up_state, + DsState::Connecting { state, mode }, + ); + } + } + } + #[test] fn work_flush_three_ok() { let mut ds = Downstairs::test_default(); @@ -6053,7 +6123,6 @@ pub(crate) mod test { fn send_next_reconciliation_req_none() { // No repairs on the queue, should return None let mut ds = Downstairs::test_default(); - set_all_reconcile(&mut ds); ds.reconcile = Some(ReconcileData::new([])); @@ -6166,7 +6235,6 @@ pub(crate) mod test { fn reconcile_rep_in_progress_bad1() { // Verify the same downstairs can't mark a job in progress twice let mut ds = Downstairs::test_default(); - set_all_reconcile(&mut ds); let rep_id = ReconciliationId(0); ds.reconcile = Some(ReconcileData::new([ReconcileIO::new( @@ -6277,7 +6345,6 @@ pub(crate) mod test { fn reconcile_repair_inprogress_not_done() { // Verify Done or Skipped works when checking for a complete repair let mut ds = Downstairs::test_default(); - set_all_reconcile(&mut ds); let up_state = UpstairsState::Active; let rep_id = ReconciliationId(1); @@ -6312,7 +6379,6 @@ pub(crate) mod test { // Verify we can't start a new job before the old is finished. // Verify Done or Skipped works when checking for a complete repair let mut ds = Downstairs::test_default(); - set_all_reconcile(&mut ds); let up_state = UpstairsState::Active; let close_id = ReconciliationId(0); @@ -6367,8 +6433,11 @@ pub(crate) mod test { ds.force_active(); // Mark client 1 as faulted - ds.clients[ClientId::new(1)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + ClientId::new(1), + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); // Create a write, enqueue it on both the downstairs // and the guest work queues. @@ -6418,11 +6487,14 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); - // Mark client 1 as faulted - ds.clients[ClientId::new(1)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); - ds.clients[ClientId::new(2)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + // Mark clients 1 and 2 as faulted + for cid in [ClientId::new(1), ClientId::new(2)] { + ds.fault_client( + cid, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); + } // Create a write, enqueue it on both the downstairs // and the guest work queues. @@ -6460,8 +6532,11 @@ pub(crate) mod test { // will result in an error back to the guest. let mut ds = Downstairs::test_default(); ds.force_active(); - ds.clients[ClientId::new(2)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + ClientId::new(2), + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); // Create a write, enqueue it on both the downstairs // and the guest work queues. @@ -6501,8 +6576,11 @@ pub(crate) mod test { // from acking back OK for a flush to the guest. let mut ds = Downstairs::test_default(); ds.force_active(); - ds.clients[ClientId::new(1)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + ClientId::new(1), + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); // Create a flush, enqueue it on both the downstairs // and the guest work queues. @@ -6535,10 +6613,13 @@ pub(crate) mod test { // back to the guest. let mut ds = Downstairs::test_default(); ds.force_active(); - ds.clients[ClientId::new(1)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); - ds.clients[ClientId::new(2)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + for cid in [ClientId::new(1), ClientId::new(2)] { + ds.fault_client( + cid, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); + } // Create a flush, enqueue it on both the downstairs // and the guest work queues. @@ -6560,8 +6641,11 @@ pub(crate) mod test { // error back to the guest. let mut ds = Downstairs::test_default(); ds.force_active(); - ds.clients[ClientId::new(0)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + ClientId::new(0), + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); // Create a flush, enqueue it on both the downstairs // and the guest work queues. @@ -7371,8 +7455,11 @@ pub(crate) mod test { // downstairs has failed. One write, one read, and one flush. let mut ds = Downstairs::test_default(); ds.force_active(); - ds.clients[ClientId::new(0)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + ds.fault_client( + ClientId::new(0), + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); // Create a write let write_one = ds.create_and_enqueue_generic_write_eob(false); @@ -7483,10 +7570,13 @@ pub(crate) mod test { // one downstairs. let mut ds = Downstairs::test_default(); ds.force_active(); - ds.clients[ClientId::new(0)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); - ds.clients[ClientId::new(2)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + for cid in [ClientId::new(0), ClientId::new(2)] { + ds.fault_client( + cid, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); + } // Create a write let write_one = ds.create_and_enqueue_generic_write_eob(false); @@ -7579,9 +7669,10 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); for cid in ClientId::iter() { - ds.clients[cid].checked_state_transition( + ds.fault_client( + cid, &UpstairsState::Active, - DsState::Faulted, + ClientFaultReason::RequestedFault, ); } @@ -7613,9 +7704,10 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); for cid in ClientId::iter() { - ds.clients[cid].checked_state_transition( + ds.fault_client( + cid, &UpstairsState::Active, - DsState::Faulted, + ClientFaultReason::RequestedFault, ); } @@ -7645,9 +7737,10 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); for cid in ClientId::iter() { - ds.clients[cid].checked_state_transition( + ds.fault_client( + cid, &UpstairsState::Active, - DsState::Faulted, + ClientFaultReason::RequestedFault, ); } @@ -7675,9 +7768,10 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); for cid in ClientId::iter() { - ds.clients[cid].checked_state_transition( + ds.fault_client( + cid, &UpstairsState::Active, - DsState::Faulted, + ClientFaultReason::RequestedFault, ); } @@ -7728,9 +7822,10 @@ pub(crate) mod test { let mut ds = Downstairs::test_default(); ds.force_active(); for cid in ClientId::iter() { - ds.clients[cid].checked_state_transition( + ds.fault_client( + cid, &UpstairsState::Active, - DsState::Faulted, + ClientFaultReason::RequestedFault, ); } @@ -9650,19 +9745,7 @@ pub(crate) mod test { // Fault the downstairs let to_repair = ClientId::new(1); - ds.fault_client( - to_repair, - &UpstairsState::Active, - ClientFaultReason::RequestedFault, - ); - for s in [ - DsState::Faulted, - DsState::LiveRepairReady, - DsState::LiveRepair, - ] { - ds.clients[to_repair] - .checked_state_transition(&UpstairsState::Active, s); - } + move_to_live_repair(&mut ds, to_repair); let next_id = ds.peek_next_id().0; ds.repair = Some(LiveRepairData { @@ -9819,19 +9902,7 @@ pub(crate) mod test { // Fault the downstairs let to_repair = ClientId::new(1); - ds.fault_client( - to_repair, - &UpstairsState::Active, - ClientFaultReason::RequestedFault, - ); - for s in [ - DsState::Faulted, - DsState::LiveRepairReady, - DsState::LiveRepair, - ] { - ds.clients[to_repair] - .checked_state_transition(&UpstairsState::Active, s); - } + move_to_live_repair(&mut ds, to_repair); let next_id = ds.peek_next_id().0; @@ -9975,15 +10046,7 @@ pub(crate) mod test { // Fault the downstairs let to_repair = ClientId::new(1); - ds.fault_client( - to_repair, - &UpstairsState::Active, - ClientFaultReason::RequestedFault, - ); - for s in [DsState::Faulted, DsState::LiveRepairReady] { - ds.clients[to_repair] - .checked_state_transition(&UpstairsState::Active, s); - } + to_live_repair_ready(&mut ds, to_repair); // Start the repair normally. This enqueues the close & reopen jobs, and // reserves Job IDs for the repair/noop diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index fb7254fbd..984d12d69 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -12,8 +12,10 @@ use crate::guest::Guest; use crate::up_main; use crate::BlockIO; use crate::Buffer; +use crate::ConnectionMode; use crate::CrucibleError; use crate::DsState; +use crate::NegotiationState; use crate::{ IO_CACHED_MAX_BYTES, IO_CACHED_MAX_JOBS, IO_OUTSTANDING_MAX_BYTES, IO_OUTSTANDING_MAX_JOBS, @@ -1562,7 +1564,13 @@ async fn test_byte_fault_condition() { // Check to make sure that happened let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Faulted, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); @@ -1633,7 +1641,13 @@ async fn test_byte_fault_condition_offline() { // Check to make sure that happened let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Offline); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); @@ -1662,11 +1676,23 @@ async fn test_byte_fault_condition_offline() { let ds = harness.guest.downstairs_state().await.unwrap(); if (i + 1) * WRITE_SIZE < IO_OUTSTANDING_MAX_BYTES as usize { - assert_eq!(ds[ClientId::new(0)], DsState::Offline); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); } else { - assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Faulted, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); } @@ -1711,7 +1737,13 @@ async fn test_offline_can_deactivate() { // Check to make sure downstairs 1 is now offline. let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Offline); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); @@ -1748,7 +1780,13 @@ async fn test_offline_with_io_can_deactivate() { // Check to make sure downstairs 1 is now offline. let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Offline); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); @@ -1799,9 +1837,15 @@ async fn test_all_offline_with_io_can_deactivate() { // Check to make sure all downstairs are offline. let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Offline); - assert_eq!(ds[ClientId::new(1)], DsState::Offline); - assert_eq!(ds[ClientId::new(2)], DsState::Offline); + for cid in ClientId::iter() { + assert_eq!( + ds[cid], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); + } // We must `spawn` here because `read` will wait for the response to // come back before returning @@ -1898,7 +1942,13 @@ async fn test_job_fault_condition() { // Check to make sure that happened let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Faulted, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); @@ -1964,7 +2014,13 @@ async fn test_job_fault_condition_offline() { // Check to make sure that happened let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Offline); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); @@ -2007,12 +2063,24 @@ async fn test_job_fault_condition_offline() { let ds = harness.guest.downstairs_state().await.unwrap(); if i + barrier_count < IO_OUTSTANDING_MAX_JOBS { // At this point, we should still be offline - assert_eq!(ds[ClientId::new(0)], DsState::Offline); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); } else { // After ds1 is kicked out, we shouldn't see any more messages - assert_eq!(ds[ClientId::new(0)], DsState::Faulted); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Faulted, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); } @@ -2752,7 +2820,13 @@ async fn test_no_send_offline() { // Check to make sure that happened let ds = harness.guest.downstairs_state().await.unwrap(); - assert_eq!(ds[ClientId::new(0)], DsState::Offline); + assert_eq!( + ds[ClientId::new(0)], + DsState::Connecting { + mode: ConnectionMode::Offline, + state: NegotiationState::Start { auto_promote: true } + } + ); assert_eq!(ds[ClientId::new(1)], DsState::Active); assert_eq!(ds[ClientId::new(2)], DsState::Active); info!(harness.log, "DS1 is offline"); diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index e5a105e79..f6cf449a9 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -67,6 +67,7 @@ mod stats; pub use client::{ ClientFaultReason, ClientNegotiationFailed, ClientStopReason, + NegotiationState, }; pub use crucible_common::impacted_blocks::*; @@ -713,166 +714,102 @@ pub(crate) struct RawReadResponse { pub data: bytes::BytesMut, } -/* - * States of a downstairs - * - * This shows the different states a downstairs can be in from the point of - * view of the upstairs. - * - * Double line paths can only be taken if an upstairs is active and goes to - * deactivated. - * - * │ - * ┌──┐ ▼ - * bad│ │ │ - * version│ ┌▼───┴──────┐ - * └─┤ ╞═════◄══════════════════╗ - * ┌─────────────► New ╞═════◄════════════════╗ ║ - * │ ┌─────► ├─────◄──────┐ ║ ║ - * │ │ └────┬───┬──┘ │ ║ ║ - * │ │ ▼ └───►───┐ other │ ║ ║ - * │ bad│ ┌────┴──────┐ │ failures ║ ║ - * │ region│ │ Wait │ │ ▲ ║ ║ - * │ │ │ Active ├─►┐ │ │ ║ ║ - * │ │ └────┬──────┘ │ │ │ ║ ║ - * │ │ ┌────┴──────┐ │ └───────┤ ║ ║ - * │ │ │ Wait │ └─────────┤ ║ ║ - * │ └─────┤ Quorum ├──►─────────┤ ║ ║ - * │ └────┬──────┘ │ ║ ║ - * │ ........▼.......... │ ║ ║ - * │failed : ┌────┴──────┐ : │ ║ ║ - * │reconcile : │ Reconcile │ : │ ╔═╝ ║ - * └─────────────┤ ├──►─────────┘ ║ ║ - * : └────┬──────┘ : ║ ║ - * Not Active : │ : ▲ ▲ Not Active - * .............. . . . │. . . . ...................║...║............ - * Active ▼ ║ ║ Active - * ┌────┴──────┐ ┌──────────╨┐ ║ - * ┌─►─┤ Active ├─────►───┤Deactivated│ ║ - * │ │ │ ┌──────┤ ├─◄──────┐ - * │ └─┬───┬───┬─┘ │ └───────────┘ ║ │ - * │ ▼ ▼ ▲ ▲ ║ │ - * │ │ │ │ │ ║ │ - * │ │ │ │ │ ║ │ - * │ │ │ │ │ ║ │ - * │ │ │ │ │ ║ │ - * │ │ │ │ │ ║ │ - * │ │ │ │ │ ║ │ - * │ │ │ │ │ ║ │ - * │ │ ▼ ▲ ▲ ║ │ - * │ │ │ │ │ ▲ │ - * │ │ ┌─┴───┴────┴┐ ┌────────────╨──┐ │ - * │ │ │ Offline │ │ Faulted │ │ - * │ │ │ ├─────►─┤ │ │ - * │ │ └───────────┘ └─┬─┬───────┬─┬─┘ │ - * │ │ ▲ ▲ ▼ ▲ ▲ - * │ └───────────►───────────┘ │ │ │ │ - * │ │ │ │ │ - * │ ┌────────┴─┐ ┌─┴─┴────┴─┐ - * └──────────────────────┤ Live ├─◄─┤ Live │ - * │ Repair │ │ Repair │ - * │ │ │ Ready │ - * └──────────┘ └──────────┘ - * - * - * The downstairs state can go to Disabled from any other state, as that - * transition happens when a message is received from the actual - * downstairs on the other side of the connection.. - * The only path back at that point is for the Upstairs (who will self - * deactivate when it detects this) is to go back to New and through - * the reconcile process. - * ┌───────────┐ - * │ Disabled │ - * └───────────┘ - */ +/// High-level states for a Downstairs +/// +/// The state machine for a Downstairs is relatively simple: +/// +/// ```text +/// ┌────────────┐ +/// ┌────► LiveRepair ├─────┐ +/// start ┌─────────┴┐ └─────┬──────┘ ┌─▼──────┐ +/// ────►│Connecting│ │ │Stopping│ +/// └─▲───────┬┘ ┌─────▼──────┐ └─▲────┬─┘ +/// │ └────► Active ├─────┘ │ +/// │ └─────┬──────┘ │ +/// │ │ │ +/// └─────────────────◄┴─────────────────┘ +/// ``` +/// +/// Complexity is hidden in the `Connecting` state, which wraps a +/// [`NegotiationState`] implementing the negotiation state machine. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "snake_case")] #[serde(tag = "type", content = "value")] pub enum DsState { - /* - * New connection - */ - New, - /* - * Waiting for activation signal. - */ - WaitActive, - /* - * Waiting for the minimum number of downstairs to be present. - */ - WaitQuorum, - /* - * Initial startup, downstairs are repairing from each other. - */ - Reconcile, - /* - * Ready for and/or currently receiving IO - */ + /// New connection + Connecting { + state: NegotiationState, + mode: ConnectionMode, + }, + + /// Ready for and/or currently receiving IO Active, - /* - * IO attempts to this downstairs are failing at too high of a - * rate, or it is not able to keep up, or it is having some - * error such that we can no longer use it. - */ - Faulted, - /* - * This downstairs was failed, but has disconnected and now we - * are ready to repair it. - */ - LiveRepairReady, - /* - * This downstairs is undergoing LiveRepair - */ + + /// This downstairs is undergoing LiveRepair LiveRepair, - /* - * This downstairs was active, but is now no longer connected. - * We may have work for it in memory, so a replay is possible - * if this downstairs reconnects in time. - */ - Offline, - /* - * The current downstairs tasks have ended and the replacement has - * begun. - */ - Replaced, /// The IO task for the client is being stopped - Stopping(crate::client::ClientStopReason), + Stopping(ClientStopReason), } + impl std::fmt::Display for DsState { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - DsState::New => { - write!(f, "New") - } - DsState::WaitActive => { + DsState::Connecting { + state: NegotiationState::WaitActive, + .. + } => { write!(f, "WaitActive") } - DsState::WaitQuorum => { + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } => { write!(f, "WaitQuorum") } - DsState::Reconcile => { + DsState::Connecting { + state: NegotiationState::Reconcile, + .. + } => { write!(f, "Reconcile") } + DsState::Connecting { + state: NegotiationState::LiveRepairReady, + .. + } => { + write!(f, "LiveRepairReady") + } DsState::Active => { write!(f, "Active") } - DsState::Faulted => { - write!(f, "Faulted") - } - DsState::LiveRepairReady => { - write!(f, "LiveRepairReady") + DsState::Connecting { + mode: ConnectionMode::New, + .. + } => { + write!(f, "New") } - DsState::LiveRepair => { - write!(f, "LiveRepair") + DsState::Connecting { + mode: ConnectionMode::Faulted, + .. + } => { + write!(f, "Faulted") } - DsState::Offline => { + DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } => { write!(f, "Offline") } - DsState::Replaced => { + DsState::Connecting { + mode: ConnectionMode::Replaced, + .. + } => { write!(f, "Replaced") } + DsState::LiveRepair => { + write!(f, "LiveRepair") + } DsState::Stopping(..) => { write!(f, "Stopping") } @@ -880,6 +817,19 @@ impl std::fmt::Display for DsState { } } +#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum ConnectionMode { + /// Connect through reconciliation once a quorum has come online + New, + /// Replay cached jobs when reconnecting + Offline, + /// Reconnect through live-repair + Faulted, + /// Reconnect through live-repair; the address is allowed to change + Replaced, +} + /* * A unit of work for downstairs that is put into the hashmap. */ diff --git a/upstairs/src/live_repair.rs b/upstairs/src/live_repair.rs index 543d8c3e9..a45d5f4ac 100644 --- a/upstairs/src/live_repair.rs +++ b/upstairs/src/live_repair.rs @@ -1109,10 +1109,26 @@ pub mod repair_test { assert_eq!(up.downstairs.last_repair_extent(), None); assert!(up.downstairs.repair().is_none()); - // Start the LiveRepair - let client = &mut up.downstairs.clients[ClientId::new(1)]; - client.checked_state_transition(&up.state, DsState::Faulted); - client.checked_state_transition(&up.state, DsState::LiveRepairReady); + // Start the LiveRepair, manually walking through states + let client = ClientId::new(1); + up.downstairs.fault_client( + client, + &up.state, + ClientFaultReason::RequestedFault, + ); + let mode = ConnectionMode::Faulted; + for state in [ + NegotiationState::Start { auto_promote: true }, + NegotiationState::WaitForPromote, + NegotiationState::WaitForRegionInfo, + NegotiationState::GetExtentVersions, + NegotiationState::LiveRepairReady, + ] { + up.downstairs.clients[client].checked_state_transition( + &up.state, + DsState::Connecting { state, mode }, + ); + } up.on_repair_check(); assert!(up.downstairs.live_repair_in_progress()); assert_eq!(up.downstairs.last_repair_extent(), Some(ExtentId(0))); diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 8c7c71a20..01d2c11c6 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -2,7 +2,10 @@ //! Data structures specific to Crucible's `struct Upstairs` use crate::{ cdt, - client::{ClientAction, ClientRunResult, ClientStopReason}, + client::{ + ClientAction, ClientNegotiationFailed, ClientRunResult, + ClientStopReason, NegotiationResult, NegotiationState, + }, control::ControlRequest, deferred::{ DeferredBlockOp, DeferredMessage, DeferredQueue, DeferredRead, @@ -12,9 +15,9 @@ use crate::{ extent_from_offset, io_limits::IOLimitGuard, stats::UpStatOuter, - BlockOp, BlockRes, Buffer, ClientId, ClientMap, CrucibleOpts, DsState, - EncryptionContext, GuestIoHandle, Message, RegionDefinition, - RegionDefinitionStatus, SnapshotDetails, WQCounts, + BlockOp, BlockRes, Buffer, ClientId, ClientMap, ConnectionMode, + CrucibleOpts, DsState, EncryptionContext, GuestIoHandle, Message, + RegionDefinition, RegionDefinitionStatus, SnapshotDetails, WQCounts, }; use crucible_client_types::RegionExtentInfo; use crucible_common::{BlockIndex, CrucibleError}; @@ -616,13 +619,21 @@ impl Upstairs { if matches!(&self.state, UpstairsState::Deactivating(..)) { info!(self.log, "checking for deactivation"); for i in ClientId::iter() { + debug!( + self.log, + "client {i} has state {:?}", + self.downstairs.clients[i].state() + ); // Clients become Stopping, then New (when the IO task // completes and the client is restarted). We don't try to // deactivate them _again_ in such cases. if matches!( self.downstairs.clients[i].state(), DsState::Stopping(ClientStopReason::Deactivated) - | DsState::New + | DsState::Connecting { + mode: ConnectionMode::New, + .. + } ) { debug!(self.log, "already deactivated {i}"); } else if self.downstairs.try_deactivate(i, &self.state) { @@ -729,7 +740,13 @@ impl Upstairs { } for cid in ClientId::iter() { - if self.downstairs.clients[cid].state() == DsState::Offline { + if matches!( + self.downstairs.clients[cid].state(), + DsState::Connecting { + mode: ConnectionMode::Offline, + .. + } + ) { self.downstairs.check_gone_too_long(cid, &self.state); } } @@ -861,14 +878,16 @@ impl Upstairs { } /// Checks if a repair is possible. If so, checks if any Downstairs is in - /// the [DsState::LiveRepairReady] state, indicating it needs to be - /// repaired. If a Downstairs needs to be repaired, try to start repairing - /// it. When starting the repair fails, this function will schedule a task - /// to retry the repair by setting [Self::repair_check_deadline]. + /// the [DsState::Connecting] state with the negotiation state of + /// [NegotiationState::LiveRepairReady], indicating it needs to be repaired. + /// If a Downstairs needs to be repaired, try to start repairing it. When + /// starting the repair fails, this function will schedule a task to retry + /// the repair by setting [Self::repair_check_deadline]. /// /// If this Upstairs is [UpstairsConfig::read_only], this function will move - /// any Downstairs from [DsState::LiveRepairReady] back to [DsState::Active] - /// without actually performing any repair. + /// any Downstairs from + /// `DsState::Connecting { state: NegotiationState::LiveRepairReady, .. }` + /// back to [DsState::Active] without actually performing any repair. pub(crate) fn on_repair_check(&mut self) { info!(self.log, "Checking if live repair is needed"); if !matches!(self.state, UpstairsState::Active) { @@ -893,11 +912,15 @@ impl Upstairs { // before we begin a live repair. let repair_in_progress = self.downstairs.live_repair_in_progress(); - let any_in_repair_ready = self - .downstairs - .clients - .iter() - .any(|c| c.state() == DsState::LiveRepairReady); + let any_in_repair_ready = self.downstairs.clients.iter().any(|c| { + matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::LiveRepairReady, + .. + } + ) + }); if repair_in_progress { info!(self.log, "Live Repair already running"); @@ -1151,10 +1174,9 @@ impl Upstairs { #[cfg(test)] BlockOp::GetDownstairsState { done } => { - let mut out = crate::ClientData::new(DsState::New); - for i in ClientId::iter() { - out[i] = self.downstairs.clients[i].state(); - } + let out = crate::ClientData::from_fn(|i| { + self.downstairs.clients[i].state() + }); done.send_ok(out); } @@ -1221,6 +1243,12 @@ impl Upstairs { UpstairsState::Initializing => { self.state = UpstairsState::GoActive(res); info!(self.log, "{} active request set", self.cfg.upstairs_id); + + // Notify all clients that they should go active when they hit + // an appropriate state in their negotiation. + for c in self.downstairs.clients.iter_mut() { + c.set_active_request(); + } } UpstairsState::GoActive(..) => { // We have already been sent a request to go active, but we @@ -1232,7 +1260,6 @@ impl Upstairs { self.cfg.upstairs_id ); res.send_err(CrucibleError::UpstairsActivateInProgress); - return; } UpstairsState::Deactivating(..) => { warn!( @@ -1240,7 +1267,6 @@ impl Upstairs { "{} active denied while Deactivating", self.cfg.upstairs_id ); res.send_err(CrucibleError::UpstairsDeactivating); - return; } UpstairsState::Active => { // We are already active, so go ahead and respond again. @@ -1250,14 +1276,8 @@ impl Upstairs { self.cfg.upstairs_id ); res.send_ok(()); - return; } } - // Notify all clients that they should go active when they hit an - // appropriate state in their negotiation. - for c in self.downstairs.clients.iter_mut() { - c.set_active_request(); - } } /// Request that the Upstairs deactivate @@ -1684,42 +1704,52 @@ impl Upstairs { .continue_negotiation(m, &self.state, &mut self.ddef); match r { - // continue_negotiation returns an error if the upstairs - // should go inactive! - Err(e) => self.set_inactive(e), - Ok(false) => (), - Ok(true) => { + Err(e) => { + // If we received an error, then abort negotiation + let f: ClientNegotiationFailed = e.into(); + self.downstairs.clients[client_id] + .abort_negotiation(&self.state, f); + + match f { + // Out-of-order messages and failed reconciliation + // may be fixed by reconnecting and giving the + // Downstairs a second chance to get in sync, so + // we'll do that! + ClientNegotiationFailed::BadNegotiationOrder + | ClientNegotiationFailed::FailedReconcile + // If we're doing a live update of the rack, it's + // possible that the Upstairs gets updated before + // the Downstairs, so we'll retry here as well. + | ClientNegotiationFailed::IncompatibleVersion + => (), + + // Incompatibility is likely persistent, so we set + // the upstairs as inactive + ClientNegotiationFailed::IncompatibleSession + | ClientNegotiationFailed::IncompatibleSettings + => { + self.set_inactive(e.into()) + } + } + } + Ok(NegotiationResult::NotDone) => (), + Ok(NegotiationResult::WaitQuorum) => { // Copy the region definition into the Downstairs self.downstairs.set_ddef(self.ddef.get_def().unwrap()); - - // Check to see whether we want to replay jobs (if the - // Downstairs is coming back from being Offline) - // TODO should we only do this in certain new states? - self.downstairs.check_replay(client_id); - - // Negotiation succeeded for this Downstairs, let's see - // what we can do from here - match self.downstairs.clients[client_id].state() { - DsState::Active => (), - - DsState::WaitQuorum => { - // See if we have a quorum - if self.connect_region_set() { - // We connected normally, so there's no need - // to check for live-repair. - self.repair_check_deadline = None; - } - } - - DsState::LiveRepairReady => { - // Immediately check for live-repair - self.repair_check_deadline = - Some(Instant::now()); - } - - s => panic!("bad state after negotiation: {s:?}"), + // See if we have a quorum + if self.connect_region_set() { + // We connected normally, so there's no need + // to check for live-repair. + self.repair_check_deadline = None; } } + Ok(NegotiationResult::Replay) => { + self.downstairs.replay_jobs(client_id); + } + Ok(NegotiationResult::LiveRepair) => { + // Immediately check for live-repair + self.repair_check_deadline = Some(Instant::now()); + } } } @@ -1743,7 +1773,7 @@ impl Upstairs { &self.state, ) { // reconciliation is done, great work everyone - self.on_reconciliation_done(DsState::Reconcile); + self.on_reconciliation_done(true); } } @@ -1859,12 +1889,16 @@ impl Upstairs { * Make sure all downstairs are in the correct state before we * proceed. */ - let not_ready = self - .downstairs - .clients - .iter() - .any(|c| c.state() != DsState::WaitQuorum); - if not_ready { + let ready = self.downstairs.clients.iter().all(|c| { + matches!( + c.state(), + DsState::Connecting { + state: NegotiationState::WaitQuorum, + .. + } + ) + }); + if !ready { info!(self.log, "Waiting for more clients to be ready"); return false; } @@ -1888,7 +1922,7 @@ impl Upstairs { // to reset that activation request. Call // `abort_reconciliation` to abort reconciliation for all // clients. - self.set_inactive(e); + self.set_inactive(e.into()); self.downstairs.abort_reconciliation(&self.state); false } @@ -1901,7 +1935,7 @@ impl Upstairs { } Ok(false) => { info!(self.log, "No downstairs reconciliation required"); - self.on_reconciliation_done(DsState::WaitQuorum); + self.on_reconciliation_done(false); info!(self.log, "Set Active after no reconciliation"); true } @@ -1909,10 +1943,10 @@ impl Upstairs { } /// Called when reconciliation is complete - fn on_reconciliation_done(&mut self, from_state: DsState) { + fn on_reconciliation_done(&mut self, did_work: bool) { // This should only ever be called if reconciliation completed // successfully; make some assertions to that effect. - self.downstairs.on_reconciliation_done(from_state); + self.downstairs.on_reconciliation_done(did_work); info!(self.log, "All required reconciliation work is completed"); info!( @@ -2098,7 +2132,7 @@ impl Upstairs { pub(crate) mod test { use super::*; use crate::{ - client::ClientStopReason, + client::{ClientFaultReason, ClientStopReason}, test::{make_encrypted_upstairs, make_upstairs}, Block, BlockOp, BlockOpWaiter, DsState, JobId, }; @@ -2134,9 +2168,7 @@ pub(crate) mod test { let mut up = create_test_upstairs(); // Move our downstairs client fail_id to LiveRepair. - let client = &mut up.downstairs.clients[or_ds]; - client.checked_state_transition(&up.state, DsState::Faulted); - client.checked_state_transition(&up.state, DsState::LiveRepairReady); + to_live_repair_ready(&mut up, or_ds); // Start repairing the downstairs; this also enqueues the jobs up.apply(UpstairsAction::RepairCheck); @@ -2146,7 +2178,7 @@ pub(crate) mod test { assert!(up.repair_check_deadline.is_none()); assert!(up.downstairs.live_repair_in_progress()); - // The first thing that should happen after we start repair_exetnt + // The first thing that should happen after we start repair_extent // is two jobs show up on the work queue, one for close and one for // the eventual re-open. Wait here for those jobs to show up on the // work queue before returning. @@ -2155,15 +2187,53 @@ pub(crate) mod test { up } + /// Helper function to legally move the given client to live-repair ready + fn to_live_repair_ready(up: &mut Upstairs, to_repair: ClientId) { + up.downstairs.fault_client( + to_repair, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); + // Restart the IO task (because we'll be faking messages from it) + up.apply(UpstairsAction::Downstairs(DownstairsAction::Client { + client_id: to_repair, + action: ClientAction::TaskStopped(ClientRunResult::RequestedStop), + })); + let mode = ConnectionMode::Faulted; + for state in [ + NegotiationState::WaitForPromote, + NegotiationState::WaitForRegionInfo, + NegotiationState::GetExtentVersions, + NegotiationState::LiveRepairReady, + ] { + up.downstairs.clients[to_repair].checked_state_transition( + &up.state, + DsState::Connecting { state, mode }, + ); + } + } + #[test] fn reconcile_not_ready() { // Verify reconcile returns false when a downstairs is not ready let mut up = Upstairs::test_default(None); - up.ds_transition(ClientId::new(0), DsState::WaitActive); - up.ds_transition(ClientId::new(0), DsState::WaitQuorum); - - up.ds_transition(ClientId::new(1), DsState::WaitActive); - up.ds_transition(ClientId::new(1), DsState::WaitQuorum); + for cid in [ClientId::new(0), ClientId::new(1)] { + for state in [ + NegotiationState::WaitActive, + NegotiationState::WaitForPromote, + NegotiationState::WaitForRegionInfo, + NegotiationState::GetExtentVersions, + NegotiationState::WaitQuorum, + ] { + up.ds_transition( + cid, + DsState::Connecting { + mode: ConnectionMode::New, + state, + }, + ); + } + } let res = up.connect_region_set(); assert!(!res); @@ -2237,14 +2307,20 @@ pub(crate) mod test { up.apply(UpstairsAction::Downstairs(DownstairsAction::Client { client_id, action: ClientAction::TaskStopped( - ClientRunResult::RequestedStop( - ClientStopReason::Deactivated, - ), + ClientRunResult::RequestedStop, ), })); // This causes the downstairs state to be reinitialized - assert_eq!(up.ds_state(client_id), DsState::New); + assert_eq!( + up.ds_state(client_id), + DsState::Connecting { + state: NegotiationState::Start { + auto_promote: false + }, + mode: ConnectionMode::New + } + ); if client_id.get() < 2 { assert!(matches!(up.state, UpstairsState::Deactivating { .. })); @@ -3414,8 +3490,7 @@ pub(crate) mod test { up.force_active().unwrap(); // Force client 1 into LiveRepairReady - up.ds_transition(ClientId::new(1), DsState::Faulted); - up.ds_transition(ClientId::new(1), DsState::LiveRepairReady); + to_live_repair_ready(&mut up, ClientId::new(1)); up.on_repair_check(); assert!(up.repair_check_deadline.is_none()); assert!(up.downstairs.live_repair_in_progress()); @@ -3434,10 +3509,9 @@ pub(crate) mod test { let mut up = Upstairs::test_default(Some(ddef)); up.force_active().unwrap(); + // Force clients 1 and 2 into LiveRepairReady for i in [1, 2].into_iter().map(ClientId::new) { - // Force client 1 into LiveRepairReady - up.ds_transition(i, DsState::Faulted); - up.ds_transition(i, DsState::LiveRepairReady); + to_live_repair_ready(&mut up, i); } up.on_repair_check(); assert!(up.repair_check_deadline.is_none()); @@ -3459,8 +3533,7 @@ pub(crate) mod test { let mut up = Upstairs::test_default(Some(ddef)); up.force_active().unwrap(); - up.ds_transition(ClientId::new(1), DsState::Faulted); - up.ds_transition(ClientId::new(1), DsState::LiveRepairReady); + to_live_repair_ready(&mut up, ClientId::new(1)); up.ds_transition(ClientId::new(1), DsState::LiveRepair); // Start the live-repair @@ -3471,8 +3544,7 @@ pub(crate) mod test { // Pretend that DS 0 faulted then came back through to LiveRepairReady; // we won't halt the existing repair, but will configure // repair_check_deadline to check again in the future. - up.ds_transition(ClientId::new(0), DsState::Faulted); - up.ds_transition(ClientId::new(0), DsState::LiveRepairReady); + to_live_repair_ready(&mut up, ClientId::new(0)); up.on_repair_check(); assert!(up.downstairs.live_repair_in_progress()); @@ -3488,8 +3560,7 @@ pub(crate) mod test { let mut up = Upstairs::test_default(Some(ddef)); up.force_active().unwrap(); - up.ds_transition(ClientId::new(1), DsState::Faulted); - up.ds_transition(ClientId::new(1), DsState::LiveRepairReady); + to_live_repair_ready(&mut up, ClientId::new(1)); up.on_repair_check(); assert!(up.repair_check_deadline.is_none()); @@ -3633,9 +3704,7 @@ pub(crate) mod test { up.apply(UpstairsAction::Downstairs(DownstairsAction::Client { client_id, action: ClientAction::TaskStopped( - ClientRunResult::RequestedStop( - ClientStopReason::Deactivated, - ), + ClientRunResult::RequestedStop, ), })); } @@ -3648,7 +3717,15 @@ pub(crate) mod test { // Verify after the ds_missing, all downstairs are New for c in up.downstairs.clients.iter() { - assert_eq!(c.state(), DsState::New); + assert_eq!( + c.state(), + DsState::Connecting { + mode: ConnectionMode::New, + state: NegotiationState::Start { + auto_promote: false + } + } + ); } } @@ -4358,12 +4435,13 @@ pub(crate) mod test { let mut up = make_upstairs(); up.force_active().unwrap(); - up.downstairs.clients[ClientId::new(0)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); - up.downstairs.clients[ClientId::new(1)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); - up.downstairs.clients[ClientId::new(2)] - .checked_state_transition(&UpstairsState::Active, DsState::Faulted); + for i in ClientId::iter() { + up.downstairs.fault_client( + i, + &UpstairsState::Active, + ClientFaultReason::RequestedFault, + ); + } let data = Buffer::new(1, 512); let offset = BlockIndex(7);