Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling partial MQTT packet writes #75

Merged
merged 6 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ This document describes the changes to Minimq between releases.

## Added
## Fixed
* Partial packet writes no longer cause the connection to the broker to break down.
[#74](https://github.com/quartiq/minimq/issues/74)

# Version [0.5.1]
Version 0.5.1 was published on 2021-12-07
Expand Down
33 changes: 29 additions & 4 deletions src/mqtt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct MqttClient<
const MSG_SIZE: usize,
const MSG_COUNT: usize,
> {
pub(crate) network: InterfaceHolder<TcpStack>,
pub(crate) network: InterfaceHolder<TcpStack, MSG_SIZE>,
clock: Clock,
session_state: SessionState<Clock, MSG_SIZE, MSG_COUNT>,
connection_state: StateMachine<Context>,
Expand Down Expand Up @@ -126,6 +126,9 @@ where
_ => {}
}

// Attempt to finish any pending packets.
self.network.finish_write()?;

self.handle_timers()?;

Ok(())
Expand Down Expand Up @@ -203,6 +206,11 @@ where
return Err(Error::NotReady);
}

// We can't subscribe if there's a pending write in the network.
if self.network.has_pending_write() {
return Err(Error::NotReady);
}

let packet_id = self.session_state.get_packet_identifier();

let mut buffer: [u8; MSG_SIZE] = [0; MSG_SIZE];
Expand Down Expand Up @@ -248,6 +256,12 @@ where
/// # Returns
/// True if the client is able to service QoS 1 requests.
pub fn can_publish(&self, qos: QoS) -> bool {
// We cannot publish if there's a pending write in the network stack. That message must be
// completed first.
if self.network.has_pending_write() {
return false;
}

self.session_state.can_publish(qos)
}

Expand All @@ -273,14 +287,15 @@ where
retain: Retain,
properties: &[Property],
) -> Result<(), Error<TcpStack::Error>> {
// TODO: QoS 2 support.
assert!(qos != QoS::ExactlyOnce);

// If we are not yet connected to the broker, we can't transmit a message.
if self.is_connected() == false {
return Ok(());
}

if !self.can_publish(qos) {
return Err(Error::NotReady);
}

debug!(
"Publishing to `{}`: {:?} Props: {:?}",
topic, data, properties
Expand Down Expand Up @@ -353,6 +368,11 @@ where

// Replay QoS 1 messages
for key in self.session_state.pending_publish_ordering.iter() {
// If the network stack cannot send another message, do not attempt to send one.
if self.network.has_pending_write() {
break;
}

let message = self.session_state.pending_publish.get(&key).unwrap();
self.network.write(message)?;
}
Expand Down Expand Up @@ -439,6 +459,11 @@ where
return Ok(());
}

// If there's a pending write, we can't send a ping no matter if it is due.
if self.network.has_pending_write() {
return Ok(());
}

let now = self.clock.try_now()?;

// Note: The ping timeout is set at this point so that it's running even if we fail
Expand Down
36 changes: 31 additions & 5 deletions src/network_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
//! stack to be used to transmit buffers that may be stored internally in other structs without
//! violating Rust's borrow rules.
use embedded_nal::{nb, SocketAddr, TcpClientStack};
use heapless::Vec;

use crate::Error;

/// Simple structure for maintaining state of the network connection.
pub(crate) struct InterfaceHolder<TcpStack: TcpClientStack> {
pub(crate) struct InterfaceHolder<TcpStack: TcpClientStack, const MSG_SIZE: usize> {
socket: Option<TcpStack::TcpSocket>,
network_stack: TcpStack,
unfinished_packet: Option<Vec<u8, MSG_SIZE>>,
}

impl<TcpStack> InterfaceHolder<TcpStack>
impl<TcpStack, const MSG_SIZE: usize> InterfaceHolder<TcpStack, MSG_SIZE>
where
TcpStack: TcpClientStack,
{
Expand All @@ -24,9 +26,15 @@ where
Self {
socket: None,
network_stack: stack,
unfinished_packet: None,
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Determine if there is a pending packet write that needs to be completed.
pub fn has_pending_write(&self) -> bool {
self.unfinished_packet.is_some()
}

/// Determine if an TCP connection exists and is connected.
pub fn tcp_connected(&mut self) -> Result<bool, Error<TcpStack::Error>> {
if self.socket.is_none() {
Expand Down Expand Up @@ -66,6 +74,10 @@ where
/// * `remote` - The address of the remote to connect to.
pub fn connect(&mut self, remote: SocketAddr) -> Result<(), Error<TcpStack::Error>> {
let socket = self.socket.as_mut().ok_or(Error::NotReady)?;

// Drop any pending unfinished packets, as we're establishing a new connection.
self.unfinished_packet.take();

self.network_stack
.connect(socket, remote)
.map_err(|err| match err {
Expand All @@ -79,6 +91,9 @@ where
/// # Args
/// * `packet` - The packet to write.
pub fn write(&mut self, packet: &[u8]) -> Result<(), Error<TcpStack::Error>> {
ryan-summers marked this conversation as resolved.
Show resolved Hide resolved
// If there's an unfinished packet pending, it's invalid to try to write a new one.
assert!(self.unfinished_packet.is_none());

let socket = self.socket.as_mut().ok_or(Error::NotReady)?;
self.network_stack
.send(socket, &packet)
Expand All @@ -88,13 +103,24 @@ where
})
.and_then(|written| {
if written != packet.len() {
Err(Error::WriteFail)
} else {
Ok(())
// Note(unwrap): The packet should always be smaller than a single message.
self.unfinished_packet
.replace(Vec::from_slice(&packet[written..]).unwrap());
}

Ok(())
})
}

/// Finish writing an MQTT control packet to the interface if one exists.
pub fn finish_write(&mut self) -> Result<(), Error<TcpStack::Error>> {
if let Some(ref packet) = self.unfinished_packet.take() {
self.write(packet.as_slice())?;
}

Ok(())
}

/// Read data from the TCP interface.
///
/// # Args
Expand Down