From 01e623c1b555603a7deb422312464aac81610a8c Mon Sep 17 00:00:00 2001
From: Link Dupont
Date: Tue, 26 Sep 2023 10:45:00 -0400
Subject: [PATCH] feat(mqtt): Use Token.WaitTimeout
When connecting to a broker and when publishing a message, use the
`Token.WaitTimeout` method instead of `Token.Wait`. `Token.Wait` waits
indefinitely, which can lead to situations when the client never
succeeds in connecting or publishing.
The timeout for each operation can be configured independently by
setting `mqtt-connect-timeout` and `mqtt-publish-timeout`. Both values
default to 30 seconds. The flags are hidden, as they should not commonly
be required to be changed by users.
Signed-off-by: Link Dupont
---
cmd/yggd/main.go | 14 ++++++++++++++
internal/config/config.go | 10 ++++++++++
internal/transport/mqtt.go | 21 ++++++++++++++++-----
3 files changed, 40 insertions(+), 5 deletions(-)
diff --git a/cmd/yggd/main.go b/cmd/yggd/main.go
index c7dd0230..9f58259f 100644
--- a/cmd/yggd/main.go
+++ b/cmd/yggd/main.go
@@ -66,6 +66,8 @@ func setupDefaultConfig(c *cli.Context) {
MQTTConnectRetryInterval: c.Duration(config.FlagNameMQTTConnectRetryInterval),
MQTTAutoReconnect: c.Bool(config.FlagNameMQTTAutoReconnect),
MQTTReconnectDelay: c.Duration(config.FlagNameMQTTReconnectDelay),
+ MQTTConnectTimeout: c.Duration(config.FlagNameMQTTConnectTimeout),
+ MQTTPublishTimeout: c.Duration(config.FlagNameMQTTPublishTimeout),
}
}
@@ -515,6 +517,18 @@ func main() {
Value: 0 * time.Second,
Hidden: true,
}),
+ altsrc.NewDurationFlag(&cli.DurationFlag{
+ Name: config.FlagNameMQTTConnectTimeout,
+ Usage: "Sets the time to wait before giving up to `DURATION` when connecting to an MQTT broker",
+ Value: 30 * time.Second,
+ Hidden: true,
+ }),
+ altsrc.NewDurationFlag(&cli.DurationFlag{
+ Name: config.FlagNameMQTTPublishTimeout,
+ Usage: "Sets the time to wait before giving up to `DURATION` when publishing a message to an MQTT broker",
+ Value: 30 * time.Second,
+ Hidden: true,
+ }),
}
app.EnableBashCompletion = true
diff --git a/internal/config/config.go b/internal/config/config.go
index 4f8f46a5..b1f72a63 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -28,6 +28,8 @@ const (
FlagNameMQTTConnectRetryInterval = "mqtt-connect-retry-interval"
FlagNameMQTTAutoReconnect = "mqtt-auto-reconnect"
FlagNameMQTTReconnectDelay = "mqtt-reconnect-delay"
+ FlagNameMQTTConnectTimeout = "mqtt-connect-timeout"
+ FlagNameMQTTPublishTimeout = "mqtt-publish-timeout"
)
var DefaultConfig = Config{
@@ -96,6 +98,14 @@ type Config struct {
// MQTTReconnectDelay is the duration the client with wait before attempting
// to reconnect to the MQTT broker.
MQTTReconnectDelay time.Duration
+
+ // MQTTConnectTimeout is the duration the client will wait for an MQTT
+ // connection to be established before giving up.
+ MQTTConnectTimeout time.Duration
+
+ // MQTTPublishTimeout is the duration the client will wait for an MQTT
+ // connection to publish a message before giving up.
+ MQTTPublishTimeout time.Duration
}
// CreateTLSConfig creates a tls.Config object from the current configuration.
diff --git a/internal/transport/mqtt.go b/internal/transport/mqtt.go
index 2cfcac12..a9ae5f11 100644
--- a/internal/transport/mqtt.go
+++ b/internal/transport/mqtt.go
@@ -155,8 +155,13 @@ func (t *MQTT) Connect() error {
}
}()
- if token := t.client.Connect(); token.Wait() && token.Error() != nil {
- return fmt.Errorf("cannot connect to broker: %w", token.Error())
+ log.Infof("connecting to broker: %v", config.DefaultConfig.Server)
+ token := t.client.Connect()
+ if token.WaitTimeout(config.DefaultConfig.MQTTConnectTimeout) {
+ if token.Error() != nil {
+ return fmt.Errorf("cannot connect to broker: %w", token.Error())
+ }
+ return fmt.Errorf("cannot connect to broker: connection timeout reached")
}
return nil
}
@@ -190,9 +195,15 @@ func (t *MQTT) Tx(
opts := t.client.OptionsReader()
topic := fmt.Sprintf("%v/%v/%v/out", config.DefaultConfig.PathPrefix, opts.ClientID(), addr)
- if token := t.client.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil {
- log.Errorf("failed to publish message: %v", token.Error())
- return TxResponseErr, nil, nil, token.Error()
+ token := t.client.Publish(topic, 1, false, data)
+ if token.WaitTimeout(config.DefaultConfig.MQTTPublishTimeout) {
+ if token.Error() != nil {
+ log.Errorf("failed to publish message: %v", token.Error())
+ return TxResponseErr, nil, nil, token.Error()
+ }
+ return TxResponseErr, nil, nil, fmt.Errorf(
+ "cannot publish message: connection timeout reached",
+ )
}
log.Debugf("published message to topic %v", topic)