Skip to content

Commit

Permalink
feat!: major internal rewrite for app interface and switch MQTT clien…
Browse files Browse the repository at this point in the history
…t library

BREAKING CHANGE: the app interface the agent
expects has been completely overhauled. Apps now
don't need to access the MQTT client to publish
messages directly. They can instead specify
whether they require polling on an interval or are
app driven (or even run once-only) and the agent
will set up the necessary functionality to provide
it. Additionally, the underlying MQTT client in
use has switched to a newer v5 based one that
should be more performant and feature-ful for
future functionality.

- Switch from bare pahoe MQTT client to autopahoe
  client.
- Apps now only need to specify their
  configuration, subscription and state messages,
  as well as a function to update their internal
  state.
- Optionally, apps can also specify a function to
  return an interval and jitter amount on which
  they should be polled for updates.
- Optionally, apps can also specify a channel of
  messages that should be published as they are
  sent on the channel, for event-driven app
  functionality.
- Switch entities are now supported.
- Configuring the entities of apps has been
  overhauled. You now configure the common entity
  features you need, then specify the type of
  entity (Sensor, Switch, Number, etc.) to
  configure type-specific functionality.
- The Example App has been overhauled with new
  examples of sensors, binary sensors, switch,
  number and button entities and their
  functionality.
  • Loading branch information
joshuar committed May 4, 2024
1 parent b2d0273 commit 7c5c888
Show file tree
Hide file tree
Showing 18 changed files with 826 additions and 534 deletions.
2 changes: 1 addition & 1 deletion cmd/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/spf13/cobra"

"github.com/joshuar/go-hass-anything/v8/internal/agent"
"github.com/joshuar/go-hass-anything/v9/internal/agent"
)

var clearCmd = &cobra.Command{
Expand Down
2 changes: 1 addition & 1 deletion cmd/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package cmd
import (
"github.com/spf13/cobra"

"github.com/joshuar/go-hass-anything/v8/internal/agent"
"github.com/joshuar/go-hass-anything/v9/internal/agent"
)

var configCmd = &cobra.Command{
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/joshuar/go-hass-anything/v8/internal/logging"
"github.com/joshuar/go-hass-anything/v9/internal/logging"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/spf13/cobra"

"github.com/joshuar/go-hass-anything/v8/internal/agent"
"github.com/joshuar/go-hass-anything/v9/internal/agent"
)

var runCmd = &cobra.Command{
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/joshuar/go-hass-anything/v8
module github.com/joshuar/go-hass-anything/v9

go 1.22.0

Expand Down Expand Up @@ -39,6 +39,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/charmbracelet/bubbles v0.18.0
github.com/charmbracelet/bubbletea v0.25.0
github.com/eclipse/paho.golang v0.21.0
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/iancoleman/strcase v0.3.0
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand All @@ -49,6 +50,7 @@ require (
github.com/shirou/gopsutil/v3 v3.24.3
github.com/sourcegraph/conc v0.3.0
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.golang v0.21.0 h1:cxxEReu+iFbA5RrHfRGxJOh8tXZKDywuehneoeBeyn8=
github.com/eclipse/paho.golang v0.21.0/go.mod h1:GHF6vy7SvDbDHBguaUpfuBkEB5G6j0zKxMG4gbh6QRQ=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
Expand Down Expand Up @@ -107,6 +109,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
Expand Down
112 changes: 102 additions & 10 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ package agent
import (
"context"
"sync"
"time"

"github.com/rs/zerolog/log"

ui "github.com/joshuar/go-hass-anything/v8/internal/agent/ui/bubbletea"
"github.com/joshuar/go-hass-anything/v8/pkg/mqtt"
"github.com/joshuar/go-hass-anything/v8/pkg/preferences"
ui "github.com/joshuar/go-hass-anything/v9/internal/agent/ui/bubbletea"
"github.com/joshuar/go-hass-anything/v9/pkg/mqtt"
"github.com/joshuar/go-hass-anything/v9/pkg/preferences"
)

//go:generate go run ../../tools/appgenerator/run.go arg1
Expand All @@ -29,12 +30,47 @@ type agent struct {
version string
}

// App represents an app that the agent can run. All apps have the following
// methods, which define how the app should be configured, current states of its
// entities and any subscriptions it wants to watch.
type App interface {
// Name() is an identifier for the app, used for logging in the agent.
Name() string
// Configuration() returns the messages needed to tell Home Assistant how to
// configure the app and its entities.
Configuration() []*mqtt.Msg
// States() are the messages that reflect the app's current state of the
// entities of the app.
States() []*mqtt.Msg
// Subscriptions() are the topics on which the app wants to subscribe and
// execute a callback in response to a message on that topic.
Subscriptions() []*mqtt.Subscription
Run(ctx context.Context, client *mqtt.Client) error
// Update() is a function that is run at least once by the agent and will
// usually contain the logic to update the states of all the apps entities.
// It may be run multiple times, if the app is also considered a polling
// app. See the definition for PollingApp for details.
Update(ctx context.Context) error
}

// PollingApp represents an app that should be polled for updates on some
// interval. When an app satisfies this interface, the agent will configure a
// goroutine to run the apps Update() function and publish its States().
type PollingApp interface {
// PollConfig defines the interval on which the app should be polled and its
// states updated. A jitter should be defined, that is much less than the
// interval, to add a small variation to the interval to avoid any
// "thundering herd" problems.
PollConfig() (interval, jitter time.Duration)
}

// EventsApp represents an app that will update its States() in response to some
// event(s) it is monitoring. When an app satisfies this interface, the agent
// will configure a goroutine to watch a channel of messages the app sends when
// an event occurs, which will be published to MQTT.
type EventsApp interface {
// MsgCh is a channel of messages that the app generates when some internal
// event occurs and a new message should be published to MQTT.
MsgCh() chan *mqtt.Msg
}

func NewAgent(id, name string) *agent {
Expand Down Expand Up @@ -68,17 +104,19 @@ func (a *agent) Configure() {
}

func Run(ctx context.Context) {
var appsToRun []mqtt.Device
var subscriptions []*mqtt.Subscription
var configs []*mqtt.Msg
for _, app := range AppList {
appsToRun = append(appsToRun, app)
configs = append(configs, app.Configuration()...)
subscriptions = append(subscriptions, app.Subscriptions()...)
}

prefs, err := preferences.LoadPreferences()
if err != nil {
log.Fatal().Err(err).Msg("Could not load preferences.")
}

client, err := mqtt.NewClient(ctx, prefs, appsToRun...)
client, err := mqtt.NewClient(ctx, prefs, subscriptions)
if err != nil {
log.Fatal().Err(err).Msg("Could not connect to broker.")
}
Expand All @@ -92,7 +130,7 @@ func ClearApps(ctx context.Context) {
log.Fatal().Err(err).Msg("Could not load preferences.")
}

client, err := mqtt.NewClient(ctx, prefs)
client, err := mqtt.NewClient(ctx, prefs, nil)
if err != nil {
log.Fatal().Err(err).Msg("Could not connect to broker.")
}
Expand All @@ -113,10 +151,64 @@ func runApps(ctx context.Context, client *mqtt.Client, apps []App) {
wg.Add(1)
go func(a App) {
defer wg.Done()
if err := a.Run(ctx, client); err != nil {
log.Debug().Err(err).Msg("Error running app.")
publishAppConfigs(a, client)
if app, ok := a.(PollingApp); ok {
interval, jitter := app.PollConfig()
log.Info().Dur("interval", interval).Str("app", a.Name()).Msg("Running loop to poll app updates.")
wg.Add(1)
go func() {
defer wg.Wait()
poll(
ctx,
func() {
updateApp(ctx, a)
publishAppStates(a, client)
},
interval,
jitter,
)
}()
}
if app, ok := a.(EventsApp); ok {
updateApp(ctx, a)
wg.Add(1)
go func() {
defer wg.Done()
log.Info().Str("app", a.Name()).Msg("Listening for message events from app to publish.")
for {
select {
case msg := <-app.MsgCh():
if err := client.Publish(msg); err != nil {
log.Error().Err(err).Str("app", a.Name()).Msg("Failed to publish state messages.")
}
case <-ctx.Done():
return
}
}
}()
}
}(app)
}
wg.Wait()
}

func updateApp(ctx context.Context, app App) {
log.Debug().Str("app", app.Name()).Msg("Updating.")
if err := app.Update(ctx); err != nil {
log.Warn().Err(err).Str("app", app.Name()).Msg("App failed to update.")
}
}

func publishAppStates(app App, client *mqtt.Client) {
log.Debug().Str("app", app.Name()).Msg("Publishing states.")
if err := client.Publish(app.States()...); err != nil {
log.Error().Err(err).Str("app", app.Name()).Msg("Failed to publish state messages.")
}
}

func publishAppConfigs(app App, client *mqtt.Client) {
log.Debug().Str("app", app.Name()).Msg("Publishing configs.")
if err := client.Publish(app.Configuration()...); err != nil {
log.Error().Err(err).Str("app", app.Name()).Msg("Failed to publish configuration messages.")
}
}
18 changes: 9 additions & 9 deletions pkg/apps/helpers/pollsensors.go → internal/agent/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,39 @@
// This software is released under the MIT License.
// https://opensource.org/licenses/MIT

package helpers
package agent

import (
"context"
"sync"
"time"

"github.com/lthibault/jitterbug/v2"
"github.com/rs/zerolog/log"
)

// PollSensors is a helper function that will call the passed `updater()`
// poll is a helper function that will call the passed `updater()`
// function around each `interval` duration within the `stdev` duration window.
// Effectively, `updater()` will get called sometime near `interval`, but not
// exactly on it. This can help avoid a "thundering herd" problem of sensors all
// trying to update at the same time.
func PollSensors(ctx context.Context, updater func(), interval, stdev time.Duration) {
func poll(ctx context.Context, updater func(), interval, jitter time.Duration) {
if interval <= 0 || jitter <= 0 {
log.Warn().Dur("interval", interval).Dur("jitter", jitter).Msg("Invalid interval and stdev for polling.")
return
}
updater()
ticker := jitterbug.New(
interval,
&jitterbug.Norm{Stdev: stdev},
&jitterbug.Norm{Stdev: jitter},
)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for {
select {
case <-ctx.Done():
wg.Done()
return
case <-ticker.C:
updater()
}
}
}()
wg.Wait()
}
2 changes: 1 addition & 1 deletion internal/agent/ui/bubbletea/bubbletea.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
tea "github.com/charmbracelet/bubbletea"
"github.com/rs/zerolog/log"

"github.com/joshuar/go-hass-anything/v8/pkg/preferences"
"github.com/joshuar/go-hass-anything/v9/pkg/preferences"
)

type Agent interface {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"log"
"syscall"

"github.com/joshuar/go-hass-anything/v8/cmd"
"github.com/joshuar/go-hass-anything/v9/cmd"
)

func init() {
Expand Down
Loading

0 comments on commit 7c5c888

Please sign in to comment.