diff --git a/changelog/10181.txt b/changelog/10181.txt new file mode 100644 index 000000000000..ccff6b8d4e3e --- /dev/null +++ b/changelog/10181.txt @@ -0,0 +1,3 @@ +```release-note:bug +storage/dynamodb: Handle throttled batch write requests by retrying, without which writes could be lost. +``` diff --git a/physical/dynamodb/dynamodb.go b/physical/dynamodb/dynamodb.go index d24920d0b404..ff5f7a2bcf01 100644 --- a/physical/dynamodb/dynamodb.go +++ b/physical/dynamodb/dynamodb.go @@ -2,6 +2,7 @@ package dynamodb import ( "context" + "errors" "fmt" "math" "net/http" @@ -27,6 +28,8 @@ import ( "github.com/hashicorp/vault/sdk/helper/awsutil" "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/physical" + + "github.com/cenkalti/backoff/v3" ) const ( @@ -497,15 +500,40 @@ func (d *DynamoDBBackend) HAEnabled() bool { func (d *DynamoDBBackend) batchWriteRequests(requests []*dynamodb.WriteRequest) error { for len(requests) > 0 { batchSize := int(math.Min(float64(len(requests)), 25)) - batch := requests[:batchSize] + batch := map[string][]*dynamodb.WriteRequest{d.table: requests[:batchSize]} requests = requests[batchSize:] + var err error + d.permitPool.Acquire() - _, err := d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{ - RequestItems: map[string][]*dynamodb.WriteRequest{ - d.table: batch, - }, - }) + + boff := backoff.NewExponentialBackOff() + boff.MaxElapsedTime = 600 * time.Second + + for len(batch) > 0 { + var output *dynamodb.BatchWriteItemOutput + output, err = d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{ + RequestItems: batch, + }) + + if err != nil { + break + } + + if len(output.UnprocessedItems) == 0 { + break + } else { + duration := boff.NextBackOff() + if duration != backoff.Stop { + batch = output.UnprocessedItems + time.Sleep(duration) + } else { + err = errors.New("dynamodb: timeout handling UnproccessedItems") + break + } + } + } + d.permitPool.Release() if err != nil { return err