This repository has been archived by the owner on Oct 29, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathnacos.go
147 lines (129 loc) · 3.76 KB
/
nacos.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package grpc_resolver_nacos
import (
"context"
"fmt"
"sort"
"strings"
"time"
"github.com/pkg/errors"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
const schemeName = "nacos"
const interval = 5 * time.Second
func init() {
resolver.Register(&NacosResolver{})
}
type NacosResolver struct {
cancelFunc context.CancelFunc
}
func (r *NacosResolver) Build(url resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
dsn := strings.Join([]string{schemeName + ":/", url.URL.Host, url.URL.Path}, "/")
config, err := parseURL(dsn)
if err != nil {
return nil, errors.Wrap(err, "Wrong URL")
}
ctx, cancel := context.WithCancel(context.Background())
pipe := make(chan []serviceInfo)
go watchNacosService(ctx, config, pipe)
go populateEndpoints(ctx, cc, pipe)
return &NacosResolver{cancelFunc: cancel}, nil
}
func (r *NacosResolver) Scheme() string {
return schemeName
}
func (r *NacosResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (r *NacosResolver) Close() {
r.cancelFunc()
}
type serviceInfo struct {
Address string
Weight int
}
func watchNacosService(ctx context.Context, config *NacosConfig, out chan<- []serviceInfo) {
res := make(chan []serviceInfo)
quit := make(chan struct{})
go func() {
start := true
for {
if !start {
time.Sleep(interval)
}
start = false
inss, err := GetHealthyInstances(config.ServiceName, config.Clusters, config.GroupName, config.NacosClient)
if err != nil {
select {
case <-quit:
return
default:
grpclog.Errorf("[Nacos resolver] Couldn't fetch endpoints. label={%s}; error={%v}", config.Label, err)
continue
}
}
grpclog.Infof("[Nacos resolver] %d endpoints fetched for label={%s}",
len(inss),
config.Label,
)
ee := make([]serviceInfo, 0, len(inss))
for _, s := range inss {
address := fmt.Sprintf("%s:%d", s.Ip, s.Port)
ee = append(ee, serviceInfo{Address: address, Weight: (int)(s.Weight)})
}
select {
case res <- ee:
continue
case <-quit:
return
}
}
}()
for {
// If in the below select both channels have values that can be read,
// Go picks one pseudo-randomly.
// But when the context is canceled we want to act upon it immediately.
if ctx.Err() != nil {
// Close quit so the goroutine returns and doesn't leak.
// Do NOT close res because that can lead to panics in the goroutine.
// res will be garbage collected at some point.
close(quit)
return
}
select {
case ee := <-res:
out <- ee
case <-ctx.Done():
close(quit)
return
}
}
}
func populateEndpoints(ctx context.Context, clientConn resolver.ClientConn, input <-chan []serviceInfo) {
for {
select {
case cc := <-input:
connsSet := make(map[serviceInfo]struct{}, len(cc))
for _, c := range cc {
connsSet[c] = struct{}{}
}
conns := make([]resolver.Address, 0, len(connsSet))
for c := range connsSet {
add := resolver.Address{Addr: c.Address,
BalancerAttributes: attributes.New(WeightAttributeKey{}, WeightAddrInfo{Weight: c.Weight})}
//fmt.Printf("%v/n", add)
conns = append(conns, add)
}
//fmt.Printf("%v/n", conns)
sort.Sort(byAddressString(conns)) // Don't replace the same address list in the balancer
clientConn.UpdateState(resolver.State{Addresses: conns})
case <-ctx.Done():
grpclog.Info("[Nacos resolver] Watch has been finished")
return
}
}
}
// byAddressString sorts resolver.Address by Address Field sorting in increasing order.
type byAddressString []resolver.Address
func (p byAddressString) Len() int { return len(p) }
func (p byAddressString) Less(i, j int) bool { return p[i].Addr < p[j].Addr }
func (p byAddressString) Swap(i, j int) { p[i], p[j] = p[j], p[i] }