Skip to content

Commit

Permalink
feat(mqtt): Use Token.WaitTimeout
Browse files Browse the repository at this point in the history
When connecting to a broker and when publishing a message, use the
`Token.WaitTimeout` method instead of `Token.Wait`. `Token.Wait` waits
indefinitely, which can lead to situations when the client never
succeeds in connecting or publishing.

The timeout for each operation can be configured independently by
setting `mqtt-connect-timeout` and `mqtt-publish-timeout`. Both values
default to 30 seconds. The flags are hidden, as they should not commonly
be required to be changed by users.

Signed-off-by: Link Dupont <[email protected]>
Signed-off-by: Link Dupont <[email protected]>
  • Loading branch information
subpop committed Oct 3, 2023
1 parent 1f015e8 commit d69e55f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
16 changes: 15 additions & 1 deletion cmd/yggd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func setupDefaultConfig(c *cli.Context) {
MQTTConnectRetryInterval: c.Duration(config.FlagNameMQTTConnectRetryInterval),
MQTTAutoReconnect: c.Bool(config.FlagNameMQTTAutoReconnect),
MQTTReconnectDelay: c.Duration(config.FlagNameMQTTReconnectDelay),
MQTTConnectTimeout: c.Duration(config.FlagNameMQTTConnectTimeout),
MQTTPublishTimeout: c.Duration(config.FlagNameMQTTPublishTimeout),
}
}

Expand Down Expand Up @@ -346,7 +348,7 @@ func mainAction(c *cli.Context) error {
// from the Transporter
client, transporter, err := setupClient(dispatcher, tlsConfig)
if err != nil {
return err
return cli.Exit(fmt.Errorf("cannot setup client: %w", err), 1)
}

// Create watcher for certificate changes
Expand Down Expand Up @@ -515,6 +517,18 @@ func main() {
Value: 0 * time.Second,
Hidden: true,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: config.FlagNameMQTTConnectTimeout,
Usage: "Sets the time to wait before giving up to `DURATION` when connecting to an MQTT broker",
Value: 30 * time.Second,
Hidden: true,
}),
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: config.FlagNameMQTTPublishTimeout,
Usage: "Sets the time to wait before giving up to `DURATION` when publishing a message to an MQTT broker",
Value: 30 * time.Second,
Hidden: true,
}),
}

app.EnableBashCompletion = true
Expand Down
10 changes: 10 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
FlagNameMQTTConnectRetryInterval = "mqtt-connect-retry-interval"
FlagNameMQTTAutoReconnect = "mqtt-auto-reconnect"
FlagNameMQTTReconnectDelay = "mqtt-reconnect-delay"
FlagNameMQTTConnectTimeout = "mqtt-connect-timeout"
FlagNameMQTTPublishTimeout = "mqtt-publish-timeout"
)

var DefaultConfig = Config{
Expand Down Expand Up @@ -96,6 +98,14 @@ type Config struct {
// MQTTReconnectDelay is the duration the client with wait before attempting
// to reconnect to the MQTT broker.
MQTTReconnectDelay time.Duration

// MQTTConnectTimeout is the duration the client will wait for an MQTT
// connection to be established before giving up.
MQTTConnectTimeout time.Duration

// MQTTPublishTimeout is the duration the client will wait for an MQTT
// connection to publish a message before giving up.
MQTTPublishTimeout time.Duration
}

// CreateTLSConfig creates a tls.Config object from the current configuration.
Expand Down
19 changes: 17 additions & 2 deletions internal/transport/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,15 @@ func (t *MQTT) Connect() error {
}
}()

if token := t.client.Connect(); token.Wait() && token.Error() != nil {
log.Infof("connecting to broker: %v", config.DefaultConfig.Server)
token := t.client.Connect()
if !token.WaitTimeout(config.DefaultConfig.MQTTConnectTimeout) {
return fmt.Errorf(
"cannot connect to broker: connection timeout: %v elapsed",
config.DefaultConfig.MQTTConnectTimeout,
)
}
if token.Error() != nil {
return fmt.Errorf("cannot connect to broker: %w", token.Error())
}
return nil
Expand Down Expand Up @@ -190,7 +198,14 @@ func (t *MQTT) Tx(
opts := t.client.OptionsReader()
topic := fmt.Sprintf("%v/%v/%v/out", config.DefaultConfig.PathPrefix, opts.ClientID(), addr)

if token := t.client.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil {
token := t.client.Publish(topic, 1, false, data)
if !token.WaitTimeout(config.DefaultConfig.MQTTPublishTimeout) {
return TxResponseErr, nil, nil, fmt.Errorf(
"cannot publish message: connection timeout: %v elapsed",
config.DefaultConfig.MQTTPublishTimeout,
)
}
if token.Error() != nil {
log.Errorf("failed to publish message: %v", token.Error())
return TxResponseErr, nil, nil, token.Error()
}
Expand Down

0 comments on commit d69e55f

Please sign in to comment.