Skip to content

Commit

Permalink
Merge pull request #4208 from stack72/aws-dynamodb_streams
Browse files Browse the repository at this point in the history
provider/aws: DynamoDB Table StreamSpecifications
  • Loading branch information
phinze committed Dec 9, 2015
2 parents 238dfff + 8b79881 commit 5884323
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 0 deletions.
65 changes: 65 additions & 0 deletions builtin/providers/aws/resource_aws_dynamodb_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/hashicorp/terraform/helper/hashcode"
"strings"
)

// Number of times to retry if a throttling-related exception occurs
Expand Down Expand Up @@ -158,6 +159,21 @@ func resourceAwsDynamoDbTable() *schema.Resource {
return hashcode.String(buf.String())
},
},
"stream_enabled": &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Computed: true,
},
"stream_view_type": &schema.Schema{
Type: schema.TypeString,
Optional: true,
Computed: true,
StateFunc: func(v interface{}) string {
value := v.(string)
return strings.ToUpper(value)
},
ValidateFunc: validateStreamViewType,
},
},
}
}
Expand Down Expand Up @@ -263,6 +279,16 @@ func resourceAwsDynamoDbTableCreate(d *schema.ResourceData, meta interface{}) er
req.GlobalSecondaryIndexes = globalSecondaryIndexes
}

if _, ok := d.GetOk("stream_enabled"); ok {

req.StreamSpecification = &dynamodb.StreamSpecification{
StreamEnabled: aws.Bool(d.Get("stream_enabled").(bool)),
StreamViewType: aws.String(d.Get("stream_view_type").(string)),
}

fmt.Printf("[DEBUG] Adding StreamSpecifications to the table")
}

attemptCount := 1
for attemptCount <= DYNAMODB_MAX_THROTTLE_RETRIES {
output, err := dynamodbconn.CreateTable(req)
Expand Down Expand Up @@ -340,6 +366,25 @@ func resourceAwsDynamoDbTableUpdate(d *schema.ResourceData, meta interface{}) er
waitForTableToBeActive(d.Id(), meta)
}

if d.HasChange("stream_enabled") || d.HasChange("stream_view_type") {
req := &dynamodb.UpdateTableInput{
TableName: aws.String(d.Id()),
}

req.StreamSpecification = &dynamodb.StreamSpecification{
StreamEnabled: aws.Bool(d.Get("stream_enabled").(bool)),
StreamViewType: aws.String(d.Get("stream_view_type").(string)),
}

_, err := dynamodbconn.UpdateTable(req)

if err != nil {
return err
}

waitForTableToBeActive(d.Id(), meta)
}

if d.HasChange("global_secondary_index") {
log.Printf("[DEBUG] Changed GSI data")
req := &dynamodb.UpdateTableInput{
Expand Down Expand Up @@ -587,6 +632,11 @@ func resourceAwsDynamoDbTableRead(d *schema.ResourceData, meta interface{}) erro
log.Printf("[DEBUG] Added GSI: %s - Read: %d / Write: %d", gsi["name"], gsi["read_capacity"], gsi["write_capacity"])
}

if table.StreamSpecification != nil {
d.Set("stream_view_type", table.StreamSpecification.StreamViewType)
d.Set("stream_enabled", table.StreamSpecification.StreamEnabled)
}

err = d.Set("global_secondary_index", gsiList)
if err != nil {
return err
Expand Down Expand Up @@ -751,3 +801,18 @@ func waitForTableToBeActive(tableName string, meta interface{}) error {
return nil

}

func validateStreamViewType(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
viewTypes := map[string]bool {
"KEYS_ONLY": true,
"NEW_IMAGE": true,
"OLD_IMAGE": true,
"NEW_AND_OLD_IMAGES": true,
}

if !viewTypes[value] {
errors = append(errors, fmt.Errorf("%q be a valid DynamoDB StreamViewType", k))
}
return
}
101 changes: 101 additions & 0 deletions builtin/providers/aws/resource_aws_dynamodb_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,66 @@ func TestAccAWSDynamoDbTable(t *testing.T) {
})
}

func TestAccAWSDynamoDbTable_streamSpecification(t *testing.T) {
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSDynamoDbTableDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccAWSDynamoDbConfigStreamSpecification,
Check: resource.ComposeTestCheckFunc(
testAccCheckInitialAWSDynamoDbTableExists("aws_dynamodb_table.basic-dynamodb-table"),
resource.TestCheckResourceAttr(
"aws_dynamodb_table.basic-dynamodb-table", "stream_enabled", "true"),
resource.TestCheckResourceAttr(
"aws_dynamodb_table.basic-dynamodb-table", "stream_view_type", "KEYS_ONLY"),
),
},
},
})
}

func TestResourceAWSDynamoDbTableStreamViewType_validation(t *testing.T) {
cases := []struct {
Value string
ErrCount int
}{
{
Value: "KEYS-ONLY",
ErrCount: 1,
},
{
Value: "RANDOM-STRING",
ErrCount: 1,
},
{
Value: "KEYS_ONLY",
ErrCount: 0,
},
{
Value: "NEW_AND_OLD_IMAGES",
ErrCount: 0,
},
{
Value: "NEW_IMAGE",
ErrCount: 0,
},
{
Value: "OLD_IMAGE",
ErrCount: 0,
},
}

for _, tc := range cases {
_, errors := validateStreamViewType(tc.Value, "aws_dynamodb_table_stream_view_type")

if len(errors) != tc.ErrCount {
t.Fatalf("Expected the DynamoDB stream_view_type to trigger a validation error")
}
}
}

func testAccCheckAWSDynamoDbTableDestroy(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).dynamodbconn

Expand Down Expand Up @@ -295,3 +355,44 @@ resource "aws_dynamodb_table" "basic-dynamodb-table" {
}
}
`

const testAccAWSDynamoDbConfigStreamSpecification = `
resource "aws_dynamodb_table" "basic-dynamodb-table" {
name = "TerraformTestStreamTable"
read_capacity = 10
write_capacity = 20
hash_key = "TestTableHashKey"
range_key = "TestTableRangeKey"
attribute {
name = "TestTableHashKey"
type = "S"
}
attribute {
name = "TestTableRangeKey"
type = "S"
}
attribute {
name = "TestLSIRangeKey"
type = "N"
}
attribute {
name = "TestGSIRangeKey"
type = "S"
}
local_secondary_index {
name = "TestTableLSI"
range_key = "TestLSIRangeKey"
projection_type = "ALL"
}
global_secondary_index {
name = "InitialTestTableGSI"
hash_key = "TestTableHashKey"
range_key = "TestGSIRangeKey"
write_capacity = 10
read_capacity = 10
projection_type = "KEYS_ONLY"
}
stream_enabled = true
stream_view_type = "KEYS_ONLY"
}
`
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ parameter.
* `non_key_attributes` - (Optional) Only required with *INCLUDE* as a
projection type; a list of attributes to project into the index. These
do not need to be defined as attributes on the table.
* `stream_enabled` - (Optional) Indicates whether Streams is to be enabled (true) or disabled (false).
* `stream_view_type` - (Optional) When an item in the table is modified, StreamViewType determines what information is written to the table's stream. Valid values are KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES.

For `global_secondary_index` objects only, you need to specify
`write_capacity` and `read_capacity` in the same way you would for the
Expand Down

0 comments on commit 5884323

Please sign in to comment.