Skip to content

Commit

Permalink
Tweak DynamoDb table update logic
Browse files Browse the repository at this point in the history
crossplane-contrib#839

The above PR recently fixed this controller such that it was possible to
configure PAY_PER_REQUEST and SSE, but part of doing this involved potentially
making a no-op update and ignoring the resulting error. This commit avoids the
no-op update by (hopefully) improving the logic that determines which update(s)
are needed.

Signed-off-by: Nic Cope <[email protected]>
  • Loading branch information
negz committed Nov 11, 2021
1 parent 824b1bf commit bcc68d1
Showing 1 changed file with 83 additions and 6 deletions.
89 changes: 83 additions & 6 deletions pkg/controller/dynamodb/table/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"sort"
"time"

awsgo "github.com/aws/aws-sdk-go/aws"
svcsdk "github.com/aws/aws-sdk-go/service/dynamodb"
svcsdkapi "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -167,9 +166,18 @@ func lateInitialize(in *svcapitypes.TableParameters, t *svcsdk.DescribeTableOutp
WriteCapacityUnits: t.Table.ProvisionedThroughput.WriteCapacityUnits,
}
}
if in.SSESpecification == nil && t.Table.SSEDescription != nil {
in.SSESpecification = &svcapitypes.SSESpecification{
SSEType: t.Table.SSEDescription.SSEType,
if t.Table.SSEDescription != nil {
if in.SSESpecification == nil {
in.SSESpecification = &svcapitypes.SSESpecification{}
}
if in.SSESpecification.Enabled == nil && t.Table.SSEDescription.Status != nil {
in.SSESpecification.Enabled = aws.Bool(*t.Table.SSEDescription.Status == string(svcapitypes.SSEStatus_ENABLED))
}
if in.SSESpecification.KMSMasterKeyID == nil && t.Table.SSEDescription.KMSMasterKeyArn != nil {
in.SSESpecification.KMSMasterKeyID = t.Table.SSEDescription.KMSMasterKeyArn
}
if in.SSESpecification.SSEType == nil && t.Table.SSEDescription.SSEType != nil {
in.SSESpecification.SSEType = t.Table.SSEDescription.SSEType
}
}
if in.StreamSpecification == nil && t.Table.StreamSpecification != nil {
Expand All @@ -178,6 +186,11 @@ func lateInitialize(in *svcapitypes.TableParameters, t *svcsdk.DescribeTableOutp
StreamViewType: t.Table.StreamSpecification.StreamViewType,
}
}

if in.BillingMode == nil && t.Table.BillingModeSummary != nil {
in.BillingMode = t.Table.BillingModeSummary.BillingMode
}

return nil
}

Expand Down Expand Up @@ -270,10 +283,24 @@ func createPatch(in *svcsdk.DescribeTableOutput, target *svcapitypes.TableParame
}

func isUpToDate(cr *svcapitypes.Table, resp *svcsdk.DescribeTableOutput) (bool, error) {
// A table that's currently updating or creating can't be updated, so we
// temporarily consider it to be up-to-date no matter what.
switch aws.StringValue(cr.Status.AtProvider.TableStatus) {
case string(svcapitypes.TableStatus_SDK_UPDATING), string(svcapitypes.TableStatus_SDK_CREATING):
return true, nil
}

// Similarly, a table that's currently updating its SSE status can't be
// updated, so we temporarily consider it to be up-to-date.
if cr.Status.AtProvider.SSEDescription != nil && aws.StringValue(cr.Status.AtProvider.SSEDescription.Status) == string(svcapitypes.SSEStatus_UPDATING) {
return true, nil
}

patch, err := createPatch(resp, &cr.Spec.ForProvider)
if err != nil {
return false, err
}

return cmp.Equal(&svcapitypes.TableParameters{}, patch,
cmpopts.IgnoreTypes(&xpv1.Reference{}, &xpv1.Selector{}, []xpv1.Reference{}),
cmpopts.IgnoreFields(svcapitypes.TableParameters{}, "Region", "Tags", "GlobalSecondaryIndexes", "KeySchema", "LocalSecondaryIndexes", "CustomTableParameters")), nil
Expand All @@ -283,19 +310,41 @@ type updateClient struct {
client svcsdkapi.DynamoDBAPI
}

<<<<<<< HEAD
func (e *updateClient) preUpdate(_ context.Context, cr *svcapitypes.Table, u *svcsdk.UpdateTableInput) error {
switch aws.StringValue(cr.Status.AtProvider.TableStatus) {
case string(svcapitypes.TableStatus_SDK_UPDATING), string(svcapitypes.TableStatus_SDK_CREATING):
return nil
}
t, err := e.client.DescribeTable(&svcsdk.DescribeTableInput{TableName: aws.String(meta.GetExternalName(cr))})
=======
func (e *updateClient) preUpdate(ctx context.Context, cr *svcapitypes.Table, u *svcsdk.UpdateTableInput) error {

filtered := &svcsdk.UpdateTableInput{TableName: aws.String(meta.GetExternalName(cr))}

// The AWS API requires us to do one kind of update at a time per
// https://github.com/aws/aws-sdk-go/blob/v1.34.32/service/dynamodb/api.go#L5605
// This means that we need to return a filtered UpdateTableInput that
// contains at most one thing that needs updating. In order to be
// eventually consistent (i.e. to eventually update all the things, one
// on each reconcile pass) we need to determine the 'next' thing to
// update on each pass. This means we need to diff actual vs desired
// state here inside preUpdate. Unfortunately we read the actual state
// during Observe, but don't typically pass it to update. We could stash
// the observed state in a cache during postObserve then read it here,
// but we typically prefer to be as stateless as possible even if it
// means redundant API calls.
out, err := e.client.DescribeTableWithContext(ctx, &svcsdk.DescribeTableInput{TableName: aws.String(meta.GetExternalName(cr))})
>>>>>>> ed980161... Tweak DynamoDb table update logic
if err != nil {
return aws.Wrap(err, errDescribe)
}

newUpdateObj := &svcsdk.UpdateTableInput{
TableName: aws.String(meta.GetExternalName(cr)),
p, err := createPatch(out, &cr.Spec.ForProvider)
if err != nil {
return err
}
<<<<<<< HEAD
// NOTE(muvaf): AWS API prohibits doing those calls in the same call.
// See https://github.com/aws/aws-sdk-go/blob/v1.34.32/service/dynamodb/api.go#L5605
switch {
Expand All @@ -310,10 +359,38 @@ func (e *updateClient) preUpdate(_ context.Context, cr *svcapitypes.Table, u *sv
newUpdateObj.StreamSpecification = u.StreamSpecification
default:
return errors.New("only provisionedThroughput and streamSpecification updates are supported")
=======

// TODO(muvaf): Implement ReplicaUpdates and GlobalSecondaryIndexUpdates.
switch {
case p.ProvisionedThroughput != nil:
filtered.ProvisionedThroughput = u.ProvisionedThroughput
case p.StreamSpecification != nil:
// NOTE(muvaf): Unless StreamEnabled is changed, updating stream
// specification won't work.
filtered.StreamSpecification = u.StreamSpecification
case p.SSESpecification != nil:
// NOTE(negz): Attempting to update the KMSMasterKeyId to its
// current value returns an error
filtered.SSESpecification = &svcsdk.SSESpecification{
Enabled: u.SSESpecification.Enabled,
SSEType: u.SSESpecification.SSEType,
}
if p.SSESpecification.KMSMasterKeyID != nil {
filtered.SSESpecification.KMSMasterKeyId = u.SSESpecification.KMSMasterKeyId
}
case p.BillingMode != nil:
filtered.BillingMode = u.BillingMode
>>>>>>> ed980161... Tweak DynamoDb table update logic
}
// TODO(muvaf): ReplicationGroupUpdate and GlobalSecondaryIndexUpdate features
// are not implemented yet.

<<<<<<< HEAD
*u = *newUpdateObj
=======
*u = *filtered

>>>>>>> ed980161... Tweak DynamoDb table update logic
return nil
}

0 comments on commit bcc68d1

Please sign in to comment.