Skip to content

Commit

Permalink
feat: adding WriteFailedCallback to let user control of failed writes
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Aug 18, 2021
1 parent 15fbdc5 commit abbb220
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
### Features
- [#264](https://github.com/influxdata/influxdb-client-go/pull/264) Synced generated server API with the latest [oss.yml](https://github.com/influxdata/openapi/blob/master/contracts/oss.yml).
- [#271](https://github.com/influxdata/influxdb-client-go/pull/271) Use exponential _random_ retry strategy
- [#273](https://github.com/influxdata/influxdb-client-go/pull/273) Added `WriteFailedCallback` for `WriteAPI` allowing to be _synchronously_ notified about failed writes and decide on further batch processing.

### Bug fixes
- [#270](https://github.com/influxdata/influxdb-client-go/pull/270) Fixed duplicate `Content-Type` header in requests to managemet API
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,33 @@ func main() {
client.Close()
}
```
### Handling of failed async writes
WriteAPI by default continues with retrying of failed writes.
Retried are automatically writes that fail on a connection failure or when server returns response HTTP status code >= 429.
Retrying algorithm uses random exponential strategy to set retry time.
The delay for the next retry attempt is a random value in the interval _retryInterval * exponentialBase^(attempts)_ and _retryInterval * exponentialBase^(attempts+1)_.
If writes of batch repeatedly fails, WriteAPI continues with retrying until _maxRetries_ is reached or the overall retry time of batch exceeds _maxRetryTime_.
The defaults parameters (part of the WriteOptions) are:
- _retryInterval_=5,000ms
- _exponentialBase_=2
- _maxRetryDelay_=125,000ms
- _maxRetries_=5
- _maxRetryTime_=180,000ms
Retry delays are by default randomly distributed within the ranges:
1. 5,000-10,000
1. 10,000-20,000
1. 20,000-40,000
1. 40,000-80,000
1. 80,000-125,000
Setting _retryInterval_ to 0 disables retry strategy and any failed write will discard the batch.
[WriteFailedCallback](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2/api#WriteFailedCallback) allows advanced controlling of retrying.
It is synchronously notified in case async write fails.
It controls further batch handling by its return value. If it returns `true`, WriteAPI continues with retrying of writes of this batch. Returned `false` means the batch should be discarded.
### Reading async errors
[Errors()](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2/api#WriteAPI.Errors) method returns a channel for reading errors which occurs during async writes. This channel is unbuffered and it
Expand Down
17 changes: 17 additions & 0 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
)

// WriteFailedCallback is synchronously notified in case non-blocking write fails.
// batch contains complete payload, error holds detailed error information,
// retryAttempts means number of retries, 0 if it failed during first write.
// It must return true if WriteAPI should continue with retrying, false will discard the batch.
type WriteFailedCallback func(batch string, error http2.Error, retryAttempts uint) bool

// WriteAPI is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server.
// WriteAPI can be used concurrently.
// When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.
Expand All @@ -33,6 +39,9 @@ type WriteAPI interface {
// Must be called before performing any writes for errors to be collected.
// The chan is unbuffered and must be drained or the writer will block.
Errors() <-chan error
// SetWriteFailedCallback sets callback allowing custom handling of failed writes.
// If callback returns true, failed batch will be retried, otherwise discarded.
SetWriteFailedCallback(cb WriteFailedCallback)
}

// WriteAPIImpl provides main implementation for WriteAPI
Expand Down Expand Up @@ -78,6 +87,14 @@ func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions
return w
}

// SetWriteFailedCallback sets callback allowing custom handling of failed writes.
// If callback returns true, failed batch will be retried, otherwise discarded.
func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback) {
w.service.SetBatchErrorCallback(func(batch *iwrite.Batch, error2 http2.Error) bool {
return cb(batch.Batch, error2, batch.RetryAttempts)
})
}

// Errors returns a channel for reading errors which occurs during async writes.
// Must be called before performing any writes for errors to be collected.
// The chan is unbuffered and must be drained or the writer will block.
Expand Down
51 changes: 45 additions & 6 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api

import (
"fmt"
"math"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -83,23 +84,23 @@ func TestGzipWithFlushing(t *testing.T) {
}
func TestFlushInterval(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(500))
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(10))
points := test.GenPoints(5)
for _, p := range points {
writeAPI.WritePoint(p)
}
require.Len(t, service.Lines(), 0)
<-time.After(time.Millisecond * 600)
<-time.After(time.Millisecond * 15)
require.Len(t, service.Lines(), 5)
writeAPI.Close()

service.Close()
writeAPI = NewWriteAPI("my-org", "my-bucket", service, writeAPI.writeOptions.SetFlushInterval(2000))
writeAPI = NewWriteAPI("my-org", "my-bucket", service, writeAPI.writeOptions.SetFlushInterval(50))
for _, p := range points {
writeAPI.WritePoint(p)
}
require.Len(t, service.Lines(), 0)
<-time.After(time.Millisecond * 2100)
<-time.After(time.Millisecond * 60)
require.Len(t, service.Lines(), 5)

writeAPI.Close()
Expand All @@ -118,7 +119,7 @@ func TestRetry(t *testing.T) {
service.Close()
service.SetReplyError(&http.Error{
StatusCode: 429,
RetryAfter: 5,
RetryAfter: 1,
})
for i := 0; i < 5; i++ {
writeAPI.WritePoint(points[i])
Expand All @@ -131,7 +132,7 @@ func TestRetry(t *testing.T) {
}
writeAPI.waitForFlushing()
require.Len(t, service.Lines(), 0)
<-time.After(5*time.Second + 50*time.Millisecond)
<-time.After(time.Second + 50*time.Millisecond)
for i := 10; i < 15; i++ {
writeAPI.WritePoint(points[i])
}
Expand Down Expand Up @@ -168,3 +169,41 @@ func TestWriteError(t *testing.T) {
require.NotNil(t, recErr)
writeAPI.Close()
}

func TestWriteErrorCallback(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetLogLevel(log.DebugLevel)
service.SetReplyError(&http.Error{
StatusCode: 429,
Code: "write",
Message: "error",
})
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(1).SetRetryInterval(1))
writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool {
return retryAttempts < 2
})
points := test.GenPoints(10)
// first two batches will be discarded by callback after 3 write attempts for each
for i, j := 0, 0; i < 6; i++ {
writeAPI.WritePoint(points[i])
writeAPI.waitForFlushing()
w := int(math.Pow(5, float64(j)))
fmt.Printf("Waiting %dms\n", w)
<-time.After(time.Duration(w) * time.Millisecond)
j++
if j == 3 {
j = 0
}
}
service.SetReplyError(nil)
writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool {
return true
})
for i := 6; i < 10; i++ {
writeAPI.WritePoint(points[i])
}
writeAPI.waitForFlushing()
assert.Len(t, service.Lines(), 8)

writeAPI.Close()
}
7 changes: 5 additions & 2 deletions internal/write/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ func (q *queue) pop() *Batch {
if el != nil {
q.list.Remove(el)
batch := el.Value.(*Batch)
batch.evicted = true
batch.Evicted = true
return batch
}
return nil
}

func (q *queue) first() *Batch {
el := q.list.Front()
return el.Value.(*Batch)
if el != nil {
return el.Value.(*Batch)
}
return nil
}

func (q *queue) isEmpty() bool {
Expand Down
4 changes: 3 additions & 1 deletion internal/write/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
func TestQueue(t *testing.T) {
que := newQueue(2)
assert.True(t, que.isEmpty())
b := &Batch{batch: "batch", retryDelay: 3, retryAttempts: 3}
assert.Nil(t, que.first())
assert.Nil(t, que.pop())
b := &Batch{Batch: "batch", RetryDelay: 3, RetryAttempts: 3}
que.push(b)
assert.False(t, que.isEmpty())
b2 := que.pop()
Expand Down
91 changes: 60 additions & 31 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,42 @@ import (
// Batch holds information for sending points batch
type Batch struct {
// lines to send
batch string
Batch string
// current retry delay
retryDelay uint
RetryDelay uint
// retry attempts so far
retryAttempts uint
RetryAttempts uint
// true if it was removed from queue
evicted bool
Evicted bool
// time where this batch expires
expires time.Time
Expires time.Time
}

// NewBatch creates new batch
func NewBatch(data string, retryDelay uint, expireDelayMs uint) *Batch {
return &Batch{
batch: data,
retryDelay: retryDelay,
expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
Batch: data,
RetryDelay: retryDelay,
Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
}
}

// BatchErrorCallback is synchronously notified in case non-blocking write fails.
// It returns true if WriteAPI should continue with retrying, false will discard the batch.
type BatchErrorCallback func(batch *Batch, error2 http2.Error) bool

// Service is responsible for reliable writing of batches
type Service struct {
org string
bucket string
httpService http2.Service
url string
lastWriteAttempt time.Time
retryQueue *queue
lock sync.Mutex
writeOptions *write.Options
org string
bucket string
httpService http2.Service
url string
lastWriteAttempt time.Time
retryQueue *queue
lock sync.Mutex
writeOptions *write.Options
retryExponentialBase uint
errorCb BatchErrorCallback
}

// NewService creates new write service
Expand All @@ -76,7 +82,21 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
params.Set("precision", precisionToString(options.Precision()))
u.RawQuery = params.Encode()
writeURL := u.String()
return &Service{org: org, bucket: bucket, httpService: httpService, url: writeURL, writeOptions: options, retryQueue: newQueue(int(retryBufferLimit))}
return &Service{
org: org,
bucket: bucket,
httpService: httpService,
url: writeURL,
writeOptions: options,
retryQueue: newQueue(int(retryBufferLimit)),
retryExponentialBase: 2,
}
}

// SetBatchErrorCallback sets callback allowing custom handling of failed writes.
// If callback returns true, failed batch will be retried, otherwise discarded.
func (w *Service) SetBatchErrorCallback(cb BatchErrorCallback) {
w.errorCb = cb
}

// HandleWrite handles writes of batches and handles retrying.
Expand Down Expand Up @@ -105,7 +125,7 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
if !retrying {
b := w.retryQueue.first()
// Can we write? In case of retryable error we must wait a bit
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.retryDelay))) {
if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(b.RetryDelay))) {
retrying = true
} else {
log.Warn("Write proc: cannot write yet, storing batch to queue")
Expand All @@ -117,7 +137,6 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
if retrying {
batchToWrite = w.retryQueue.first()
batchToWrite.retryAttempts++
if batch != nil { //store actual batch to retry queue
if w.retryQueue.push(batch) {
log.Warn("Write proc: Retry buffer full, discarding oldest batch")
Expand All @@ -128,37 +147,47 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
}
// write batch
if batchToWrite != nil {
if time.Now().After(batchToWrite.expires) {
if !batchToWrite.evicted {
if time.Now().After(batchToWrite.Expires) {
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
return fmt.Errorf("write failed (attempts %d): max retry time exceeded", batchToWrite.retryAttempts)
return fmt.Errorf("write failed (attempts %d): max retry time exceeded", batchToWrite.RetryAttempts)
}
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
batchToWrite.retryDelay = perror.RetryAfter * 1000
batchToWrite.RetryDelay = perror.RetryAfter * 1000
} else {
batchToWrite.retryDelay = w.computeRetryDelay(batchToWrite.retryAttempts)
batchToWrite.RetryDelay = w.computeRetryDelay(batchToWrite.RetryAttempts)
}
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Warn("Callback rejected batch, discarding")
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
return perror
}
if batchToWrite.retryAttempts == 0 {
// store new batch (not taken from queue)
if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() {
if w.retryQueue.push(batch) {
log.Warn("Retry buffer full, discarding oldest batch")
}
} else if batchToWrite.retryAttempts == w.writeOptions.MaxRetries() {
} else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() {
log.Warn("Reached maximum number of retries, discarding batch")
if !batchToWrite.evicted {
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
}
batchToWrite.RetryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", batchToWrite.RetryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
}
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.retryAttempts, perror)
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}
if retrying && !batchToWrite.evicted {
if retrying && !batchToWrite.Evicted {
w.retryQueue.pop()
}
batchToWrite = nil
Expand Down Expand Up @@ -198,10 +227,10 @@ func pow(x, y uint) uint {
func (w *Service) WriteBatch(ctx context.Context, batch *Batch) *http2.Error {
var body io.Reader
var err error
body = strings.NewReader(batch.batch)
body = strings.NewReader(batch.Batch)

if log.Level() >= ilog.DebugLevel {
log.Debugf("Writing batch: %s", batch.batch)
log.Debugf("Writing batch: %s", batch.Batch)
}
if w.writeOptions.UseGZip() {
body, err = gzip.CompressWithGzip(body)
Expand Down
Loading

0 comments on commit abbb220

Please sign in to comment.