diff --git a/README.md b/README.md index c46d399..107e0a4 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,15 @@ $connectionSettings = (new \PhpMqtt\Client\ConnectionSettings) // of pending messages without acknowledgement. The value cannot be less than 1 second. ->setResendTimeout(10) + // This flag determines whether the client will try to reconnect automatically + // if it notices a disconnect while sending data. + // The setting cannot be used together with the clean session flag. + ->setReconnectAutomatically(false) + + // Defines the maximum number of reconnect attempts until the client gives up. + // This setting is only relevant if setReconnectAutomatically() is set to true. + ->setMaxReconnectAttempts(3) + // The keep alive interval is the number of seconds the client will wait without sending a message // until it sends a keep alive signal (ping) to the broker. The value cannot be less than 1 second // and may not be higher than 65535 seconds. A reasonable value is 10 seconds (the default). diff --git a/src/Concerns/ValidatesConfiguration.php b/src/Concerns/ValidatesConfiguration.php index fff3c76..3bdb99b 100644 --- a/src/Concerns/ValidatesConfiguration.php +++ b/src/Concerns/ValidatesConfiguration.php @@ -43,6 +43,10 @@ protected function ensureConnectionSettingsAreValid(ConnectionSettings $settings throw new ConfigurationInvalidException('The keep alive interval must be a value in the range of 1 to 65535 seconds.'); } + if ($settings->getMaxReconnectAttempts() < 1) { + throw new ConfigurationInvalidException('The maximum reconnect attempts cannot be fewer than 1.'); + } + if ($settings->getUsername() !== null && trim($settings->getUsername()) === '') { throw new ConfigurationInvalidException('The username may not consist of white space only.'); } diff --git a/src/ConnectionSettings.php b/src/ConnectionSettings.php index 017e5cd..b33a58d 100644 --- a/src/ConnectionSettings.php +++ b/src/ConnectionSettings.php @@ -17,6 +17,8 @@ class ConnectionSettings private int $socketTimeout = 5; private int $resendTimeout = 10; private int $keepAliveInterval = 10; + private bool $reconnectAutomatically = false; + private int $maxReconnectAttempts = 3; private ?string $lastWillTopic = null; private ?string $lastWillMessage = null; private int $lastWillQualityOfService = 0; @@ -175,6 +177,55 @@ public function getKeepAliveInterval(): int return $this->keepAliveInterval; } + /** + * This flag determines whether the client will try to reconnect automatically, + * if it notices a disconnect while sending data. + * The setting cannot be used together with the clean session flag. + * + * @param bool $reconnectAutomatically + * @return ConnectionSettings + */ + public function setReconnectAutomatically(bool $reconnectAutomatically): ConnectionSettings + { + $copy = clone $this; + + $copy->reconnectAutomatically = $reconnectAutomatically; + + return $copy; + } + + /** + * @return bool + */ + public function shouldReconnectAutomatically(): bool + { + return $this->reconnectAutomatically; + } + + /** + * Defines the maximum number of reconnect attempts until the client gives up. This setting + * is only relevant if {@see setReconnectAutomatically()} is set to true. + * + * @param int $maxReconnectAttempts + * @return ConnectionSettings + */ + public function setMaxReconnectAttempts(int $maxReconnectAttempts): ConnectionSettings + { + $copy = clone $this; + + $copy->maxReconnectAttempts = $maxReconnectAttempts; + + return $copy; + } + + /** + * @return int + */ + public function getMaxReconnectAttempts(): int + { + return $this->maxReconnectAttempts; + } + /** * If the broker should publish a last will message in the name of the client when the client * disconnects abruptly, this setting defines the topic on which the message will be published. diff --git a/src/MqttClient.php b/src/MqttClient.php index b4afe2f..ea389bd 100644 --- a/src/MqttClient.php +++ b/src/MqttClient.php @@ -11,6 +11,7 @@ use PhpMqtt\Client\Contracts\MqttClient as ClientContract; use PhpMqtt\Client\Contracts\Repository; use PhpMqtt\Client\Exceptions\ClientNotConnectedToBrokerException; +use PhpMqtt\Client\Exceptions\ConfigurationInvalidException; use PhpMqtt\Client\Exceptions\ConnectingToBrokerFailedException; use PhpMqtt\Client\Exceptions\DataTransferException; use PhpMqtt\Client\Exceptions\InvalidMessageException; @@ -30,15 +31,16 @@ */ class MqttClient implements ClientContract { - use GeneratesRandomClientIds, - OffersHooks, - ValidatesConfiguration; + use GeneratesRandomClientIds; + use OffersHooks; + use ValidatesConfiguration; const MQTT_3_1 = '3.1'; - const QOS_AT_MOST_ONCE = 0; - const QOS_AT_LEAST_ONCE = 1; - const QOS_EXACTLY_ONCE = 2; + const QOS_AT_MOST_ONCE = 0; + const QOS_AT_LEAST_ONCE = 1; + const QOS_EXACTLY_ONCE = 2; + const SOCKET_READ_BUFFER_SIZE = 8192; private string $host; private int $port; @@ -121,11 +123,29 @@ public function connect( $this->ensureConnectionSettingsAreValid($this->settings); + // Because a clean session would make reconnects inherently more complex since all subscriptions would need to be replayed after reconnecting, + // we simply do not allow using these two features together. + if ($useCleanSession && $this->settings->shouldReconnectAutomatically()) { + throw new ConfigurationInvalidException('Automatic reconnects cannot be used together with the clean session flag.'); + } + // When a clean session is requested, we have to reset the repository to forget about persisted states. if ($useCleanSession) { $this->repository->reset(); } + $this->connectInternal($useCleanSession); + } + + /** + * Connect to the MQTT broker using the configured settings. + * + * @param bool $useCleanSession + * @return void + * @throws ConnectingToBrokerFailedException + */ + protected function connectInternal(bool $useCleanSession = false): void + { try { $this->establishSocketConnection(); $this->performConnectionHandshake($useCleanSession); @@ -373,6 +393,30 @@ protected function performConnectionHandshake(bool $useCleanSession = false): vo } } + /** + * Attempts to reconnect to the broker. If a connection cannot be established within the configured number of retries, + * the last caught exception is thrown. + * + * @return void + * @throws ConnectingToBrokerFailedException + */ + protected function reconnect(): void + { + $maxReconnectAttempts = $this->settings->getMaxReconnectAttempts(); + + for ($i = 1; $i <= $maxReconnectAttempts; $i++) { + try { + $this->connectInternal(); + + return; + } catch (ConnectingToBrokerFailedException $e) { + if ($i === $maxReconnectAttempts) { + throw $e; + } + } + } + } + /** * {@inheritDoc} */ @@ -518,7 +562,7 @@ protected function publishMessage( $data = $this->messageProcessor->buildPublishMessage($topic, $message, $qualityOfService, $retain, $messageId, $isDuplicate); - $this->writeToSocket($data); + $this->writeToSocketWithAutoReconnect($data); } /** @@ -543,7 +587,7 @@ public function subscribe(string $topicFilter, callable $callback = null, int $q $this->repository->addPendingOutgoingMessage($pendingMessage); $data = $this->messageProcessor->buildSubscribeMessage($messageId, $subscriptions); - $this->writeToSocket($data); + $this->writeToSocketWithAutoReconnect($data); } /** @@ -562,7 +606,7 @@ public function unsubscribe(string $topicFilter): void $this->repository->addPendingOutgoingMessage($pendingMessage); $data = $this->messageProcessor->buildUnsubscribeMessage($messageId, $topicFilters); - $this->writeToSocket($data); + $this->writeToSocketWithAutoReconnect($data); } /** @@ -831,7 +875,7 @@ protected function handleMessage(Message $message): void // PINGREQ if ($message->getType()->equals(MessageType::PING_REQUEST())) { // Respond with PINGRESP. - $this->writeToSocket($this->messageProcessor->buildPingResponseMessage()); + $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingResponseMessage()); return; } } @@ -919,14 +963,14 @@ protected function resendPendingMessages(): void ]); $data = $this->messageProcessor->buildSubscribeMessage($pendingMessage->getMessageId(), $pendingMessage->getSubscriptions(), true); - $this->writeToSocket($data); + $this->writeToSocketWithAutoReconnect($data); } elseif ($pendingMessage instanceof UnsubscribeRequest) { $this->logger->debug('Re-sending pending unsubscribe request to the broker.', [ 'messageId' => $pendingMessage->getMessageId(), ]); $data = $this->messageProcessor->buildUnsubscribeMessage($pendingMessage->getMessageId(), $pendingMessage->getTopicFilters(), true); - $this->writeToSocket($data); + $this->writeToSocketWithAutoReconnect($data); } else { throw new InvalidMessageException('Unexpected message type encountered while resending pending messages.'); } @@ -947,7 +991,7 @@ protected function sendPublishAcknowledgement(int $messageId): void { $this->logger->debug('Sending publish acknowledgement to the broker (message id: {messageId}).', ['messageId' => $messageId]); - $this->writeToSocket($this->messageProcessor->buildPublishAcknowledgementMessage($messageId)); + $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPublishAcknowledgementMessage($messageId)); } /** @@ -961,7 +1005,7 @@ protected function sendPublishReceived(int $messageId): void { $this->logger->debug('Sending publish received message to the broker (message id: {messageId}).', ['messageId' => $messageId]); - $this->writeToSocket($this->messageProcessor->buildPublishReceivedMessage($messageId)); + $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPublishReceivedMessage($messageId)); } /** @@ -975,7 +1019,7 @@ protected function sendPublishRelease(int $messageId): void { $this->logger->debug('Sending publish release message to the broker (message id: {messageId}).', ['messageId' => $messageId]); - $this->writeToSocket($this->messageProcessor->buildPublishReleaseMessage($messageId)); + $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPublishReleaseMessage($messageId)); } /** @@ -989,7 +1033,7 @@ protected function sendPublishComplete(int $messageId): void { $this->logger->debug('Sending publish complete message to the broker (message id: {messageId}).', ['messageId' => $messageId]); - $this->writeToSocket($this->messageProcessor->buildPublishCompleteMessage($messageId)); + $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPublishCompleteMessage($messageId)); } /** @@ -1002,7 +1046,7 @@ protected function ping(): void { $this->logger->debug('Sending ping to the broker to keep the connection alive.'); - $this->writeToSocket($this->messageProcessor->buildPingRequestMessage()); + $this->writeToSocketWithAutoReconnect($this->messageProcessor->buildPingRequestMessage()); } /** @@ -1017,12 +1061,45 @@ protected function sendDisconnect(): void $this->logger->debug('Sending disconnect package to the broker.'); - $this->writeToSocket($data); + $this->writeToSocketWithAutoReconnect($data); + } + + /** + * Writes some data to the socket. If a {@see $length} is given, and it is shorter + * than the data, only {@see $length} amount of bytes will be sent. + * If configured, this method will try to reconnect in case of transmission errors. + * + * @param string $data + * @param int|null $length + * @return void + * @throws DataTransferException + */ + protected function writeToSocketWithAutoReconnect(string $data, int $length = null): void + { + try { + $this->writeToSocket($data, $length); + } catch (DataTransferException $e) { + if (!$this->settings->shouldReconnectAutomatically()) { + throw $e; + } + + try { + $this->reconnect(); + } catch (ConnectingToBrokerFailedException $exception) { + $this->logger->error('Automatically reconnecting to the broker while writing data to the socket failed.'); + + // Throw the original exception. + throw $e; + } + + // Retry writing to the socket. If this fails again, the exception is thrown as-is. + $this->writeToSocket($data, $length); + } } /** - * Writes some data to the socket. If a $length is given and it is shorter - * than the data, only $length amount of bytes will be sent. + * Writes some data to the socket. If a {@see $length} is given, and it is shorter + * than the data, only {@see $length} amount of bytes will be sent. * * @param string $data * @param int|null $length @@ -1054,16 +1131,50 @@ protected function writeToSocket(string $data, int $length = null): void } /** - * Reads data from the socket. If the second parameter $withoutBlocking is set to true, - * a maximum of $limit bytes will be read and returned. If $withoutBlocking is set to false, - * the method will wait until $limit bytes have been received. + * Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, + * a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, + * the method will wait until {@see $limit} bytes have been received. + * If configured, this method will try to reconnect in case of transmission errors. + * + * @param int $limit + * @param bool $withoutBlocking + * @return string + * @throws DataTransferException + */ + protected function readFromSocketWithAutoReconnect(int $limit = self::SOCKET_READ_BUFFER_SIZE, bool $withoutBlocking = false): string + { + try { + return $this->readFromSocket($limit, $withoutBlocking); + } catch (DataTransferException $e) { + if (!$this->settings->shouldReconnectAutomatically()) { + throw $e; + } + + try { + $this->reconnect(); + } catch (ConnectingToBrokerFailedException $exception) { + $this->logger->error('Automatically reconnecting to the broker while reading data from the socket failed.'); + + // Throw the original exception. + throw $e; + } + + // Retry writing to the socket. If this fails again, the exception is thrown as-is. + return $this->readFromSocket($limit, $withoutBlocking); + } + } + + /** + * Reads data from the socket. If the second parameter {@see $withoutBlocking} is set to true, + * a maximum of {@see $limit} bytes will be read and returned. If {@see $withoutBlocking} is set to false, + * the method will wait until {@see $limit} bytes have been received. * * @param int $limit * @param bool $withoutBlocking * @return string * @throws DataTransferException */ - protected function readFromSocket(int $limit = 8192, bool $withoutBlocking = false): string + protected function readFromSocket(int $limit = self::SOCKET_READ_BUFFER_SIZE, bool $withoutBlocking = false): string { if ($withoutBlocking) { $result = fread($this->socket, $limit); @@ -1110,22 +1221,25 @@ protected function readFromSocket(int $limit = 8192, bool $withoutBlocking = fal /** * Reads all the available data from the socket using non-blocking mode. Essentially this means - * that {@see MqttClient::readFromSocket()} is called over and over again, as long as data is + * that {@see MqttClient::readFromSocketWithAutoReconnect()} is called over and over again, as long as data is * returned. * + * @param bool $withAutoReconnectIfConfigured * @return string * @throws DataTransferException */ - protected function readAllAvailableDataFromSocket(): string + protected function readAllAvailableDataFromSocket(bool $withAutoReconnectIfConfigured = false): string { $result = ''; while (true) { - $buffer = $this->readFromSocket(8192, true); + $buffer = $withAutoReconnectIfConfigured + ? $this->readFromSocketWithAutoReconnect(self::SOCKET_READ_BUFFER_SIZE, true) + : $this->readFromSocket(self::SOCKET_READ_BUFFER_SIZE, true); $result .= $buffer; - if (strlen($buffer) < 8192) { + if (strlen($buffer) < self::SOCKET_READ_BUFFER_SIZE) { break; } }