Skip to content

Commit

Permalink
Merge pull request #1 from banked/refactor/env-configuration-support
Browse files Browse the repository at this point in the history
Support environment variable based configuration
  • Loading branch information
krak3n authored Dec 12, 2022
2 parents 2877803 + 47078f1 commit 7af7f57
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 75 deletions.
50 changes: 35 additions & 15 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"runtime/debug"
Expand All @@ -9,10 +11,11 @@ import (
"github.com/jackc/pgx"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sethvargo/go-envconfig"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"

"github.com/ihippik/wal-listener/config"
"github.com/banked/wal-listener/v2/config"
)

// logger log levels.
Expand All @@ -24,7 +27,7 @@ const (
)

func getVersion() string {
var version = "unknown"
version := "unknown"

info, ok := debug.ReadBuildInfo()
if ok {
Expand Down Expand Up @@ -52,6 +55,10 @@ func getConf(path string) (*config.Config, error) {
return nil, fmt.Errorf("unable to decode into config struct: %w", err)
}

if err := envconfig.Process(context.Background(), &cfg); err != nil {
return nil, fmt.Errorf("unable to process environment variables into config struct: %w", err)
}

return &cfg, nil
}

Expand Down Expand Up @@ -107,7 +114,7 @@ func createStream(logger *logrus.Entry, js nats.JetStreamContext, streamName str
}

if stream == nil {
var streamSubjects = streamName + ".*"
streamSubjects := streamName + ".*"

if _, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Expand All @@ -122,6 +129,30 @@ func createStream(logger *logrus.Entry, js nats.JetStreamContext, streamName str
return nil
}

func initNats(cfg config.NatsCfg) (*nats.Conn, error) {
opts := []nats.Option{}

if mtls := cfg.MTLS; mtls.Enabled {
if mtls.InsecureSkipVerify {
opts = append(
opts,
nats.Secure(&tls.Config{InsecureSkipVerify: true}),
)
} else {
opts = append(
opts,
nats.ClientCert(
fmt.Sprintf("%s/%s", mtls.CertPath, mtls.CertFile),
fmt.Sprintf("%s/%s", mtls.CertPath, mtls.KeyFile),
),
nats.RootCAs(fmt.Sprintf("%s/%s", mtls.CertPath, mtls.CAFile)),
)
}
}

return nats.Connect(cfg.Address, opts...)
}

func initSentry(dsn string, logger *logrus.Entry) {
if len(dsn) == 0 {
logger.Warnln("empty Sentry DSN")
Expand All @@ -143,18 +174,7 @@ func initSentry(dsn string, logger *logrus.Entry) {
}

// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) {
pgxConf := pgx.ConnConfig{
// TODO logger
LogLevel: pgx.LogLevelInfo,
Logger: pgxLogger{},
Host: cfg.Host,
Port: cfg.Port,
Database: cfg.Name,
User: cfg.User,
Password: cfg.Password,
}

func initPgxConnections(pgxConf pgx.ConnConfig) (*pgx.Conn, *pgx.ReplicationConn, error) {
pgConn, err := pgx.Connect(pgxConf)
if err != nil {
return nil, nil, fmt.Errorf("db connection: %w", err)
Expand Down
14 changes: 10 additions & 4 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"
"os"

"github.com/nats-io/nats.go"
"github.com/jackc/pgx"
"github.com/sirupsen/logrus"
"github.com/urfave/cli/v2"

"github.com/ihippik/wal-listener/listener"
"github.com/banked/wal-listener/v2/listener"
)

func main() {
Expand Down Expand Up @@ -49,7 +49,7 @@ func main() {

go initMetrics(cfg.Monitoring.PromAddr, logger)

natsConn, err := nats.Connect(cfg.Nats.Address)
natsConn, err := initNats(cfg.Nats)
if err != nil {
return fmt.Errorf("nats connection: %w", err)
}
Expand All @@ -64,7 +64,12 @@ func main() {
return fmt.Errorf("create Nats stream: %w", err)
}

conn, rConn, err := initPgxConnections(cfg.Database)
pgxConf, err := pgx.ParseURI(cfg.Database.DSN)
if err != nil {
return fmt.Errorf("failed to parse database DSN: %w", err)
}

conn, rConn, err := initPgxConnections(pgxConf)
if err != nil {
return fmt.Errorf("pgx connection: %w", err)
}
Expand All @@ -76,6 +81,7 @@ func main() {
rConn,
listener.NewNatsPublisher(js),
listener.NewBinaryParser(binary.BigEndian),
fmt.Sprintf("%s_%s", cfg.Listener.SlotName, pgxConf.Database),
)

if err := service.Process(c.Context); err != nil {
Expand Down
55 changes: 31 additions & 24 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,52 +6,59 @@ import (
"github.com/asaskevich/govalidator"
)

// Config for wal-listener/
// Config for wal-listener
type Config struct {
Listener ListenerCfg
Database DatabaseCfg
Nats NatsCfg
Logger LoggerCfg
Monitoring MonitoringCfg
Listener ListenerCfg `env:",prefix=LISTENER"`
Database DatabaseCfg `env:",prefix=DB_"`
Nats NatsCfg `env:",prefix=NATS"`
Logger LoggerCfg `env:",prefix=LOG"`
Monitoring MonitoringCfg `env:",prefix=MONITOR"`
}

// ListenerCfg path of the listener config.
type ListenerCfg struct {
SlotName string `valid:"required"`
AckTimeout time.Duration
RefreshConnection time.Duration `valid:"required"`
HeartbeatInterval time.Duration `valid:"required"`
Filter FilterStruct
SlotName string `valid:"required" env:"SLOT_NAME"`
AckTimeout time.Duration `env:"ACK_TIMEOUT"`
RefreshConnection time.Duration `valid:"required" env:"REFRESH_TIMEOUT"`
HeartbeatInterval time.Duration `valid:"required" env:"HEARTBEAT_INTERVAL"`
Filter FilterStruct `env:"FILTER"`
TopicsMap map[string]string
}

// NatsCfg path of the NATS config.
type NatsCfg struct {
Address string `valid:"required"`
StreamName string `valid:"required"`
TopicPrefix string
Address string `valid:"required" env:"ADDRESS"`
StreamName string `valid:"required" env:"STREAM_NAME"`
TopicPrefix string `env:"TOPIC_PREFIX"`
MTLS NatsMtlsCfg `env:"MTLS"`
}

// Nats TLS Coniguration
type NatsMtlsCfg struct {
CertPath string `env:"CERT_PATH,default=/etc/nats-certs/clients"`
CertFile string `env:"CERT_FILE,default=tls.crt"`
KeyFile string `env:"KEY_FILE,default=tls.key"`
CAFile string `env:"CA_FILE,default=ca.crt"`
InsecureSkipVerify bool `env:"INSECURE_SKIP_VERIFY,default=false"`
Enabled bool `env:"ENABLED,default=false"`
}

// MonitoringCfg monitoring configuration.
type MonitoringCfg struct {
SentryDSN string
PromAddr string
SentryDSN string `env:"SENTRY_DSN`
PromAddr string `env:"PROMETHEUS_ADDRESS"`
}

// LoggerCfg path of the logger config.
type LoggerCfg struct {
Caller bool
Level string
Format string
Caller bool `env:"CALLER"`
Level string `env:"LEVEL"`
Format string `env:"FORMAT"`
}

// DatabaseCfg path of the PostgreSQL DB config.
type DatabaseCfg struct {
Host string `valid:"required"`
Port uint16 `valid:"required"`
Name string `valid:"required"`
User string `valid:"required"`
Password string `valid:"required"`
DSN string `valid:"required" env:"DSN,overwrite"`
}

// FilterStruct incoming WAL message filter.
Expand Down
30 changes: 6 additions & 24 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ func TestConfig_Validate(t *testing.T) {
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
DSN: "http://user:password@localhost:5432/name",
},
Nats: NatsCfg{
Address: "addr",
Expand All @@ -50,11 +46,7 @@ func TestConfig_Validate(t *testing.T) {
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
DSN: "http://user:password@localhost:5432/name",
},
Nats: NatsCfg{
Address: "addr",
Expand All @@ -74,17 +66,15 @@ func TestConfig_Validate(t *testing.T) {
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Name: "db",
User: "usr",
Password: "pass",
DSN: "",
},
Nats: NatsCfg{
Address: "addr",
StreamName: "stream",
TopicPrefix: "prefix",
},
},
wantErr: errors.New("Database.Host: non zero value required;Database.Port: non zero value required"),
wantErr: errors.New("Database.DSN: non zero value required"),
},
{
name: "empty nats addr cfg",
Expand All @@ -96,11 +86,7 @@ func TestConfig_Validate(t *testing.T) {
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
DSN: "http://user:password@localhost:5432/name",
},
Nats: NatsCfg{
StreamName: "stream",
Expand All @@ -119,11 +105,7 @@ func TestConfig_Validate(t *testing.T) {
HeartbeatInterval: 10,
},
Database: DatabaseCfg{
Host: "host",
Port: 10,
Name: "db",
User: "usr",
Password: "pass",
DSN: "http://user:password@localhost:5432/name",
},
Nats: NatsCfg{
Address: "addr",
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/ihippik/wal-listener
module github.com/banked/wal-listener/v2

go 1.19

Expand All @@ -11,6 +11,7 @@ require (
github.com/magiconair/properties v1.8.6
github.com/nats-io/nats.go v1.16.0
github.com/prometheus/client_golang v1.13.0
github.com/sethvargo/go-envconfig v0.8.3
github.com/sirupsen/logrus v1.9.0
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.8.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sethvargo/go-envconfig v0.8.3 h1:dXyUrDCJvCm3ybP7yNpiux93qoSORvuH23bdsgFfiJ0=
github.com/sethvargo/go-envconfig v0.8.3/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
Expand Down
9 changes: 5 additions & 4 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"

"github.com/ihippik/wal-listener/config"
"github.com/banked/wal-listener/v2/config"
)

const errorBufferSize = 100
Expand Down Expand Up @@ -85,11 +85,12 @@ func NewWalListener(
repl replication,
publ publisher,
parser parser,
slotName string,
) *Listener {
return &Listener{
log: log,
slotName: fmt.Sprintf("%s_%s", cfg.Listener.SlotName, cfg.Database.Name),
cfg: cfg,
log: log,
slotName: slotName,
publisher: publ,
repository: repo,
replicator: repl,
Expand Down Expand Up @@ -279,7 +280,7 @@ func (l *Listener) Stream(ctx context.Context) {
}

if msg.ServerHeartbeat != nil {
//FIXME panic if there have been no messages for a long time.
// FIXME panic if there have been no messages for a long time.
l.log.WithFields(logrus.Fields{
"server_wal_end": msg.ServerHeartbeat.ServerWalEnd,
"server_time": msg.ServerHeartbeat.ServerTime,
Expand Down
2 changes: 1 addition & 1 deletion listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/ihippik/wal-listener/config"
"github.com/banked/wal-listener/v2/config"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion listener/nats_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/google/uuid"
"github.com/nats-io/nats.go"

"github.com/ihippik/wal-listener/config"
"github.com/banked/wal-listener/v2/config"
)

// NatsPublisher represent event publisher.
Expand Down
2 changes: 1 addition & 1 deletion listener/nats_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package listener
import (
"testing"

"github.com/ihippik/wal-listener/config"
"github.com/banked/wal-listener/v2/config"
)

func TestEvent_GetSubjectName(t *testing.T) {
Expand Down

0 comments on commit 7af7f57

Please sign in to comment.