Skip to content

Commit

Permalink
switch internal buffer to private to simplify sender public api
Browse files Browse the repository at this point in the history
  • Loading branch information
sklarsa committed Mar 12, 2024
1 parent 66f8c9b commit 4752c47
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 57 deletions.
42 changes: 21 additions & 21 deletions pkg/http/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var (
// utilize a global transport for connection pooling. A sender
// should not be called concurrently by multiple goroutines.
type LineSender struct {
buffer.Buffer
buf *buffer.Buffer

address string

Expand Down Expand Up @@ -169,7 +169,7 @@ func WithRetryTimeout(t time.Duration) LineSenderOption {
// in bytes to be used when sending ILP messages. Defaults to 0.
func WithInitBufferSize(sizeInBytes int) LineSenderOption {
return func(s *LineSender) {
s.InitBufSizeBytes = sizeInBytes
s.buf.InitBufSizeBytes = sizeInBytes
}
}

Expand Down Expand Up @@ -219,7 +219,7 @@ func NewLineSender(opts ...LineSenderOption) (*LineSender, error) {
retryTimeout: 10 * time.Second,
autoFlushRows: 75000,

Buffer: *buffer.NewBuffer(),
buf: buffer.NewBuffer(),
}

for _, opt := range opts {
Expand Down Expand Up @@ -395,21 +395,21 @@ func (s *LineSender) Flush(ctx context.Context) error {
return errors.New("cannot flush a closed LineSender")
}

err := s.LastErr()
s.ClearLastErr()
err := s.buf.LastErr()
s.buf.ClearLastErr()
if err != nil {
s.DiscardPendingMsg()
s.buf.DiscardPendingMsg()
return err
}
if s.HasTable() {
s.DiscardPendingMsg()
if s.buf.HasTable() {
s.buf.DiscardPendingMsg()
return errors.New("pending ILP message must be finalized with At or AtNow before calling Flush")
}

req, err = http.NewRequest(
http.MethodPost,
s.uri,
s,
s.buf,
)
if err != nil {
return err
Expand Down Expand Up @@ -467,7 +467,7 @@ func (s *LineSender) Flush(ctx context.Context) error {
// '\n', '\r', '?', ',', ”', '"', '\', '/', ':', ')', '(', '+', '*',
// '%', '~', starting '.', trailing '.', or a non-printable char.
func (s *LineSender) Table(name string) *LineSender {
s.Buffer.Table(name)
s.buf.Table(name)
return s
}

Expand All @@ -478,7 +478,7 @@ func (s *LineSender) Table(name string) *LineSender {
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) Symbol(name, val string) *LineSender {
s.Buffer.Symbol(name, val)
s.buf.Symbol(name, val)
return s
}

Expand All @@ -489,7 +489,7 @@ func (s *LineSender) Symbol(name, val string) *LineSender {
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) Int64Column(name string, val int64) *LineSender {
s.Buffer.Int64Column(name, val)
s.buf.Int64Column(name, val)
return s
}

Expand All @@ -503,7 +503,7 @@ func (s *LineSender) Int64Column(name string, val int64) *LineSender {
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) Long256Column(name string, val *big.Int) *LineSender {
s.Buffer.Long256Column(name, val)
s.buf.Long256Column(name, val)
return s
}

Expand All @@ -514,7 +514,7 @@ func (s *LineSender) Long256Column(name string, val *big.Int) *LineSender {
// '\n', '\r', '?', '.', ',', ”', '"', '\\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) TimestampColumn(name string, ts time.Time) *LineSender {
s.Buffer.TimestampColumn(name, ts)
s.buf.TimestampColumn(name, ts)
return s
}

Expand All @@ -525,7 +525,7 @@ func (s *LineSender) TimestampColumn(name string, ts time.Time) *LineSender {
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) Float64Column(name string, val float64) *LineSender {
s.Buffer.Float64Column(name, val)
s.buf.Float64Column(name, val)
return s
}

Expand All @@ -535,7 +535,7 @@ func (s *LineSender) Float64Column(name string, val float64) *LineSender {
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) StringColumn(name, val string) *LineSender {
s.Buffer.StringColumn(name, val)
s.buf.StringColumn(name, val)
return s
}

Expand All @@ -545,7 +545,7 @@ func (s *LineSender) StringColumn(name, val string) *LineSender {
// '\n', '\r', '?', '.', ',', ”', '"', '\', '/', ':', ')', '(', '+',
// '-', '*' '%%', '~', or a non-printable char.
func (s *LineSender) BoolColumn(name string, val bool) *LineSender {
s.Buffer.BoolColumn(name, val)
s.buf.BoolColumn(name, val)
return s
}

Expand All @@ -559,7 +559,7 @@ func (s *LineSender) Close(ctx context.Context) error {
return nil
}

if s.autoFlushRows > 0 && s.Buffer.Len() > 0 {
if s.autoFlushRows > 0 && s.buf.Len() > 0 {
err := s.Flush(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -604,12 +604,12 @@ func (s *LineSender) At(ctx context.Context, ts time.Time) error {
if ts.IsZero() {
sendTs = false
}
err := s.Buffer.At(ts, sendTs)
err := s.buf.At(ts, sendTs)
if err != nil {
return err
}

if s.MsgCount() == s.autoFlushRows {
if s.buf.MsgCount() == s.autoFlushRows {
return s.Flush(ctx)
}

Expand All @@ -620,7 +620,7 @@ func (s *LineSender) At(ctx context.Context, ts time.Time) error {
func (s *LineSender) makeRequest(ctx context.Context, req *http.Request) (bool, error) {
// reqTimeout = ( request.len() / min_throughput ) + request_timeout
// nb: conversion from int to time.Duration is in milliseconds
reqTimeout := time.Duration(s.Len()/s.minThroughputBytesPerSecond)*time.Second + s.requestTimeout
reqTimeout := time.Duration(s.buf.Len()/s.minThroughputBytesPerSecond)*time.Second + s.requestTimeout
reqCtx, cancel := context.WithTimeout(ctx, reqTimeout)
defer cancel()

Expand Down
18 changes: 9 additions & 9 deletions pkg/http/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestErrorOnFlushWhenMessageIsPending(t *testing.T) {
err = sender.Flush(ctx)

assert.ErrorContains(t, err, "pending ILP message must be finalized with At or AtNow before calling Flush")
assert.Empty(t, sender.Messages())
assert.Empty(t, sender.buf.Messages())
}

func TestErrorOnContextDeadlineHttp(t *testing.T) {
Expand Down Expand Up @@ -291,13 +291,13 @@ func TestAutoFlush(t *testing.T) {
assert.NoError(t, err)
}

assert.Equal(t, autoFlushRows-1, sender.MsgCount())
assert.Equal(t, autoFlushRows-1, sender.buf.MsgCount())

// Send one additional message and ensure that all are flushed
err = sender.Table(testTable).StringColumn("bar", "baz").AtNow(ctx)
assert.NoError(t, err)

assert.Equal(t, 0, sender.MsgCount())
assert.Equal(t, 0, sender.buf.MsgCount())
}

func TestAutoFlushDisabled(t *testing.T) {
Expand All @@ -324,7 +324,7 @@ func TestAutoFlushDisabled(t *testing.T) {
assert.NoError(t, err)
}

assert.Equal(t, autoFlushRows+1, sender.MsgCount())
assert.Equal(t, autoFlushRows+1, sender.buf.MsgCount())
}

func TestSenderDoubleClose(t *testing.T) {
Expand Down Expand Up @@ -381,11 +381,11 @@ func TestAutoFlushWhenSenderIsClosed(t *testing.T) {

err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx)
assert.NoError(t, err)
assert.NotEmpty(t, sender.Messages())
assert.NotEmpty(t, sender.buf.Messages())

err = sender.Close(ctx)
assert.NoError(t, err)
assert.Empty(t, sender.Messages())
assert.Empty(t, sender.buf.Messages())
}

func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {
Expand All @@ -403,11 +403,11 @@ func TestNoFlushWhenSenderIsClosedAndAutoFlushIsDisabled(t *testing.T) {

err = sender.Table(testTable).Symbol("abc", "def").AtNow(ctx)
assert.NoError(t, err)
assert.NotEmpty(t, sender.Messages())
assert.NotEmpty(t, sender.buf.Messages())

err = sender.Close(ctx)
assert.NoError(t, err)
assert.NotEmpty(t, sender.Messages())
assert.NotEmpty(t, sender.buf.Messages())
}

func TestBufferClearAfterFlush(t *testing.T) {
Expand All @@ -428,7 +428,7 @@ func TestBufferClearAfterFlush(t *testing.T) {
assert.NoError(t, err)

utils.ExpectLines(t, srv.BackCh, []string{fmt.Sprintf("%s,abc=def", testTable)})
assert.Zero(t, sender.Buffer.Len())
assert.Zero(t, sender.buf.Len())

err = sender.Table(testTable).Symbol("ghi", "jkl").AtNow(ctx)
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit 4752c47

Please sign in to comment.