Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-165] Auto release idle connections #963

Merged
merged 10 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ type ClientOptions struct {
// Default prometheus.DefaultRegisterer
MetricsRegisterer prometheus.Registerer

// Release the connection if it is not used for more than ConnectionMaxIdleTime.
// Default is 60 seconds, negative such as -1 to disable.
ConnectionMaxIdleTime time.Duration

EnableTransaction bool

// Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.
Expand Down
14 changes: 13 additions & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
defaultOperationTimeout = 30 * time.Second
defaultKeepAliveInterval = 30 * time.Second
defaultMemoryLimitBytes = 64 * 1024 * 1024
defaultConnMaxIdleTime = 180 * time.Second
minConnMaxIdleTime = 60 * time.Second
)

type client struct {
Expand All @@ -56,6 +58,16 @@ func newClient(options ClientOptions) (Client, error) {
logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
}

connectionMaxIdleTime := options.ConnectionMaxIdleTime
if connectionMaxIdleTime == 0 {
connectionMaxIdleTime = defaultConnMaxIdleTime
} else if connectionMaxIdleTime > 0 && connectionMaxIdleTime < minConnMaxIdleTime {
return nil, newError(InvalidConfiguration, fmt.Sprintf("Connection max idle time should be at least %f "+
"seconds", minConnMaxIdleTime.Seconds()))
} else {
logger.Debugf("Disable auto release idle connections")
}

if options.URL == "" {
return nil, newError(InvalidConfiguration, "URL is required for client")
}
Expand Down Expand Up @@ -143,7 +155,7 @@ func newClient(options ClientOptions) (Client, error) {

c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, keepAliveInterval,
maxConnectionsPerHost, logger, metrics),
maxConnectionsPerHost, logger, metrics, connectionMaxIdleTime),
log: logger,
metrics: metrics,
memLimit: internal.NewMemoryLimitController(memLimitBytes),
Expand Down
115 changes: 115 additions & 0 deletions pulsar/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"crypto/tls"
"fmt"
"log"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -1123,3 +1124,117 @@ func TestServiceUrlTLSWithTLSTransportWithBasicAuth(t *testing.T) {
func TestWebServiceUrlTLSWithTLSTransportWithBasicAuth(t *testing.T) {
testTLSTransportWithBasicAuth(t, webServiceURLTLS)
}

func TestConfigureConnectionMaxIdleTime(t *testing.T) {
_, err := NewClient(ClientOptions{
URL: serviceURL,
ConnectionMaxIdleTime: 1 * time.Second,
})

assert.Error(t, err, "Should be failed when the connectionMaxIdleTime is less than minConnMaxIdleTime")

cli, err := NewClient(ClientOptions{
URL: serviceURL,
ConnectionMaxIdleTime: -1, // Disabled
})

assert.Nil(t, err)
cli.Close()

cli, err = NewClient(ClientOptions{
URL: serviceURL,
ConnectionMaxIdleTime: 60 * time.Second,
})

assert.Nil(t, err)
cli.Close()
}

func testSendAndReceive(t *testing.T, producer Producer, consumer Consumer) {
// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(context.Background(), &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
log.Fatal(err)
}
}

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
assert.Equal(t, []byte(expectMsg), msg.Payload())
// ack message
err = consumer.Ack(msg)
if err != nil {
return
}
}
}

func TestAutoCloseIdleConnection(t *testing.T) {
cli, err := NewClient(ClientOptions{
URL: serviceURL,
ConnectionMaxIdleTime: -1, // Disable auto release connections first, we will enable it manually later
})

assert.Nil(t, err)

topic := "TestAutoCloseIdleConnection"

// create consumer
consumer1, err := cli.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)

// create producer
producer1, err := cli.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)

testSendAndReceive(t, producer1, consumer1)

pool := cli.(*client).cnxPool

producer1.Close()
consumer1.Close()

assert.NotEqual(t, 0, internal.GetConnectionsCount(&pool))

internal.StartCleanConnectionsTask(&pool, 2*time.Second) // Enable auto idle connections release manually

time.Sleep(6 * time.Second) // Need to wait at least 3 * ConnectionMaxIdleTime

assert.Equal(t, 0, internal.GetConnectionsCount(&pool))

// create consumer
consumer2, err := cli.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
})
assert.Nil(t, err)

// create producer
producer2, err := cli.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)

// Ensure the client still works
testSendAndReceive(t, producer2, consumer2)

producer2.Close()
consumer2.Close()

cli.Close()
}
48 changes: 48 additions & 0 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type connection struct {
metrics *Metrics

keepAliveInterval time.Duration

lastActive time.Time
}

// connectionOptions defines configurations for creating connection.
Expand Down Expand Up @@ -927,6 +929,52 @@ func (c *connection) UnregisterListener(id uint64) {
delete(c.listeners, id)
}

func (c *connection) ResetLastActive() {
c.Lock()
defer c.Unlock()
c.lastActive = time.Now()
}

func (c *connection) isIdle() bool {
{
c.pendingLock.Lock()
defer c.pendingLock.Unlock()
if len(c.pendingReqs) != 0 {
return false
}
}

{
c.listenersLock.RLock()
shibd marked this conversation as resolved.
Show resolved Hide resolved
defer c.listenersLock.RUnlock()
if len(c.listeners) != 0 {
return false
}
}

{
c.consumerHandlersLock.Lock()
defer c.consumerHandlersLock.Unlock()
if len(c.consumerHandlers) != 0 {
return false
}
}

if len(c.incomingRequestsCh) != 0 || len(c.writeRequestsCh) != 0 {
return false
}
return true
}

func (c *connection) CheckIdle(maxIdleTime time.Duration) bool {
// We don't need to lock here because this method should only be
// called in a single goroutine of the connectionPool
if !c.isIdle() {
c.lastActive = time.Now()
}
return time.Since(c.lastActive) > maxIdleTime
}

// Close closes the connection by
// closing underlying socket connection and closeCh.
// This also triggers callbacks to the ConnectionClosed listeners.
Expand Down
33 changes: 31 additions & 2 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type connectionPool struct {
maxConnectionsPerHost int32
roundRobinCnt int32
keepAliveInterval time.Duration
closeCh chan struct{}

metrics *Metrics
log log.Logger
Expand All @@ -60,8 +61,9 @@ func NewConnectionPool(
keepAliveInterval time.Duration,
maxConnectionsPerHost int,
logger log.Logger,
metrics *Metrics) ConnectionPool {
return &connectionPool{
metrics *Metrics,
connectionMaxIdleTime time.Duration) ConnectionPool {
p := &connectionPool{
connections: make(map[string]*connection),
tlsOptions: tlsOptions,
auth: auth,
Expand All @@ -70,7 +72,10 @@ func NewConnectionPool(
keepAliveInterval: keepAliveInterval,
log: logger,
metrics: metrics,
closeCh: make(chan struct{}),
}
go p.checkAndCleanIdleConnections(connectionMaxIdleTime)
return p
}

func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
Expand Down Expand Up @@ -109,6 +114,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
p.Unlock()
conn.start()
} else {
conn.ResetLastActive()
// we already have a connection
p.Unlock()
}
Expand All @@ -119,6 +125,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U

func (p *connectionPool) Close() {
p.Lock()
close(p.closeCh)
for k, c := range p.connections {
delete(p.connections, k)
c.Close()
Expand All @@ -134,3 +141,25 @@ func (p *connectionPool) getMapKey(addr *url.URL) string {
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(addr.Host, '-', idx)
}

func (p *connectionPool) checkAndCleanIdleConnections(maxIdleTime time.Duration) {
if maxIdleTime < 0 {
return
}
for {
select {
case <-p.closeCh:
return
case <-time.After(maxIdleTime):
p.Lock()
for k, c := range p.connections {
if c.CheckIdle(maxIdleTime) {
c.log.Debugf("Closed connection due to inactivity.")
delete(p.connections, k)
c.Close()
}
}
p.Unlock()
}
}
}
33 changes: 33 additions & 0 deletions pulsar/internal/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import "time"

// These method should only be used by tests

func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime time.Duration) {
go (*p).(*connectionPool).checkAndCleanIdleConnections(connectionMaxIdleTime)
}

func GetConnectionsCount(p *ConnectionPool) int {
pool := (*p).(*connectionPool)
pool.Lock()
defer pool.Unlock()
return len(pool.connections)
}