Skip to content

Commit

Permalink
Add ability for notifications when one of the agent tokens is updated (
Browse files Browse the repository at this point in the history
…#8301)

Co-authored-by: Chris Piraino <[email protected]>
  • Loading branch information
2 people authored and hashicorp-ci committed Jul 14, 2020
1 parent 3330282 commit 625055a
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 0 deletions.
110 changes: 110 additions & 0 deletions agent/token/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,28 @@ const (
TokenSourceAPI TokenSource = true
)

type TokenKind int

const (
TokenKindAgent TokenKind = iota
TokenKindAgentMaster
TokenKindUser
TokenKindReplication
)

type watcher struct {
kind TokenKind
ch chan<- struct{}
}

// Notifier holds the channel used to notify a watcher
// of token updates as well as some internal tracking
// information to allow for deregistering the notifier.
type Notifier struct {
id int
Ch <-chan struct{}
}

// Store is used to hold the special ACL tokens used by Consul agents. It is
// designed to update the tokens on the fly, so the token store itself should be
// plumbed around and used to get tokens at runtime, don't save the resulting
Expand Down Expand Up @@ -52,17 +74,96 @@ type Store struct {
// replicationTokenSource indicates where this token originated from
replicationTokenSource TokenSource

watchers map[int]watcher
watcherIndex int

// enterpriseTokens contains tokens only used in consul-enterprise
enterpriseTokens
}

// Notify will set up a watch for when tokens of the desired kind is changed
func (t *Store) Notify(kind TokenKind) Notifier {
// buffering ensures that notifications aren't missed if the watcher
// isn't already in a select and that our notifications don't
// block returning from the Update* methods.
ch := make(chan struct{}, 1)

w := watcher{
kind: kind,
ch: ch,
}

t.l.Lock()
defer t.l.Unlock()
if t.watchers == nil {
t.watchers = make(map[int]watcher)
}
// we specifically want to avoid the zero-value to prevent accidental stop-notification requests
t.watcherIndex += 1
t.watchers[t.watcherIndex] = w

return Notifier{id: t.watcherIndex, Ch: ch}
}

// StopNotify stops the token store from sending notifications to the specified notifiers chan
func (t *Store) StopNotify(n Notifier) {
t.l.Lock()
defer t.l.Unlock()
delete(t.watchers, n.id)
}

// anyKindAllowed returns true if any of the kinds in the `check` list are
// set to be allowed in the `allowed` map.
//
// Note: this is mostly just a convenience to simplify the code in
// sendNotificationLocked and prevent more nested looping with breaks/continues
// and other state tracking.
func anyKindAllowed(allowed TokenKind, check []TokenKind) bool {
for _, kind := range check {
if allowed == kind {
return true
}
}
return false
}

// sendNotificationLocked will iterate through all watchers and notify them that a
// token they are watching has been updated.
//
// NOTE: this function explicitly does not attempt to send the kind or new token value
// along through the channel. With that approach watchers could potentially miss updates
// if the buffered chan fills up. Instead with this approach we just notify that any
// token they care about has been udpated and its up to the caller to retrieve the
// new value (after receiving from the chan). With this approach its entirely possible
// for the watcher to be notified twice before actually retrieving the token after the first
// read from the chan. This is better behavior than missing events. It can cause some
// churn temporarily but in common cases its not expected that these tokens would be updated
// frequently enough to cause this to happen.
func (t *Store) sendNotificationLocked(kinds ...TokenKind) {
for _, watcher := range t.watchers {
if !anyKindAllowed(watcher.kind, kinds) {
// ignore this watcher as it doesn't want events for these kinds of token
continue
}

select {
case watcher.ch <- struct{}{}:
default:
// its already pending a notification
}
}
}

// UpdateUserToken replaces the current user token in the store.
// Returns true if it was changed.
func (t *Store) UpdateUserToken(token string, source TokenSource) bool {
t.l.Lock()
changed := (t.userToken != token || t.userTokenSource != source)
t.userToken = token
t.userTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindUser)
}
t.l.Unlock()
return changed
}
Expand All @@ -74,6 +175,9 @@ func (t *Store) UpdateAgentToken(token string, source TokenSource) bool {
changed := (t.agentToken != token || t.agentTokenSource != source)
t.agentToken = token
t.agentTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindAgent)
}
t.l.Unlock()
return changed
}
Expand All @@ -85,6 +189,9 @@ func (t *Store) UpdateAgentMasterToken(token string, source TokenSource) bool {
changed := (t.agentMasterToken != token || t.agentMasterTokenSource != source)
t.agentMasterToken = token
t.agentMasterTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindAgentMaster)
}
t.l.Unlock()
return changed
}
Expand All @@ -96,6 +203,9 @@ func (t *Store) UpdateReplicationToken(token string, source TokenSource) bool {
changed := (t.replicationToken != token || t.replicationTokenSource != source)
t.replicationToken = token
t.replicationTokenSource = source
if changed {
t.sendNotificationLocked(TokenKindReplication)
}
t.l.Unlock()
return changed
}
Expand Down
91 changes: 91 additions & 0 deletions agent/token/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,94 @@ func TestStore_AgentMasterToken(t *testing.T) {
s.UpdateAgentMasterToken("", TokenSourceConfig)
verify(false, "", "nope", "master", "another")
}

func TestStore_Notify(t *testing.T) {
t.Parallel()
s := new(Store)

newNotification := func(t *testing.T, s *Store, kind TokenKind) Notifier {
n := s.Notify(kind)
require.NotNil(t, n.Ch)
return n
}

requireNotNotified := func(t *testing.T, ch <-chan struct{}) {
require.Empty(t, ch)
}

requireNotifiedOnce := func(t *testing.T, ch <-chan struct{}) {
require.Len(t, ch, 1)
// drain the channel
<-ch
// just to be safe
require.Empty(t, ch)
}

agentNotifier := newNotification(t, s, TokenKindAgent)
userNotifier := newNotification(t, s, TokenKindUser)
agentMasterNotifier := newNotification(t, s, TokenKindAgentMaster)
replicationNotifier := newNotification(t, s, TokenKindReplication)
replicationNotifier2 := newNotification(t, s, TokenKindReplication)

// perform an update of the user token
require.True(t, s.UpdateUserToken("edcae2a2-3b51-4864-b412-c7a568f49cb1", TokenSourceConfig))
// do it again to ensure it doesn't block even though nothing has read from the 1 buffered chan yet
require.True(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI))

// ensure notifications were sent to the user and all notifiers
requireNotNotified(t, agentNotifier.Ch)
requireNotifiedOnce(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)

// now update the agent token which should send notificaitons to the agent and all notifier
require.True(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI))

requireNotifiedOnce(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)

// now update the agent master token which should send notificaitons to the agent master and all notifier
require.True(t, s.UpdateAgentMasterToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))

requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotifiedOnce(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)

// now update the replication token which should send notificaitons to the replication and all notifier
require.True(t, s.UpdateReplicationToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))

requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotifiedOnce(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotifiedOnce(t, replicationNotifier2.Ch)

s.StopNotify(replicationNotifier2)

// now update the replication token which should send notificaitons to the replication and all notifier
require.True(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI))

requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotifiedOnce(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
requireNotNotified(t, replicationNotifier2.Ch)

// request updates but that are not changes
require.False(t, s.UpdateAgentToken("5d748ec2-d536-461f-8e2a-1f7eae98d559", TokenSourceAPI))
require.False(t, s.UpdateAgentMasterToken("789badc8-f850-43e1-8742-9b9f484957cc", TokenSourceAPI))
require.False(t, s.UpdateUserToken("47788919-f944-476a-bda5-446d64be1df8", TokenSourceAPI))
require.False(t, s.UpdateReplicationToken("eb0b56b9-fa65-4ae1-902a-c64457c62ac6", TokenSourceAPI))

// ensure that notifications were not sent
requireNotNotified(t, agentNotifier.Ch)
requireNotNotified(t, userNotifier.Ch)
requireNotNotified(t, replicationNotifier.Ch)
requireNotNotified(t, agentMasterNotifier.Ch)
}

0 comments on commit 625055a

Please sign in to comment.