From 9232726e5b42db29d61c50c71e15f529b38d3e3b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 17 Aug 2022 13:44:33 +0200 Subject: [PATCH 1/6] Refactoring MQTT state management --- CHANGELOG.md | 3 + Cargo.toml | 5 +- src/lib.rs | 4 +- src/mqtt_client.rs | 415 ++++++++++++++++++++------------------ src/session_state.rs | 2 +- tests/integration_test.rs | 20 +- 6 files changed, 244 insertions(+), 205 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5888e06..5bd48b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ This document describes the changes to Minimq between releases. ## Added * Allow configuration of non-default broker port numbers +## Changed +* [breaking] The client is no longer publicly exposed, and is instead accessible via `Minimq::client()` + ## Fixed # [0.5.3] - 2022-02-14 diff --git a/Cargo.toml b/Cargo.toml index d1be8fc..098f253 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,9 +19,12 @@ bit_field = "0.10.0" enum-iterator = "0.6.0" heapless = "0.7" log = {version = "0.4", optional = true} -smlang = "0.4" embedded-time = "0.12" +[dependencies.smlang] +git = "https://github.com/ryan-summers/smlang-rs" +branch = "feature/versatile-guard-errors" + [dependencies.embedded-nal] version = "0.6" diff --git a/src/lib.rs b/src/lib.rs index 17c7003..80e0045 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,8 +44,8 @@ //! let mut subscribed = false; //! //! loop { -//! if mqtt.client.is_connected() && !subscribed { -//! mqtt.client.subscribe("topic", &[]).unwrap(); +//! if mqtt.client().is_connected() && !subscribed { +//! mqtt.client().subscribe("topic", &[]).unwrap(); //! subscribed = true; //! } //! diff --git a/src/mqtt_client.rs b/src/mqtt_client.rs index e89d103..eee0bcf 100644 --- a/src/mqtt_client.rs +++ b/src/mqtt_client.rs @@ -22,144 +22,89 @@ mod sm { statemachine! { transitions: { - *Restart + GotSocket = ConnectTransport, - ConnectTransport + Connect = ConnectBroker, - ConnectBroker + SentConnect = Establishing, - ConnectBroker + Disconnect = Restart, + *Init + Update [ reconnect ] = Restart, + Restart + Update [ connect_to_broker ] = Establishing, Establishing + ReceivedConnAck = Active, - Establishing + Disconnect = Restart, - Active + Disconnect = Restart, - } + _ + Disconnect [ reconnect ] = Restart, + }, + custom_guard_error: true, } - - pub struct Context; - - impl StateMachineContext for Context {} } -use sm::{Context, Events, StateMachine, States}; +use sm::{Events, StateMachine, States}; -/// The general structure for managing MQTT via Minimq. -pub struct Minimq -where - TcpStack: TcpClientStack, - Clock: embedded_time::Clock, -{ - pub client: MqttClient, - packet_reader: PacketReader, -} - -/// A client for interacting with an MQTT Broker. -pub struct MqttClient< +struct ClientContext< TcpStack: TcpClientStack, Clock: embedded_time::Clock, const MSG_SIZE: usize, const MSG_COUNT: usize, > { pub(crate) network: InterfaceHolder, - clock: Clock, session_state: SessionState, - connection_state: StateMachine, will: Option>, } -impl - MqttClient +impl sm::StateMachineContext + for ClientContext where TcpStack: TcpClientStack, Clock: embedded_time::Clock, { - fn process(&mut self) -> Result<(), Error> { - // Potentially update the state machine depending on the current socket connection status. - if !self.network.tcp_connected()? { - self.connection_state.process_event(Events::Disconnect).ok(); - } else { - self.connection_state.process_event(Events::Connect).ok(); - } - - match *self.connection_state.state() { - // In the RESTART state, we need to reopen the TCP socket. - States::Restart => { - self.network.allocate_socket()?; - - self.connection_state - .process_event(Events::GotSocket) - .unwrap(); - } + type GuardError = crate::Error; - // In the connect transport state, we need to connect our TCP socket to the broker. - States::ConnectTransport => { - self.network.connect(self.session_state.broker)?; - } - - // Next, connect to the broker via the MQTT protocol. - States::ConnectBroker => { - let properties = [ - // Tell the broker our maximum packet size. - Property::MaximumPacketSize(MSG_SIZE as u32), - // The session does not expire. - Property::SessionExpiryInterval(u32::MAX), - ]; - - let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; - let packet = serialize::connect_message( - &mut buffer, - self.session_state.client_id.as_str().as_bytes(), - self.session_state.keepalive_interval(), - &properties, - // Only perform a clean start if we do not have any session state. - !self.session_state.is_present(), - self.will.as_ref(), - )?; - - info!("Sending CONNECT"); - self.network.write(packet)?; - - self.connection_state - .process_event(Events::SentConnect) - .unwrap(); - } + fn reconnect(&mut self) -> Result<(), Self::GuardError> { + self.network.allocate_socket()?; + self.network.connect(self.session_state.broker)?; + Ok(()) + } - States::Establishing => {} - _ => {} + fn connect_to_broker(&mut self) -> Result<(), Self::GuardError> { + if !self.network.tcp_connected()? { + return Err(Error::NotReady); } - // Attempt to finish any pending packets. - self.network.finish_write()?; - - self.handle_timers()?; - - Ok(()) - } + let properties = [ + // Tell the broker our maximum packet size. + Property::MaximumPacketSize(MSG_SIZE as u32), + // The session does not expire. + Property::SessionExpiryInterval(u32::MAX), + ]; - /// Specify the Will message to be sent if the client disconnects. - /// - /// # Args - /// * `topic` - The topic to send the message on - /// * `data` - The message to transmit - /// * `qos` - The quality of service at which to send the message. - /// * `retained` - Specifies whether the will message should be retained by the broker. - /// * `properties` - Any properties to send with the will message. - pub fn set_will( - &mut self, - topic: &str, - data: &[u8], - qos: QoS, - retained: Retain, - properties: &[Property], - ) -> Result<(), Error> { - let mut will = Will::new(topic, data, properties)?; - will.retained(retained); - will.qos(qos); + let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; + let packet = serialize::connect_message( + &mut buffer, + self.session_state.client_id.as_str().as_bytes(), + self.session_state.keepalive_interval(), + &properties, + // Only perform a clean start if we do not have any session state. + !self.session_state.is_present(), + self.will.as_ref(), + )?; + + info!("Sending CONNECT"); + self.network.write(packet)?; - self.will.replace(will); Ok(()) } +} - fn reset(&mut self) { - self.connection_state.process_event(Events::Disconnect).ok(); - } +pub struct MqttClient< + TcpStack: TcpClientStack, + Clock: embedded_time::Clock, + const MSG_SIZE: usize, + const MSG_COUNT: usize, +> { + clock: Clock, + sm: sm::StateMachine>, +} +impl< + TcpStack: TcpClientStack, + Clock: embedded_time::Clock, + const MSG_SIZE: usize, + const MSG_COUNT: usize, + > MqttClient +{ /// Configure the MQTT keep-alive interval. /// /// # Note @@ -176,13 +121,14 @@ where &mut self, interval_seconds: u16, ) -> Result<(), Error> { - if (self.connection_state.state() == &States::Active) - || (self.connection_state.state() == &States::Establishing) - { + if (self.sm.state() == &States::Active) || (self.sm.state() == &States::Establishing) { return Err(Error::NotReady); } - self.session_state.set_keepalive(interval_seconds); + self.sm + .context_mut() + .session_state + .set_keepalive(interval_seconds); Ok(()) } @@ -194,11 +140,43 @@ where /// # Args /// * `port` - The Port number to connect to. pub fn set_broker_port(&mut self, port: u16) -> Result<(), Error> { - if *self.connection_state.state() != States::Restart { + if self.sm.state() != &States::Restart { return Err(Error::NotReady); } - self.session_state.broker.set_port(port); + self.sm.context_mut().session_state.broker.set_port(port); + Ok(()) + } + + /// Determine if the client has established a connection with the broker. + /// + /// # Returns + /// True if the client is connected to the broker. + pub fn is_connected(&mut self) -> bool { + self.sm.state() == &States::Active + } + + /// Specify the Will message to be sent if the client disconnects. + /// + /// # Args + /// * `topic` - The topic to send the message on + /// * `data` - The message to transmit + /// * `qos` - The quality of service at which to send the message. + /// * `retained` - Specifies whether the will message should be retained by the broker. + /// * `properties` - Any properties to send with the will message. + pub fn set_will( + &mut self, + topic: &str, + data: &[u8], + qos: QoS, + retained: Retain, + properties: &[Property], + ) -> Result<(), Error> { + let mut will = Will::new(topic, data, properties)?; + will.retained(retained); + will.qos(qos); + + self.sm.context_mut().will.replace(will); Ok(()) } @@ -216,27 +194,33 @@ where topic: &'a str, properties: &[Property<'b>], ) -> Result<(), Error> { - if self.connection_state.state() != &States::Active { + if self.sm.state() != &States::Active { return Err(Error::NotReady); } + let ClientContext { + network, + session_state, + .. + } = self.sm.context_mut(); + // We can't subscribe if there's a pending write in the network. - if self.network.has_pending_write() { + if network.has_pending_write() { return Err(Error::NotReady); } - let packet_id = self.session_state.get_packet_identifier(); + let packet_id = session_state.get_packet_identifier(); let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; let packet = serialize::subscribe_message(&mut buffer, topic, packet_id, properties)?; - self.network.write(packet).and_then(|_| { + network.write(packet).and_then(|_| { info!("Subscribing to `{}`: {}", topic, packet_id); - self.session_state + session_state .pending_subscriptions .push(packet_id) .map_err(|_| Error::Unsupported)?; - self.session_state.increment_packet_identifier(); + session_state.increment_packet_identifier(); Ok(()) }) } @@ -246,15 +230,12 @@ where /// # Returns /// True if any subscriptions are waiting for confirmation from the broker. pub fn subscriptions_pending(&self) -> bool { - !self.session_state.pending_subscriptions.is_empty() - } - - /// Determine if the client has established a connection with the broker. - /// - /// # Returns - /// True if the client is connected to the broker. - pub fn is_connected(&mut self) -> bool { - self.connection_state.state() == &States::Active + !self + .sm + .context() + .session_state + .pending_subscriptions + .is_empty() } /// Get the count of unacknowledged QoS 1 messages. @@ -262,7 +243,7 @@ where /// # Returns /// Number of pending messages with QoS 1. pub fn pending_messages(&self, qos: QoS) -> usize { - self.session_state.pending_messages(qos) + self.sm.context().session_state.pending_messages(qos) } /// Determine if the client is able to process QoS 1 publish requess. @@ -272,11 +253,11 @@ where pub fn can_publish(&self, qos: QoS) -> bool { // We cannot publish if there's a pending write in the network stack. That message must be // completed first. - if self.network.has_pending_write() { + if self.sm.context().network.has_pending_write() { return false; } - self.session_state.can_publish(qos) + self.sm.context().session_state.can_publish(qos) } /// Publish a message over MQTT. @@ -315,83 +296,121 @@ where topic, data, properties ); + let ClientContext { + session_state, + network, + .. + } = self.sm.context_mut(); + // If QoS 0 the ID will be ignored - let id = self.session_state.get_packet_identifier(); + let id = session_state.get_packet_identifier(); let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; let packet = serialize::publish_message(&mut buffer, topic, data, qos, retain, id, properties)?; - self.network.write(packet)?; - self.session_state.increment_packet_identifier(); + network.write(packet)?; + session_state.increment_packet_identifier(); if qos == QoS::AtLeastOnce { - self.session_state.handle_publish(qos, id, packet); + session_state.handle_publish(qos, id, packet); + } + + Ok(()) + } + + fn handle_disconnection(&mut self) -> Result<(), Error> { + match self.sm.process_event(Events::Disconnect) { + Err(sm::Error::GuardFailed(error)) => return Err(error), + other => other.unwrap(), + }; + + Ok(()) + } + + fn update(&mut self) -> Result<(), Error> { + self.sm.process_event(Events::Update).ok(); + + // Potentially update the state machine depending on the current socket connection status. + if !self.sm.context_mut().network.tcp_connected()? && self.sm.state() != &States::Restart { + self.handle_disconnection()?; } + // Attempt to finish any pending packets. + self.sm.context_mut().network.finish_write()?; + + self.handle_timers()?; + Ok(()) } + fn reset(&mut self) -> Result<(), Error> { + self.handle_disconnection() + } + fn handle_connection_acknowledge( &mut self, acknowledge: ConnAck, ) -> Result<(), Error> { - if self.connection_state.state() != &States::Establishing { + if self.sm.state() != &States::Establishing { return Err(Error::Protocol(ProtocolError::Invalid)); } - let mut result = Ok(()); - if acknowledge.reason_code != 0 { return Err(Error::Failed(acknowledge.reason_code)); } - if !acknowledge.session_present { - if self.session_state.is_present() { - result = Err(Error::SessionReset); - } + self.sm.process_event(Events::ReceivedConnAck).unwrap(); - // Reset the session state upon connection with a broker that doesn't have a - // session state saved for us. - self.session_state.reset(); - } + let ClientContext { + session_state, + network, + .. + } = &mut self.sm.context_mut(); + + let session_reset = !acknowledge.session_present && session_state.is_present(); - self.connection_state - .process_event(Events::ReceivedConnAck) - .unwrap(); + // Reset the session state upon connection with a broker that doesn't have a session state + // saved for us. + if !acknowledge.session_present { + session_state.reset(); + } for property in acknowledge.properties { match property { Property::MaximumPacketSize(size) => { - self.session_state.maximum_packet_size.replace(size); + session_state.maximum_packet_size.replace(size); } Property::AssignedClientIdentifier(id) => { - self.session_state.client_id = + session_state.client_id = String::from_str(id).or(Err(Error::ProvidedClientIdTooLong))?; } Property::ServerKeepAlive(keep_alive) => { - self.session_state.set_keepalive(keep_alive); + session_state.set_keepalive(keep_alive); } _prop => info!("Ignoring property: {:?}", _prop), }; } // Now that we are connected, we have session state that will be persisted. - self.session_state - .register_connection(self.clock.try_now()?); + session_state.register_connection(self.clock.try_now()?); // Replay QoS 1 messages - for key in self.session_state.pending_publish_ordering.iter() { + for key in session_state.pending_publish_ordering.iter() { // If the network stack cannot send another message, do not attempt to send one. - if self.network.has_pending_write() { + if network.has_pending_write() { break; } - let message = self.session_state.pending_publish.get(key).unwrap(); - self.network.write(message)?; + let message = session_state.pending_publish.get(key).unwrap(); + network.write(message)?; } - result + if session_reset { + Err(Error::SessionReset) + } else { + Ok(()) + } } fn handle_packet<'a, F>( @@ -413,7 +432,7 @@ where } // All other packets must be received in the active state. - if !self.is_connected() { + if self.sm.state() != &States::Active { error!( "Received invalid packet outside of connected state: {:?}", packet @@ -425,20 +444,19 @@ where ReceivedPacket::Publish(info) => { // Call a handler function to deal with the received data. f(self, info.topic, info.payload, &info.properties); - - Ok(()) } ReceivedPacket::PubAck(ack) => { // No matter the status code the message is considered acknowledged at this point - self.session_state.handle_puback(ack.packet_identifier); - - Ok(()) + self.sm + .context_mut() + .session_state + .handle_puback(ack.packet_identifier); } ReceivedPacket::SubAck(subscribe_acknowledge) => { - match self - .session_state + let session_state = &mut self.sm.context_mut().session_state; + match session_state .pending_subscriptions .iter() .position(|v| *v == subscribe_acknowledge.packet_identifier) @@ -447,34 +465,33 @@ where error!("Got bad suback: {:?}", subscribe_acknowledge); return Err(Error::Protocol(ProtocolError::Invalid)); } - Some(index) => self.session_state.pending_subscriptions.swap_remove(index), + Some(index) => session_state.pending_subscriptions.swap_remove(index), }; if subscribe_acknowledge.reason_code != 0 { return Err(Error::Failed(subscribe_acknowledge.reason_code)); } - - Ok(()) } ReceivedPacket::PingResp => { // Cancel the ping response timeout. - self.session_state.register_ping_response(); - Ok(()) + self.sm.context_mut().session_state.register_ping_response(); } - _ => Err(Error::Unsupported), - } + _ => return Err(Error::Unsupported), + }; + + Ok(()) } fn handle_timers(&mut self) -> Result<(), Error> { // If we are not connected, there's no session state to manage. - if self.connection_state.state() != &States::Active { + if self.sm.state() != &States::Active { return Ok(()); } // If there's a pending write, we can't send a ping no matter if it is due. - if self.network.has_pending_write() { + if self.sm.context().network.has_pending_write() { return Ok(()); } @@ -484,10 +501,8 @@ where // to write the ping message. This is intentional incase the underlying transport // mechanism has stalled. The ping timeout will then allow us to recover the // underlying TCP connection. - match self.session_state.handle_ping(now) { - Err(()) => { - self.connection_state.process_event(Events::Disconnect).ok(); - } + match self.sm.context_mut().session_state.handle_ping(now) { + Err(()) => self.reset()?, Ok(true) => { let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; @@ -495,7 +510,7 @@ where // Note: If we fail to serialize or write the packet, the ping timeout timer is // still running, so we will recover the TCP connection in the future. let packet = serialize::ping_req_message(&mut buffer)?; - self.network.write(packet)?; + self.sm.context_mut().network.write(packet)?; } Ok(false) => {} @@ -505,6 +520,16 @@ where } } +/// The general structure for managing MQTT via Minimq. +pub struct Minimq +where + TcpStack: TcpClientStack, + Clock: embedded_time::Clock, +{ + client: MqttClient, + packet_reader: PacketReader, +} + impl< TcpStack: TcpClientStack, Clock: embedded_time::Clock, @@ -537,11 +562,12 @@ impl< let minimq = Minimq { client: MqttClient { - network: InterfaceHolder::new(network_stack), clock, - session_state, - connection_state: StateMachine::new(Context), - will: None, + sm: StateMachine::new(ClientContext { + network: InterfaceHolder::new(network_stack), + session_state, + will: None, + }), }, packet_reader: PacketReader::new(), }; @@ -563,19 +589,19 @@ impl< &[Property<'a>], ), { - self.client.process()?; + self.client.update()?; // If the connection is no longer active, reset the packet reader state and return. There's // nothing more we can do. - if self.client.connection_state.state() != &States::Active - && self.client.connection_state.state() != &States::Establishing + if self.client.sm.state() != &States::Active + && self.client.sm.state() != &States::Establishing { self.packet_reader.reset(); return Ok(()); } let mut buf: [u8; 1024] = [0; 1024]; - let received = self.client.network.read(&mut buf)?; + let received = self.client.sm.context_mut().network.read(&mut buf)?; if received > 0 { debug!("Received {} bytes", received); } @@ -589,7 +615,7 @@ impl< } Err(e) => { - self.client.reset(); + self.client.reset()?; self.packet_reader.reset(); return Err(Error::Protocol(e)); } @@ -613,4 +639,9 @@ impl< Ok(()) } + + /// Directly access the MQTT client. + pub fn client(&mut self) -> &mut MqttClient { + &mut self.client + } } diff --git a/src/session_state.rs b/src/session_state.rs index f293a03..b74665d 100644 --- a/src/session_state.rs +++ b/src/session_state.rs @@ -137,7 +137,7 @@ impl self.active } - pub fn get_packet_identifier(&mut self) -> u16 { + pub fn get_packet_identifier(&self) -> u16 { self.packet_id } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 20fe109..f388b84 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -6,6 +6,7 @@ use std_embedded_time::StandardClock; #[test] fn main() -> std::io::Result<()> { env_logger::init(); + log::info!("Starting test"); let stack = std_embedded_nal::Stack::default(); let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); @@ -13,13 +14,13 @@ fn main() -> std::io::Result<()> { Minimq::<_, _, 256, 16>::new(localhost, "", stack, StandardClock::default()).unwrap(); // Use a keepalive interval for the client. - mqtt.client.set_keepalive_interval(60).unwrap(); + mqtt.client().set_keepalive_interval(60).unwrap(); let mut published = false; let mut subscribed = false; let mut responses = 0; - mqtt.client + mqtt.client() .set_will( "exit", "Test complete".as_bytes(), @@ -56,17 +57,18 @@ fn main() -> std::io::Result<()> { } }) .unwrap(); + let client = mqtt.client(); if !subscribed { - if mqtt.client.is_connected() { - mqtt.client.subscribe("response", &[]).unwrap(); - mqtt.client.subscribe("request", &[]).unwrap(); + if client.is_connected() { + client.subscribe("response", &[]).unwrap(); + client.subscribe("request", &[]).unwrap(); subscribed = true; } - } else if !mqtt.client.subscriptions_pending() && !published { + } else if !client.subscriptions_pending() && !published { println!("PUBLISH request"); let properties = [Property::ResponseTopic("response")]; - mqtt.client + client .publish( "request", "Ping".as_bytes(), @@ -76,7 +78,7 @@ fn main() -> std::io::Result<()> { ) .unwrap(); - mqtt.client + client .publish( "request", "Ping".as_bytes(), @@ -87,7 +89,7 @@ fn main() -> std::io::Result<()> { .unwrap(); // The message cannot be ack'd until the next poll call - assert_eq!(1, mqtt.client.pending_messages(QoS::AtLeastOnce)); + assert_eq!(1, client.pending_messages(QoS::AtLeastOnce)); published = true; } From 0df3b2c41c9c526a9271cf440acdd877b8a0c3a2 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 17 Aug 2022 13:55:42 +0200 Subject: [PATCH 2/6] Reorganizing --- src/mqtt_client.rs | 73 ++++++++++++++++++--------------------- tests/integration_test.rs | 1 - 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/src/mqtt_client.rs b/src/mqtt_client.rs index eee0bcf..39540c8 100644 --- a/src/mqtt_client.rs +++ b/src/mqtt_client.rs @@ -105,6 +105,30 @@ impl< const MSG_COUNT: usize, > MqttClient { + /// Specify the Will message to be sent if the client disconnects. + /// + /// # Args + /// * `topic` - The topic to send the message on + /// * `data` - The message to transmit + /// * `qos` - The quality of service at which to send the message. + /// * `retained` - Specifies whether the will message should be retained by the broker. + /// * `properties` - Any properties to send with the will message. + pub fn set_will( + &mut self, + topic: &str, + data: &[u8], + qos: QoS, + retained: Retain, + properties: &[Property], + ) -> Result<(), Error> { + let mut will = Will::new(topic, data, properties)?; + will.retained(retained); + will.qos(qos); + + self.sm.context_mut().will.replace(will); + Ok(()) + } + /// Configure the MQTT keep-alive interval. /// /// # Note @@ -148,38 +172,6 @@ impl< Ok(()) } - /// Determine if the client has established a connection with the broker. - /// - /// # Returns - /// True if the client is connected to the broker. - pub fn is_connected(&mut self) -> bool { - self.sm.state() == &States::Active - } - - /// Specify the Will message to be sent if the client disconnects. - /// - /// # Args - /// * `topic` - The topic to send the message on - /// * `data` - The message to transmit - /// * `qos` - The quality of service at which to send the message. - /// * `retained` - Specifies whether the will message should be retained by the broker. - /// * `properties` - Any properties to send with the will message. - pub fn set_will( - &mut self, - topic: &str, - data: &[u8], - qos: QoS, - retained: Retain, - properties: &[Property], - ) -> Result<(), Error> { - let mut will = Will::new(topic, data, properties)?; - will.retained(retained); - will.qos(qos); - - self.sm.context_mut().will.replace(will); - Ok(()) - } - /// Subscribe to a topic. /// /// # Note @@ -225,7 +217,6 @@ impl< }) } - /// Determine if any subscriptions are waiting for completion. /// /// # Returns /// True if any subscriptions are waiting for confirmation from the broker. @@ -238,6 +229,14 @@ impl< .is_empty() } + /// Determine if the client has established a connection with the broker. + /// + /// # Returns + /// True if the client is connected to the broker. + pub fn is_connected(&mut self) -> bool { + self.sm.state() == &States::Active + } + /// Get the count of unacknowledged QoS 1 messages. /// /// # Returns @@ -344,10 +343,6 @@ impl< Ok(()) } - fn reset(&mut self) -> Result<(), Error> { - self.handle_disconnection() - } - fn handle_connection_acknowledge( &mut self, acknowledge: ConnAck, @@ -502,7 +497,7 @@ impl< // mechanism has stalled. The ping timeout will then allow us to recover the // underlying TCP connection. match self.sm.context_mut().session_state.handle_ping(now) { - Err(()) => self.reset()?, + Err(()) => self.handle_disconnection()?, Ok(true) => { let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; @@ -615,7 +610,7 @@ impl< } Err(e) => { - self.client.reset()?; + self.client.handle_disconnection()?; self.packet_reader.reset(); return Err(Error::Protocol(e)); } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index f388b84..90a8129 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -6,7 +6,6 @@ use std_embedded_time::StandardClock; #[test] fn main() -> std::io::Result<()> { env_logger::init(); - log::info!("Starting test"); let stack = std_embedded_nal::Stack::default(); let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); From bd3d3976c7a99ea273033ba65b9832891408ca4d Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 17 Aug 2022 14:04:21 +0200 Subject: [PATCH 3/6] Adding ReceiveMaximum to the connect request --- src/mqtt_client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mqtt_client.rs b/src/mqtt_client.rs index 39540c8..639145d 100644 --- a/src/mqtt_client.rs +++ b/src/mqtt_client.rs @@ -68,6 +68,7 @@ where Property::MaximumPacketSize(MSG_SIZE as u32), // The session does not expire. Property::SessionExpiryInterval(u32::MAX), + Property::ReceiveMaximum(MSG_COUNT as u16), ]; let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; From 9a4cdeaf0dd2a20a5e8aa94c655e6f9bee421b80 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 17 Aug 2022 14:04:56 +0200 Subject: [PATCH 4/6] Updating CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bd48b1..b3544d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This document describes the changes to Minimq between releases. * [breaking] The client is no longer publicly exposed, and is instead accessible via `Minimq::client()` ## Fixed +* The `ReceiveMaximum` property is now sent in the connection request to the broker # [0.5.3] - 2022-02-14 From 48396ecdb27c33eaff00b64fc3842b992deb2813 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 19 Aug 2022 15:46:25 +0200 Subject: [PATCH 5/6] Removing state rework --- CHANGELOG.md | 3 - src/lib.rs | 4 +- src/mqtt_client.rs | 356 ++++++++++++++++++-------------------- src/session_state.rs | 2 +- tests/integration_test.rs | 19 +- 5 files changed, 177 insertions(+), 207 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3544d9..acf1011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,9 +7,6 @@ This document describes the changes to Minimq between releases. ## Added * Allow configuration of non-default broker port numbers -## Changed -* [breaking] The client is no longer publicly exposed, and is instead accessible via `Minimq::client()` - ## Fixed * The `ReceiveMaximum` property is now sent in the connection request to the broker diff --git a/src/lib.rs b/src/lib.rs index 80e0045..17c7003 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,8 +44,8 @@ //! let mut subscribed = false; //! //! loop { -//! if mqtt.client().is_connected() && !subscribed { -//! mqtt.client().subscribe("topic", &[]).unwrap(); +//! if mqtt.client.is_connected() && !subscribed { +//! mqtt.client.subscribe("topic", &[]).unwrap(); //! subscribed = true; //! } //! diff --git a/src/mqtt_client.rs b/src/mqtt_client.rs index 639145d..55091ee 100644 --- a/src/mqtt_client.rs +++ b/src/mqtt_client.rs @@ -22,90 +22,117 @@ mod sm { statemachine! { transitions: { - *Init + Update [ reconnect ] = Restart, - Restart + Update [ connect_to_broker ] = Establishing, + *Restart + GotSocket = ConnectTransport, + ConnectTransport + Connect = ConnectBroker, + ConnectBroker + SentConnect = Establishing, + ConnectBroker + Disconnect = Restart, Establishing + ReceivedConnAck = Active, - _ + Disconnect [ reconnect ] = Restart, - }, - custom_guard_error: true, + Establishing + Disconnect = Restart, + Active + Disconnect = Restart, + } } + + pub struct Context; + + impl StateMachineContext for Context {} } -use sm::{Events, StateMachine, States}; +use sm::{Context, Events, StateMachine, States}; -struct ClientContext< +/// The general structure for managing MQTT via Minimq. +pub struct Minimq +where + TcpStack: TcpClientStack, + Clock: embedded_time::Clock, +{ + pub client: MqttClient, + packet_reader: PacketReader, +} + +/// A client for interacting with an MQTT Broker. +pub struct MqttClient< TcpStack: TcpClientStack, Clock: embedded_time::Clock, const MSG_SIZE: usize, const MSG_COUNT: usize, > { pub(crate) network: InterfaceHolder, + clock: Clock, session_state: SessionState, + connection_state: StateMachine, will: Option>, } -impl sm::StateMachineContext - for ClientContext +impl + MqttClient where TcpStack: TcpClientStack, Clock: embedded_time::Clock, { - type GuardError = crate::Error; + fn process(&mut self) -> Result<(), Error> { + // Potentially update the state machine depending on the current socket connection status. + if !self.network.tcp_connected()? { + self.connection_state.process_event(Events::Disconnect).ok(); + } else { + self.connection_state.process_event(Events::Connect).ok(); + } - fn reconnect(&mut self) -> Result<(), Self::GuardError> { - self.network.allocate_socket()?; - self.network.connect(self.session_state.broker)?; - Ok(()) - } + match *self.connection_state.state() { + // In the RESTART state, we need to reopen the TCP socket. + States::Restart => { + self.network.allocate_socket()?; - fn connect_to_broker(&mut self) -> Result<(), Self::GuardError> { - if !self.network.tcp_connected()? { - return Err(Error::NotReady); + self.connection_state + .process_event(Events::GotSocket) + .unwrap(); + } + + // In the connect transport state, we need to connect our TCP socket to the broker. + States::ConnectTransport => { + self.network.connect(self.session_state.broker)?; + } + + // Next, connect to the broker via the MQTT protocol. + States::ConnectBroker => { + let properties = [ + // Tell the broker our maximum packet size. + Property::MaximumPacketSize(MSG_SIZE as u32), + // The session does not expire. + Property::SessionExpiryInterval(u32::MAX), + Property::ReceiveMaximum(MSG_COUNT as u16), + ]; + + let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; + let packet = serialize::connect_message( + &mut buffer, + self.session_state.client_id.as_str().as_bytes(), + self.session_state.keepalive_interval(), + &properties, + // Only perform a clean start if we do not have any session state. + !self.session_state.is_present(), + self.will.as_ref(), + )?; + + info!("Sending CONNECT"); + self.network.write(packet)?; + + self.connection_state + .process_event(Events::SentConnect) + .unwrap(); + } + + States::Establishing => {} + _ => {} } - let properties = [ - // Tell the broker our maximum packet size. - Property::MaximumPacketSize(MSG_SIZE as u32), - // The session does not expire. - Property::SessionExpiryInterval(u32::MAX), - Property::ReceiveMaximum(MSG_COUNT as u16), - ]; + // Attempt to finish any pending packets. + self.network.finish_write()?; - let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; - let packet = serialize::connect_message( - &mut buffer, - self.session_state.client_id.as_str().as_bytes(), - self.session_state.keepalive_interval(), - &properties, - // Only perform a clean start if we do not have any session state. - !self.session_state.is_present(), - self.will.as_ref(), - )?; - - info!("Sending CONNECT"); - self.network.write(packet)?; + self.handle_timers()?; Ok(()) } -} - -pub struct MqttClient< - TcpStack: TcpClientStack, - Clock: embedded_time::Clock, - const MSG_SIZE: usize, - const MSG_COUNT: usize, -> { - clock: Clock, - sm: sm::StateMachine>, -} -impl< - TcpStack: TcpClientStack, - Clock: embedded_time::Clock, - const MSG_SIZE: usize, - const MSG_COUNT: usize, - > MqttClient -{ /// Specify the Will message to be sent if the client disconnects. /// /// # Args @@ -126,10 +153,14 @@ impl< will.retained(retained); will.qos(qos); - self.sm.context_mut().will.replace(will); + self.will.replace(will); Ok(()) } + fn reset(&mut self) { + self.connection_state.process_event(Events::Disconnect).ok(); + } + /// Configure the MQTT keep-alive interval. /// /// # Note @@ -146,14 +177,13 @@ impl< &mut self, interval_seconds: u16, ) -> Result<(), Error> { - if (self.sm.state() == &States::Active) || (self.sm.state() == &States::Establishing) { + if (self.connection_state.state() == &States::Active) + || (self.connection_state.state() == &States::Establishing) + { return Err(Error::NotReady); } - self.sm - .context_mut() - .session_state - .set_keepalive(interval_seconds); + self.session_state.set_keepalive(interval_seconds); Ok(()) } @@ -165,11 +195,11 @@ impl< /// # Args /// * `port` - The Port number to connect to. pub fn set_broker_port(&mut self, port: u16) -> Result<(), Error> { - if self.sm.state() != &States::Restart { + if *self.connection_state.state() != States::Restart { return Err(Error::NotReady); } - self.sm.context_mut().session_state.broker.set_port(port); + self.session_state.broker.set_port(port); Ok(()) } @@ -187,47 +217,37 @@ impl< topic: &'a str, properties: &[Property<'b>], ) -> Result<(), Error> { - if self.sm.state() != &States::Active { + if self.connection_state.state() != &States::Active { return Err(Error::NotReady); } - let ClientContext { - network, - session_state, - .. - } = self.sm.context_mut(); - // We can't subscribe if there's a pending write in the network. - if network.has_pending_write() { + if self.network.has_pending_write() { return Err(Error::NotReady); } - let packet_id = session_state.get_packet_identifier(); + let packet_id = self.session_state.get_packet_identifier(); let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; let packet = serialize::subscribe_message(&mut buffer, topic, packet_id, properties)?; - network.write(packet).and_then(|_| { + self.network.write(packet).and_then(|_| { info!("Subscribing to `{}`: {}", topic, packet_id); - session_state + self.session_state .pending_subscriptions .push(packet_id) .map_err(|_| Error::Unsupported)?; - session_state.increment_packet_identifier(); + self.session_state.increment_packet_identifier(); Ok(()) }) } + /// Determine if any subscriptions are waiting for completion. /// /// # Returns /// True if any subscriptions are waiting for confirmation from the broker. pub fn subscriptions_pending(&self) -> bool { - !self - .sm - .context() - .session_state - .pending_subscriptions - .is_empty() + !self.session_state.pending_subscriptions.is_empty() } /// Determine if the client has established a connection with the broker. @@ -235,7 +255,7 @@ impl< /// # Returns /// True if the client is connected to the broker. pub fn is_connected(&mut self) -> bool { - self.sm.state() == &States::Active + self.connection_state.state() == &States::Active } /// Get the count of unacknowledged QoS 1 messages. @@ -243,7 +263,7 @@ impl< /// # Returns /// Number of pending messages with QoS 1. pub fn pending_messages(&self, qos: QoS) -> usize { - self.sm.context().session_state.pending_messages(qos) + self.session_state.pending_messages(qos) } /// Determine if the client is able to process QoS 1 publish requess. @@ -253,11 +273,11 @@ impl< pub fn can_publish(&self, qos: QoS) -> bool { // We cannot publish if there's a pending write in the network stack. That message must be // completed first. - if self.sm.context().network.has_pending_write() { + if self.network.has_pending_write() { return false; } - self.sm.context().session_state.can_publish(qos) + self.session_state.can_publish(qos) } /// Publish a message over MQTT. @@ -296,51 +316,20 @@ impl< topic, data, properties ); - let ClientContext { - session_state, - network, - .. - } = self.sm.context_mut(); - // If QoS 0 the ID will be ignored - let id = session_state.get_packet_identifier(); + let id = self.session_state.get_packet_identifier(); let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; let packet = serialize::publish_message(&mut buffer, topic, data, qos, retain, id, properties)?; - network.write(packet)?; - session_state.increment_packet_identifier(); + self.network.write(packet)?; + self.session_state.increment_packet_identifier(); if qos == QoS::AtLeastOnce { - session_state.handle_publish(qos, id, packet); - } - - Ok(()) - } - - fn handle_disconnection(&mut self) -> Result<(), Error> { - match self.sm.process_event(Events::Disconnect) { - Err(sm::Error::GuardFailed(error)) => return Err(error), - other => other.unwrap(), - }; - - Ok(()) - } - - fn update(&mut self) -> Result<(), Error> { - self.sm.process_event(Events::Update).ok(); - - // Potentially update the state machine depending on the current socket connection status. - if !self.sm.context_mut().network.tcp_connected()? && self.sm.state() != &States::Restart { - self.handle_disconnection()?; + self.session_state.handle_publish(qos, id, packet); } - // Attempt to finish any pending packets. - self.sm.context_mut().network.finish_write()?; - - self.handle_timers()?; - Ok(()) } @@ -348,65 +337,62 @@ impl< &mut self, acknowledge: ConnAck, ) -> Result<(), Error> { - if self.sm.state() != &States::Establishing { + if self.connection_state.state() != &States::Establishing { return Err(Error::Protocol(ProtocolError::Invalid)); } + let mut result = Ok(()); + if acknowledge.reason_code != 0 { return Err(Error::Failed(acknowledge.reason_code)); } - self.sm.process_event(Events::ReceivedConnAck).unwrap(); - - let ClientContext { - session_state, - network, - .. - } = &mut self.sm.context_mut(); - - let session_reset = !acknowledge.session_present && session_state.is_present(); - - // Reset the session state upon connection with a broker that doesn't have a session state - // saved for us. if !acknowledge.session_present { - session_state.reset(); + if self.session_state.is_present() { + result = Err(Error::SessionReset); + } + + // Reset the session state upon connection with a broker that doesn't have a + // session state saved for us. + self.session_state.reset(); } + self.connection_state + .process_event(Events::ReceivedConnAck) + .unwrap(); + for property in acknowledge.properties { match property { Property::MaximumPacketSize(size) => { - session_state.maximum_packet_size.replace(size); + self.session_state.maximum_packet_size.replace(size); } Property::AssignedClientIdentifier(id) => { - session_state.client_id = + self.session_state.client_id = String::from_str(id).or(Err(Error::ProvidedClientIdTooLong))?; } Property::ServerKeepAlive(keep_alive) => { - session_state.set_keepalive(keep_alive); + self.session_state.set_keepalive(keep_alive); } _prop => info!("Ignoring property: {:?}", _prop), }; } // Now that we are connected, we have session state that will be persisted. - session_state.register_connection(self.clock.try_now()?); + self.session_state + .register_connection(self.clock.try_now()?); // Replay QoS 1 messages - for key in session_state.pending_publish_ordering.iter() { + for key in self.session_state.pending_publish_ordering.iter() { // If the network stack cannot send another message, do not attempt to send one. - if network.has_pending_write() { + if self.network.has_pending_write() { break; } - let message = session_state.pending_publish.get(key).unwrap(); - network.write(message)?; + let message = self.session_state.pending_publish.get(key).unwrap(); + self.network.write(message)?; } - if session_reset { - Err(Error::SessionReset) - } else { - Ok(()) - } + result } fn handle_packet<'a, F>( @@ -428,7 +414,7 @@ impl< } // All other packets must be received in the active state. - if self.sm.state() != &States::Active { + if !self.is_connected() { error!( "Received invalid packet outside of connected state: {:?}", packet @@ -440,19 +426,20 @@ impl< ReceivedPacket::Publish(info) => { // Call a handler function to deal with the received data. f(self, info.topic, info.payload, &info.properties); + + Ok(()) } ReceivedPacket::PubAck(ack) => { // No matter the status code the message is considered acknowledged at this point - self.sm - .context_mut() - .session_state - .handle_puback(ack.packet_identifier); + self.session_state.handle_puback(ack.packet_identifier); + + Ok(()) } ReceivedPacket::SubAck(subscribe_acknowledge) => { - let session_state = &mut self.sm.context_mut().session_state; - match session_state + match self + .session_state .pending_subscriptions .iter() .position(|v| *v == subscribe_acknowledge.packet_identifier) @@ -461,33 +448,34 @@ impl< error!("Got bad suback: {:?}", subscribe_acknowledge); return Err(Error::Protocol(ProtocolError::Invalid)); } - Some(index) => session_state.pending_subscriptions.swap_remove(index), + Some(index) => self.session_state.pending_subscriptions.swap_remove(index), }; if subscribe_acknowledge.reason_code != 0 { return Err(Error::Failed(subscribe_acknowledge.reason_code)); } + + Ok(()) } ReceivedPacket::PingResp => { // Cancel the ping response timeout. - self.sm.context_mut().session_state.register_ping_response(); + self.session_state.register_ping_response(); + Ok(()) } - _ => return Err(Error::Unsupported), - }; - - Ok(()) + _ => Err(Error::Unsupported), + } } fn handle_timers(&mut self) -> Result<(), Error> { // If we are not connected, there's no session state to manage. - if self.sm.state() != &States::Active { + if self.connection_state.state() != &States::Active { return Ok(()); } // If there's a pending write, we can't send a ping no matter if it is due. - if self.sm.context().network.has_pending_write() { + if self.network.has_pending_write() { return Ok(()); } @@ -497,8 +485,10 @@ impl< // to write the ping message. This is intentional incase the underlying transport // mechanism has stalled. The ping timeout will then allow us to recover the // underlying TCP connection. - match self.sm.context_mut().session_state.handle_ping(now) { - Err(()) => self.handle_disconnection()?, + match self.session_state.handle_ping(now) { + Err(()) => { + self.connection_state.process_event(Events::Disconnect).ok(); + } Ok(true) => { let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE]; @@ -506,7 +496,7 @@ impl< // Note: If we fail to serialize or write the packet, the ping timeout timer is // still running, so we will recover the TCP connection in the future. let packet = serialize::ping_req_message(&mut buffer)?; - self.sm.context_mut().network.write(packet)?; + self.network.write(packet)?; } Ok(false) => {} @@ -516,16 +506,6 @@ impl< } } -/// The general structure for managing MQTT via Minimq. -pub struct Minimq -where - TcpStack: TcpClientStack, - Clock: embedded_time::Clock, -{ - client: MqttClient, - packet_reader: PacketReader, -} - impl< TcpStack: TcpClientStack, Clock: embedded_time::Clock, @@ -558,12 +538,11 @@ impl< let minimq = Minimq { client: MqttClient { + network: InterfaceHolder::new(network_stack), clock, - sm: StateMachine::new(ClientContext { - network: InterfaceHolder::new(network_stack), - session_state, - will: None, - }), + session_state, + connection_state: StateMachine::new(Context), + will: None, }, packet_reader: PacketReader::new(), }; @@ -585,19 +564,19 @@ impl< &[Property<'a>], ), { - self.client.update()?; + self.client.process()?; // If the connection is no longer active, reset the packet reader state and return. There's // nothing more we can do. - if self.client.sm.state() != &States::Active - && self.client.sm.state() != &States::Establishing + if self.client.connection_state.state() != &States::Active + && self.client.connection_state.state() != &States::Establishing { self.packet_reader.reset(); return Ok(()); } let mut buf: [u8; 1024] = [0; 1024]; - let received = self.client.sm.context_mut().network.read(&mut buf)?; + let received = self.client.network.read(&mut buf)?; if received > 0 { debug!("Received {} bytes", received); } @@ -611,7 +590,7 @@ impl< } Err(e) => { - self.client.handle_disconnection()?; + self.client.reset(); self.packet_reader.reset(); return Err(Error::Protocol(e)); } @@ -635,9 +614,4 @@ impl< Ok(()) } - - /// Directly access the MQTT client. - pub fn client(&mut self) -> &mut MqttClient { - &mut self.client - } } diff --git a/src/session_state.rs b/src/session_state.rs index b74665d..f293a03 100644 --- a/src/session_state.rs +++ b/src/session_state.rs @@ -137,7 +137,7 @@ impl self.active } - pub fn get_packet_identifier(&self) -> u16 { + pub fn get_packet_identifier(&mut self) -> u16 { self.packet_id } diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 90a8129..20fe109 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -13,13 +13,13 @@ fn main() -> std::io::Result<()> { Minimq::<_, _, 256, 16>::new(localhost, "", stack, StandardClock::default()).unwrap(); // Use a keepalive interval for the client. - mqtt.client().set_keepalive_interval(60).unwrap(); + mqtt.client.set_keepalive_interval(60).unwrap(); let mut published = false; let mut subscribed = false; let mut responses = 0; - mqtt.client() + mqtt.client .set_will( "exit", "Test complete".as_bytes(), @@ -56,18 +56,17 @@ fn main() -> std::io::Result<()> { } }) .unwrap(); - let client = mqtt.client(); if !subscribed { - if client.is_connected() { - client.subscribe("response", &[]).unwrap(); - client.subscribe("request", &[]).unwrap(); + if mqtt.client.is_connected() { + mqtt.client.subscribe("response", &[]).unwrap(); + mqtt.client.subscribe("request", &[]).unwrap(); subscribed = true; } - } else if !client.subscriptions_pending() && !published { + } else if !mqtt.client.subscriptions_pending() && !published { println!("PUBLISH request"); let properties = [Property::ResponseTopic("response")]; - client + mqtt.client .publish( "request", "Ping".as_bytes(), @@ -77,7 +76,7 @@ fn main() -> std::io::Result<()> { ) .unwrap(); - client + mqtt.client .publish( "request", "Ping".as_bytes(), @@ -88,7 +87,7 @@ fn main() -> std::io::Result<()> { .unwrap(); // The message cannot be ack'd until the next poll call - assert_eq!(1, client.pending_messages(QoS::AtLeastOnce)); + assert_eq!(1, mqtt.client.pending_messages(QoS::AtLeastOnce)); published = true; } From 06bfcdd5c5f6821a0df416a31770f388ba990739 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 19 Aug 2022 15:46:45 +0200 Subject: [PATCH 6/6] Reverting TOML file --- Cargo.toml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 098f253..d1be8fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,12 +19,9 @@ bit_field = "0.10.0" enum-iterator = "0.6.0" heapless = "0.7" log = {version = "0.4", optional = true} +smlang = "0.4" embedded-time = "0.12" -[dependencies.smlang] -git = "https://github.com/ryan-summers/smlang-rs" -branch = "feature/versatile-guard-errors" - [dependencies.embedded-nal] version = "0.6"