Skip to content

Commit

Permalink
Simplify global transport
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Mar 18, 2024
1 parent 5396fd9 commit 7a211ba
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 40 deletions.
4 changes: 1 addition & 3 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

package questdb

import "sync/atomic"

type (
Buffer = buffer
ConfigData = configData
Expand All @@ -34,7 +32,7 @@ type (
)

var (
ClientCt *atomic.Int64 = &clientCt
GlobalTransport = globalTransport
)

func NewBuffer(initBufSize int, fileNameLimit int) Buffer {
Expand Down
83 changes: 52 additions & 31 deletions http_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,37 @@ import (
"time"
)

type globalHttpTransport struct {
transport *http.Transport
// clientCt is used to track the number of open httpLineSenders
// If the clientCt reaches 0, meaning all senders have been
// closed, the global transport closes all idle connections to
// free up resources
clientCt atomic.Int64
}

func (t *globalHttpTransport) ClientCount() int64 {
return t.clientCt.Load()
}

func (t *globalHttpTransport) RegisterClient() {
t.clientCt.Add(1)
}

func (t *globalHttpTransport) UnregisterClient() {
newCt := t.clientCt.Add(-1)
if newCt == 0 {
t.transport.CloseIdleConnections()
}
}

var (
// We use a shared http transport to pool connections
// across HttpLineSenders
// TODO(puzpuzpuz): we can't use a single global transport for both plain text and TLS
globalTransport *http.Transport = newHttpTransport(false)

// clientCt is used to track the number of open HttpLineSenders
// If the clientCt reaches 0, meaning all HttpLineSenders have been
// closed, the globalTransport closes all idle connections to
// free up resources
clientCt atomic.Int64
globalTransport *globalHttpTransport = &globalHttpTransport{transport: newHttpTransport()}
)

func newHttpTransport(disableKeepAlives bool) *http.Transport {
func newHttpTransport() *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
MaxConnsPerHost: 0,
Expand All @@ -60,7 +77,6 @@ func newHttpTransport(disableKeepAlives bool) *http.Transport {
IdleConnTimeout: 120 * time.Second,
TLSHandshakeTimeout: defaultRequestTimeout,
TLSClientConfig: &tls.Config{},
DisableKeepAlives: disableKeepAlives,
}
}

Expand Down Expand Up @@ -91,21 +107,24 @@ type httpLineSender struct {
token string
tlsMode tlsMode

client http.Client
uri string
closed bool
transport *http.Transport
client http.Client
uri string
closed bool

// Global transport is used unless a custom transport was provided.
globalTransport *globalHttpTransport
}

func newHttpLineSender(conf *lineSenderConfig) (*httpLineSender, error) {
var transport *http.Transport

s := &httpLineSender{
address: conf.address,
minThroughputBytesPerSecond: conf.minThroughput,
requestTimeout: conf.requestTimeout,
retryTimeout: conf.retryTimeout,
autoFlushRows: conf.autoFlushRows,
autoFlushInterval: conf.autoFlushInterval,
transport: conf.httpTransport,
tlsMode: conf.tlsMode,
user: conf.httpUser,
pass: conf.httpPass,
Expand All @@ -114,23 +133,28 @@ func newHttpLineSender(conf *lineSenderConfig) (*httpLineSender, error) {
buf: newBuffer(conf.initBufferSize, conf.fileNameLimit),
}

if s.transport == nil {
s.transport = globalTransport
}

if s.tlsMode == tlsInsecureSkipVerify {
s.transport = newHttpTransport(true) // no keep alive
s.transport.TLSClientConfig = &tls.Config{}
s.transport.TLSClientConfig.InsecureSkipVerify = true
if conf.httpTransport != nil {
// Use custom transport.
transport = conf.httpTransport
} else if s.tlsMode == tlsInsecureSkipVerify {
// We can't use the global transport in case of skipped TLS verification.
// Instead, create a single-time transport with disabled keep-alives.
transport = newHttpTransport()
transport.DisableKeepAlives = true
transport.TLSClientConfig.InsecureSkipVerify = true
} else {
// Otherwise, use the global transport.
s.globalTransport = globalTransport
transport = globalTransport.transport
}

s.client = http.Client{
Transport: s.transport,
Transport: transport,
Timeout: 0,
}

if s.transport == globalTransport {
clientCt.Add(1)
if s.globalTransport != nil {
s.globalTransport.RegisterClient()
}

s.uri = "http"
Expand Down Expand Up @@ -290,11 +314,8 @@ func (s *httpLineSender) Close(ctx context.Context) error {

s.closed = true

if s.transport == globalTransport {
newCt := clientCt.Add(-1)
if newCt == 0 {
globalTransport.CloseIdleConnections()
}
if s.globalTransport != nil {
s.globalTransport.UnregisterClient()
}

return err
Expand Down
4 changes: 2 additions & 2 deletions http_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestCustomTransportAndTlsInit(t *testing.T) {
// s1 and s2 have successfully instantiated a sender
// using the global transport and should be registered in the
// global transport client count
assert.Equal(t, int64(2), qdb.ClientCt.Load())
assert.Equal(t, int64(2), qdb.GlobalTransport.ClientCount())

// Closing the client with the custom transport should not impact
// the global transport client count
Expand All @@ -481,7 +481,7 @@ func TestCustomTransportAndTlsInit(t *testing.T) {
s1.Close(ctx)
s2.Close(ctx)
s3.Close(ctx)
assert.Equal(t, int64(0), qdb.ClientCt.Load())
assert.Equal(t, int64(0), qdb.GlobalTransport.ClientCount())
}

func BenchmarkHttpLineSenderBatch1000(b *testing.B) {
Expand Down
File renamed without changes.
5 changes: 1 addition & 4 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,6 @@ func WithAddress(addr string) LineSenderOption {
// but skips server certificate verification. Useful in test
// environments with self-signed certificates. Do not use in
// production environments.
//
// Only available for the TCP sender.
//
// For the HTTP sender, use WithHttpTransport.
func WithTlsInsecureSkipVerify() LineSenderOption {
return func(s *lineSenderConfig) {
s.tlsMode = tlsInsecureSkipVerify
Expand All @@ -349,6 +345,7 @@ func WithTlsInsecureSkipVerify() LineSenderOption {
// WithHttpTransport sets the client's http transport to the
// passed pointer instead of the global transport. This can be
// used for customizing the http transport used by the HttpLineSender.
// WithTlsInsecureSkipVerify is ignored when this option is in use.
//
// Only available for the HTTP sender.
func WithHttpTransport(t *http.Transport) LineSenderOption {
Expand Down

0 comments on commit 7a211ba

Please sign in to comment.