Skip to content

Commit

Permalink
Backport 10181 1.7.x (#11295)
Browse files Browse the repository at this point in the history
* Potential data loss in DynamoDB backend (#10181)

fixes #5836

DynamoDB may when throttled return a 2xx response while not committing
all submitted items to the database.

Depending upon load all actions in a BatchWriteUpdate may be throttled
with ProvisionedThroughputExceededException in which case AWS SDK handles
the retry. If some messages were throttled but not all
ProvisionedThroughputExceededException is not returned to the SDK and it
is up to us to resubmit the request.

Using an exponential backoff as recommended in AWS SDK for times we possibly
get partially throttled repeatedly.

* Add a Changelog entry for 10181

* Fix err shadowing

Co-authored-by: Conrad Lara <[email protected]>
  • Loading branch information
sgmiller and cmlara authored Apr 7, 2021
1 parent b770685 commit 5c13d9e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
3 changes: 3 additions & 0 deletions changelog/10181.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
storage/dynamodb: Handle throttled batch write requests by retrying, without which writes could be lost.
```
40 changes: 34 additions & 6 deletions physical/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dynamodb

import (
"context"
"errors"
"fmt"
"math"
"net/http"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/hashicorp/vault/sdk/helper/awsutil"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/sdk/physical"

"github.com/cenkalti/backoff/v3"
)

const (
Expand Down Expand Up @@ -497,15 +500,40 @@ func (d *DynamoDBBackend) HAEnabled() bool {
func (d *DynamoDBBackend) batchWriteRequests(requests []*dynamodb.WriteRequest) error {
for len(requests) > 0 {
batchSize := int(math.Min(float64(len(requests)), 25))
batch := requests[:batchSize]
batch := map[string][]*dynamodb.WriteRequest{d.table: requests[:batchSize]}
requests = requests[batchSize:]

var err error

d.permitPool.Acquire()
_, err := d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{
d.table: batch,
},
})

boff := backoff.NewExponentialBackOff()
boff.MaxElapsedTime = 600 * time.Second

for len(batch) > 0 {
var output *dynamodb.BatchWriteItemOutput
output, err = d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: batch,
})

if err != nil {
break
}

if len(output.UnprocessedItems) == 0 {
break
} else {
duration := boff.NextBackOff()
if duration != backoff.Stop {
batch = output.UnprocessedItems
time.Sleep(duration)
} else {
err = errors.New("dynamodb: timeout handling UnproccessedItems")
break
}
}
}

d.permitPool.Release()
if err != nil {
return err
Expand Down

0 comments on commit 5c13d9e

Please sign in to comment.