Skip to content

Commit

Permalink
feat(mqtt): Use Token.WaitTimeout
Browse files Browse the repository at this point in the history
When connecting to a broker and when publishing a message, use the
`Token.WaitTimeout` method instead of `Token.Wait`. `Token.Wait` waits
indefinitely, which can lead to situations when the client never
succeeds in connecting or publishing.

The timeout for each operation can be configured independently by
setting `mqtt-connect-timeout` and `mqtt-publish-timeout`. Both values
default to 30 seconds. The flags are hidden, as they should not commonly
be required to be changed by users.

Signed-off-by: Link Dupont <[email protected]>
  • Loading branch information
subpop committed Sep 26, 2023
1 parent 1f015e8 commit 01e623c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 5 deletions.
14 changes: 14 additions & 0 deletions cmd/yggd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func setupDefaultConfig(c *cli.Context) {
MQTTConnectRetryInterval: c.Duration(config.FlagNameMQTTConnectRetryInterval),
MQTTAutoReconnect: c.Bool(config.FlagNameMQTTAutoReconnect),
MQTTReconnectDelay: c.Duration(config.FlagNameMQTTReconnectDelay),
MQTTConnectTimeout: c.Duration(config.FlagNameMQTTConnectTimeout),
MQTTPublishTimeout: c.Duration(config.FlagNameMQTTPublishTimeout),
}
}

Expand Down Expand Up @@ -515,6 +517,18 @@ func main() {
Value: 0 * time.Second,
Hidden: true,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: config.FlagNameMQTTConnectTimeout,
Usage: "Sets the time to wait before giving up to `DURATION` when connecting to an MQTT broker",
Value: 30 * time.Second,
Hidden: true,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: config.FlagNameMQTTPublishTimeout,
Usage: "Sets the time to wait before giving up to `DURATION` when publishing a message to an MQTT broker",
Value: 30 * time.Second,
Hidden: true,
}),
}

app.EnableBashCompletion = true
Expand Down
10 changes: 10 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
FlagNameMQTTConnectRetryInterval = "mqtt-connect-retry-interval"
FlagNameMQTTAutoReconnect = "mqtt-auto-reconnect"
FlagNameMQTTReconnectDelay = "mqtt-reconnect-delay"
FlagNameMQTTConnectTimeout = "mqtt-connect-timeout"
FlagNameMQTTPublishTimeout = "mqtt-publish-timeout"
)

var DefaultConfig = Config{
Expand Down Expand Up @@ -96,6 +98,14 @@ type Config struct {
// MQTTReconnectDelay is the duration the client with wait before attempting
// to reconnect to the MQTT broker.
MQTTReconnectDelay time.Duration

// MQTTConnectTimeout is the duration the client will wait for an MQTT
// connection to be established before giving up.
MQTTConnectTimeout time.Duration

// MQTTPublishTimeout is the duration the client will wait for an MQTT
// connection to publish a message before giving up.
MQTTPublishTimeout time.Duration
}

// CreateTLSConfig creates a tls.Config object from the current configuration.
Expand Down
21 changes: 16 additions & 5 deletions internal/transport/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,13 @@ func (t *MQTT) Connect() error {
}
}()

if token := t.client.Connect(); token.Wait() && token.Error() != nil {
return fmt.Errorf("cannot connect to broker: %w", token.Error())
log.Infof("connecting to broker: %v", config.DefaultConfig.Server)
token := t.client.Connect()
if token.WaitTimeout(config.DefaultConfig.MQTTConnectTimeout) {
if token.Error() != nil {
return fmt.Errorf("cannot connect to broker: %w", token.Error())
}
return fmt.Errorf("cannot connect to broker: connection timeout reached")
}
return nil
}
Expand Down Expand Up @@ -190,9 +195,15 @@ func (t *MQTT) Tx(
opts := t.client.OptionsReader()
topic := fmt.Sprintf("%v/%v/%v/out", config.DefaultConfig.PathPrefix, opts.ClientID(), addr)

if token := t.client.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil {
log.Errorf("failed to publish message: %v", token.Error())
return TxResponseErr, nil, nil, token.Error()
token := t.client.Publish(topic, 1, false, data)
if token.WaitTimeout(config.DefaultConfig.MQTTPublishTimeout) {
if token.Error() != nil {
log.Errorf("failed to publish message: %v", token.Error())
return TxResponseErr, nil, nil, token.Error()
}
return TxResponseErr, nil, nil, fmt.Errorf(
"cannot publish message: connection timeout reached",
)
}
log.Debugf("published message to topic %v", topic)

Expand Down

0 comments on commit 01e623c

Please sign in to comment.