Skip to content

Commit

Permalink
Remove auto flush from TCP client
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Mar 15, 2024
1 parent 00949da commit 8cbadb2
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 113 deletions.
1 change: 0 additions & 1 deletion alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ var (
WithAuth = tcp.WithAuth
WithBufferCapacity = tcp.WithBufferCapacity
WithFileNameLimit = tcp.WithFileNameLimit
WithInitBufferSize = tcp.WithInitBufferSize
WithTls = tcp.WithTls
WithTlsInsecureSkipVerify = tcp.WithTlsInsecureSkipVerify
)
Expand Down
5 changes: 2 additions & 3 deletions pkg/buffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ const (
type Buffer struct {
bytes.Buffer

BufCap int
InitBufSizeBytes int
FileNameLimit int
BufCap int
FileNameLimit int

lastMsgPos int
lastErr error
Expand Down
7 changes: 5 additions & 2 deletions pkg/http/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package http

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
Expand Down Expand Up @@ -164,10 +165,10 @@ func WithRetryTimeout(t time.Duration) LineSenderOption {
}

// WithInitBuffer size sets the desired initial buffer capacity
// in bytes to be used when sending ILP messages. Defaults to 0.
// in bytes to be used when sending ILP messages. Defaults to 128KB.
func WithInitBufferSize(sizeInBytes int) LineSenderOption {
return func(s *LineSender) {
s.buf.InitBufSizeBytes = sizeInBytes
s.buf.BufCap = sizeInBytes
}
}

Expand Down Expand Up @@ -286,6 +287,8 @@ func NewLineSender(opts ...LineSenderOption) (*LineSender, error) {
}
s.uri += fmt.Sprintf("://%s/write", s.address)

s.buf.Buffer = *bytes.NewBuffer(make([]byte, 0, s.buf.BufCap))

return s, nil
}

Expand Down
45 changes: 4 additions & 41 deletions pkg/tcp/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ type LineSender struct {
keyId string // Erased once auth is done.
key string // Erased once auth is done.

autoFlushRows int

conn net.Conn
}

Expand Down Expand Up @@ -136,32 +134,6 @@ func WithFileNameLimit(limit int) LineSenderOption {
}
}

// WithInitBuffer size sets the desired initial buffer capacity
// in bytes to be used when sending ILP messages. Defaults to 0.
func WithInitBufferSize(sizeInBytes int) LineSenderOption {
return func(s *LineSender) {
s.buf.InitBufSizeBytes = sizeInBytes
}
}

// WithAutoFlushDisabled turns off auto-flushing behavior.
// To send ILP messages, the user must either call Flush() or
// wait until the buffer capacity is exceeded (in bytes).
func WithAutoFlushDisabled() LineSenderOption {
return func(s *LineSender) {
s.autoFlushRows = 0
}
}

// WithAutoFlushRows sets the number of buffered rows that
// must be breached in order to trigger an auto-flush.
// Defaults to 600.
func WithAutoFlushRows(rows int) LineSenderOption {
return func(s *LineSender) {
s.autoFlushRows = rows
}
}

// NewLineSender creates new InfluxDB Line Protocol (ILP) sender. Each
// sender corresponds to a single TCP connection. Sender should
// not be called concurrently by multiple goroutines.
Expand All @@ -177,8 +149,7 @@ func NewLineSender(ctx context.Context, opts ...LineSenderOption) (*LineSender,
address: "127.0.0.1:9009",
tlsMode: tlsDisabled,

buf: buffer.NewBuffer(),
autoFlushRows: 600,
buf: buffer.NewBuffer(),
}

for _, opt := range opts {
Expand Down Expand Up @@ -263,8 +234,7 @@ func NewLineSender(ctx context.Context, opts ...LineSenderOption) (*LineSender,
}

s.conn = conn

s.buf.Buffer = *bytes.NewBuffer(make([]byte, s.buf.InitBufSizeBytes, s.buf.BufCap))
s.buf.Buffer = *bytes.NewBuffer(make([]byte, 0, s.buf.BufCap))

return s, nil

Expand Down Expand Up @@ -311,8 +281,6 @@ func optsFromConf(config string) ([]LineSenderOption, error) {

}
switch k {
case "init_buf_size":
opts = append(opts, WithInitBufferSize(parsedVal))
case "max_buf_size":
opts = append(opts, WithBufferCapacity(parsedVal))
default:
Expand Down Expand Up @@ -508,9 +476,8 @@ func (s *LineSender) Flush(ctx context.Context) error {
// bytes.Buffer grows as 2*cap+n, so we use 3x as the threshold.
if s.buf.Cap() > 3*s.buf.BufCap {
// Shrink the buffer back to desired capacity.
s.buf.Buffer = *bytes.NewBuffer(make([]byte, s.buf.InitBufSizeBytes, s.buf.BufCap))
s.buf.Buffer = *bytes.NewBuffer(make([]byte, 0, s.buf.BufCap))
}

return nil
}

Expand Down Expand Up @@ -538,6 +505,7 @@ func (s *LineSender) At(ctx context.Context, ts time.Time) error {
if ts.IsZero() {
sendTs = false
}

err := s.buf.At(ts, sendTs)
if err != nil {
return err
Expand All @@ -546,10 +514,5 @@ func (s *LineSender) At(ctx context.Context, ts time.Time) error {
if s.buf.Len() > s.buf.BufCap {
return s.Flush(ctx)
}

if s.autoFlushRows > 0 && s.buf.MsgCount() == s.autoFlushRows {
return s.Flush(ctx)
}

return nil
}
71 changes: 5 additions & 66 deletions pkg/tcp/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ type configTestCase struct {
func TestHappyCasesFromConf(t *testing.T) {

var (
user = "test-user"
token = "test-token"
initBufSize = 999
maxBufSize = 1000
user = "test-user"
token = "test-token"
maxBufSize = 1000
)

testServer, err := utils.NewTestTcpServer(utils.ReadAndDiscard)
Expand All @@ -74,11 +73,10 @@ func TestHappyCasesFromConf(t *testing.T) {
},
{
name: "init_buf_size and max_buf_size",
config: fmt.Sprintf("tcp::addr=%s;init_buf_size=%d;max_buf_size=%d",
addr, initBufSize, maxBufSize),
config: fmt.Sprintf("tcp::addr=%s;max_buf_size=%d",
addr, maxBufSize),
expectedOpts: []LineSenderOption{
WithAddress(addr),
WithInitBufferSize(initBufSize),
WithBufferCapacity(maxBufSize),
},
},
Expand Down Expand Up @@ -234,65 +232,6 @@ func TestErrorOnContextDeadline(t *testing.T) {
t.Fail()
}

func TestAutoFlush(t *testing.T) {
ctx := context.Background()
autoFlushRows := 10

srv, err := utils.NewTestTcpServer(utils.ReadAndDiscard)
assert.NoError(t, err)
defer srv.Close()

sender, err := NewLineSender(
ctx,
WithAddress(srv.Addr()),
WithAutoFlushRows(autoFlushRows),
)
assert.NoError(t, err)
defer sender.Close()

// Send autoFlushRows - 1 messages and ensure all are buffered
for i := 0; i < autoFlushRows-1; i++ {
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)
}

assert.Equal(t, autoFlushRows-1, sender.buf.MsgCount())

// Send one additional message and ensure that all are flushed
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)

assert.Equal(t, 0, sender.buf.MsgCount())
}

func TestAutoFlushDisabled(t *testing.T) {
ctx := context.Background()
autoFlushRows := 10

srv, err := utils.NewTestTcpServer(utils.ReadAndDiscard)
assert.NoError(t, err)
defer srv.Close()

// opts are processed sequentially, so AutoFlushDisabled will
// override AutoFlushRows
sender, err := NewLineSender(
ctx,
WithAddress(srv.Addr()),
WithAutoFlushRows(autoFlushRows),
WithAutoFlushDisabled(),
)
assert.NoError(t, err)
defer sender.Close()

// Send autoFlushRows + 1 messages and ensure all are buffered
for i := 0; i < autoFlushRows+1; i++ {
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)
}

assert.Equal(t, autoFlushRows+1, sender.buf.MsgCount())
}

func BenchmarkLineSenderBatch1000(b *testing.B) {
ctx := context.Background()

Expand Down

0 comments on commit 8cbadb2

Please sign in to comment.