-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsource.go
127 lines (114 loc) · 3.37 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package events
import (
"context"
"fmt"
"github.com/kubemq-io/kubemq-bridges/config"
"github.com/kubemq-io/kubemq-bridges/middleware"
"github.com/kubemq-io/kubemq-bridges/pkg/logger"
"github.com/kubemq-io/kubemq-bridges/pkg/roundrobin"
"github.com/kubemq-io/kubemq-bridges/pkg/uuid"
"github.com/kubemq-io/kubemq-go"
)
type Source struct {
opts options
log *logger.Logger
clients []*kubemq.Client
targets []middleware.Middleware
properties config.Metadata
roundRobin *roundrobin.RoundRobin
loadBalancingMode bool
}
func New() *Source {
return &Source{}
}
func (s *Source) Init(ctx context.Context, connection config.Metadata, properties config.Metadata, bindingName string, log *logger.Logger) error {
s.log = log
if s.log == nil {
s.log = logger.NewLogger("events")
}
var err error
s.opts, err = parseOptions(connection)
if err != nil {
return err
}
s.properties = properties
for i := 0; i < s.opts.sources; i++ {
clientId := fmt.Sprintf("kubemq-bridges_%s_%s", bindingName, s.opts.clientId)
if s.opts.sources > 1 {
clientId = fmt.Sprintf("kubemq-bridges_%s_%s-%d", bindingName, clientId, i)
}
client, err := kubemq.NewClient(ctx,
kubemq.WithAddress(s.opts.host, s.opts.port),
kubemq.WithClientId(clientId),
kubemq.WithTransportType(kubemq.TransportTypeGRPC),
kubemq.WithCheckConnection(true),
kubemq.WithAuthToken(s.opts.authToken),
kubemq.WithMaxReconnects(s.opts.maxReconnects),
kubemq.WithAutoReconnect(s.opts.autoReconnect),
kubemq.WithReconnectInterval(s.opts.reconnectIntervalSeconds))
if err != nil {
return err
}
s.clients = append(s.clients, client)
}
return nil
}
func (s *Source) Start(ctx context.Context, target []middleware.Middleware) error {
s.roundRobin = roundrobin.NewRoundRobin(len(target))
if s.properties != nil {
mode, ok := s.properties["load-balancing"]
if ok && mode == "true" {
s.loadBalancingMode = true
}
}
s.targets = target
if s.opts.sources > 1 && s.opts.group == "" {
s.opts.group = uuid.New().String()
}
for _, client := range s.clients {
errCh := make(chan error, 1)
eventsCh, err := client.SubscribeToEvents(ctx, s.opts.channel, s.opts.group, errCh)
if err != nil {
return fmt.Errorf("error on subscribing to events channel, %w", err)
}
go func(ctx context.Context, eventsCh <-chan *kubemq.Event, errCh chan error) {
s.run(ctx, eventsCh, errCh)
}(ctx, eventsCh, errCh)
}
return nil
}
func (s *Source) run(ctx context.Context, eventsCh <-chan *kubemq.Event, errCh chan error) {
for {
select {
case event := <-eventsCh:
if s.loadBalancingMode {
go func(event *kubemq.Event, target middleware.Middleware) {
_, err := target.Do(ctx, event)
if err != nil {
s.log.Errorf("error received from target, %s", err.Error())
}
}(event, s.targets[s.roundRobin.Next()])
} else {
for _, target := range s.targets {
go func(event *kubemq.Event, target middleware.Middleware) {
_, err := target.Do(ctx, event)
if err != nil {
s.log.Errorf("error received from target, %s", err.Error())
}
}(event, target)
}
}
case err := <-errCh:
s.log.Errorf("error received from kuebmq server, %s", err.Error())
return
case <-ctx.Done():
return
}
}
}
func (s *Source) Stop() error {
for _, client := range s.clients {
_ = client.Close()
}
return nil
}