-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnotifier.go
64 lines (55 loc) · 1.16 KB
/
notifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package flux
import "sync"
type Notifier struct {
m sync.RWMutex
subscribers map[interface{}]func(done chan struct{})
}
func NewNotifier() *Notifier {
n := &Notifier{}
n.subscribers = map[interface{}]func(done chan struct{}){}
return n
}
func (s *Notifier) Delete(key interface{}) {
s.delete(key)
}
func (s *Notifier) Watch(key interface{}, f func(done chan struct{})) {
s.set(key, f)
}
func (s *Notifier) Notify() (done chan struct{}) {
wg := &sync.WaitGroup{}
all := s.values()
wg.Add(len(all))
for _, f := range all {
finished := make(chan struct{}, 1)
go func() {
<-finished
wg.Done()
}()
f(finished)
}
done = make(chan struct{}, 1)
go func() {
wg.Wait()
close(done)
}()
return done
}
func (n *Notifier) set(key interface{}, value func(done chan struct{})) {
n.m.Lock()
defer n.m.Unlock()
n.subscribers[key] = value
}
func (n *Notifier) delete(key interface{}) {
n.m.Lock()
defer n.m.Unlock()
delete(n.subscribers, key)
}
func (n *Notifier) values() []func(done chan struct{}) {
n.m.RLock()
defer n.m.RUnlock()
var out []func(done chan struct{})
for _, v := range n.subscribers {
out = append(out, v)
}
return out
}