Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kinesis Data Firehose server-side encryption #6523

Merged
merged 10 commits into from
Aug 30, 2019
205 changes: 170 additions & 35 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
"github.com/hashicorp/terraform/helper/validation"
)

const (
firehoseDeliveryStreamStatusDeleted = "DESTROYED"
)

func cloudWatchLoggingOptionsSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeSet,
Expand Down Expand Up @@ -594,8 +598,19 @@ func flattenProcessingConfiguration(pc *firehose.ProcessingConfiguration, roleAr

func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.DeliveryStreamDescription) error {
d.Set("version_id", s.VersionId)
d.Set("arn", *s.DeliveryStreamARN)
d.Set("arn", s.DeliveryStreamARN)
d.Set("name", s.DeliveryStreamName)

sseOptions := map[string]interface{}{
"enabled": false,
}
if s.DeliveryStreamEncryptionConfiguration != nil && aws.StringValue(s.DeliveryStreamEncryptionConfiguration.Status) == firehose.DeliveryStreamEncryptionStatusEnabled {
sseOptions["enabled"] = true
}
if err := d.Set("server_side_encryption", []map[string]interface{}{sseOptions}); err != nil {
return fmt.Errorf("error setting server_side_encryption: %s", err)
}

if len(s.Destinations) > 0 {
destination := s.Destinations[0]
if destination.RedshiftDestinationDescription != nil {
Expand Down Expand Up @@ -681,11 +696,30 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {

"tags": tagsSchema(),

"server_side_encryption": {
Type: schema.TypeList,
Optional: true,
Computed: true,
Copy link
Contributor

@bflad bflad Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing Computed: true on merge so Terraform always performs drift detection.

MaxItems: 1,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
ConflictsWith: []string{"kinesis_source_configuration"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
},
},
},

"kinesis_source_configuration": {
Type: schema.TypeList,
ForceNew: true,
Optional: true,
MaxItems: 1,
Type: schema.TypeList,
ForceNew: true,
Optional: true,
MaxItems: 1,
ConflictsWith: []string{"server_side_encryption"},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"kinesis_stream_arn": {
Expand Down Expand Up @@ -2138,26 +2172,27 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
return fmt.Errorf("error creating Kinesis Firehose Delivery Stream: %s", err)
}

stateConf := &resource.StateChangeConf{
Pending: []string{"CREATING"},
Target: []string{"ACTIVE"},
Refresh: firehoseStreamStateRefreshFunc(conn, sn),
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

firehoseStream, err := stateConf.WaitForState()
s, err := waitForKinesisFirehoseDeliveryStreamCreation(conn, sn)
if err != nil {
return fmt.Errorf(
"Error waiting for Kinesis Stream (%s) to become active: %s",
sn, err)
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) creation: %s", sn, err)
}

s := firehoseStream.(*firehose.DeliveryStreamDescription)
d.SetId(*s.DeliveryStreamARN)
d.SetId(aws.StringValue(s.DeliveryStreamARN))
d.Set("arn", s.DeliveryStreamARN)

if v, ok := d.GetOk("server_side_encryption"); ok && !isKinesisFirehoseDeliveryStreamOptionDisabled(v) {
_, err := conn.StartDeliveryStreamEncryption(&firehose.StartDeliveryStreamEncryptionInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return fmt.Errorf("error starting Kinesis Firehose Delivery Stream (%s) encryption: %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamSSEEnabled(conn, sn); err != nil {
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) encryption to be enabled: %s", sn, err)
}
}

return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}

Expand Down Expand Up @@ -2240,7 +2275,7 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
err := resource.Retry(1*time.Minute, func() *resource.RetryError {
_, err := conn.UpdateDestination(updateInput)
if err != nil {
log.Printf("[DEBUG] Error creating Firehose Delivery Stream: %s", err)
log.Printf("[DEBUG] Error updating Firehose Delivery Stream: %s", err)

// Retry for IAM eventual consistency
if isAWSErr(err, firehose.ErrCodeInvalidArgumentException, "is not authorized to") {
Expand Down Expand Up @@ -2279,6 +2314,33 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
sn, err)
}

if d.HasChange("server_side_encryption") {
_, n := d.GetChange("server_side_encryption")
if isKinesisFirehoseDeliveryStreamOptionDisabled(n) {
_, err := conn.StopDeliveryStreamEncryption(&firehose.StopDeliveryStreamEncryptionInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return fmt.Errorf("error stopping Kinesis Firehose Delivery Stream (%s) encryption: %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamSSEDisabled(conn, sn); err != nil {
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) encryption to be disabled: %s", sn, err)
}
} else {
_, err := conn.StartDeliveryStreamEncryption(&firehose.StartDeliveryStreamEncryptionInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return fmt.Errorf("error starting Kinesis Firehose Delivery Stream (%s) encryption: %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamSSEEnabled(conn, sn); err != nil {
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) encryption to be enabled: %s", sn, err)
}
}
}

return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}

Expand Down Expand Up @@ -2319,42 +2381,69 @@ func resourceAwsKinesisFirehoseDeliveryStreamDelete(d *schema.ResourceData, meta
_, err := conn.DeleteDeliveryStream(&firehose.DeleteDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
})

if err != nil {
return fmt.Errorf("error deleting Kinesis Firehose Delivery Stream (%s): %s", sn, err)
}

if err := waitForKinesisFirehoseDeliveryStreamDeletion(conn, sn); err != nil {
return fmt.Errorf(
"Error waiting for Delivery Stream (%s) to be destroyed: %s",
sn, err)
return fmt.Errorf("error waiting for Kinesis Firehose Delivery Stream (%s) deletion: %s", sn, err)
}

return nil
}

func firehoseStreamStateRefreshFunc(conn *firehose.Firehose, sn string) resource.StateRefreshFunc {
func firehoseDeliveryStreamStateRefreshFunc(conn *firehose.Firehose, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
describeOpts := &firehose.DescribeDeliveryStreamInput{
resp, err := conn.DescribeDeliveryStream(&firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
resp, err := conn.DescribeDeliveryStream(describeOpts)
})
if err != nil {
if isAWSErr(err, firehose.ErrCodeResourceNotFoundException, "") {
return 42, "DESTROYED", nil
return &firehose.DeliveryStreamDescription{}, firehoseDeliveryStreamStatusDeleted, nil
}
return nil, "failed", err
return nil, "", err
}

return resp.DeliveryStreamDescription, aws.StringValue(resp.DeliveryStreamDescription.DeliveryStreamStatus), nil
}
}

func firehoseDeliveryStreamSSEStateRefreshFunc(conn *firehose.Firehose, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
resp, err := conn.DescribeDeliveryStream(&firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
})
if err != nil {
return nil, "", err
}

return resp.DeliveryStreamDescription, *resp.DeliveryStreamDescription.DeliveryStreamStatus, nil
return resp.DeliveryStreamDescription, aws.StringValue(resp.DeliveryStreamDescription.DeliveryStreamEncryptionConfiguration.Status), nil
}
}

func waitForKinesisFirehoseDeliveryStreamCreation(conn *firehose.Firehose, deliveryStreamName string) (*firehose.DeliveryStreamDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamStatusCreating},
Target: []string{firehose.DeliveryStreamStatusActive},
Refresh: firehoseDeliveryStreamStateRefreshFunc(conn, deliveryStreamName),
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

v, err := stateConf.WaitForState()
if err != nil {
return nil, err
}

return v.(*firehose.DeliveryStreamDescription), nil
}

func waitForKinesisFirehoseDeliveryStreamDeletion(conn *firehose.Firehose, deliveryStreamName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{"DELETING"},
Target: []string{"DESTROYED"},
Refresh: firehoseStreamStateRefreshFunc(conn, deliveryStreamName),
Pending: []string{firehose.DeliveryStreamStatusDeleting},
Target: []string{firehoseDeliveryStreamStatusDeleted},
Refresh: firehoseDeliveryStreamStateRefreshFunc(conn, deliveryStreamName),
Timeout: 20 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
Expand All @@ -2364,3 +2453,49 @@ func waitForKinesisFirehoseDeliveryStreamDeletion(conn *firehose.Firehose, deliv

return err
}

func waitForKinesisFirehoseDeliveryStreamSSEEnabled(conn *firehose.Firehose, deliveryStreamName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamEncryptionStatusEnabling},
Target: []string{firehose.DeliveryStreamEncryptionStatusEnabled},
Refresh: firehoseDeliveryStreamSSEStateRefreshFunc(conn, deliveryStreamName),
Timeout: 10 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

_, err := stateConf.WaitForState()

return err
}

func waitForKinesisFirehoseDeliveryStreamSSEDisabled(conn *firehose.Firehose, deliveryStreamName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{firehose.DeliveryStreamEncryptionStatusDisabling},
Target: []string{firehose.DeliveryStreamEncryptionStatusDisabled},
Refresh: firehoseDeliveryStreamSSEStateRefreshFunc(conn, deliveryStreamName),
Timeout: 10 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 3 * time.Second,
}

_, err := stateConf.WaitForState()

return err
}

func isKinesisFirehoseDeliveryStreamOptionDisabled(v interface{}) bool {
options := v.([]interface{})
if len(options) == 0 || options[0] == nil {
return true
}
m := options[0].(map[string]interface{})

var enabled bool

if v, ok := m["enabled"]; ok {
enabled = v.(bool)
}

return !enabled
}
Loading