Skip to content

Commit

Permalink
Add a everlasting binomially increasing retry.
Browse files Browse the repository at this point in the history
This fixes an issue in an application `mc` when
during `mc mirror -w` it is expected that the underlying
call for ListenBucketNotification should run continously.
In case if server crashes, client should retry and wait
for the server.

Fixes minio/mc#1889
  • Loading branch information
harshavardhana committed Nov 20, 2016
1 parent 675b291 commit c692048
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 14 deletions.
25 changes: 11 additions & 14 deletions api-notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"net/http"
"net/url"
"time"
)

// GetBucketNotification - get bucket notification at a given path.
Expand Down Expand Up @@ -143,7 +144,14 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even
}

// Continously run and listen on bucket notification.
for {
// Create a done channel to control 'ListObjects' go routine.
doneCh := make(chan struct{}, 1)

// Indicate to our routine to exit cleanly upon return.
defer close(doneCh)

// Wait on the jitter retry loop.
for range c.newRetryTimerContinous(time.Second, time.Second*30, MaxJitter, doneCh) {
urlValues := make(url.Values)
urlValues.Set("prefix", prefix)
urlValues.Set("suffix", suffix)
Expand All @@ -155,10 +163,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even
queryValues: urlValues,
})
if err != nil {
notificationInfoCh <- NotificationInfo{
Err: err,
}
return
continue
}

// Validate http response, upon error return quickly.
Expand All @@ -180,10 +185,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even
for bio.Scan() {
var notificationInfo NotificationInfo
if err = json.Unmarshal(bio.Bytes(), &notificationInfo); err != nil {
notificationInfoCh <- NotificationInfo{
Err: err,
}
return
continue
}
// Send notifications on channel only if there are events received.
if len(notificationInfo.Records) > 0 {
Expand All @@ -200,12 +202,7 @@ func (c Client) ListenBucketNotification(bucketName, prefix, suffix string, even
// and re-connect.
if err == io.ErrUnexpectedEOF {
resp.Body.Close()
continue
}
notificationInfoCh <- NotificationInfo{
Err: err,
}
return
}
}
}(notificationInfoCh)
Expand Down
52 changes: 52 additions & 0 deletions retry-continous.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package minio

import "time"

// newRetryTimerContinous creates a timer with exponentially increasing delays forever.
func (c Client) newRetryTimerContinous(unit time.Duration, cap time.Duration, jitter float64, doneCh chan struct{}) <-chan int {
attemptCh := make(chan int)

// normalize jitter to the range [0, 1.0]
if jitter < NoJitter {
jitter = NoJitter
}
if jitter > MaxJitter {
jitter = MaxJitter
}

// computes the exponential backoff duration according to
// https://www.awsarchitectureblog.com/2015/03/backoff.html
exponentialBackoffWait := func(attempt int) time.Duration {
// 1<<uint(attempt) below could overflow, so limit the value of attempt
maxAttempt := 30
if attempt > maxAttempt {
attempt = maxAttempt
}
//sleep = random_between(0, min(cap, base * 2 ** attempt))
sleep := unit * time.Duration(1<<uint(attempt))
if sleep > cap {
sleep = cap
}
if jitter != NoJitter {
sleep -= time.Duration(c.random.Float64() * float64(sleep) * jitter)
}
return sleep
}

go func() {
defer close(attemptCh)
var nextBackoff int
for {
select {
// Attempts starts.
case attemptCh <- nextBackoff:
nextBackoff++
case <-doneCh:
// Stop the routine.
return
}
time.Sleep(exponentialBackoffWait(nextBackoff))
}
}()
return attemptCh
}

0 comments on commit c692048

Please sign in to comment.