diff --git a/cmd/yggd/main.go b/cmd/yggd/main.go index c7dd0230..9f58259f 100644 --- a/cmd/yggd/main.go +++ b/cmd/yggd/main.go @@ -66,6 +66,8 @@ func setupDefaultConfig(c *cli.Context) { MQTTConnectRetryInterval: c.Duration(config.FlagNameMQTTConnectRetryInterval), MQTTAutoReconnect: c.Bool(config.FlagNameMQTTAutoReconnect), MQTTReconnectDelay: c.Duration(config.FlagNameMQTTReconnectDelay), + MQTTConnectTimeout: c.Duration(config.FlagNameMQTTConnectTimeout), + MQTTPublishTimeout: c.Duration(config.FlagNameMQTTPublishTimeout), } } @@ -515,6 +517,18 @@ func main() { Value: 0 * time.Second, Hidden: true, }), + altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: config.FlagNameMQTTConnectTimeout, + Usage: "Sets the time to wait before giving up to `DURATION` when connecting to an MQTT broker", + Value: 30 * time.Second, + Hidden: true, + }), + altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: config.FlagNameMQTTPublishTimeout, + Usage: "Sets the time to wait before giving up to `DURATION` when publishing a message to an MQTT broker", + Value: 30 * time.Second, + Hidden: true, + }), } app.EnableBashCompletion = true diff --git a/internal/config/config.go b/internal/config/config.go index 4f8f46a5..b1f72a63 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -28,6 +28,8 @@ const ( FlagNameMQTTConnectRetryInterval = "mqtt-connect-retry-interval" FlagNameMQTTAutoReconnect = "mqtt-auto-reconnect" FlagNameMQTTReconnectDelay = "mqtt-reconnect-delay" + FlagNameMQTTConnectTimeout = "mqtt-connect-timeout" + FlagNameMQTTPublishTimeout = "mqtt-publish-timeout" ) var DefaultConfig = Config{ @@ -96,6 +98,14 @@ type Config struct { // MQTTReconnectDelay is the duration the client with wait before attempting // to reconnect to the MQTT broker. MQTTReconnectDelay time.Duration + + // MQTTConnectTimeout is the duration the client will wait for an MQTT + // connection to be established before giving up. + MQTTConnectTimeout time.Duration + + // MQTTPublishTimeout is the duration the client will wait for an MQTT + // connection to publish a message before giving up. + MQTTPublishTimeout time.Duration } // CreateTLSConfig creates a tls.Config object from the current configuration. diff --git a/internal/transport/mqtt.go b/internal/transport/mqtt.go index 2cfcac12..a9ae5f11 100644 --- a/internal/transport/mqtt.go +++ b/internal/transport/mqtt.go @@ -155,8 +155,13 @@ func (t *MQTT) Connect() error { } }() - if token := t.client.Connect(); token.Wait() && token.Error() != nil { - return fmt.Errorf("cannot connect to broker: %w", token.Error()) + log.Infof("connecting to broker: %v", config.DefaultConfig.Server) + token := t.client.Connect() + if token.WaitTimeout(config.DefaultConfig.MQTTConnectTimeout) { + if token.Error() != nil { + return fmt.Errorf("cannot connect to broker: %w", token.Error()) + } + return fmt.Errorf("cannot connect to broker: connection timeout reached") } return nil } @@ -190,9 +195,15 @@ func (t *MQTT) Tx( opts := t.client.OptionsReader() topic := fmt.Sprintf("%v/%v/%v/out", config.DefaultConfig.PathPrefix, opts.ClientID(), addr) - if token := t.client.Publish(topic, 1, false, data); token.Wait() && token.Error() != nil { - log.Errorf("failed to publish message: %v", token.Error()) - return TxResponseErr, nil, nil, token.Error() + token := t.client.Publish(topic, 1, false, data) + if token.WaitTimeout(config.DefaultConfig.MQTTPublishTimeout) { + if token.Error() != nil { + log.Errorf("failed to publish message: %v", token.Error()) + return TxResponseErr, nil, nil, token.Error() + } + return TxResponseErr, nil, nil, fmt.Errorf( + "cannot publish message: connection timeout reached", + ) } log.Debugf("published message to topic %v", topic)