Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update v10.0.0 #28

Merged
merged 28 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
33483fd
Don't include status groups in contact indexing
rowanseymour Oct 8, 2024
5d26110
Merge pull request #85 from nyaruka/ignore_status_groups
rowanseymour Oct 8, 2024
c01ca7e
Update CHANGELOG.md for v9.2.1
rowanseymour Oct 8, 2024
eaa6514
Add runtime.Runtime like our other go services have
rowanseymour Dec 12, 2024
16ad34e
Merge pull request #86 from nyaruka/runtime
rowanseymour Dec 12, 2024
775ee9f
Send metrics to cloudwatch
rowanseymour Dec 12, 2024
c59d04f
Rename to use DeploymentID
norkans7 Dec 13, 2024
63e54b1
Merge pull request #88 from nyaruka/use-cloudwatch
rowanseymour Dec 13, 2024
1888fe5
Merge pull request #87 from nyaruka/cloudwatch
rowanseymour Dec 13, 2024
2a623d8
Update CHANGELOG.md for v9.3.0
rowanseymour Dec 13, 2024
78d7a98
Remove librato, use latest gocommon
rowanseymour Dec 13, 2024
1f7e63e
Merge pull request #89 from nyaruka/remove_librato
rowanseymour Dec 13, 2024
db5d49f
Update CHANGELOG.md for v9.3.1
rowanseymour Dec 13, 2024
e5ffbaf
Update Dockerfile
rowanseymour Dec 13, 2024
5f03bc7
Update to latest gocommon
rowanseymour Dec 13, 2024
23c2292
Merge pull request #91 from nyaruka/update_gocommon
rowanseymour Dec 13, 2024
5149068
Update CHANGELOG.md for v9.3.2
rowanseymour Dec 13, 2024
f0f5926
Clean up old analytics use
norkans7 Dec 13, 2024
a2fd407
Merge pull request #92 from nyaruka/remove-old-analytics
rowanseymour Dec 13, 2024
229df67
Update CHANGELOG.md for v9.3.3
rowanseymour Dec 13, 2024
36ab028
Update to latest gocommon and tweak default cloudwatch namespace
rowanseymour Dec 16, 2024
8f4eaf6
Merge pull request #93 from nyaruka/latest_gocommon
rowanseymour Dec 16, 2024
9d2af33
Update CHANGELOG.md for v9.3.4
rowanseymour Dec 16, 2024
cdc9aff
Update README.md
rowanseymour Jan 7, 2025
10c87e2
Update CHANGELOG.md for v10.0.0
rowanseymour Jan 7, 2025
1685d62
Merge tag 'v10.0.0' into update/v10.0.0
Robi9 Jan 21, 2025
c8fd235
Update dependencies
Robi9 Jan 21, 2025
0316aa8
Update WENI-CHANGELOG.md
Robi9 Jan 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: CI
on: [push, pull_request]
env:
go-version: "1.22.x"
go-version: "1.23.x"
jobs:
test:
name: Test
Expand Down
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
v10.0.0 (2025-01-07)
-------------------------
* Update README.md

v9.3.4 (2024-12-16)
-------------------------
* Update to latest gocommon and tweak default cloudwatch namespace

v9.3.3 (2024-12-13)
-------------------------
* Clean up old analytics use

v9.3.2 (2024-12-13)
-------------------------
* Update to latest gocommon

v9.3.1 (2024-12-13)
-------------------------
* Remove librato, use latest gocommon

v9.3.0 (2024-12-13)
-------------------------
* Send metrics to cloudwatch

v9.2.1 (2024-10-08)
-------------------------
* Don't include status groups in contact indexing

v9.2.0 (2024-07-17)
-------------------------
* Test against postgresql 15
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22
FROM golang:1.23

WORKDIR /usr/src/app

Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@ We recommend running it with no changes to the configuration and no parameters,
environment variables to configure it. You can use `% rp-indexer --help` to see a list of the
environment variables and parameters and for more details on each option.

### RapidPro

For use with RapidPro, you will want to configure these settings:

* `INDEXER_DB`: a URL connection string for your RapidPro database or read replica
* `INDEXER_ELASTIC_URL`: the URL for your ElasticSearch endpoint

### AWS services:

* `INDEXER_AWS_ACCESS_KEY_ID`: AWS access key id used to authenticate to AWS
* `INDEXER_AWS_SECRET_ACCESS_KEY` AWS secret access key used to authenticate to AWS
* `INDEXER_AWS_REGION`: AWS region (ex: `eu-west-1`)

Recommended settings for error reporting:
### Logging and error reporting:

* `INDEXER_SENTRY_DSN`: DSN to use when logging errors to Sentry
* `INDEXER_LOG_LEVEL`: logging level to use (default is `info`)

## Development

Expand Down
4 changes: 4 additions & 0 deletions WENI-CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
1.2.2-indexer-10.0.0
----------
* Update to version v10.0.0

1.2.2-indexer-9.2.0
----------
* Update to version v9.2.0
Expand Down
36 changes: 23 additions & 13 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/getsentry/sentry-go"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/aws/cwatch"
indexer "github.com/nyaruka/rp-indexer/v9"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
slogmulti "github.com/samber/slog-multi"
slogsentry "github.com/samber/slog-sentry"
)
Expand All @@ -25,7 +27,7 @@ var (
)

func main() {
cfg := indexer.NewDefaultConfig()
cfg := runtime.NewDefaultConfig()
loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
loader.MustLoad()

Expand All @@ -36,15 +38,14 @@ func main() {
os.Exit(1)
}

rt := &runtime.Runtime{Config: cfg}

// configure our logger
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
slog.SetDefault(slog.New(logHandler))

logger := slog.With("comp", "main")
logger.Info("starting indexer", "version", version, "released", date)

// if we have a DSN entry, try to initialize it
if cfg.SentryDSN != "" {
if rt.Config.SentryDSN != "" {
err := sentry.Init(sentry.ClientOptions{
Dsn: cfg.SentryDSN,
EnableTracing: false,
Expand All @@ -55,7 +56,8 @@ func main() {
}

defer sentry.Flush(2 * time.Second)
logger = slog.New(

logger := slog.New(
slogmulti.Fanout(
logHandler,
slogsentry.Option{Level: slog.LevelError}.NewSentryHandler(),
Expand All @@ -65,24 +67,32 @@ func main() {
slog.SetDefault(logger)
}

db, err := sql.Open("postgres", cfg.DB)
log := slog.With("comp", "main")
log.Info("starting indexer", "version", version, "released", date)

rt.DB, err = sql.Open("postgres", cfg.DB)
if err != nil {
log.Error("unable to connect to database", "error", err)
}

rt.CW, err = cwatch.NewService(rt.Config.AWSAccessKeyID, rt.Config.AWSSecretAccessKey, rt.Config.AWSRegion, rt.Config.CloudwatchNamespace, rt.Config.DeploymentID)
if err != nil {
logger.Error("unable to connect to database")
log.Error("unable to create cloudwatch service", "error", err)
}

idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, rt.Config.ContactsShards, rt.Config.ContactsReplicas, 500),
}

if cfg.Rebuild {
if rt.Config.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
logger.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
if _, err := idxr.Index(rt, true, rt.Config.Cleanup); err != nil {
log.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
}
} else {
d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second)
d := indexer.NewDaemon(rt, idxrs)
d.Start()

handleSignals(d)
Expand Down
38 changes: 0 additions & 38 deletions config.go

This file was deleted.

48 changes: 21 additions & 27 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package indexer

import (
"context"
"database/sql"
"fmt"
"log/slog"
"sync"
"time"

"github.com/nyaruka/gocommon/analytics"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
)

type Daemon struct {
cfg *Config
db *sql.DB
rt *runtime.Runtime
wg *sync.WaitGroup
quit chan bool
indexers []indexers.Indexer
Expand All @@ -24,27 +24,19 @@ type Daemon struct {
}

// NewDaemon creates a new daemon to run the given indexers
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer, poll time.Duration) *Daemon {
func NewDaemon(rt *runtime.Runtime, ixs []indexers.Indexer) *Daemon {
return &Daemon{
cfg: cfg,
db: db,
rt: rt,
wg: &sync.WaitGroup{},
quit: make(chan bool),
indexers: ixs,
poll: poll,
poll: time.Duration(rt.Config.Poll) * time.Second,
prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)),
}
}

// Start starts this daemon
func (d *Daemon) Start() {
// if we have a librato token, configure it
if d.cfg.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg))
}

analytics.Start()

for _, i := range d.indexers {
d.startIndexer(i)
}
Expand All @@ -68,7 +60,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
_, err := indexer.Index(d.rt, d.rt.Config.Rebuild, d.rt.Config.Cleanup)
if err != nil {
log.Error("error during indexing", "error", err)
}
Expand All @@ -85,7 +77,7 @@ func (d *Daemon) startStatsReporter(interval time.Duration) {

go func() {
defer func() {
slog.Info("analytics exiting")
slog.Info("metrics reporter exiting")
d.wg.Done()
}()

Expand All @@ -107,7 +99,7 @@ func (d *Daemon) reportStats(includeLag bool) {
defer cancel()

log := slog.New(slog.Default().Handler())
metrics := make(map[string]float64, len(d.indexers)*2)
metrics := make([]types.MetricDatum, 0, len(d.indexers)*3)

for _, ix := range d.indexers {
stats := ix.Stats()
Expand All @@ -121,9 +113,13 @@ func (d *Daemon) reportStats(includeLag bool) {
rateInPeriod = float64(indexedInPeriod) / (float64(elapsedInPeriod) / float64(time.Second))
}

metrics[ix.Name()+"_indexed"] = float64(indexedInPeriod)
metrics[ix.Name()+"_deleted"] = float64(deletedInPeriod)
metrics[ix.Name()+"_rate"] = rateInPeriod
idxDim := cwatch.Dimension("Index", ix.Name())

metrics = append(metrics,
cwatch.Datum("RecordsIndexed", float64(indexedInPeriod), types.StandardUnitCount, idxDim),
cwatch.Datum("RecordsDeleted", float64(deletedInPeriod), types.StandardUnitCount, idxDim),
cwatch.Datum("IndexingRate", rateInPeriod, types.StandardUnitCountSecond, idxDim),
)

d.prevStats[ix] = stats

Expand All @@ -132,14 +128,13 @@ func (d *Daemon) reportStats(includeLag bool) {
if err != nil {
log.Error("error getting db last modified", "index", ix.Name(), "error", err)
} else {
metrics[ix.Name()+"_lag"] = lag.Seconds()
metrics = append(metrics, cwatch.Datum("IndexingLag", lag.Seconds(), types.StandardUnitSeconds, idxDim))
}
}
}

for k, v := range metrics {
analytics.Gauge("indexer."+k, v)
log = log.With(k, v)
if err := d.rt.CW.Send(ctx, metrics...); err != nil {
log.Error("error putting metrics", "error", err)
}

log.Info("stats reported")
Expand All @@ -151,7 +146,7 @@ func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Du
return 0, fmt.Errorf("error getting ES last modified: %w", err)
}

dbLastModified, err := ix.GetDBLastModified(ctx, d.db)
dbLastModified, err := ix.GetDBLastModified(ctx, d.rt.DB)
if err != nil {
return 0, fmt.Errorf("error getting DB last modified: %w", err)
}
Expand All @@ -162,7 +157,6 @@ func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Du
// Stop stops this daemon
func (d *Daemon) Stop() {
slog.Info("daemon stopping")
analytics.Stop()

close(d.quit)
d.wg.Wait()
Expand Down
Loading
Loading