Skip to content

Commit

Permalink
Merge 6aa3f5c into 95c99f0
Browse files Browse the repository at this point in the history
  • Loading branch information
wenxuwan authored Apr 11, 2021
2 parents 95c99f0 + 6aa3f5c commit 02d9190
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 119 deletions.
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:z
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coredns/coredns v1.1.2 h1:bAFHrSsBeTeRG5W3Nf2su3lUGw7Npw2UKeCJm/3A638=
github.com/coredns/coredns v1.1.2/go.mod h1:zASH/MVDgR6XZTbxvOnsZfffS+31vg6Ackf/wo1+AM0=
github.com/coreos/bbolt v1.3.4 h1:0VqjxUwoTLxM3PmsSIk0hI2ao6gTtButQ2z8FT4//yo=
github.com/coreos/bbolt v1.3.4/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down Expand Up @@ -227,7 +228,6 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 h1:Ujru1hufTHVb++eG6OuNDKMxZnGIvF6o/u8q/8h2+I4=
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
Expand Down
2 changes: 0 additions & 2 deletions protocol/dubbo/dubbo_invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ func InitTest(t *testing.T) (protocol.Protocol, *common.URL) {
ConnectionNum: 2,
HeartbeatPeriod: "5s",
SessionTimeout: "20s",
PoolTTL: 600,
PoolSize: 64,
GettySessionParam: getty.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
Expand Down
2 changes: 0 additions & 2 deletions protocol/dubbo/dubbo_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ func initDubboInvokerTest() {
ConnectionNum: 1,
HeartbeatPeriod: "3s",
SessionTimeout: "20s",
PoolTTL: 600,
PoolSize: 64,
GettySessionParam: getty.GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
Expand Down
6 changes: 0 additions & 6 deletions remoting/getty/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@ type (
SessionTimeout string `default:"60s" yaml:"session_timeout" json:"session_timeout,omitempty"`
sessionTimeout time.Duration

// Connection Pool
PoolSize int `default:"2" yaml:"pool_size" json:"pool_size,omitempty"`
PoolTTL int `default:"180" yaml:"pool_ttl" json:"pool_ttl,omitempty"`

// grpool
GrPoolSize int `default:"0" yaml:"gr_pool_size" json:"gr_pool_size,omitempty"`
QueueLen int `default:"0" yaml:"queue_len" json:"queue_len,omitempty"`
Expand All @@ -116,8 +112,6 @@ func GetDefaultClientConfig() ClientConfig {
ConnectionNum: 16,
HeartbeatPeriod: "30s",
SessionTimeout: "180s",
PoolSize: 4,
PoolTTL: 600,
GrPoolSize: 200,
QueueLen: 64,
QueueNumber: 10,
Expand Down
2 changes: 1 addition & 1 deletion remoting/getty/getty_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *Client) Connect(url *common.URL) error {
initClient(url.Protocol)
c.conf = *clientConf
// new client
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
c.pool = newGettyRPCClientConnPool(c)
c.pool.sslEnabled = url.GetParamBool(constant.SSL_ENABLED_KEY, false)

// codec
Expand Down
4 changes: 1 addition & 3 deletions remoting/getty/getty_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func getClient(url *common.URL) *Client {
}

func testClient_Call(t *testing.T, svr *Server, url *common.URL, c *Client) {
c.pool = newGettyRPCClientConnPool(c, clientConf.PoolSize, time.Duration(int(time.Second)*clientConf.PoolTTL))
c.pool = newGettyRPCClientConnPool(c)

testGetBigPkg(t, c)
testGetUser(t, c)
Expand Down Expand Up @@ -342,8 +342,6 @@ func InitTest(t *testing.T) (*Server, *common.URL) {
ConnectionNum: 2,
HeartbeatPeriod: "5s",
SessionTimeout: "20s",
PoolTTL: 600,
PoolSize: 64,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
Expand Down
99 changes: 17 additions & 82 deletions remoting/getty/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (c *gettyRPCClient) removeSession(session getty.Session) {
}
}()
if removeFlag {
c.pool.safeRemove(c)
c.pool.resetConn()
c.close()
}
}
Expand Down Expand Up @@ -325,123 +325,58 @@ func (c *gettyRPCClient) close() error {

type gettyRPCClientPool struct {
rpcClient *Client
size int // size of []*gettyRPCClient
ttl int64 // ttl of every gettyRPCClient, it is checked when getConn
sslEnabled bool
closed bool

sync.Mutex
conns []*gettyRPCClient
conn *gettyRPCClient
}

func newGettyRPCClientConnPool(rpcClient *Client, size int, ttl time.Duration) *gettyRPCClientPool {
func newGettyRPCClientConnPool(rpcClient *Client) *gettyRPCClientPool {
return &gettyRPCClientPool{
rpcClient: rpcClient,
size: size,
ttl: int64(ttl.Seconds()),
// init capacity : 2
conns: make([]*gettyRPCClient, 0, 2),
closed: false,
}
}

func (p *gettyRPCClientPool) close() {
p.Lock()
conns := p.conns
p.conns = nil
conn := p.conn
p.conn = nil
p.closed = true
p.Unlock()
for _, conn := range conns {
if conn != nil {
conn.close()
}
}

func (p *gettyRPCClientPool) getGettyRpcClient(addr string) (*gettyRPCClient, error) {
p.Lock()
defer p.Unlock()
conn, connErr := p.get()
if connErr == nil && conn == nil {
// create new conn
rpcClientConn, rpcErr := newGettyRPCClientConn(p, addr)
if rpcErr == nil {
p.put(rpcClientConn)
p.conn = rpcClientConn
}
return rpcClientConn, perrors.WithStack(rpcErr)
}
return conn, perrors.WithStack(connErr)
}

func (p *gettyRPCClientPool) get() (*gettyRPCClient, error) {
now := time.Now().Unix()

p.Lock()
defer p.Unlock()
if p.conns == nil {
if p.closed {
return nil, errClientPoolClosed
}
for num := len(p.conns); num > 0; {
var conn *gettyRPCClient
if num != 1 {
conn = p.conns[rand.Int31n(int32(num))]
} else {
conn = p.conns[0]
}
// This will recreate gettyRpcClient for remove last position
// p.conns = p.conns[:len(p.conns)-1]

if d := now - conn.getActive(); d > p.ttl {
p.remove(conn)
go conn.close()
num = len(p.conns)
continue
}
conn.updateActive(now) // update active time
return conn, nil
if p.conn != nil {
return p.conn, nil
}
return nil, nil
}

func (p *gettyRPCClientPool) put(conn *gettyRPCClient) {
if conn == nil || conn.getActive() == 0 {
return
}
p.Lock()
defer p.Unlock()
if p.conns == nil {
return
}
// check whether @conn has existed in p.conns or not.
for i := range p.conns {
if p.conns[i] == conn {
return
}
}
if len(p.conns) >= p.size {
// delete @conn from client pool
// p.remove(conn)
conn.close()
return
}
p.conns = append(p.conns, conn)
}

func (p *gettyRPCClientPool) remove(conn *gettyRPCClient) {
if conn == nil || conn.getActive() == 0 {
return
}

if p.conns == nil {
return
}

if len(p.conns) > 0 {
for idx, c := range p.conns {
if conn == c {
p.conns = append(p.conns[:idx], p.conns[idx+1:]...)
break
}
}
}
}

func (p *gettyRPCClientPool) safeRemove(conn *gettyRPCClient) {
func (p *gettyRPCClientPool) resetConn() {
p.Lock()
defer p.Unlock()

p.remove(conn)
p.conn = nil
}
23 changes: 3 additions & 20 deletions remoting/getty/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package getty

import (
"testing"
"time"
)

import (
Expand All @@ -28,24 +27,8 @@ import (

func TestGetConnFromPool(t *testing.T) {
var rpcClient Client

clientPoll := newGettyRPCClientConnPool(&rpcClient, 1, time.Duration(5*time.Second))

var conn1 gettyRPCClient
conn1.active = time.Now().Unix()
clientPoll.put(&conn1)
assert.Equal(t, 1, len(clientPoll.conns))

var conn2 gettyRPCClient
conn2.active = time.Now().Unix()
clientPoll.put(&conn2)
assert.Equal(t, 1, len(clientPoll.conns))
conn, err := clientPoll.get()
assert.Nil(t, err)
assert.Equal(t, &conn1, conn)
time.Sleep(6 * time.Second)
conn, err = clientPoll.get()
assert.Nil(t, conn)
clientPoll := newGettyRPCClientConnPool(&rpcClient)
cli, err := clientPoll.get()
assert.Nil(t, cli)
assert.Nil(t, err)
assert.Equal(t, 0, len(clientPoll.conns))
}
2 changes: 0 additions & 2 deletions remoting/getty/readwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ func getServer(t *testing.T) (*Server, *common.URL) {
ConnectionNum: 2,
HeartbeatPeriod: "5s",
SessionTimeout: "20s",
PoolTTL: 600,
PoolSize: 64,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
TcpNoDelay: true,
Expand Down

0 comments on commit 02d9190

Please sign in to comment.