Skip to content

Commit

Permalink
Fix: use blocking socket only during publishing if configured (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
Namoshek authored Jan 13, 2023
1 parent b1759ef commit dc40a6d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,11 @@ $connectionSettings = (new \PhpMqtt\Client\ConnectionSettings)
// The password used for authentication when connecting to the broker.
->setPassword(null)

// Whether to use a blocking socket or not. By default, the socket is non-blocking,
// which is required when using subscriptions and/or {@see MqttClient::loop()}.
// In rare cases, it might be required to use a blocking socket though. One such example
// is when sending large messages (e.g. binaries) and the broker has a limited receive buffer.
//
// Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken
// or when the broker does not consume the sent data fast enough. Use with caution.
// Whether to use a blocking socket when publishing messages or not.
// Normally, this setting can be ignored. When publishing large messages with multiple kilobytes in size,
// a blocking socket may be required if the receipt buffer of the broker is not large enough.
//
// Note: This setting has no effect on subscriptions, only on the publishing of messages.
->useBlockingSocket(false)

// The connect timeout defines the maximum amount of seconds the client will try to establish
Expand Down
12 changes: 5 additions & 7 deletions src/ConnectionSettings.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ public function getPassword(): ?string
}

/**
* Whether to use a blocking socket or not. By default, the socket is non-blocking,
* which is required when using subscriptions and/or {@see MqttClient::loop()}.
* In rare cases, it might be required to use a blocking socket though. One such example
* is when sending large messages (e.g. binaries) and the broker has a limited receive buffer.
* Whether to use a blocking socket when publishing messages or not.
* Normally, this setting can be ignored. When publishing large messages with multiple kilobytes in size,
* a blocking socket may be required if the receipt buffer of the broker is not large enough.
*
* Note: When using a blocking socket, the MQTT client can get stuck if the socket is broken
* or when the broker does not consume the sent data fast enough. Use with caution.
* Note: This setting has no effect on subscriptions, only on the publishing of messages.
*
* @param bool $useBlockingSocket
* @return ConnectionSettings
Expand All @@ -97,7 +95,7 @@ public function useBlockingSocket(bool $useBlockingSocket): ConnectionSettings
{
$copy = clone $this;

$copy->useBlockingModeForSocket = $useBlockingSocket;
$copy->useBlockingSocket = $useBlockingSocket;

return $copy;
}
Expand Down
10 changes: 9 additions & 1 deletion src/MqttClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected function establishSocketConnection(): void
}

stream_set_timeout($socket, $this->settings->getSocketTimeout());
stream_set_blocking($socket, $this->settings->shouldUseBlockingSocket());
stream_set_blocking($socket, false);

$this->logger->debug('Socket opened and ready to use.');

Expand Down Expand Up @@ -1140,8 +1140,16 @@ protected function writeToSocket(string $data, int $length = null): void
$calculatedLength = strlen($data);
$length = min($length ?? $calculatedLength, $calculatedLength);

if ($this->settings->shouldUseBlockingSocket()) {
socket_set_blocking($this->socket, true);
}

$result = @fwrite($this->socket, $data, $length);

if ($this->settings->shouldUseBlockingSocket()) {
socket_set_blocking($this->socket, false);
}

if ($result === false || $result !== $length) {
$this->logger->error('Sending data over the socket to the broker failed.');
throw new DataTransferException(
Expand Down

0 comments on commit dc40a6d

Please sign in to comment.