Skip to content

Commit

Permalink
Merge branch 'ewbankkit-issue-6443'
Browse files Browse the repository at this point in the history
  • Loading branch information
bflad committed Aug 30, 2019
2 parents b0470db + e42490b commit d096c88
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 127 deletions.
204 changes: 169 additions & 35 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/hashicorp/terraform/helper/validation"
)

const (
firehoseDeliveryStreamStatusDeleted = "DESTROYED"
)

func cloudWatchLoggingOptionsSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeSet,
Expand Down Expand Up @@ -594,8 +598,19 @@ func flattenProcessingConfiguration(pc *firehose.ProcessingConfiguration, roleAr

func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.DeliveryStreamDescription) error {
d.Set("version_id", s.VersionId)
d.Set("arn", *s.DeliveryStreamARN)
d.Set("arn", s.DeliveryStreamARN)
d.Set("name", s.DeliveryStreamName)

sseOptions := map[string]interface{}{
"enabled": false,
}
if s.DeliveryStreamEncryptionConfiguration != nil && aws.StringValue(s.DeliveryStreamEncryptionConfiguration.Status) == firehose.DeliveryStreamEncryptionStatusEnabled {
sseOptions["enabled"] = true
}
if err := d.Set("server_side_encryption", []map[string]interface{}{sseOptions}); err != nil {
return fmt.Errorf("error setting server_side_encryption: %s", err)
}

if len(s.Destinations) > 0 {
destination := s.Destinations[0]
if destination.RedshiftDestinationDescription != nil {
Expand Down Expand Up @@ -681,11 +696,29 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {

"tags": tagsSchema(),

"server_side_encryption": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
ConflictsWith: []string{"kinesis_source_configuration"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
},
},
},

"kinesis_source_configuration": {
Type: schema.TypeList,
ForceNew: true,
Optional: true,
MaxItems: 1,
Type: schema.TypeList,
ForceNew: true,
Optional: true,
MaxItems: 1,
ConflictsWith: []string{"server_side_encryption"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"kinesis_stream_arn": {
Expand Down Expand Up @@ -2138,26 +2171,27 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
return fmt.Errorf("error creating Kinesis Firehose Delivery Stream: %s", err)
}

stateConf := &resource.StateChangeConf{
Pending: []string{"CREATING"},
Target: []string{"ACTIVE"},
Refresh: firehoseStreamStateRefreshFunc(conn, sn),
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

firehoseStream, err := stateConf.WaitForState()
s, err := waitForKinesisFirehoseDeliveryStreamCreation(conn, sn)
if err != nil {
return fmt.Errorf(
"Error waiting for Kinesis Stream (%s) to become active: %s",
sn, err)
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) creation: %s", sn, err)
}

s := firehoseStream.(*firehose.DeliveryStreamDescription)
d.SetId(*s.DeliveryStreamARN)
d.SetId(aws.StringValue(s.DeliveryStreamARN))
d.Set("arn", s.DeliveryStreamARN)

if v, ok := d.GetOk("server_side_encryption"); ok && !isKinesisFirehoseDeliveryStreamOptionDisabled(v) {
_, err := conn.StartDeliveryStreamEncryption(&firehose.StartDeliveryStreamEncryptionInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return fmt.Errorf("error starting Kinesis Firehose Delivery Stream (%s) encryption: %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamSSEEnabled(conn, sn); err != nil {
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) encryption to be enabled: %s", sn, err)
}
}

return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}

Expand Down Expand Up @@ -2240,7 +2274,7 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
_, err := conn.UpdateDestination(updateInput)
if err != nil {
log.Printf("[DEBUG] Error creating Firehose Delivery Stream: %s", err)
log.Printf("[DEBUG] Error updating Firehose Delivery Stream: %s", err)

// Retry for IAM eventual consistency
if isAWSErr(err, firehose.ErrCodeInvalidArgumentException, "is not authorized to") {
Expand Down Expand Up @@ -2279,6 +2313,33 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
sn, err)
}

if d.HasChange("server_side_encryption") {
_, n := d.GetChange("server_side_encryption")
if isKinesisFirehoseDeliveryStreamOptionDisabled(n) {
_, err := conn.StopDeliveryStreamEncryption(&firehose.StopDeliveryStreamEncryptionInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return fmt.Errorf("error stopping Kinesis Firehose Delivery Stream (%s) encryption: %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamSSEDisabled(conn, sn); err != nil {
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) encryption to be disabled: %s", sn, err)
}
} else {
_, err := conn.StartDeliveryStreamEncryption(&firehose.StartDeliveryStreamEncryptionInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return fmt.Errorf("error starting Kinesis Firehose Delivery Stream (%s) encryption: %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamSSEEnabled(conn, sn); err != nil {
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) encryption to be enabled: %s", sn, err)
}
}
}

return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}

Expand Down Expand Up @@ -2319,42 +2380,69 @@ func resourceAwsKinesisFirehoseDeliveryStreamDelete(d *schema.ResourceData, meta
_, err := conn.DeleteDeliveryStream(&firehose.DeleteDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
})

if err != nil {
return fmt.Errorf("error deleting Kinesis Firehose Delivery Stream (%s): %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamDeletion(conn, sn); err != nil {
return fmt.Errorf(
"Error waiting for Delivery Stream (%s) to be destroyed: %s",
sn, err)
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) deletion: %s", sn, err)
}

return nil
}

func firehoseStreamStateRefreshFunc(conn *firehose.Firehose, sn string) resource.StateRefreshFunc {
func firehoseDeliveryStreamStateRefreshFunc(conn *firehose.Firehose, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
describeOpts := &firehose.DescribeDeliveryStreamInput{
resp, err := conn.DescribeDeliveryStream(&firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
})
if err != nil {
if isAWSErr(err, firehose.ErrCodeResourceNotFoundException, "") {
return 42, "DESTROYED", nil
return &firehose.DeliveryStreamDescription{}, firehoseDeliveryStreamStatusDeleted, nil
}
return nil, "failed", err
return nil, "", err
}

return resp.DeliveryStreamDescription, aws.StringValue(resp.DeliveryStreamDescription.DeliveryStreamStatus), nil
}
}

func firehoseDeliveryStreamSSEStateRefreshFunc(conn *firehose.Firehose, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
resp, err := conn.DescribeDeliveryStream(&firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return nil, "", err
}

return resp.DeliveryStreamDescription, *resp.DeliveryStreamDescription.DeliveryStreamStatus, nil
return resp.DeliveryStreamDescription, aws.StringValue(resp.DeliveryStreamDescription.DeliveryStreamEncryptionConfiguration.Status), nil
}
}

func waitForKinesisFirehoseDeliveryStreamCreation(conn *firehose.Firehose, deliveryStreamName string) (*firehose.DeliveryStreamDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamStatusCreating},
Target: []string{firehose.DeliveryStreamStatusActive},
Refresh: firehoseDeliveryStreamStateRefreshFunc(conn, deliveryStreamName),
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

v, err := stateConf.WaitForState()
if err != nil {
return nil, err
}

return v.(*firehose.DeliveryStreamDescription), nil
}

func waitForKinesisFirehoseDeliveryStreamDeletion(conn *firehose.Firehose, deliveryStreamName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{"DELETING"},
Target: []string{"DESTROYED"},
Refresh: firehoseStreamStateRefreshFunc(conn, deliveryStreamName),
Pending: []string{firehose.DeliveryStreamStatusDeleting},
Target: []string{firehoseDeliveryStreamStatusDeleted},
Refresh: firehoseDeliveryStreamStateRefreshFunc(conn, deliveryStreamName),
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
Expand All @@ -2364,3 +2452,49 @@ func waitForKinesisFirehoseDeliveryStreamDeletion(conn *firehose.Firehose, deliv

return err
}

func waitForKinesisFirehoseDeliveryStreamSSEEnabled(conn *firehose.Firehose, deliveryStreamName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamEncryptionStatusEnabling},
Target: []string{firehose.DeliveryStreamEncryptionStatusEnabled},
Refresh: firehoseDeliveryStreamSSEStateRefreshFunc(conn, deliveryStreamName),
Timeout: 10 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

_, err := stateConf.WaitForState()

return err
}

func waitForKinesisFirehoseDeliveryStreamSSEDisabled(conn *firehose.Firehose, deliveryStreamName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamEncryptionStatusDisabling},
Target: []string{firehose.DeliveryStreamEncryptionStatusDisabled},
Refresh: firehoseDeliveryStreamSSEStateRefreshFunc(conn, deliveryStreamName),
Timeout: 10 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

_, err := stateConf.WaitForState()

return err
}

func isKinesisFirehoseDeliveryStreamOptionDisabled(v interface{}) bool {
options := v.([]interface{})
if len(options) == 0 || options[0] == nil {
return true
}
m := options[0].(map[string]interface{})

var enabled bool

if v, ok := m["enabled"]; ok {
enabled = v.(bool)
}

return !enabled
}
Loading

0 comments on commit d096c88

Please sign in to comment.