-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadcast.go
118 lines (109 loc) · 2.73 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package broadcast
type broadcaster[T any] struct {
input chan T
reg chan chan<- T
unreg chan chan<- T
registers map[chan<- T]struct{}
done chan struct{}
}
type Broadcaster[T any] interface {
// Register a new channel to receive messages.
Register(chan<- T)
// Unregister a channel that no longer receive messages.
Unregister(chan<- T)
// Subbmit a new message to all subscribers.
Subbmit(T)
// Close the broadcaster. Using of closed broadcaster or close a closed broadcaster will result in panic.
Close()
}
// Create a new broadcaster with the given input channel buffer length.
// If ubable to send messages to registered channels, the broadcaster will be blocked
// and ubable to send messages to other channels.
func NewBroadcaster[T any](buflen int) Broadcaster[T] {
b := &broadcaster[T]{
input: make(chan T, buflen),
reg: make(chan chan<- T),
unreg: make(chan chan<- T),
registers: make(map[chan<- T]struct{}),
done: make(chan struct{}),
}
go b.run()
return b
}
// Create a new broadcaster with the given input channel buffer length.
// This broadcaster won't be blocked if unable to send messages to some registered channels.
// Instead, these channels will be ignored.
func NewUnblockedBroadcaster[T any](buflen int) Broadcaster[T] {
b := &broadcaster[T]{
input: make(chan T, buflen),
reg: make(chan chan<- T),
unreg: make(chan chan<- T),
registers: make(map[chan<- T]struct{}),
done: make(chan struct{}),
}
go b.unblockedrun()
return b
}
func (b *broadcaster[T]) run() {
for {
select {
// Send message to all subscribers.
case m := <-b.input:
for ch := range b.registers {
ch <- m
}
// Add a new subscriber.
case ch := <-b.reg:
b.registers[ch] = struct{}{}
// Delete a subscriber.
case ch := <-b.unreg:
delete(b.registers, ch)
// Terminate the broadcaster.
case <-b.done:
close(b.input)
close(b.reg)
close(b.unreg)
return
}
}
}
func (b *broadcaster[T]) unblockedrun() {
for {
select {
// Send message to all subscribers.
case m := <-b.input:
for ch := range b.registers {
select {
// If able to send message to this channel
case ch <- m:
// Else, ignore this channel
default:
}
}
// Add a new subscriber.
case ch := <-b.reg:
b.registers[ch] = struct{}{}
// Delete a subscriber.
case ch := <-b.unreg:
delete(b.registers, ch)
// Terminate the broadcaster.
case <-b.done:
close(b.input)
close(b.reg)
close(b.unreg)
return
}
}
}
func (b *broadcaster[T]) Register(ch chan<- T) {
b.reg <- ch
}
func (b *broadcaster[T]) Unregister(ch chan<- T) {
b.unreg <- ch
}
func (b *broadcaster[T]) Subbmit(m T) {
b.input <- m
}
func (b *broadcaster[T]) Close() {
close(b.done)
}