Skip to content

Commit

Permalink
Allow to have multiple connections per broker (#276)
Browse files Browse the repository at this point in the history
- Allow to have multiple connections per broker
  • Loading branch information
merlimat authored Jun 11, 2020
1 parent 9ed8af8 commit c979046
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
3 changes: 3 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type ClientOptions struct {

// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
TLSValidateHostname bool

// Max number of connections to a single broker that will kept in the pool. (Default: 1 connection)
MaxConnectionsPerBroker int
}

type Client interface {
Expand Down
7 changes: 6 additions & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ func newClient(options ClientOptions) (Client, error) {
operationTimeout = defaultOperationTimeout
}

maxConnectionsPerHost := options.MaxConnectionsPerBroker
if maxConnectionsPerHost <= 0 {
maxConnectionsPerHost = 1
}

c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout),
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost),
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil)
Expand Down
42 changes: 31 additions & 11 deletions pulsar/internal/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package internal

import (
"fmt"
"net/url"
"sync"
"sync/atomic"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/auth"
Expand All @@ -37,23 +39,31 @@ type ConnectionPool interface {
}

type connectionPool struct {
pool sync.Map
connectionTimeout time.Duration
tlsOptions *TLSOptions
auth auth.Provider
pool sync.Map
connectionTimeout time.Duration
tlsOptions *TLSOptions
auth auth.Provider
maxConnectionsPerHost int32
roundRobinCnt int32
}

// NewConnectionPool init connection pool.
func NewConnectionPool(tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration) ConnectionPool {
func NewConnectionPool(
tlsOptions *TLSOptions,
auth auth.Provider,
connectionTimeout time.Duration,
maxConnectionsPerHost int) ConnectionPool {
return &connectionPool{
tlsOptions: tlsOptions,
auth: auth,
connectionTimeout: connectionTimeout,
tlsOptions: tlsOptions,
auth: auth,
connectionTimeout: connectionTimeout,
maxConnectionsPerHost: int32(maxConnectionsPerHost),
}
}

func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error) {
cachedCnx, found := p.pool.Load(logicalAddr.Host)
key := p.getMapKey(logicalAddr)
cachedCnx, found := p.pool.Load(key)
if found {
cnx := cachedCnx.(*connection)
log.Debug("Found connection in cache:", cnx.logicalAddr, cnx.physicalAddr)
Expand All @@ -63,14 +73,15 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, physicalAddr *url.U
return cnx, nil
}
// The cached connection is failed
p.pool.Delete(logicalAddr.Host)
p.pool.Delete(key)
log.Debug("Removed failed connection from pool:", cnx.logicalAddr, cnx.physicalAddr)
}

// Try to create a new connection
newConnection := newConnection(logicalAddr, physicalAddr, p.tlsOptions, p.connectionTimeout, p.auth)
newCnx, wasCached := p.pool.LoadOrStore(logicalAddr.Host, newConnection)
newCnx, wasCached := p.pool.LoadOrStore(key, newConnection)
cnx := newCnx.(*connection)

if !wasCached {
cnx.start()
} else {
Expand All @@ -89,3 +100,12 @@ func (p *connectionPool) Close() {
return true
})
}

func (p *connectionPool) getMapKey(addr *url.URL) string {
cnt := atomic.AddInt32(&p.roundRobinCnt, 1)
if cnt < 0 {
cnt = -cnt
}
idx := cnt % p.maxConnectionsPerHost
return fmt.Sprint(addr.Host, '-', idx)
}

0 comments on commit c979046

Please sign in to comment.