diff --git a/cmd/clear.go b/cmd/clear.go index 3b3a6d9..57ecc37 100644 --- a/cmd/clear.go +++ b/cmd/clear.go @@ -15,7 +15,7 @@ import ( "github.com/spf13/cobra" - "github.com/joshuar/go-hass-anything/v8/internal/agent" + "github.com/joshuar/go-hass-anything/v9/internal/agent" ) var clearCmd = &cobra.Command{ diff --git a/cmd/configure.go b/cmd/configure.go index 9cbb516..19734b4 100644 --- a/cmd/configure.go +++ b/cmd/configure.go @@ -8,7 +8,7 @@ package cmd import ( "github.com/spf13/cobra" - "github.com/joshuar/go-hass-anything/v8/internal/agent" + "github.com/joshuar/go-hass-anything/v9/internal/agent" ) var configCmd = &cobra.Command{ diff --git a/cmd/root.go b/cmd/root.go index a97cff7..165db35 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -8,7 +8,7 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "github.com/joshuar/go-hass-anything/v8/internal/logging" + "github.com/joshuar/go-hass-anything/v9/internal/logging" ) const ( diff --git a/cmd/run.go b/cmd/run.go index 227c422..4e7e320 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -15,7 +15,7 @@ import ( "github.com/spf13/cobra" - "github.com/joshuar/go-hass-anything/v8/internal/agent" + "github.com/joshuar/go-hass-anything/v9/internal/agent" ) var runCmd = &cobra.Command{ diff --git a/go.mod b/go.mod index 42c8e5e..8fb2ec4 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/joshuar/go-hass-anything/v8 +module github.com/joshuar/go-hass-anything/v9 go 1.22.0 @@ -39,6 +39,7 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 github.com/charmbracelet/bubbles v0.18.0 github.com/charmbracelet/bubbletea v0.25.0 + github.com/eclipse/paho.golang v0.21.0 github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/iancoleman/strcase v0.3.0 github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -49,6 +50,7 @@ require ( github.com/shirou/gopsutil/v3 v3.24.3 github.com/sourcegraph/conc v0.3.0 github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 diff --git a/go.sum b/go.sum index fe8f35e..7e42064 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.golang v0.21.0 h1:cxxEReu+iFbA5RrHfRGxJOh8tXZKDywuehneoeBeyn8= +github.com/eclipse/paho.golang v0.21.0/go.mod h1:GHF6vy7SvDbDHBguaUpfuBkEB5G6j0zKxMG4gbh6QRQ= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= @@ -107,6 +109,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY= +golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 8ae37be..0ab1c04 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -8,12 +8,13 @@ package agent import ( "context" "sync" + "time" "github.com/rs/zerolog/log" - ui "github.com/joshuar/go-hass-anything/v8/internal/agent/ui/bubbletea" - "github.com/joshuar/go-hass-anything/v8/pkg/mqtt" - "github.com/joshuar/go-hass-anything/v8/pkg/preferences" + ui "github.com/joshuar/go-hass-anything/v9/internal/agent/ui/bubbletea" + "github.com/joshuar/go-hass-anything/v9/pkg/mqtt" + "github.com/joshuar/go-hass-anything/v9/pkg/preferences" ) //go:generate go run ../../tools/appgenerator/run.go arg1 @@ -29,12 +30,47 @@ type agent struct { version string } +// App represents an app that the agent can run. All apps have the following +// methods, which define how the app should be configured, current states of its +// entities and any subscriptions it wants to watch. type App interface { + // Name() is an identifier for the app, used for logging in the agent. Name() string + // Configuration() returns the messages needed to tell Home Assistant how to + // configure the app and its entities. Configuration() []*mqtt.Msg + // States() are the messages that reflect the app's current state of the + // entities of the app. States() []*mqtt.Msg + // Subscriptions() are the topics on which the app wants to subscribe and + // execute a callback in response to a message on that topic. Subscriptions() []*mqtt.Subscription - Run(ctx context.Context, client *mqtt.Client) error + // Update() is a function that is run at least once by the agent and will + // usually contain the logic to update the states of all the apps entities. + // It may be run multiple times, if the app is also considered a polling + // app. See the definition for PollingApp for details. + Update(ctx context.Context) error +} + +// PollingApp represents an app that should be polled for updates on some +// interval. When an app satisfies this interface, the agent will configure a +// goroutine to run the apps Update() function and publish its States(). +type PollingApp interface { + // PollConfig defines the interval on which the app should be polled and its + // states updated. A jitter should be defined, that is much less than the + // interval, to add a small variation to the interval to avoid any + // "thundering herd" problems. + PollConfig() (interval, jitter time.Duration) +} + +// EventsApp represents an app that will update its States() in response to some +// event(s) it is monitoring. When an app satisfies this interface, the agent +// will configure a goroutine to watch a channel of messages the app sends when +// an event occurs, which will be published to MQTT. +type EventsApp interface { + // MsgCh is a channel of messages that the app generates when some internal + // event occurs and a new message should be published to MQTT. + MsgCh() chan *mqtt.Msg } func NewAgent(id, name string) *agent { @@ -68,9 +104,11 @@ func (a *agent) Configure() { } func Run(ctx context.Context) { - var appsToRun []mqtt.Device + var subscriptions []*mqtt.Subscription + var configs []*mqtt.Msg for _, app := range AppList { - appsToRun = append(appsToRun, app) + configs = append(configs, app.Configuration()...) + subscriptions = append(subscriptions, app.Subscriptions()...) } prefs, err := preferences.LoadPreferences() @@ -78,7 +116,7 @@ func Run(ctx context.Context) { log.Fatal().Err(err).Msg("Could not load preferences.") } - client, err := mqtt.NewClient(ctx, prefs, appsToRun...) + client, err := mqtt.NewClient(ctx, prefs, subscriptions) if err != nil { log.Fatal().Err(err).Msg("Could not connect to broker.") } @@ -92,7 +130,7 @@ func ClearApps(ctx context.Context) { log.Fatal().Err(err).Msg("Could not load preferences.") } - client, err := mqtt.NewClient(ctx, prefs) + client, err := mqtt.NewClient(ctx, prefs, nil) if err != nil { log.Fatal().Err(err).Msg("Could not connect to broker.") } @@ -113,10 +151,64 @@ func runApps(ctx context.Context, client *mqtt.Client, apps []App) { wg.Add(1) go func(a App) { defer wg.Done() - if err := a.Run(ctx, client); err != nil { - log.Debug().Err(err).Msg("Error running app.") + publishAppConfigs(a, client) + if app, ok := a.(PollingApp); ok { + interval, jitter := app.PollConfig() + log.Info().Dur("interval", interval).Str("app", a.Name()).Msg("Running loop to poll app updates.") + wg.Add(1) + go func() { + defer wg.Wait() + poll( + ctx, + func() { + updateApp(ctx, a) + publishAppStates(a, client) + }, + interval, + jitter, + ) + }() + } + if app, ok := a.(EventsApp); ok { + updateApp(ctx, a) + wg.Add(1) + go func() { + defer wg.Done() + log.Info().Str("app", a.Name()).Msg("Listening for message events from app to publish.") + for { + select { + case msg := <-app.MsgCh(): + if err := client.Publish(msg); err != nil { + log.Error().Err(err).Str("app", a.Name()).Msg("Failed to publish state messages.") + } + case <-ctx.Done(): + return + } + } + }() } }(app) } wg.Wait() } + +func updateApp(ctx context.Context, app App) { + log.Debug().Str("app", app.Name()).Msg("Updating.") + if err := app.Update(ctx); err != nil { + log.Warn().Err(err).Str("app", app.Name()).Msg("App failed to update.") + } +} + +func publishAppStates(app App, client *mqtt.Client) { + log.Debug().Str("app", app.Name()).Msg("Publishing states.") + if err := client.Publish(app.States()...); err != nil { + log.Error().Err(err).Str("app", app.Name()).Msg("Failed to publish state messages.") + } +} + +func publishAppConfigs(app App, client *mqtt.Client) { + log.Debug().Str("app", app.Name()).Msg("Publishing configs.") + if err := client.Publish(app.Configuration()...); err != nil { + log.Error().Err(err).Str("app", app.Name()).Msg("Failed to publish configuration messages.") + } +} diff --git a/pkg/apps/helpers/pollsensors.go b/internal/agent/polling.go similarity index 63% rename from pkg/apps/helpers/pollsensors.go rename to internal/agent/polling.go index fe42ff7..8fe135a 100644 --- a/pkg/apps/helpers/pollsensors.go +++ b/internal/agent/polling.go @@ -3,39 +3,39 @@ // This software is released under the MIT License. // https://opensource.org/licenses/MIT -package helpers +package agent import ( "context" - "sync" "time" "github.com/lthibault/jitterbug/v2" + "github.com/rs/zerolog/log" ) -// PollSensors is a helper function that will call the passed `updater()` +// poll is a helper function that will call the passed `updater()` // function around each `interval` duration within the `stdev` duration window. // Effectively, `updater()` will get called sometime near `interval`, but not // exactly on it. This can help avoid a "thundering herd" problem of sensors all // trying to update at the same time. -func PollSensors(ctx context.Context, updater func(), interval, stdev time.Duration) { +func poll(ctx context.Context, updater func(), interval, jitter time.Duration) { + if interval <= 0 || jitter <= 0 { + log.Warn().Dur("interval", interval).Dur("jitter", jitter).Msg("Invalid interval and stdev for polling.") + return + } updater() ticker := jitterbug.New( interval, - &jitterbug.Norm{Stdev: stdev}, + &jitterbug.Norm{Stdev: jitter}, ) - var wg sync.WaitGroup - wg.Add(1) go func() { for { select { case <-ctx.Done(): - wg.Done() return case <-ticker.C: updater() } } }() - wg.Wait() } diff --git a/internal/agent/ui/bubbletea/bubbletea.go b/internal/agent/ui/bubbletea/bubbletea.go index a7c4629..4a1e7f9 100644 --- a/internal/agent/ui/bubbletea/bubbletea.go +++ b/internal/agent/ui/bubbletea/bubbletea.go @@ -9,7 +9,7 @@ import ( tea "github.com/charmbracelet/bubbletea" "github.com/rs/zerolog/log" - "github.com/joshuar/go-hass-anything/v8/pkg/preferences" + "github.com/joshuar/go-hass-anything/v9/pkg/preferences" ) type Agent interface { diff --git a/main.go b/main.go index 218709c..ccba3ee 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,7 @@ import ( "log" "syscall" - "github.com/joshuar/go-hass-anything/v8/cmd" + "github.com/joshuar/go-hass-anything/v9/cmd" ) func init() { diff --git a/pkg/apps/exampleapp/exampleapp.go b/pkg/apps/exampleapp/exampleapp.go index 839afaf..a4f8e0a 100644 --- a/pkg/apps/exampleapp/exampleapp.go +++ b/pkg/apps/exampleapp/exampleapp.go @@ -8,25 +8,26 @@ package exampleapp import ( "context" "encoding/json" + "errors" "os" "os/exec" "strconv" "time" "github.com/carlmjohnson/requests" - MQTT "github.com/eclipse/paho.mqtt.golang" + "github.com/eclipse/paho.golang/paho" "github.com/rs/zerolog/log" "github.com/shirou/gopsutil/v3/load" - "github.com/joshuar/go-hass-anything/v8/pkg/apps/helpers" - "github.com/joshuar/go-hass-anything/v8/pkg/hass" - "github.com/joshuar/go-hass-anything/v8/pkg/mqtt" - "github.com/joshuar/go-hass-anything/v8/pkg/preferences" - "github.com/joshuar/go-hass-anything/v8/pkg/web" + mqtthass "github.com/joshuar/go-hass-anything/v9/pkg/hass" + mqttapi "github.com/joshuar/go-hass-anything/v9/pkg/mqtt" + "github.com/joshuar/go-hass-anything/v9/pkg/preferences" + "github.com/joshuar/go-hass-anything/v9/pkg/web" ) const ( appName = "exampleApp" + appID = "example_app" weatherURL = "http://wttr.in/?format=j1" weatherURLpref = "weatherURL" ) @@ -38,12 +39,15 @@ var defaultPrefs = map[string]any{ type exampleApp struct { loadData *load.AvgStat config *preferences.AppPreferences - weatherTopics *hass.Topics - buttonTopics *hass.Topics - numberTopics *hass.Topics - loadTopics []*hass.Topics + weatherEntity *mqtthass.SensorEntity + buttonEntity *mqtthass.ButtonEntity + numberEntity *mqtthass.NumberEntity[int] + switchEntity *mqtthass.SwitchEntity + msgCh chan *mqttapi.Msg + loadEntities []*mqtthass.SensorEntity weatherData []byte - number int + numberState int + switchState bool } // New sets up our example app. We make use of the preference loading/saving in @@ -52,7 +56,9 @@ type exampleApp struct { // preferences our app needs in this file by providing a map[string]any that // maps preferences to values. func New(_ context.Context) (*exampleApp, error) { - app := &exampleApp{} + app := &exampleApp{ + msgCh: make(chan *mqttapi.Msg), + } // load our app config. if we don't have a config, set some defaults p, err := preferences.LoadAppPreferences(app.Name()) if os.IsNotExist(err) { @@ -116,7 +122,7 @@ func (a *exampleApp) Timeout() time.Duration { } // getLoadAvgs fetches the load averages for the system running -// go-hass-anything, using the very handy gopsutil package +// go-hass-anything, using the very handy gopsutil package. func (a *exampleApp) getLoadAvgs(ctx context.Context) error { var l *load.AvgStat var err error @@ -137,28 +143,22 @@ func (a *exampleApp) Name() string { // Configuration is called when our app is first registered in Home Assistant and // will return configuration messages for the data our app will send/receive. -func (a *exampleApp) Configuration() []*mqtt.Msg { - var msgs []*mqtt.Msg - - // Fetch the topic prefix from the agent preferences. Usually, this will - // default to "homeassistant". - var topicPrefix string - appPrefs, err := preferences.LoadPreferences() - if err != nil { - log.Warn().Err(err).Msg("Could not load app preferences.") - } - topicPrefix = appPrefs.GetTopicPrefix() +func (a *exampleApp) Configuration() []*mqttapi.Msg { + var msgs []*mqttapi.Msg // deviceInfo is used to associate each sensor to our example app device in Home Assistant - deviceInfo := &hass.Device{ - Name: appName, - Identifiers: []string{"exampleApp01"}, + deviceInfo := &mqtthass.Device{ + Name: appName, + Identifiers: []string{"exampleApp01"}, + URL: "https://github.com/joshuar/go-hass-anything", + Manufacturer: "go-hass-anything", + Model: appID, } // originInfo is used by Home Assistant to indicate where the device/sensors // came from. In this case, we fill out the details to indicate Go Hass // Anything. - originInfo := &hass.Origin{ + originInfo := &mqtthass.Origin{ Name: "Go Hass Anything", URL: "https://github.com/joshuar/go-hass-anything", } @@ -172,61 +172,76 @@ func (a *exampleApp) Configuration() []*mqtt.Msg { // ValueTemplate to extract out the value we are interested in. We could // create more sensors and extract other values out of this response if // desired. - weatherEntity := hass.NewEntityByName[float32]("ExampleApp Weather Temp", appName, topicPrefix). - AsSensor(). + a.weatherEntity = mqtthass.AsSensor(mqtthass.NewEntity(appName, "Weather Temp", ""). WithDeviceInfo(deviceInfo). WithOriginInfo(originInfo). WithStateClassMeasurement(). WithDeviceClass("temperature"). WithUnits("°C"). - WithValueTemplate("{{ value_json.current_condition[0].temp_C }}") - a.weatherTopics = weatherEntity.GetTopics() - if msg, err := hass.MarshalConfig(weatherEntity); err != nil { - log.Error().Err(err).Str("entity", weatherEntity.Entity.Name).Msg("Could not marshal config for entity.") + WithValueTemplate("{{ value_json.current_condition[0].temp_C }}"). + WithStateCallback(a.weatherStateCallback)) + if msg, err := a.weatherEntity.MarshalConfig(); err != nil { + log.Error().Err(err).Str("entity", a.weatherEntity.Name).Msg("Could not marshal config for entity.") } else { msgs = append(msgs, msg) } // we have three sensors for the loadavgs for _, l := range []string{"1", "5", "15"} { - loadEntity := hass.NewEntityByID[float64]("example_app_load"+l, appName, topicPrefix). - AsSensor(). + loadEntity := mqtthass.AsSensor(mqtthass.NewEntity(appName, "Load Average "+l+" min", "load_avg_"+l). WithDeviceInfo(deviceInfo). WithOriginInfo(originInfo). WithStateClassMeasurement(). - WithValueTemplate("{{ value }}") - a.loadTopics = append(a.loadTopics, loadEntity.GetTopics()) - if msg, err := hass.MarshalConfig(loadEntity); err != nil { - log.Error().Err(err).Str("entity", loadEntity.Entity.Name).Msg("Could not marshal config for entity.") + WithValueTemplate("{{ value }}"). + WithStateCallback(a.loadStateCallback)) + if msg, err := loadEntity.MarshalConfig(); err != nil { + log.Error().Err(err).Str("entity", loadEntity.Name).Msg("Could not marshal config for entity.") } else { msgs = append(msgs, msg) } + a.loadEntities = append(a.loadEntities, loadEntity) } // we also have a button that when pressed in Home Assistant, will perform // an action. - buttonEntity := hass.NewEntityByID[bool]("example_app_button", appName, topicPrefix). - AsButton(). - WithCommandCallback(buttonCallback) - a.buttonTopics = buttonEntity.GetTopics() - if msg, err := hass.MarshalConfig(buttonEntity); err != nil { - log.Error().Err(err).Str("entity", buttonEntity.Entity.Name).Msg("Could not marshal config for entity.") + a.buttonEntity = mqtthass.AsButton(mqtthass.NewEntity(appName, "Button", ""). + WithDeviceInfo(deviceInfo). + WithOriginInfo(originInfo). + WithCommandCallback(buttonCommandCallback)) + if msg, err := a.buttonEntity.MarshalConfig(); err != nil { + log.Error().Err(err).Str("entity", a.buttonEntity.Name).Msg("Could not marshal config for entity.") } else { msgs = append(msgs, msg) } - // and we have a number slider entity. we need to track the state of the + // we have a number slider entity. we need to track the state of the // slider entity, and we set a default value. - a.number = 50 + a.numberState = 50 // create our number entity. - numberEntity := hass.NewEntityByID[int]("example_app_number", appName, topicPrefix). - AsNumber(1, 0, 100, hass.NumberSlider). - WithCommandCallback(a.numberCallback). - WithValueTemplate("{{ value_json.value }}") - // we retrieve the topics for the number entity and store them for re-use. - a.numberTopics = numberEntity.GetTopics() - if msg, err := hass.MarshalConfig(numberEntity); err != nil { - log.Error().Err(err).Str("entity", numberEntity.Entity.Name).Msg("Could not marshal config for entity.") + a.numberEntity = mqtthass.AsNumber(mqtthass.NewEntity(appName, "Number", ""). + WithDeviceInfo(deviceInfo). + WithOriginInfo(originInfo). + WithStateClassMeasurement(). + WithCommandCallback(a.numberCommandCallback). + WithStateCallback(a.numberStateCallback). + WithValueTemplate("{{ value_json.value }}"), + 1, 0, 250, mqtthass.NumberSlider) + if msg, err := a.numberEntity.MarshalConfig(); err != nil { + log.Error().Err(err).Str("entity", a.numberEntity.Name).Msg("Could not marshal config for entity.") + } else { + msgs = append(msgs, msg) + } + + // we have a switch entity. + a.switchEntity = mqtthass.AsSwitch(mqtthass.NewEntity(appName, "Switch", ""). + WithDeviceInfo(deviceInfo). + WithOriginInfo(originInfo). + WithCommandCallback(a.switchCommandCallback). + WithStateCallback(a.switchStateCallback). + WithValueTemplate("{{ value }}"), + true).AsTypeSwitch() + if msg, err := a.switchEntity.MarshalConfig(); err != nil { + log.Error().Err(err).Str("entity", a.switchEntity.Name).Msg("Could not marshal config for entity.") } else { msgs = append(msgs, msg) } @@ -236,34 +251,44 @@ func (a *exampleApp) Configuration() []*mqtt.Msg { } // States is called when we want to send our sensor data to Home Assistant. -func (a *exampleApp) States() []*mqtt.Msg { - var msgs []*mqtt.Msg +func (a *exampleApp) States() []*mqttapi.Msg { + var msgs []*mqttapi.Msg // we retrieve the weather data and create a message to publish it to its // state topic. - msgs = append(msgs, - mqtt.NewMsg(a.weatherTopics.State, a.weatherData)) + weatherState, err := a.weatherEntity.MarshalState() + if err != nil { + log.Warn().Err(err).Msg("Unable to marshal weather state to MQTT message.") + } else { + msgs = append(msgs, weatherState) + } - // we retrieve our load avgs aand reate msgs to publish to each of their + // we retrieve our load avgs and reate msgs to publish to each of their // state topics. - for i := range 3 { - var l float64 - switch i { - case 0: - l = a.loadData.Load1 - case 1: - l = a.loadData.Load5 - case 2: - l = a.loadData.Load15 + for i, l := range []string{"1", "5", "15"} { + loadState, err := a.loadEntities[i].MarshalState(l) + if err != nil { + log.Warn().Err(err).Msg("Unable to marshal load state to MQTT message.") + } else { + msgs = append(msgs, loadState) } - msgs = append(msgs, - mqtt.NewMsg(a.loadTopics[i].State, - json.RawMessage(strconv.FormatFloat(l, 'f', -1, 64)))) } // we create a msg to publish the current number value to its state topic - msgs = append(msgs, - mqtt.NewMsg(a.numberTopics.State, json.RawMessage(`{ "value": `+strconv.Itoa(a.number)+` }`))) + numberState, err := a.numberEntity.MarshalState() + if err != nil { + log.Warn().Err(err).Msg("Unable to marshal number state to MQTT message.") + } else { + msgs = append(msgs, numberState) + } + + // we create a msg to publish the current state of the switch + switchState, err := a.switchEntity.MarshalState() + if err != nil { + log.Warn().Err(err).Msg("Unable to marshal switch state to MQTT message.") + } else { + msgs = append(msgs, switchState) + } // return all the messages for publication. return msgs @@ -271,21 +296,33 @@ func (a *exampleApp) States() []*mqtt.Msg { // Subscriptions is called once to register the callbacks we will use when Home // Assistant sends back messages on any command topics. -func (a *exampleApp) Subscriptions() []*mqtt.Subscription { - var msgs []*mqtt.Subscription - - msgs = append(msgs, - // we add a subscription to watch for requests for the button press. - &mqtt.Subscription{ - Topic: a.buttonTopics.Command, - Callback: buttonCallback, - }, - // we add a subscription to watch for requests for changing the number - // value. - &mqtt.Subscription{ - Topic: a.numberTopics.Command, - Callback: a.numberCallback, - }) +func (a *exampleApp) Subscriptions() []*mqttapi.Subscription { + var msgs []*mqttapi.Subscription + + // we add a subscription to watch for requests for changing the number + // value. + buttonSub, err := a.buttonEntity.MarshalSubscription() + if err != nil { + log.Warn().Err(err).Msg("Unable to marshal button subscription.") + } else { + msgs = append(msgs, buttonSub) + } + // we add a subscription to watch for requests for changing the number + // value. + numberSub, err := a.numberEntity.MarshalSubscription() + if err != nil { + log.Warn().Err(err).Msg("Unable to marshal number subscription.") + } else { + msgs = append(msgs, numberSub) + } + // we add a subscription to watch for requests for changing the switch + // value. + switchSub, err := a.switchEntity.MarshalSubscription() + if err != nil { + log.Warn().Err(err).Msg("Unable to marshal switch subscription.") + } else { + msgs = append(msgs, switchSub) + } // return all the messages for publication. return msgs @@ -294,63 +331,106 @@ func (a *exampleApp) Subscriptions() []*mqtt.Subscription { // Run is the function that the agent calls to start our app. In it, we create // our app struct, register our app (if needed), listen for our button press, // then set up a loop to send our sensor data. -func (a *exampleApp) Run(ctx context.Context, client *mqtt.Client) error { - log.Info().Str("appName", appName).Msg("Starting app.") - - // add our subscriptions - if err := client.Subscribe(a.Subscriptions()...); err != nil { - log.Error().Err(err).Msg("Could not activate subscriptions.") +func (a *exampleApp) Update(ctx context.Context) error { + var errs error + // get the weather + if err := a.getWeather(ctx); err != nil { + errs = errors.Join(errs, errors.New("could not get weather data")) } + // get the load averages + if err := a.getLoadAvgs(ctx); err != nil { + errs = errors.Join(errs, errors.New("could not get load averages")) + } + return errs +} - // create a function we will use whenever we want to send our app state - sendState := func() { - // get the weather - if err := a.getWeather(ctx); err != nil { - log.Error().Err(err).Msg("Could not get weather data.") - } - // get the load averages - if err := a.getLoadAvgs(ctx); err != nil { - log.Error().Err(err).Msg("Could not get load averages.") - } - // send our data - if err := client.Publish(a.States()...); err != nil { - log.Error().Err(err).Msg("Failed to publish state.") +func (a *exampleApp) PollConfig() (interval, jitter time.Duration) { + return time.Minute, time.Second * 5 +} + +func (a *exampleApp) MsgCh() chan *mqttapi.Msg { + return a.msgCh +} + +func (a *exampleApp) weatherStateCallback(_ ...any) (json.RawMessage, error) { + return a.weatherData, nil +} + +func (a *exampleApp) loadStateCallback(args ...any) (json.RawMessage, error) { + var value float64 + if l, ok := args[0].(string); !ok { + return nil, errors.New("could not determine which load was requested") + } else { + switch l { + case "1": + value = a.loadData.Load1 + case "5": + value = a.loadData.Load5 + case "15": + value = a.loadData.Load15 } } - - // we use a helper function, PollSensors, that takes care of setting up a - // ticker to run every minute (with a little bit of jitter) to send our data - helpers.PollSensors(ctx, sendState, time.Minute, time.Second*5) - return nil + return json.RawMessage(strconv.FormatFloat(value, 'f', -1, 64)), nil } -// buttonCallback is our callback function that is run when somebody presses the +// buttonCommandCallback is our callback function that is run when somebody presses the // button we created in Home Assistant. In this case, we use xdg-open to open // the Home Assistant homepage. But we could do other things, or many things at // once and/or react based on the response data we got (the MQTT.Message // parameter). -func buttonCallback(_ MQTT.Client, msg MQTT.Message) { - log.Info().Str("topic", msg.Topic()).RawJSON("payload", msg.Payload()).Msg("Received button press request.") +func buttonCommandCallback(_ *paho.Publish) { + log.Info().Msg("Button was pressed.") log.Info().Msg("Opening Home Assistant homepage.") if err := exec.Command("xdg-open", "https://home-assistant.io").Run(); err != nil { log.Warn().Err(err).Msg("Could not execute xdg-open.") } } -// numberCallback is our callback function for when a request to change the value is +func (a *exampleApp) numberStateCallback(_ ...any) (json.RawMessage, error) { + return json.RawMessage(`{ "value": ` + strconv.Itoa(a.numberState) + ` }`), nil +} + +// numberCommandCallback is our callback function for when a request to change the value is // received on MQTT, we set our state internally and publish back on the // state topic for any listeners. -func (a *exampleApp) numberCallback(client MQTT.Client, msg MQTT.Message) { - if newValue, err := strconv.Atoi(string(msg.Payload())); err != nil { +func (a *exampleApp) numberCommandCallback(p *paho.Publish) { + if newValue, err := strconv.Atoi(string(p.Payload)); err != nil { log.Warn().Err(err).Msg("Could not parse new value for number.") } else { - log.Info().Str("topic", msg.Topic()).Int("value", newValue).Msg("Received request to change number.") - a.number = newValue - token := client.Publish(a.numberTopics.State, 0, false, []byte(`{ "value": `+string(msg.Payload())+` }`)) - if token.Wait() && token.Error() != nil { - log.Warn().Err(token.Error()).Msg("Failed to publish new state to MQTT.") - } else { - log.Info().Str("topic", a.numberTopics.State).Int("value", a.number).Msg("Published new number state.") - } + log.Info().Int("value", newValue).Msg("Number was changed.") + a.numberState = newValue + } +} + +// switchCallback is our callback function for when the switch entity is +// manipulated in Home Assistant. +func (a *exampleApp) switchStateCallback(_ ...any) (json.RawMessage, error) { + switch a.switchState { + case true: + return json.RawMessage(`ON`), nil + default: + return json.RawMessage(`OFF`), nil + } +} + +// switchCommandCallback is our callback function for when the switch entity is +// manipulated in Home Assistant. +func (a *exampleApp) switchCommandCallback(p *paho.Publish) { + // Record the new state. + state := string(p.Payload) + switch state { + case "ON": + log.Info().Msg("Switch was turned on.") + a.switchState = true + case "OFF": + log.Info().Msg("Switch was turned off.") + a.switchState = false + } + // Publish a message with the new state. + msg, err := a.switchEntity.MarshalState() + if err != nil { + log.Warn().Msg("Unable to marshal new state message.") + } else { + a.msgCh <- msg } } diff --git a/pkg/hass/entities.go b/pkg/hass/entities.go index 4902bee..330f6e9 100644 --- a/pkg/hass/entities.go +++ b/pkg/hass/entities.go @@ -7,41 +7,39 @@ package hass import ( "encoding/json" - "errors" + "fmt" "strings" - mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/eclipse/paho.golang/paho" "github.com/iancoleman/strcase" + "golang.org/x/exp/constraints" "golang.org/x/text/cases" "golang.org/x/text/language" - mqttapi "github.com/joshuar/go-hass-anything/v8/pkg/mqtt" + mqttapi "github.com/joshuar/go-hass-anything/v9/pkg/mqtt" ) -// EntityConfig is a type used by apps to represent a Home Assistant entity and -// some additional fields for manipulating that entity in the app. Specify the -// type T to indicate the type of value this entity uses, which will be used to -// configure any additional fields/parameters that need to be created for the -// entity type. For example, for a number entity, specifying T as int will -// ensure the min, max and step parameters are also treated as ints. -type EntityConfig[T any] struct { - Entity *Entity[T] - App string - CommandCallback func(mqtt.Client, mqtt.Message) +var HomeAssistantTopic = "homeassistant" + +// entityConfig contains fields for defining the configuration of the entity. +type entityConfig struct { + CommandCallback func(p *paho.Publish) StateCallback func(args ...any) (json.RawMessage, error) - AttributesCallback func() (json.RawMessage, error) - ConfigTopic string - topicPrefix string + AttributesCallback func(args ...any) (json.RawMessage, error) + App string + NodeID string + EntityType EntityType } -// Entity represents a generic entity in Home Assistant. The fields are common +// entity represents a generic entity in Home Assistant. The fields are common // across any specific entity. -type Entity[T any] struct { - numberEntity[T] +type entity struct { + *entityConfig `json:"-"` Origin *Origin `json:"origin,omitempty"` Device *Device `json:"device,omitempty"` UnitOfMeasurement string `json:"unit_of_measurement,omitempty"` StateClass string `json:"state_class,omitempty"` + ConfigTopic string `json:"-"` CommandTopic string `json:"command_topic,omitempty"` StateTopic string `json:"state_topic"` ValueTemplate string `json:"value_template"` @@ -52,17 +50,123 @@ type Entity[T any] struct { AttributesTopic string `json:"json_attributes_topic,omitempty"` AttributesTemplate string `json:"json_attributes_template,omitempty"` DeviceClass string `json:"device_class,omitempty"` + StateExpiry int `json:"expire_after,omitempty"` +} + +// MarshalState will generate an *mqtt.Msg for a given entity, that can be used +// to publish the entity's state to the MQTT bus. +func (e *entity) MarshalState(args ...any) (*mqttapi.Msg, error) { + var state json.RawMessage + var err error + if e.StateCallback == nil { + return nil, fmt.Errorf("entity %s for app %s does not have a state callback function", e.UniqueID, e.App) + } + if state, err = e.StateCallback(args...); err != nil { + return nil, err + } + return mqttapi.NewMsg(e.StateTopic, state), nil +} + +func (e *entity) MarshalAttributes(args ...any) (*mqttapi.Msg, error) { + var state json.RawMessage + var err error + if e.AttributesCallback == nil { + return nil, fmt.Errorf("entity %s for app %s does not have a state callback function", e.UniqueID, e.App) + } + if state, err = e.AttributesCallback(args...); err != nil { + return nil, err + } + return mqttapi.NewMsg(e.AttributesTopic, state), nil +} + +// MarshallSubscription will generate an *mqtt.Subscription for a given entity, +// which can be used to subscribe to an entity's command topic and execute a +// callback on messages. +func (e *entity) MarshalSubscription() (*mqttapi.Subscription, error) { + if e.CommandCallback == nil { + return nil, fmt.Errorf("entity %s for app %s does not have a command callback function", e.UniqueID, e.App) + } + msg := &mqttapi.Subscription{ + Topic: e.CommandTopic, + Callback: e.CommandCallback, + } + return msg, nil +} + +type SensorEntity struct { + *entity +} + +func (e *SensorEntity) MarshalConfig() (*mqttapi.Msg, error) { + var cfg []byte + var err error + if cfg, err = json.Marshal(e); err != nil { + return nil, err + } + return mqttapi.NewMsg(e.ConfigTopic, cfg), nil +} + +type BinarySensorEntity struct { + *entity +} + +func (e *BinarySensorEntity) MarshalConfig() (*mqttapi.Msg, error) { + var cfg []byte + var err error + if cfg, err = json.Marshal(e); err != nil { + return nil, err + } + return mqttapi.NewMsg(e.ConfigTopic, cfg), nil +} + +type ButtonEntity struct { + *entity +} + +func (e *ButtonEntity) MarshalConfig() (*mqttapi.Msg, error) { + var cfg []byte + var err error + if cfg, err = json.Marshal(e); err != nil { + return nil, err + } + return mqttapi.NewMsg(e.ConfigTopic, cfg), nil } -// numberEntity represents the fields specifically for number entities in Home -// Assistant. -type numberEntity[T any] struct { +type NumberEntity[T constraints.Ordered] struct { + *entity Min T `json:"min,omitempty"` Max T `json:"max,omitempty"` Step T `json:"step,omitempty"` Mode string `json:"mode,omitempty"` } +func (e *NumberEntity[T]) MarshalConfig() (*mqttapi.Msg, error) { + var cfg []byte + var err error + if cfg, err = json.Marshal(e); err != nil { + return nil, err + } + return mqttapi.NewMsg(e.ConfigTopic, cfg), nil +} + +type SwitchEntity struct { + *entity + Optimistic bool `json:"optimistic,omitempty"` +} + +func (e *SwitchEntity) MarshalConfig() (*mqttapi.Msg, error) { + var cfg []byte + var err error + if cfg, err = json.Marshal(e); err != nil { + return nil, err + } + return mqttapi.NewMsg(e.ConfigTopic, cfg), nil +} + +type EntityConstraint[T constraints.Ordered] interface { + ~*SensorEntity | ~*BinarySensorEntity | ~*ButtonEntity | ~*NumberEntity[T] | ~*SwitchEntity +} + // Device contains information about the device an entity is a part of to tie it // into the device registry in Home Assistant. type Device struct { @@ -80,7 +184,7 @@ type Device struct { // Origin contains information about the app that is responsible for the entity. // It is used by Home Assistant for logging and display purposes. type Origin struct { - Name string `json:"name,omitempty"` + Name string `json:"name"` Version string `json:"sw_version,omitempty"` URL string `json:"support_url,omitempty"` } @@ -96,120 +200,50 @@ type Topics struct { Attributes string } -// MarshalConfig will generate an *mqtt.Msg for a given entity, that can be used -// to publish the required config for the entity to the MQTT bus. -func MarshalConfig[T any](e *EntityConfig[T]) (*mqttapi.Msg, error) { - var cfg []byte - var err error - if cfg, err = json.Marshal(e.Entity); err != nil { - return nil, err - } - return mqttapi.NewMsg(e.ConfigTopic, cfg), nil -} - -// MarshalState will generate an *mqtt.Msg for a given entity, that can be used -// to publish the entity's state to the MQTT bus. -func MarshalState[T any](e *EntityConfig[T]) (*mqttapi.Msg, error) { - var state json.RawMessage - var err error - if e.StateCallback == nil { - return nil, errors.New("entity does not have a state callback function") - } - if state, err = e.StateCallback(); err != nil { - return nil, err - } - return mqttapi.NewMsg(e.Entity.StateTopic, state), nil -} - -// MarshallSubscription will generate an *mqtt.Subscription for a given entity, -// which can be used to subscribe to an entity's command topic and execute a -// callback on messages. -func MarshalSubscription[T any](e *EntityConfig[T]) (*mqttapi.Subscription, error) { - if e.CommandCallback == nil { - return nil, errors.New("entity does not have a command callback function") - } - msg := &mqttapi.Subscription{ - Topic: e.Entity.CommandTopic, - Callback: e.CommandCallback, - } - return msg, nil -} - -// NewEntityByName will create a new entity and config based off the given name -// and app. Use this function where you don't care about the id of the -// underlying sensor in Home Assistant. The id will be derived from the name by -// converting it to snake_case. -func NewEntityByName[T any](name, app, prefix string) *EntityConfig[T] { - return &EntityConfig[T]{ - Entity: &Entity[T]{ - Name: FormatName(name), - UniqueID: FormatID(name), - }, - App: strings.ToLower(app), - topicPrefix: prefix, +func NewEntity(app, name, id string) *entity { + name = FormatName(name) + if id != "" { + id = FormatID(app + "_" + id) + } else { + id = FormatID(app + "_" + name) } -} -// NewEntityByID will create a new entity and config based off the given id and -// app. Use this when you want to ensure the exact format of the id for the -// underlying sensor in Home Assistant. The name will be derived from the id. -func NewEntityByID[T any](id, app, prefix string) *EntityConfig[T] { - return &EntityConfig[T]{ - Entity: &Entity[T]{ - Name: FormatName(id), - UniqueID: id, + return &entity{ + entityConfig: &entityConfig{ + App: app, }, - App: strings.ToLower(app), - topicPrefix: prefix, + UniqueID: id, + Name: name, } } -// AsSensor will configure appropriate MQTT topics to represent a Home Assistant sensor. -func (e *EntityConfig[T]) AsSensor() *EntityConfig[T] { - prefix := strings.Join([]string{e.topicPrefix, "sensor", e.App, e.Entity.UniqueID}, "/") - e.ConfigTopic = prefix + "/config" - e.Entity.StateTopic = prefix + "/state" - e.Entity.AttributesTopic = prefix + "/attributes" +func (e *entity) WithNodeID(id string) *entity { + e.NodeID = FormatID(id) return e } -// AsBinarySensor will configure appropriate MQTT topics to represent a Home Assistant binary_sensor. -func (e *EntityConfig[T]) AsBinarySensor() *EntityConfig[T] { - prefix := strings.Join([]string{e.topicPrefix, "binary_sensor", e.App, e.Entity.UniqueID}, "/") - e.ConfigTopic = prefix + "/config" - e.Entity.StateTopic = prefix + "/state" - e.Entity.AttributesTopic = prefix + "/attributes" - return e -} - -// AsButton will configure appropriate MQTT topics to represent a Home Assistant button. -func (e *EntityConfig[T]) AsButton() *EntityConfig[T] { - prefix := strings.Join([]string{e.topicPrefix, "button", e.App, e.Entity.UniqueID}, "/") - e.ConfigTopic = prefix + "/config" - e.Entity.CommandTopic = prefix + "/toggle" - e.Entity.AttributesTopic = prefix + "/attributes" - return e -} - -// AsNumber will configure appropriate MQTT topics to represent a Home Assistant -// number. See also https://www.home-assistant.io/integrations/number.mqtt/ -func (e *EntityConfig[T]) AsNumber(step, min, max T, mode NumberMode) *EntityConfig[T] { - prefix := strings.Join([]string{e.topicPrefix, "number", e.App, e.Entity.UniqueID}, "/") +func (e *entity) setTopics(t EntityType) { + var prefix string + if e.NodeID != "" { + prefix = strings.Join([]string{HomeAssistantTopic, t.String(), e.NodeID, e.UniqueID}, "/") + } else { + prefix = strings.Join([]string{HomeAssistantTopic, t.String(), e.UniqueID}, "/") + } e.ConfigTopic = prefix + "/config" - e.Entity.CommandTopic = prefix + "/set" - e.Entity.StateTopic = prefix + "/state" - e.Entity.AttributesTopic = prefix + "/attributes" - e.Entity.Step = step - e.Entity.Min = min - e.Entity.Max = max - e.Entity.Mode = mode.String() - return e + e.StateTopic = prefix + "/state" + if e.CommandCallback != nil { + e.CommandTopic = prefix + "/set" + } + if e.AttributesTemplate != "" || e.AttributesCallback != nil { + e.AttributesTopic = prefix + "/attributes" + } + e.EntityType = t } // WithAttributesTemplate configures the passed in template to be used to extract the // value of the attributes in Home Assistant. -func (e *EntityConfig[T]) WithAttributesTemplate(t string) *EntityConfig[T] { - e.Entity.AttributesTemplate = t +func (e *entity) WithAttributesTemplate(t string) *entity { + e.AttributesTemplate = t return e } @@ -217,7 +251,7 @@ func (e *EntityConfig[T]) WithAttributesTemplate(t string) *EntityConfig[T] { // to be run whenever the attributes of the entity are needed. If this callback // is to be used, then the WithAttributesTopic() builder function should also be // called to set-up the attributes topic. -func (e *EntityConfig[T]) WithAttributesCallback(c func() (json.RawMessage, error)) *EntityConfig[T] { +func (e *entity) WithAttributesCallback(c func(args ...any) (json.RawMessage, error)) *entity { e.AttributesCallback = c return e } @@ -226,7 +260,7 @@ func (e *EntityConfig[T]) WithAttributesCallback(c func() (json.RawMessage, erro // be run when a message is received on the command topic of the entity. It // doesn't make sense to add this for entities that don't have a command topic, // like regular sensors. -func (e *EntityConfig[T]) WithCommandCallback(c func(mqtt.Client, mqtt.Message)) *EntityConfig[T] { +func (e *entity) WithCommandCallback(c func(p *paho.Publish)) *entity { e.CommandCallback = c return e } @@ -237,27 +271,27 @@ func (e *EntityConfig[T]) WithCommandCallback(c func(mqtt.Client, mqtt.Message)) // be useful to use this where you have a single state that represents many // entities. In such cases, it would be better to manually send the state in // your own code. -func (e *EntityConfig[T]) WithStateCallback(c func(args ...any) (json.RawMessage, error)) *EntityConfig[T] { +func (e *entity) WithStateCallback(c func(args ...any) (json.RawMessage, error)) *entity { e.StateCallback = c return e } // WithDeviceInfo adds the passed in device info to the entity config. -func (e *EntityConfig[T]) WithDeviceInfo(d *Device) *EntityConfig[T] { - e.Entity.Device = d +func (e *entity) WithDeviceInfo(d *Device) *entity { + e.Device = d return e } // WithOriginInfo adds the passed in origin info to the entity config. -func (e *EntityConfig[T]) WithOriginInfo(o *Origin) *EntityConfig[T] { - e.Entity.Origin = o +func (e *entity) WithOriginInfo(o *Origin) *entity { + e.Origin = o return e } // WithOriginInfo adds a pre-filled origin that references go-hass-agent // to the entity config. -func (e *EntityConfig[T]) WithDefaultOriginInfo() *EntityConfig[T] { - e.Entity.Origin = &Origin{ +func (e *entity) WithDefaultOriginInfo() *entity { + e.Origin = &Origin{ Name: "Go Hass Anything", URL: "https://github.com/joshuar/go-hass-anything", } @@ -266,59 +300,114 @@ func (e *EntityConfig[T]) WithDefaultOriginInfo() *EntityConfig[T] { // WithValueTemplate configures the passed in template to be used to extract the // value of the entity in Home Assistant. -func (e *EntityConfig[T]) WithValueTemplate(t string) *EntityConfig[T] { - e.Entity.ValueTemplate = t +func (e *entity) WithValueTemplate(t string) *entity { + e.ValueTemplate = t return e } // WithStateClassMeasurement configures the State Class for the entity to be "measurement". -func (e *EntityConfig[T]) WithStateClassMeasurement() *EntityConfig[T] { - e.Entity.StateClass = "measurement" +func (e *entity) WithStateClassMeasurement() *entity { + e.StateClass = "measurement" return e } // WithStateClassMeasurement configures the State Class for the entity to be "total". -func (e *EntityConfig[T]) WithStateClassTotal() *EntityConfig[T] { - e.Entity.StateClass = "total" +func (e *entity) WithStateClassTotal() *entity { + e.StateClass = "total" return e } // WithStateClassMeasurement configures the State Class for the entity to be "total_increasing". -func (e *EntityConfig[T]) WithStateClassTotalIncreasing() *EntityConfig[T] { - e.Entity.StateClass = "total_increasing" +func (e *entity) WithStateClassTotalIncreasing() *entity { + e.StateClass = "total_increasing" return e } // WithDeviceClass configures the Device Class for the entity. -func (e *EntityConfig[T]) WithDeviceClass(d string) *EntityConfig[T] { - e.Entity.DeviceClass = d +func (e *entity) WithDeviceClass(d string) *entity { + e.DeviceClass = d return e } // WithUnits adds a unit of measurement to the entity. -func (e *EntityConfig[T]) WithUnits(u string) *EntityConfig[T] { - e.Entity.UnitOfMeasurement = u +func (e *entity) WithUnits(u string) *entity { + e.UnitOfMeasurement = u return e } // WithIcon adds an icon to the entity. -func (e *EntityConfig[T]) WithIcon(i string) *EntityConfig[T] { - e.Entity.Icon = i +func (e *entity) WithIcon(i string) *entity { + e.Icon = i + return e +} + +func (e *entity) WithStateExpiry(i int) *entity { + e.StateExpiry = i return e } // GetTopics returns a Topic struct containing the topics configured for this // entity. If an entity does not have a particular topic (due to not having some // functionality), the topic value will be an empty string. -func (e *EntityConfig[T]) GetTopics() *Topics { +func (e *entity) GetTopics() *Topics { return &Topics{ Config: e.ConfigTopic, - Command: e.Entity.CommandTopic, - State: e.Entity.StateTopic, - Attributes: e.Entity.AttributesTopic, + Command: e.CommandTopic, + State: e.StateTopic, + Attributes: e.AttributesTopic, } } +func AsSensor(e *entity) *SensorEntity { + e.setTopics(Sensor) + return &SensorEntity{ + entity: e, + } +} + +func AsBinarySensor(e *entity) *BinarySensorEntity { + e.setTopics(BinarySensor) + return &BinarySensorEntity{ + entity: e, + } +} + +func AsButton(e *entity) *ButtonEntity { + e.setTopics(Button) + return &ButtonEntity{ + entity: e, + } +} + +func AsNumber[T constraints.Ordered](e *entity, step, min, max T, mode NumberMode) *NumberEntity[T] { + e.setTopics(Number) + return &NumberEntity[T]{ + entity: e, + Step: step, + Min: min, + Max: max, + Mode: mode.String(), + } +} + +func AsSwitch(e *entity, optimistic bool) *SwitchEntity { + e.setTopics(Switch) + return &SwitchEntity{ + entity: e, + Optimistic: optimistic, + } +} + +func (e *SwitchEntity) AsTypeSwitch() *SwitchEntity { + e.DeviceClass = "switch" + return e +} + +func (e *SwitchEntity) AsTypeOutlet() *SwitchEntity { + e.DeviceClass = "outlet" + return e +} + // FormatName will take a string s and format it with appropriate spacing // between words and capitalised the first letter of each word. For example // someString becomes Some String. The new string is then an appropriate format diff --git a/pkg/hass/entityTypeStrings.go b/pkg/hass/entityTypeStrings.go new file mode 100644 index 0000000..083b51e --- /dev/null +++ b/pkg/hass/entityTypeStrings.go @@ -0,0 +1,28 @@ +// Code generated by "stringer -type=EntityType -output entityTypeStrings.go -linecomment"; DO NOT EDIT. + +package hass + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Unknown-0] + _ = x[Sensor-1] + _ = x[BinarySensor-2] + _ = x[Button-3] + _ = x[Number-4] + _ = x[Switch-5] +} + +const _EntityType_name = "unknownsensorbinary_sensorbuttonnumberswitch" + +var _EntityType_index = [...]uint8{0, 7, 13, 26, 32, 38, 44} + +func (i EntityType) String() string { + if i < 0 || i >= EntityType(len(_EntityType_index)-1) { + return "EntityType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _EntityType_name[_EntityType_index[i]:_EntityType_index[i+1]] +} diff --git a/pkg/hass/entityTypes.go b/pkg/hass/entityTypes.go new file mode 100644 index 0000000..e0fd10d --- /dev/null +++ b/pkg/hass/entityTypes.go @@ -0,0 +1,23 @@ +// Copyright (c) 2024 Joshua Rich +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT + +package hass + +//go:generate stringer -type=EntityType -output entityTypeStrings.go -linecomment +const ( + Unknown EntityType = iota // unknown + // An entity with some kind of value, numeric or string. + Sensor // sensor + // An entity with a boolean value. + BinarySensor // binary_sensor + // An entity that changes state when activated. + Button // button + // An entity that is a number (float or int) with a range of values. + Number // number + // An entity that changes state between ON and OFF. + Switch // switch +) + +type EntityType int diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go new file mode 100644 index 0000000..f8b51ec --- /dev/null +++ b/pkg/mqtt/client.go @@ -0,0 +1,169 @@ +// Copyright (c) 2023 Joshua Rich +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT + +package mqtt + +import ( + "context" + "net/url" + "strconv" + "sync" + "time" + + "github.com/eclipse/paho.golang/autopaho" + "github.com/eclipse/paho.golang/paho" + "github.com/rs/zerolog/log" +) + +type prefs interface { + GetMQTTServer() string + GetMQTTUser() string + GetMQTTPassword() string + GetTopicPrefix() string +} + +type Device interface { + Name() string + Configuration() []*Msg + States() []*Msg + Subscriptions() []*Subscription +} + +// Subscription represents a listener on a specific Topic, that will pass any +// messages sent to that topic to the Callback function. +type Subscription struct { + Callback func(p *paho.Publish) + Topic string +} + +// Client is the connection to the MQTT broker. +type Client struct { + conn *autopaho.ConnectionManager + mu sync.Mutex +} + +// Publish will send the list of messages it is passed to the broker that the +// client is connected to. Any errors in publihsing will be returned. +func (c *Client) Publish(msgs ...*Msg) error { + c.mu.Lock() + defer c.mu.Unlock() + err := publish(c.conn, msgs...) + return err +} + +func (c *Client) Unpublish(msgs ...*Msg) error { + var newMsgs []*Msg + for _, msg := range msgs { + newMsgs = append(newMsgs, NewMsg(msg.Topic, []byte(``))) + } + c.mu.Lock() + defer c.mu.Unlock() + err := publish(c.conn, newMsgs...) + return err +} + +func NewClient(ctx context.Context, prefs prefs, subscriptions []*Subscription) (*Client, error) { + var subOpts []paho.SubscribeOptions + router := paho.NewStandardRouter() + + for _, s := range subscriptions { + log.Trace().Str("topic", s.Topic).Msg("Adding subscription for topic.") + subOpts = append(subOpts, paho.SubscribeOptions{Topic: s.Topic, QoS: 1}) + router.RegisterHandler(s.Topic, s.Callback) + } + statusTopic := prefs.GetTopicPrefix() + "/status" + subOpts = append(subOpts, paho.SubscribeOptions{Topic: statusTopic, QoS: 1}) + router.RegisterHandler(statusTopic, func(p *paho.Publish) { + switch msg := string(p.Payload); msg { + case "online": + log.Debug().Msg("Home Assistant online.") + case "offline": + log.Debug().Msg("Home Assistant offline.") + } + }) + + // We will connect to the Eclipse test server (note that you may see messages that other users publish) + u, err := url.Parse(prefs.GetMQTTServer()) + if err != nil { + panic(err) + } + + connOpts := genConnOpts(ctx, u, subOpts, router) + + c, err := autopaho.NewConnection(ctx, connOpts) // starts process; will reconnect until context cancelled + if err != nil { + return nil, err + } + // Wait for the connection to come up + if err := c.AwaitConnection(ctx); err != nil { + return nil, err + } + return &Client{conn: c}, nil +} + +func genConnOpts(ctx context.Context, server *url.URL, subOpts []paho.SubscribeOptions, router *paho.StandardRouter) autopaho.ClientConfig { + clientID := "go_hass_anything_" + strconv.Itoa(time.Now().Second()) + + connOpts := autopaho.ClientConfig{ + ServerUrls: []*url.URL{server}, + KeepAlive: 20, // Keepalive message should be sent every 20 seconds + // CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection. + CleanStartOnInitialConnection: false, + // SessionExpiryInterval - Seconds that a session will survive after disconnection. + // It is important to set this because otherwise, any queued messages will be lost if the connection drops and + // the server will not queue messages while it is down. The specific setting will depend upon your needs + // (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 0xFFFFFFFE = 136 years, 0xFFFFFFFF = don't expire) + SessionExpiryInterval: 60, + OnConnectionUp: func(cm *autopaho.ConnectionManager, _ *paho.Connack) { + log.Debug().Msg("MQTT connection up.") + // Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if + // the connection drops) + if _, err := cm.Subscribe(ctx, &paho.Subscribe{Subscriptions: subOpts}); err != nil { + log.Warn().Err(err).Msg("Failed to add subscriptions.") + } + log.Debug().Msg("Subscriptions added.") + }, + OnConnectError: func(err error) { log.Error().Err(err).Msg("Error establishing MQTT connection.") }, + // eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection + ClientConfig: paho.ClientConfig{ + // If you are using QOS 1/2, then it's important to specify a client id (which must be unique) + ClientID: clientID, + // OnPublishReceived is a slice of functions that will be called when a message is received. + // You can write the function(s) yourself or use the supplied Router + OnPublishReceived: []func(paho.PublishReceived) (bool, error){ + func(pr paho.PublishReceived) (bool, error) { + log.Trace().Str("topic", pr.Packet.Topic).Msg("Routing message to handler.") + router.Route(pr.Packet.Packet()) + return true, nil // we assume that the router handles all messages (todo: amend router API) + }, + }, + OnClientError: func(err error) { log.Error().Err(err).Msg("Client error.") }, + OnServerDisconnect: func(d *paho.Disconnect) { + if d.Properties != nil { + log.Debug().Str("reason", d.Properties.ReasonString).Msg("Server requested disconnect.") + } else { + log.Debug().Interface("code", d.ReasonCode).Msg("Server requested disconnect.") + } + }, + }, + } + return connOpts +} + +func publish(c *autopaho.ConnectionManager, msgs ...*Msg) error { + var errs error + for _, msg := range msgs { + log.Trace().Str("topic", msg.Topic).Bool("retain", msg.Retained).RawJSON("payload", msg.Message).Msg("Publishing message.") + // Publish a test message (use PublishViaQueue if you don't want to wait for a response) + if _, err := c.Publish(context.TODO(), &paho.Publish{ + QoS: 1, + Topic: msg.Topic, + Payload: []byte(msg.Message), + }); err != nil { + log.Error().Err(err).Str("topic", msg.Topic).Msg("Error publishing message.") + } + } + return errs +} diff --git a/pkg/mqtt/mqtt.go b/pkg/mqtt/mqtt.go deleted file mode 100644 index 792cf88..0000000 --- a/pkg/mqtt/mqtt.go +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright (c) 2023 Joshua Rich -// -// This software is released under the MIT License. -// https://opensource.org/licenses/MIT - -package mqtt - -import ( - "context" - "encoding/json" - "fmt" - "os" - "strconv" - "time" - - "github.com/cenkalti/backoff/v4" - MQTT "github.com/eclipse/paho.mqtt.golang" - "github.com/rs/zerolog/log" - "github.com/sourcegraph/conc/pool" -) - -type prefs interface { - GetMQTTServer() string - GetMQTTUser() string - GetMQTTPassword() string - GetTopicPrefix() string -} - -type Device interface { - Name() string - Configuration() []*Msg - States() []*Msg - Subscriptions() []*Subscription -} - -// Subscription represents a listener on a specific Topic, that will pass any -// messages sent to that topic to the Callback function. -type Subscription struct { - Callback func(MQTT.Client, MQTT.Message) - Topic string - QOS byte - Retained bool -} - -// Client is the connection to the MQTT broker. -type Client struct { - conn MQTT.Client - options *MQTT.ClientOptions -} - -// Publish will send the list of messages it is passed to the broker that the -// client is connected to. Any errors in publihsing will be returned. -func (c *Client) Publish(msgs ...*Msg) error { - return publish(c.conn, msgs...) -} - -// Subscribe will parse the list of subscriptions and listen on their topics, -// passing any received messages to their callback functions. Any error in -// setting up a subscription will be returned. -func (c *Client) Subscribe(subs ...*Subscription) error { - return subscribe(c.conn, subs...) -} - -func (c *Client) Unpublish(msgs ...*Msg) error { - var newMsgs []*Msg - for _, msg := range msgs { - newMsgs = append(newMsgs, NewMsg(msg.Topic, []byte(``))) - } - return publish(c.conn, newMsgs...) -} - -// Connect will establish a new connection to the MQTT service with a generated configuration -func (c *Client) connect(ctx context.Context) error { - c.conn = MQTT.NewClient(c.options) - - connect := func() error { - if token := c.conn.Connect(); token.Wait() && token.Error() != nil { - return token.Error() - } - return nil - } - err := backoff.Retry(connect, backoff.WithContext(backoff.NewExponentialBackOff(), ctx)) - if err != nil { - return err - } - - log.Debug().Msg("Connected to MQTT server.") - return nil -} - -func NewClient(ctx context.Context, prefs prefs, devices ...Device) (*Client, error) { - statusTopic := prefs.GetTopicPrefix() + "/status" - onConnectCallback := genOnConnectHandler(statusTopic, devices...) - connOpts := genConnOpts(prefs, onConnectCallback) - c := &Client{ - options: connOpts, - } - if err := c.connect(ctx); err != nil { - return nil, err - } - return c, nil -} - -func genConnOpts(prefs prefs, callback MQTT.OnConnectHandler) *MQTT.ClientOptions { - hostname, _ := os.Hostname() - clientid := hostname + strconv.Itoa(time.Now().Second()) - - connOpts := MQTT.NewClientOptions(). - AddBroker(prefs.GetMQTTServer()). - SetClientID(clientid). - SetCleanSession(true). - SetKeepAlive(10 * time.Second). - SetAutoReconnect(true). - SetOnConnectHandler(callback) - - if prefs.GetMQTTUser() != "" { - connOpts.SetUsername(prefs.GetMQTTUser()) - if prefs.GetMQTTPassword() != "" { - connOpts.SetPassword(prefs.GetMQTTPassword()) - } - } - - return connOpts -} - -func genOnConnectHandler(statustopic string, devices ...Device) MQTT.OnConnectHandler { - var configs []*Msg - var subscriptions []*Subscription - - for _, d := range devices { - configs = append(configs, d.Configuration()...) - subscriptions = append(subscriptions, d.Subscriptions()...) - } - HAStatusSub := &Subscription{ - Topic: statustopic, - Callback: func(c MQTT.Client, m MQTT.Message) { - switch msg := string(m.Payload()); msg { - case "online": - log.Debug().Msg("Home Assistant Online.") - p := pool.New().WithErrors() - for _, device := range devices { - p.Go(func() error { - return publish(c, device.Configuration()...) - }) - p.Go(func() error { - return publish(c, device.States()...) - }) - p.Go(func() error { - return subscribe(c, device.Subscriptions()...) - }) - } - if err := p.Wait(); err != nil { - log.Error().Err(err).Msg("Failed to re-register app with Home Assistant.") - } - case "offline": - log.Debug().Msg("Home Assistant Offline.") - } - }, - } - subscriptions = append(subscriptions, HAStatusSub) - redoFunc := func(c MQTT.Client) { - p := pool.New().WithErrors() - p.Go(func() error { - return publish(c, configs...) - - }) - p.Go(func() error { - return subscribe(c, subscriptions...) - }) - if err := p.Wait(); err != nil { - log.Error().Err(err).Msg("Failed to re-send config/subscriptions after restart.") - } - } - return redoFunc -} - -func publish(c MQTT.Client, msgs ...*Msg) error { - p := pool.New().WithErrors() - for _, msg := range msgs { - log.Trace().Str("topic", msg.Topic).Bool("retain", msg.Retained).Msg("Publishing message.") - p.Go(func() error { - if token := c.Publish(msg.Topic, msg.QOS, msg.Retained, []byte(msg.Message)); token.Wait() && token.Error() != nil { - return fmt.Errorf("failed to pusblish message to topic %s: %v", msg.Topic, token.Error()) - } - return nil - }) - } - return p.Wait() -} - -func subscribe(c MQTT.Client, subs ...*Subscription) error { - p := pool.New().WithErrors() - for _, sub := range subs { - log.Trace().Str("topic", sub.Topic).Bool("retain", sub.Retained).Msg("Subscribing to topic.") - p.Go(func() error { - if token := c.Subscribe(sub.Topic, sub.QOS, sub.Callback); token.Wait() && token.Error() != nil { - return fmt.Errorf("failed to subscribe to topic %s: %v", sub.Topic, token.Error()) - } - return nil - }) - } - return p.Wait() -} - -// Msg represents a message that can be sent or received on the MQTT bus. -type Msg struct { - Topic string - Message json.RawMessage - QOS byte - Retained bool -} - -// Retain sets the Retained status of a Msg to true, ensuring that it will be -// retained on the MQTT bus when sent. -func (m *Msg) Retain() *Msg { - m.Retained = true - return m -} - -// NewMsg is a convenience function to create a new Msg with a given topic and -// message body. The returned Msg can be further customised directly for -// specifying retention and QoS parameters. -func NewMsg(topic string, msg json.RawMessage) *Msg { - return &Msg{ - Topic: topic, - Message: msg, - } -} diff --git a/pkg/mqtt/msg.go b/pkg/mqtt/msg.go new file mode 100644 index 0000000..f530ba3 --- /dev/null +++ b/pkg/mqtt/msg.go @@ -0,0 +1,33 @@ +// Copyright (c) 2024 Joshua Rich +// +// This software is released under the MIT License. +// https://opensource.org/licenses/MIT + +package mqtt + +import "encoding/json" + +// Msg represents a message that can be sent or received on the MQTT bus. +type Msg struct { + Topic string + Message json.RawMessage + QOS byte + Retained bool +} + +// Retain sets the Retained status of a Msg to true, ensuring that it will be +// retained on the MQTT bus when sent. +func (m *Msg) Retain() *Msg { + m.Retained = true + return m +} + +// NewMsg is a convenience function to create a new Msg with a given topic and +// message body. The returned Msg can be further customised directly for +// specifying retention and QoS parameters. +func NewMsg(topic string, msg json.RawMessage) *Msg { + return &Msg{ + Topic: topic, + Message: msg, + } +} diff --git a/tools/appgenerator/init.go.tmpl b/tools/appgenerator/init.go.tmpl index 8ea5b56..8ff247e 100644 --- a/tools/appgenerator/init.go.tmpl +++ b/tools/appgenerator/init.go.tmpl @@ -7,9 +7,9 @@ import ( "github.com/rs/zerolog/log" {{ range $app := . -}} {{ if (eq $app "exampleapp") }} -"github.com/joshuar/go-hass-anything/v8/pkg/apps/{{$app}}" +"github.com/joshuar/go-hass-anything/v9/pkg/apps/{{$app}}" {{ else }} -"github.com/joshuar/go-hass-anything/v8/apps/{{$app}}" +"github.com/joshuar/go-hass-anything/v9/apps/{{$app}}" {{ end }} {{ end -}} )