-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
eval: add notification method when set enabled called.
- Loading branch information
Showing
5 changed files
with
194 additions
and
9 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
package broker | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/hashicorp/nomad/helper" | ||
) | ||
|
||
// GenericNotifier allows a process to send updates to many subscribers in an | ||
// easy manner. | ||
type GenericNotifier struct { | ||
|
||
// publishCh is the channel used to receive the update which will be sent | ||
// to all subscribers. | ||
publishCh chan interface{} | ||
|
||
// subscribeCh and unsubscribeCh are the channels used to modify the | ||
// subscription membership mapping. | ||
subscribeCh chan chan interface{} | ||
unsubscribeCh chan chan interface{} | ||
} | ||
|
||
// NewGenericNotifier returns a generic notifier which can be used by a process | ||
// to notify many subscribers when a specific update is triggered. | ||
func NewGenericNotifier() *GenericNotifier { | ||
return &GenericNotifier{ | ||
publishCh: make(chan interface{}, 1), | ||
subscribeCh: make(chan chan interface{}, 1), | ||
unsubscribeCh: make(chan chan interface{}, 1), | ||
} | ||
} | ||
|
||
// Notify allows the implementer to notify all subscribers will a specific | ||
// update. There is no guarantee the order in which subscribers receive the | ||
// message which is sent linearly. | ||
func (g *GenericNotifier) Notify(msg interface{}) { | ||
select { | ||
case g.publishCh <- msg: | ||
default: | ||
} | ||
} | ||
|
||
// Run is a long-lived process which handles updating subscribers as well as | ||
// ensuring any update is sent to them. The passed stopCh is used to coordinate | ||
// shutdown. | ||
func (g *GenericNotifier) Run(stopCh <-chan struct{}) { | ||
|
||
// Store our subscribers inline with a map. This map can only be accessed | ||
// via a single channel update at a time, meaning we can manage with | ||
// without using a lock. | ||
subscribers := map[chan interface{}]struct{}{} | ||
|
||
for { | ||
select { | ||
case <-stopCh: | ||
return | ||
case msgCh := <-g.subscribeCh: | ||
subscribers[msgCh] = struct{}{} | ||
case msgCh := <-g.unsubscribeCh: | ||
delete(subscribers, msgCh) | ||
case update := <-g.publishCh: | ||
for subscriberCh := range subscribers { | ||
|
||
// The subscribers channels are buffered, but ensure we don't | ||
// block the whole process on this. | ||
select { | ||
case subscriberCh <- update: | ||
default: | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
// WaitForChange allows a subscriber to wait until there is a notification | ||
// change, or the timeout is reached. The function will block until one | ||
// condition ie met. | ||
func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { | ||
|
||
// Create a channel and subscribe to any update. This channel is buffered | ||
// to ensure we do not block the main broker process. | ||
updateCh := make(chan interface{}, 1) | ||
g.subscribeCh <- updateCh | ||
|
||
// Create a timeout timer and use the helper to ensure this routine doesn't | ||
// panic and making the stop call clear. | ||
timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout) | ||
|
||
// Defer a function which performs all the required cleanup of the | ||
// subscriber once it has been notified of a change, or reached its wait | ||
// timeout. | ||
defer func() { | ||
g.unsubscribeCh <- updateCh | ||
close(updateCh) | ||
timeoutStop() | ||
}() | ||
|
||
// Enter the main loop which listens for an update or timeout and returns | ||
// this information to the subscriber. | ||
for { | ||
select { | ||
case <-timeoutTimer.C: | ||
return "wait timed out after " + timeout.String() | ||
case update := <-updateCh: | ||
return update | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package broker | ||
|
||
import ( | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/hashicorp/nomad/ci" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestGenericNotifier(t *testing.T) { | ||
ci.Parallel(t) | ||
|
||
// Create the new notifier. | ||
stopChan := make(chan struct{}) | ||
defer close(stopChan) | ||
|
||
notifier := NewGenericNotifier() | ||
go notifier.Run(stopChan) | ||
|
||
// Ensure we have buffered channels. | ||
require.Equal(t, 1, cap(notifier.publishCh)) | ||
require.Equal(t, 1, cap(notifier.subscribeCh)) | ||
require.Equal(t, 1, cap(notifier.unsubscribeCh)) | ||
|
||
// Test that the timeout works. | ||
var timeoutWG sync.WaitGroup | ||
|
||
for i := 0; i < 6; i++ { | ||
go func(wg *sync.WaitGroup) { | ||
wg.Add(1) | ||
msg := notifier.WaitForChange(100 * time.Millisecond) | ||
require.Equal(t, "wait timed out after 100ms", msg) | ||
wg.Done() | ||
}(&timeoutWG) | ||
} | ||
timeoutWG.Wait() | ||
|
||
// Test that all subscribers recieve an update when a single notification | ||
// is sent. | ||
var notifiedWG sync.WaitGroup | ||
|
||
for i := 0; i < 6; i++ { | ||
go func(wg *sync.WaitGroup) { | ||
wg.Add(1) | ||
msg := notifier.WaitForChange(3 * time.Second) | ||
require.Equal(t, "we got an update and not a timeout", msg) | ||
wg.Done() | ||
}(¬ifiedWG) | ||
} | ||
|
||
notifier.Notify("we got an update and not a timeout") | ||
notifiedWG.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters