Skip to content

Commit

Permalink
fix grpc client listeners race (#399)
Browse files Browse the repository at this point in the history
  • Loading branch information
binbin0325 authored Feb 9, 2022
1 parent bf71d0d commit 494f91e
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 53 deletions.
38 changes: 0 additions & 38 deletions clients/client_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ func TestSetConfigClient(t *testing.T) {
*constant.NewServerConfig(
ip,
8848,
//constant.WithScheme("http"),
//constant.WithContextPath("/nacos"),
),
}

Expand Down Expand Up @@ -67,40 +65,4 @@ func TestSetConfigClient(t *testing.T) {
assert.Nil(t, err)
assert.True(t, reflect.DeepEqual(nacosClientFromMap, nacosClientFromStruct))
})

t.Run("registry", func(t *testing.T) {
client, err := NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
ServerConfigs: sc,
},
)
if err != nil {
t.Fatal(err)
return
}
serviceName := "golang-sms@grpc"
_, err = client.RegisterInstance(vo.RegisterInstanceParam{
Ip: "f",
Port: 8840,
ServiceName: serviceName,
Weight: 10,
Enable: true,
Healthy: true,
Ephemeral: true,
Metadata: map[string]string{"idc": "shanghai-xs"},
})
if err != nil {
t.Fatal(err)
return
}
is, err := client.GetService(vo.GetServiceParam{
ServiceName: serviceName,
})
if err != nil {
t.Fatal(err)
}
t.Logf("is %#v", is)
})

}
24 changes: 19 additions & 5 deletions clients/naming_client/naming_http/beat_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"sync/atomic"
"time"

Expand All @@ -42,6 +43,7 @@ type BeatReactor struct {
beatThreadSemaphore *semaphore.Weighted
beatRecordMap cache.ConcurrentMap
clientCfg constant.ClientConfig
mux *sync.Mutex
}

const DefaultBeatThreadNum = 20
Expand All @@ -56,29 +58,41 @@ func NewBeatReactor(clientCfg constant.ClientConfig, nacosServer *nacos_server.N
br.beatThreadCount = DefaultBeatThreadNum
br.beatRecordMap = cache.NewConcurrentMap()
br.beatThreadSemaphore = semaphore.NewWeighted(int64(br.beatThreadCount))
br.mux = new(sync.Mutex)
return br
}

func buildKey(serviceName string, ip string, port uint64) string {
return serviceName + constant.NAMING_INSTANCE_ID_SPLITTER + ip + constant.NAMING_INSTANCE_ID_SPLITTER + strconv.Itoa(int(port))
}

func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo model.BeatInfo) {
func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo *model.BeatInfo) {
logger.Infof("adding beat: <%s> to beat map", util.ToJsonString(beatInfo))
k := buildKey(serviceName, beatInfo.Ip, beatInfo.Port)
br.beatMap.Set(k, &beatInfo)
go br.sendInstanceBeat(k, &beatInfo)
defer br.mux.Unlock()
br.mux.Lock()
if data, ok := br.beatMap.Get(k); ok {
beatInfo = data.(*model.BeatInfo)
atomic.StoreInt32(&beatInfo.State, int32(model.StateShutdown))
br.beatMap.Remove(k)
}
br.beatMap.Set(k, beatInfo)
beatInfo.Metadata = util.DeepCopyMap(beatInfo.Metadata)
go br.sendInstanceBeat(k, beatInfo)
}

func (br *BeatReactor) RemoveBeatInfo(serviceName string, ip string, port uint64) {
logger.Infof("remove beat: %s@%s:%d from beat map", serviceName, ip, port)
k := buildKey(serviceName, ip, port)
defer br.mux.Unlock()
br.mux.Lock()
data, exist := br.beatMap.Get(k)
if exist {
beatInfo := data.(*model.BeatInfo)
atomic.StoreInt32(&beatInfo.State, int32(model.StateShutdown))
}
br.beatMap.Remove(k)

}

func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) {
Expand All @@ -92,7 +106,7 @@ func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) {
}

//进行心跳通信
beatInterval, err := br.SendBeat(*beatInfo)
beatInterval, err := br.SendBeat(beatInfo)
if err != nil {
logger.Errorf("beat to server return error:%+v", err)
br.beatThreadSemaphore.Release(1)
Expand All @@ -112,7 +126,7 @@ func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) {
}
}

func (br *BeatReactor) SendBeat(info model.BeatInfo) (int64, error) {
func (br *BeatReactor) SendBeat(info *model.BeatInfo) (int64, error) {
logger.Infof("namespaceId:<%s> sending beat to server:<%s>",
br.clientCfg.NamespaceId, util.ToJsonString(info))
params := map[string]string{}
Expand Down
6 changes: 3 additions & 3 deletions clients/naming_client/naming_http/beat_reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestBeatReactor_AddBeatInfo(t *testing.T) {
br := NewBeatReactor(constant.ClientConfig{}, &nacos_server.NacosServer{})
serviceName := "Test"
groupName := "public"
beatInfo := model.BeatInfo{
beatInfo := &model.BeatInfo{
Ip: "127.0.0.1",
Port: 8080,
Metadata: map[string]string{},
Expand All @@ -49,7 +49,7 @@ func TestBeatReactor_RemoveBeatInfo(t *testing.T) {
br := NewBeatReactor(constant.ClientConfig{}, &nacos_server.NacosServer{})
serviceName := "Test"
groupName := "public"
beatInfo1 := model.BeatInfo{
beatInfo1 := &model.BeatInfo{
Ip: "127.0.0.1",
Port: 8080,
Metadata: map[string]string{},
Expand All @@ -58,7 +58,7 @@ func TestBeatReactor_RemoveBeatInfo(t *testing.T) {
Weight: 1,
}
br.AddBeatInfo(util.GetGroupName(serviceName, groupName), beatInfo1)
beatInfo2 := model.BeatInfo{
beatInfo2 := &model.BeatInfo{
Ip: "127.0.0.2",
Port: 8080,
Metadata: map[string]string{},
Expand Down
2 changes: 1 addition & 1 deletion clients/naming_client/naming_http/naming_http_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (proxy *NamingHttpProxy) RegisterInstance(serviceName string, groupName str
return false, err
}
if instance.Ephemeral {
beatInfo := model.BeatInfo{
beatInfo := &model.BeatInfo{
Ip: instance.Ip,
Port: instance.Port,
Metadata: instance.Metadata,
Expand Down
2 changes: 2 additions & 0 deletions common/remote/rpc/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func NewGrpcClient(clientName string, nacosServer *nacos_server.NacosServer) *Gr
},
}
rpcClient.executeClient = rpcClient
listeners := make([]IConnectionEventListener, 0, 8)
rpcClient.connectionEventListeners.Store(listeners)
return rpcClient
}

Expand Down
16 changes: 10 additions & 6 deletions common/remote/rpc/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type RpcClient struct {
rpcClientStatus RpcClientStatus
eventChan chan ConnectionEvent
reconnectionChan chan ReconnectContext
connectionEventListeners []IConnectionEventListener
connectionEventListeners atomic.Value
lastActiveTimeStamp time.Time
executeClient IRpcClient
nacosServer *nacos_server.NacosServer
Expand Down Expand Up @@ -282,7 +282,10 @@ func (r *RpcClient) RegisterServerRequestHandler(request func() rpc_request.IReq

func (r *RpcClient) RegisterConnectionListener(listener IConnectionEventListener) {
logger.Infof("%s register connection listener [%+v] to current client", r.Name, reflect.TypeOf(listener))
r.connectionEventListeners = append(r.connectionEventListeners, listener)
listeners := r.connectionEventListeners.Load()
connectionEventListeners := listeners.([]IConnectionEventListener)
connectionEventListeners = append(connectionEventListeners, listener)
r.connectionEventListeners.Store(connectionEventListeners)
}

func (r *RpcClient) switchServerAsync(recommendServerInfo ServerInfo, onRequestFail bool) {
Expand Down Expand Up @@ -358,11 +361,12 @@ func (r *RpcClient) closeConnection() {

// Notify when client new connected.
func (r *RpcClient) notifyConnectionEvent(event ConnectionEvent) {
if len(r.connectionEventListeners) == 0 {
listeners := r.connectionEventListeners.Load().([]IConnectionEventListener)
if len(listeners) == 0 {
return
}
logger.Infof("%s notify %s event to listeners.", r.Name, event.toString())
for _, v := range r.connectionEventListeners {
for _, v := range listeners {
if event.isConnected() {
v.OnConnected()
}
Expand Down Expand Up @@ -462,8 +466,8 @@ func (r *RpcClient) Request(request rpc_request.IRequest, timeoutMills int64) (r
var currentErr error
for retryTimes < constant.REQUEST_DOMAIN_RETRY_TIME && util.CurrentMillis() < start+timeoutMills {
if r.currentConnection == nil || !r.IsRunning() {
currentErr = waitReconnect(timeoutMills, &retryTimes, request, errors.New(fmt.Sprintf(
"Client not connected, current status:%s", r.rpcClientStatus.getDesc())))
currentErr = waitReconnect(timeoutMills, &retryTimes, request,
fmt.Errorf("client not connected, current status:%s", r.rpcClientStatus.getDesc()))
continue
}
response, err := r.currentConnection.request(request, timeoutMills, r)
Expand Down
8 changes: 8 additions & 0 deletions util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,11 @@ func GetUrlFormedMap(source map[string]string) (urlEncoded string) {
urlEncoded = urlEncoder.Encode()
return
}

func DeepCopyMap(params map[string]string) map[string]string {
result := make(map[string]string, len(params))
for k, v := range params {
result[k] = v
}
return result
}

0 comments on commit 494f91e

Please sign in to comment.