From 0a60cab31271ad6bb62c849b9d7f17408a4d2f27 Mon Sep 17 00:00:00 2001 From: Anthony Wat Date: Sun, 8 Sep 2024 16:06:47 -0400 Subject: [PATCH] feat: Add Snowflake destination buffering hint settings for aws_kinesis_firehose_delivery_stream --- .changelog/39214.txt | 3 ++ internal/service/firehose/delivery_stream.go | 33 +++++++++++++++++-- .../service/firehose/delivery_stream_test.go | 23 +++++++++---- ...sis_firehose_delivery_stream.html.markdown | 18 ++++++---- 4 files changed, 61 insertions(+), 16 deletions(-) create mode 100644 .changelog/39214.txt diff --git a/.changelog/39214.txt b/.changelog/39214.txt new file mode 100644 index 00000000000..1ff66895e14 --- /dev/null +++ b/.changelog/39214.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_kinesis_firehose_delivery_stream: Add `snowflake_configuration.buffering_internal` and `snowflake_configuration.buffering_size` arguments +``` \ No newline at end of file diff --git a/internal/service/firehose/delivery_stream.go b/internal/service/firehose/delivery_stream.go index dcc056e5fad..510eacfb766 100644 --- a/internal/service/firehose/delivery_stream.go +++ b/internal/service/firehose/delivery_stream.go @@ -1185,6 +1185,18 @@ func resourceDeliveryStream() *schema.Resource { Type: schema.TypeString, Required: true, }, + "buffering_interval": { + Type: schema.TypeInt, + Optional: true, + Default: 0, + ValidateFunc: validation.IntBetween(0, 900), + }, + "buffering_size": { + Type: schema.TypeInt, + Optional: true, + Default: 1, + ValidateFunc: validation.IntBetween(1, 128), + }, "cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(), "content_column_name": { Type: schema.TypeString, @@ -2728,7 +2740,11 @@ func expandAmazonOpenSearchServerlessDestinationUpdate(oss map[string]interface{ func expandSnowflakeDestinationConfiguration(tfMap map[string]interface{}) *types.SnowflakeDestinationConfiguration { roleARN := tfMap[names.AttrRoleARN].(string) apiObject := &types.SnowflakeDestinationConfiguration{ - AccountUrl: aws.String(tfMap["account_url"].(string)), + AccountUrl: aws.String(tfMap["account_url"].(string)), + BufferingHints: &types.SnowflakeBufferingHints{ + IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))), + SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))), + }, Database: aws.String(tfMap[names.AttrDatabase].(string)), RetryOptions: expandSnowflakeRetryOptions(tfMap), RoleARN: aws.String(roleARN), @@ -2792,7 +2808,11 @@ func expandSnowflakeDestinationConfiguration(tfMap map[string]interface{}) *type func expandSnowflakeDestinationUpdate(tfMap map[string]interface{}) *types.SnowflakeDestinationUpdate { roleARN := tfMap[names.AttrRoleARN].(string) apiObject := &types.SnowflakeDestinationUpdate{ - AccountUrl: aws.String(tfMap["account_url"].(string)), + AccountUrl: aws.String(tfMap["account_url"].(string)), + BufferingHints: &types.SnowflakeBufferingHints{ + IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))), + SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))), + }, Database: aws.String(tfMap[names.AttrDatabase].(string)), RetryOptions: expandSnowflakeRetryOptions(tfMap), RoleARN: aws.String(roleARN), @@ -3556,6 +3576,15 @@ func flattenSnowflakeDestinationDescription(apiObject *types.SnowflakeDestinatio "user": aws.ToString(apiObject.User), } + if v := apiObject.BufferingHints; v != nil { + if v.IntervalInSeconds != nil { + tfMap["buffering_interval"] = int(aws.ToInt32(v.IntervalInSeconds)) + } + if v.SizeInMBs != nil { + tfMap["buffering_size"] = int(aws.ToInt32(v.SizeInMBs)) + } + } + if apiObject.RetryOptions != nil { tfMap["retry_duration"] = int(aws.ToInt32(apiObject.RetryOptions.DurationInSeconds)) } diff --git a/internal/service/firehose/delivery_stream_test.go b/internal/service/firehose/delivery_stream_test.go index fdf8fe160e9..e84daf6228b 100644 --- a/internal/service/firehose/delivery_stream_test.go +++ b/internal/service/firehose/delivery_stream_test.go @@ -1126,6 +1126,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)), + resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""), @@ -1193,6 +1195,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)), + resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", "900"), + resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", "128"), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""), @@ -1267,6 +1271,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)), + resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", acctest.Ct0), + resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse), resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""), @@ -3997,13 +4003,15 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { destination = "snowflake" snowflake_configuration { - account_url = "https://%[1]s.snowflakecomputing.com" - database = "test-db" - private_key = "%[2]s" - role_arn = aws_iam_role.firehose.arn - schema = "test-schema" - table = "test-table" - user = "test-usr" + account_url = "https://%[1]s.snowflakecomputing.com" + buffering_interval = 900 + buffering_size = 128 + database = "test-db" + private_key = "%[2]s" + role_arn = aws_iam_role.firehose.arn + schema = "test-schema" + table = "test-table" + user = "test-usr" s3_configuration { role_arn = aws_iam_role.firehose.arn @@ -4043,6 +4051,7 @@ resource "aws_kinesis_firehose_delivery_stream" "test" { } } } + `, rName, acctest.TLSPEMRemoveRSAPrivateKeyEncapsulationBoundaries(acctest.TLSPEMRemoveNewlines(privateKey)))) } diff --git a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown index 5f9b587f4cb..70ad09fe556 100644 --- a/website/docs/r/kinesis_firehose_delivery_stream.html.markdown +++ b/website/docs/r/kinesis_firehose_delivery_stream.html.markdown @@ -600,13 +600,15 @@ resource "aws_kinesis_firehose_delivery_stream" "example_snowflake_destination" destination = "snowflake" snowflake_configuration { - account_url = "https://example.snowflakecomputing.com" - database = "example-db" - private_key = "..." - role_arn = aws_iam_role.firehose.arn - schema = "example-schema" - table = "example-table" - user = "example-usr" + account_url = "https://example.snowflakecomputing.com" + buffering_size = 15 + buffering_interval = 600 + database = "example-db" + private_key = "..." + role_arn = aws_iam_role.firehose.arn + schema = "example-schema" + table = "example-table" + user = "example-usr" s3_configuration { role_arn = aws_iam_role.firehose.arn @@ -796,6 +798,8 @@ The `http_endpoint_configuration` configuration block supports the following arg The `snowflake_configuration` configuration block supports the following arguments: * `account_url` - (Required) The URL of the Snowflake account. Format: https://[account_identifier].snowflakecomputing.com. +* `buffering_size` - (Optional) Buffer incoming data to the specified size, in MBs between 1 to 128, before delivering it to the destination. The default value is 1MB. +* `buffering_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds between 0 to 900, before delivering it to the destination. The default value is 0s. * `private_key` - (Optional) The private key for authentication. This value is required if `secrets_manager_configuration` is not provided. * `key_passphrase` - (Optional) The passphrase for the private key. * `user` - (Optional) The user for authentication. This value is required if `secrets_manager_configuration` is not provided.