Skip to content

Commit

Permalink
Add nats username and password
Browse files Browse the repository at this point in the history
  • Loading branch information
xorkevin committed Apr 23, 2023
1 parent b3533cd commit 1a4f1e2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
17 changes: 11 additions & 6 deletions service/events/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ func (s *NatsService) handlePing(ctx context.Context, m *lifecycle.Manager[natsC
if s.hbfailed < s.hbmaxfail {
s.log.WarnErr(ctx, kerrors.WithMsg(err, "Failed to ping events server"),
klog.AString("addr", s.addr),
klog.AString("username", client.auth.Username),
)
return
}
s.log.Err(ctx, kerrors.WithMsg(err, "Failed max pings to events server"),
klog.AString("addr", s.addr),
klog.AString("username", client.auth.Username),
)
s.hbfailed = 0
// first invalidate cached secret in order to ensure that construct client
Expand All @@ -130,28 +132,29 @@ func (s *NatsService) handlePing(ctx context.Context, m *lifecycle.Manager[natsC

type (
natsauth struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}
)

func (s *NatsService) handleGetClient(ctx context.Context, m *lifecycle.Manager[natsClient]) (*natsClient, error) {
var secret natsauth
var auth natsauth
{
client := m.Load(ctx)
if err := s.config.GetSecret(ctx, "auth", 0, &secret); err != nil {
if err := s.config.GetSecret(ctx, "auth", 0, &auth); err != nil {
return client, kerrors.WithMsg(err, "Invalid secret")
}
if secret.Password == "" {
if auth.Username == "" {
return client, kerrors.WithKind(nil, governor.ErrInvalidConfig, "Empty auth")
}
if secret == client.auth {
if auth == client.auth {
return client, nil
}
}

conn, err := nats.Connect(fmt.Sprintf("nats://%s", s.addr),
nats.Name(s.clientname),
nats.Token(secret.Password),
nats.UserInfo(auth.Username, auth.Password),
nats.PingInterval(s.hbinterval),
nats.MaxPingsOutstanding(s.hbmaxfail),
)
Expand All @@ -172,12 +175,13 @@ func (s *NatsService) handleGetClient(ctx context.Context, m *lifecycle.Manager[

s.log.Info(ctx, "Established connection to event stream",
klog.AString("addr", s.addr),
klog.AString("username", auth.Username),
)

client := &natsClient{
client: conn,
jetstream: jetstream,
auth: secret,
auth: auth,
}
m.Store(client)

Expand All @@ -189,6 +193,7 @@ func (s *NatsService) closeClient(ctx context.Context, client *natsClient) {
client.client.Close()
s.log.Info(ctx, "Closed events connection",
klog.AString("addr", s.addr),
klog.AString("username", client.auth.Username),
)
}
}
Expand Down
33 changes: 23 additions & 10 deletions service/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,16 @@ func (s *Service) handlePing(ctx context.Context, m *lifecycle.Manager[pubsubCli
}
s.hbfailed++
if s.hbfailed < s.hbmaxfail {
s.log.WarnErr(ctx, kerrors.WithMsg(err, "Failed to ping pubsub server"), klog.AString("addr", s.addr))
s.log.WarnErr(ctx, kerrors.WithMsg(err, "Failed to ping pubsub server"),
klog.AString("addr", s.addr),
klog.AString("username", client.auth.Username),
)
return
}
s.log.Err(ctx, kerrors.WithMsg(err, "Failed max pings to pubsub server"), klog.AString("addr", s.addr))
s.log.Err(ctx, kerrors.WithMsg(err, "Failed max pings to pubsub server"),
klog.AString("addr", s.addr),
klog.AString("username", client.auth.Username),
)
s.hbfailed = 0
// first invalidate cached secret in order to ensure that construct client
// will use refreshed auth
Expand Down Expand Up @@ -182,28 +188,29 @@ func (e errClient) Error() string {

type (
natsauth struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
}
)

func (s *Service) handleGetClient(ctx context.Context, m *lifecycle.Manager[pubsubClient]) (*pubsubClient, error) {
var secret natsauth
var auth natsauth
{
client := m.Load(ctx)
if err := s.config.GetSecret(ctx, "auth", 0, &secret); err != nil {
if err := s.config.GetSecret(ctx, "auth", 0, &auth); err != nil {
return client, kerrors.WithMsg(err, "Invalid secret")
}
if secret.Password == "" {
if auth.Username == "" {
return client, kerrors.WithKind(nil, governor.ErrInvalidConfig, "Empty auth")
}
if secret == client.auth {
if auth == client.auth {
return client, nil
}
}

conn, err := nats.Connect(fmt.Sprintf("nats://%s", s.addr),
nats.Name(s.clientname),
nats.Token(secret.Password),
nats.UserInfo(auth.Username, auth.Password),
nats.PingInterval(s.hbinterval),
nats.MaxPingsOutstanding(s.hbmaxfail),
)
Expand All @@ -218,11 +225,14 @@ func (s *Service) handleGetClient(ctx context.Context, m *lifecycle.Manager[pubs

m.Stop(ctx)

s.log.Info(ctx, "Established connection to event stream", klog.AString("addr", s.addr))
s.log.Info(ctx, "Established connection to event stream",
klog.AString("addr", s.addr),
klog.AString("username", auth.Username),
)

client := &pubsubClient{
client: conn,
auth: secret,
auth: auth,
}
m.Store(client)

Expand All @@ -232,7 +242,10 @@ func (s *Service) handleGetClient(ctx context.Context, m *lifecycle.Manager[pubs
func (s *Service) closeClient(ctx context.Context, client *pubsubClient) {
if client != nil && !client.client.IsClosed() {
client.client.Close()
s.log.Info(ctx, "Closed pubsub connection", klog.AString("addr", s.addr))
s.log.Info(ctx, "Closed pubsub connection",
klog.AString("addr", s.addr),
klog.AString("username", client.auth.Username),
)
}
}

Expand Down

0 comments on commit 1a4f1e2

Please sign in to comment.