-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathmanager.go
589 lines (502 loc) · 13.5 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
package event
import (
"fmt"
"reflect"
"strings"
"sync"
)
const (
defaultChannelSize = 100
defaultConsumerNum = 3
)
// Manager event manager definition. for manage events and listeners
type Manager struct {
Options
sync.Mutex
wg sync.WaitGroup
ch chan Event
oc sync.Once
err error // latest error
// name of the manager
name string
// pool sync.Pool
// is a sample for new BasicEvent
sample *BasicEvent
// storage user custom Event instance. you can pre-define some Event instances.
events map[string]Event
// storage user pre-defined event factory func.
eventFc map[string]FactoryFunc
// storage all event name and ListenerQueue map
listeners map[string]*ListenerQueue
// storage all event names by listened
listenedNames map[string]int
}
// NewM create event manager. alias of the NewManager()
func NewM(name string, fns ...OptionFn) *Manager {
return NewManager(name, fns...)
}
// NewManager create event manager
func NewManager(name string, fns ...OptionFn) *Manager {
em := &Manager{
name: name,
sample: &BasicEvent{},
// events storage
eventFc: make(map[string]FactoryFunc),
// listeners
listeners: make(map[string]*ListenerQueue),
listenedNames: make(map[string]int),
}
// em.EnableLock = true
// for async fire by goroutine
em.ConsumerNum = defaultConsumerNum
em.ChannelSize = defaultChannelSize
// apply options
return em.WithOptions(fns...)
}
// WithOptions create event manager with options
func (em *Manager) WithOptions(fns ...OptionFn) *Manager {
for _, fn := range fns {
fn(&em.Options)
}
return em
}
/*************************************************************
* -- register listeners
*************************************************************/
// AddListener register a event handler/listener. alias of the method On()
func (em *Manager) AddListener(name string, listener Listener, priority ...int) {
em.On(name, listener, priority...)
}
// Listen register a event handler/listener. alias of the On()
func (em *Manager) Listen(name string, listener Listener, priority ...int) {
em.On(name, listener, priority...)
}
// On register a event handler/listener. can setting priority.
//
// Usage:
//
// em.On("evt0", listener)
// em.On("evt0", listener, High)
func (em *Manager) On(name string, listener Listener, priority ...int) {
pv := Normal
if len(priority) > 0 {
pv = priority[0]
}
em.addListenerItem(name, &ListenerItem{pv, listener})
}
// Subscribe add events by subscriber interface. alias of the AddSubscriber()
func (em *Manager) Subscribe(sbr Subscriber) {
em.AddSubscriber(sbr)
}
// AddSubscriber add events by subscriber interface.
//
// you can register multi event listeners in a struct func.
// more usage please see README or tests.
func (em *Manager) AddSubscriber(sbr Subscriber) {
for name, listener := range sbr.SubscribedEvents() {
switch lt := listener.(type) {
case Listener:
em.On(name, lt)
// case ListenerFunc:
// em.On(name, lt)
case ListenerItem:
em.addListenerItem(name, <)
default:
panic("event: the value must be an Listener or ListenerItem instance")
}
}
}
func (em *Manager) addListenerItem(name string, li *ListenerItem) {
name = goodName(name, true)
if li.Listener == nil {
panicf("event: the event %q listener cannot be empty", name)
}
if reflect.ValueOf(li.Listener).Kind() == reflect.Struct {
panicf("event: %q - struct listener must be pointer", name)
}
// exists, append it.
if lq, ok := em.listeners[name]; ok {
lq.Push(li)
} else { // first add.
em.listenedNames[name] = 1
em.listeners[name] = (&ListenerQueue{}).Push(li)
}
}
/*************************************************************
* Listener Manage: - trigger event
*************************************************************/
// MustTrigger alias of the method MustFire()
func (em *Manager) MustTrigger(name string, params M) Event {
return em.MustFire(name, params)
}
// MustFire fire event by name. will panic on error
func (em *Manager) MustFire(name string, params M) Event {
err, e := em.Fire(name, params)
if err != nil {
panic(err)
}
return e
}
// Trigger alias of the method Fire()
func (em *Manager) Trigger(name string, params M) (error, Event) {
return em.Fire(name, params)
}
// Fire trigger event by name. if not found listener, will return (nil, nil)
func (em *Manager) Fire(name string, params M) (err error, e Event) {
// call listeners handle event
e, err = em.fireByName(name, params, false)
return
}
// Async fire event by go channel.
//
// Note: if you want to use this method, you should
// call the method Close() after all events are fired.
func (em *Manager) Async(name string, params M) {
_, _ = em.fireByName(name, params, true)
}
// FireC async fire event by go channel. alias of the method Async()
//
// Note: if you want to use this method, you should
// call the method Close() after all events are fired.
func (em *Manager) FireC(name string, params M) {
_, _ = em.fireByName(name, params, true)
}
// fire event by name.
//
// if useCh is true, will async fire by channel. always return (nil, nil)
//
// On useCh=false:
// - will call listeners handle event.
// - if not found listener, will return (nil, nil)
func (em *Manager) fireByName(name string, params M, useCh bool) (e Event, err error) {
name = goodName(name, false)
// use pre-defined Event
if fc, ok := em.eventFc[name]; ok {
e = fc() // make new instance
if params != nil {
e.SetData(params)
}
} else {
// create new basic event instance
e = em.newBasicEvent(name, params)
}
// fire by channel
if useCh {
em.FireAsync(e)
return nil, nil
}
// call listeners handle event
err = em.FireEvent(e)
return
}
// FireEvent fire event by given Event instance
func (em *Manager) FireEvent(e Event) (err error) {
if em.EnableLock {
em.Lock()
defer em.Unlock()
}
// ensure aborted is false.
e.Abort(false)
name := e.Name()
// fire group listeners by wildcard. eg "db.user.*"
if em.MatchMode == ModePath {
err = em.firePathMode(name, e)
return
}
// handle mode: ModeSimple
err = em.fireSimpleMode(name, e)
if err != nil || e.IsAborted() {
return
}
// fire wildcard event listeners
if lq, ok := em.listeners[Wildcard]; ok {
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
break
}
}
}
return
}
// ModeSimple has group listeners by wildcard. eg "db.user.*"
//
// Example:
// - event "db.user.add" will trigger listeners on the "db.user.*"
func (em *Manager) fireSimpleMode(name string, e Event) (err error) {
// fire direct matched listeners. eg: db.user.add
if lq, ok := em.listeners[name]; ok {
// sort by priority before call.
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
return
}
}
}
pos := strings.LastIndexByte(name, '.')
if pos > 0 && pos < len(name) {
groupName := name[:pos+1] + Wildcard // "app.*"
if lq, ok := em.listeners[groupName]; ok {
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
return
}
}
}
}
return nil
}
// ModePath fire group listeners by ModePath.
//
// Example:
// - event "db.user.add" will trigger listeners on the "db.**"
// - event "db.user.add" will trigger listeners on the "db.user.*"
func (em *Manager) firePathMode(name string, e Event) (err error) {
for pattern, lq := range em.listeners {
if pattern == name || matchNodePath(pattern, name, ".") {
for _, li := range lq.Sort().Items() {
err = li.Listener.Handle(e)
if err != nil || e.IsAborted() {
return
}
}
}
}
return nil
}
/*************************************************************
* Fire by channel
*************************************************************/
// FireAsync async fire event by go channel.
//
// Note: if you want to use this method, you should
// call the method Close() after all events are fired.
//
// Example:
//
// em := NewManager("test")
// em.FireAsync("db.user.add", M{"id": 1001})
func (em *Manager) FireAsync(e Event) {
// once make consumers
em.oc.Do(func() {
em.makeConsumers()
})
// dispatch event
em.ch <- e
}
// async fire event by 'go' keywords
func (em *Manager) makeConsumers() {
if em.ConsumerNum <= 0 {
em.ConsumerNum = defaultConsumerNum
}
if em.ChannelSize <= 0 {
em.ChannelSize = defaultChannelSize
}
em.ch = make(chan Event, em.ChannelSize)
// make event consumers
for i := 0; i < em.ConsumerNum; i++ {
em.wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
em.err = fmt.Errorf("async consum event error: %v", err)
}
em.wg.Done()
}()
// keep running until channel closed
for e := range em.ch {
_ = em.FireEvent(e) // ignore async fire error
}
}()
}
}
// CloseWait close channel and wait all async event done.
func (em *Manager) CloseWait() error {
if err := em.Close(); err != nil {
return err
}
return em.Wait()
}
// Wait wait all async event done.
func (em *Manager) Wait() error {
em.wg.Wait()
return em.err
}
// Close event channel, deny to fire new event.
func (em *Manager) Close() error {
if em.ch != nil {
close(em.ch)
}
return nil
}
// FireBatch fire multi event at once.
//
// Usage:
//
// FireBatch("name1", "name2", &MyEvent{})
func (em *Manager) FireBatch(es ...any) (ers []error) {
var err error
for _, e := range es {
if name, ok := e.(string); ok {
err, _ = em.Fire(name, nil)
} else if evt, ok := e.(Event); ok {
err = em.FireEvent(evt)
} // ignore invalid param.
if err != nil {
ers = append(ers, err)
}
}
return
}
// AsyncFire simple async fire event by 'go' keywords
func (em *Manager) AsyncFire(e Event) {
go func(e Event) {
_ = em.FireEvent(e)
}(e)
}
// AwaitFire async fire event by 'go' keywords, but will wait return result
func (em *Manager) AwaitFire(e Event) (err error) {
ch := make(chan error)
go func(e Event) {
err := em.FireEvent(e)
ch <- err
}(e)
err = <-ch
close(ch)
return
}
/*************************************************************
* Event Manage
*************************************************************/
// AddEvent add a pre-defined event instance to manager.
func (em *Manager) AddEvent(e Event) {
name := goodName(e.Name(), false)
if ec, ok := e.(Cloneable); ok {
em.AddEventFc(name, func() Event {
return ec.Clone()
})
} else {
em.AddEventFc(name, func() Event {
return e
})
}
}
// AddEventFc add a pre-defined event factory func to manager.
func (em *Manager) AddEventFc(name string, fc FactoryFunc) {
em.Lock()
em.eventFc[name] = fc
em.Unlock()
}
// GetEvent get a pre-defined event instance by name
func (em *Manager) GetEvent(name string) (e Event, ok bool) {
fc, ok := em.eventFc[name]
if ok {
return fc(), true
}
return
}
// HasEvent has pre-defined event check
func (em *Manager) HasEvent(name string) bool {
_, ok := em.eventFc[name]
return ok
}
// RemoveEvent delete pre-define Event by name
func (em *Manager) RemoveEvent(name string) {
if _, ok := em.eventFc[name]; ok {
delete(em.eventFc, name)
}
}
// RemoveEvents remove all registered events
func (em *Manager) RemoveEvents() {
em.eventFc = map[string]FactoryFunc{}
}
/*************************************************************
* Helper Methods
*************************************************************/
// newBasicEvent create new BasicEvent by clone em.sample
func (em *Manager) newBasicEvent(name string, data M) *BasicEvent {
var cp = *em.sample
cp.SetName(name)
cp.SetData(data)
return &cp
}
// HasListeners check has direct listeners for the event name.
func (em *Manager) HasListeners(name string) bool {
_, ok := em.listenedNames[name]
return ok
}
// Listeners get all listeners
func (em *Manager) Listeners() map[string]*ListenerQueue {
return em.listeners
}
// ListenersByName get listeners by given event name
func (em *Manager) ListenersByName(name string) *ListenerQueue {
return em.listeners[name]
}
// ListenersCount get listeners number for the event name.
func (em *Manager) ListenersCount(name string) int {
if lq, ok := em.listeners[name]; ok {
return lq.Len()
}
return 0
}
// ListenedNames get listened event names
func (em *Manager) ListenedNames() map[string]int {
return em.listenedNames
}
// RemoveListener remove a given listener, you can limit event name.
//
// Usage:
//
// RemoveListener("", listener)
// RemoveListener("name", listener) // limit event name.
func (em *Manager) RemoveListener(name string, listener Listener) {
if name != "" {
if lq, ok := em.listeners[name]; ok {
lq.Remove(listener)
// delete from manager
if lq.IsEmpty() {
delete(em.listeners, name)
delete(em.listenedNames, name)
}
}
return
}
// name is empty. find all listener and remove matched.
for name, lq := range em.listeners {
lq.Remove(listener)
// delete from manager
if lq.IsEmpty() {
delete(em.listeners, name)
delete(em.listenedNames, name)
}
}
}
// RemoveListeners remove listeners by given name
func (em *Manager) RemoveListeners(name string) {
_, ok := em.listenedNames[name]
if ok {
em.listeners[name].Clear()
// delete from manager
delete(em.listeners, name)
delete(em.listenedNames, name)
}
}
// Clear alias of the Reset()
func (em *Manager) Clear() { em.Reset() }
// Reset the manager, clear all data.
func (em *Manager) Reset() {
// clear all listeners
for _, lq := range em.listeners {
lq.Clear()
}
// reset all
em.ch = nil
em.oc = sync.Once{}
em.wg = sync.WaitGroup{}
em.eventFc = make(map[string]FactoryFunc)
em.listeners = make(map[string]*ListenerQueue)
em.listenedNames = make(map[string]int)
}