-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconfiguring.go
81 lines (64 loc) · 2.19 KB
/
configuring.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package decs
import (
"github.com/x-cellent/decs/mq"
"github.com/x-cellent/decs/nats"
"github.com/x-cellent/decs/nats/stan"
"github.com/x-cellent/decs/nsq"
"go.uber.org/zap"
"time"
)
func (cbus *CommandBus) Logger() *zap.Logger {
return cbus.log
}
func (cbus *CommandBus) SetLogger(logger *zap.Logger) {
cbus.log = logger
cbus.Recorder.log = logger
}
func (cbus *CommandBus) SetMqProvider(provider mq.Provider, retryInterval, flushDelay time.Duration) {
if cbus.mqProvider != nil && cbus.mqProvider == provider {
return
}
local := provider == nil
if local {
provider = mq.NewLocalProvider()
}
cbus.mqProvider = provider
if local {
cbus.producer = newProducerProxy(nil, cbus)
} else {
cbus.producer = newProducerProxy(mq.NewProducerProxy(cbus.mqProvider, retryInterval, flushDelay), cbus)
}
}
func (cbus *CommandBus) ConfigureLocalProvider(retryInterval, flushDelay time.Duration) {
cbus.SetMqProvider(nil, retryInterval, flushDelay)
}
func (cbus *CommandBus) ConfigureNatsProvider(retryInterval, flushDelay time.Duration, url string) {
cbus.SetMqProvider(nats.NewProvider(cbus.log, url), retryInterval, flushDelay)
}
func (cbus *CommandBus) ConfigureNatsStreamingProvider(retryInterval, flushDelay time.Duration, url, clusterID, clientID string) {
cbus.SetMqProvider(stan.NewProvider(cbus.log, url, clusterID, clientID), retryInterval, flushDelay)
}
func (cbus *CommandBus) ConfigureNsqProvider(retryInterval, flushDelay time.Duration, nsqdTcpAddress, nsqdHttpAddress string, nsqLookupdHttpAddresses ...string) {
cbus.SetMqProvider(nsq.NewProvider(cbus.log, nsqdTcpAddress, nsqdHttpAddress, nsqLookupdHttpAddresses...), retryInterval, flushDelay)
}
func (cbus *CommandBus) HandleLocalCommandsImmediately() {
if cbus.localFilter != nil {
return
}
cbus.localFilter = newLocalCommandFilter(cbus)
}
func (cbus *CommandBus) HandleLocalCommandsOnDemand() {
cbus.localFilter = nil
}
func (cbus *CommandBus) SuspendPublishing() {
cbus.publishingSuspended = true
}
func (cbus *CommandBus) ResumePublishing() {
cbus.publishingSuspended = false
}
func (cbus *CommandBus) SuspendCaching() {
cbus.cachingSuspended = true
}
func (cbus *CommandBus) ResumeCaching() {
cbus.cachingSuspended = false
}