From e0c005280a913b7880ff53ff0832fe02d7fa54ee Mon Sep 17 00:00:00 2001 From: Jared Baker Date: Tue, 20 Feb 2024 11:22:39 -0500 Subject: [PATCH] r/aws_mq_broker: enable cross-region data replication This change adds the `data_replication_mode` and `data_replication_primary_broker_arn` arguments, enabling support for cross-region data replication. See the AWS MQ documentation for configuration details. https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/crdr-for-active-mq.html ```console % make testacc PKG=mq TESTS=TestAccMQBroker_dataReplicationMode ==> Checking that code complies with gofmt requirements... TF_ACC=1 go test ./internal/service/mq/... -v -count 1 -parallel 20 -run='TestAccMQBroker_dataReplicationMode' -timeout 360m --- PASS: TestAccMQBroker_dataReplicationMode (511.53s) PASS ok github.com/hashicorp/terraform-provider-aws/internal/service/mq 518.736s ``` --- .changelog/35990.txt | 3 + internal/service/mq/broker.go | 49 ++++++ internal/service/mq/broker_test.go | 208 +++++++++++++++++++++++++ internal/service/mq/exports_test.go | 3 + website/docs/r/mq_broker.html.markdown | 55 +++++++ 5 files changed, 318 insertions(+) create mode 100644 .changelog/35990.txt diff --git a/.changelog/35990.txt b/.changelog/35990.txt new file mode 100644 index 00000000000..5ce17a8cc11 --- /dev/null +++ b/.changelog/35990.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_mq_broker: Add `data_replication_mode` and `data_replication_primary_broker_arn` arguments, enabling support for cross-region data replication +``` diff --git a/internal/service/mq/broker.go b/internal/service/mq/broker.go index 72540713686..b84832dbf1e 100644 --- a/internal/service/mq/broker.go +++ b/internal/service/mq/broker.go @@ -104,6 +104,28 @@ func resourceBroker() *schema.Resource { }, }, }, + "data_replication_mode": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ValidateDiagFunc: enum.Validate[types.DataReplicationMode](), + DiffSuppressFunc: func(k, o, n string, d *schema.ResourceData) bool { + // Suppress differences when the configured data replication mode + // matches a non-empty, pending replication mode. This scenario + // can exist when the mode has been set, but the broker has not + // yet been rebooted. + if n != "" && n == d.Get("pending_data_replication_mode").(string) { + return true + } + return false + }, + }, + "data_replication_primary_broker_arn": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, // Can only be set on Create + ValidateFunc: verify.ValidARN, + }, "deployment_mode": { Type: schema.TypeString, Optional: true, @@ -269,6 +291,10 @@ func resourceBroker() *schema.Resource { }, }, }, + "pending_data_replication_mode": { + Type: schema.TypeString, + Computed: true, + }, "publicly_accessible": { Type: schema.TypeBool, Optional: true, @@ -392,6 +418,12 @@ func resourceBrokerCreate(ctx context.Context, d *schema.ResourceData, meta inte if v, ok := d.GetOk("deployment_mode"); ok { input.DeploymentMode = types.DeploymentMode(v.(string)) } + if v, ok := d.GetOk("data_replication_mode"); ok { + input.DataReplicationMode = types.DataReplicationMode(v.(string)) + } + if v, ok := d.GetOk("data_replication_primary_broker_arn"); ok { + input.DataReplicationPrimaryBrokerArn = aws.String(v.(string)) + } if v, ok := d.GetOk("encryption_options"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { input.EncryptionOptions = expandEncryptionOptions(d.Get("encryption_options").([]interface{})) } @@ -451,11 +483,13 @@ func resourceBrokerRead(ctx context.Context, d *schema.ResourceData, meta interf d.Set("authentication_strategy", output.AuthenticationStrategy) d.Set("auto_minor_version_upgrade", output.AutoMinorVersionUpgrade) d.Set("broker_name", output.BrokerName) + d.Set("data_replication_mode", output.DataReplicationMode) d.Set("deployment_mode", output.DeploymentMode) d.Set("engine_type", output.EngineType) d.Set("engine_version", output.EngineVersion) d.Set("host_instance_type", output.HostInstanceType) d.Set("instances", flattenBrokerInstances(output.BrokerInstances)) + d.Set("pending_data_replication_mode", output.PendingDataReplicationMode) d.Set("publicly_accessible", output.PubliclyAccessible) d.Set("security_groups", output.SecurityGroups) d.Set("storage_type", output.StorageType) @@ -600,6 +634,21 @@ func resourceBrokerUpdate(ctx context.Context, d *schema.ResourceData, meta inte requiresReboot = true } + if d.HasChange("data_replication_mode") { + input := &mq.UpdateBrokerInput{ + BrokerId: aws.String(d.Id()), + DataReplicationMode: types.DataReplicationMode(d.Get("data_replication_mode").(string)), + } + + _, err := conn.UpdateBroker(ctx, input) + + if err != nil { + return sdkdiag.AppendErrorf(diags, "updating MQ Broker (%s) data replication mode: %s", d.Id(), err) + } + + requiresReboot = true + } + if d.Get("apply_immediately").(bool) && requiresReboot { _, err := conn.RebootBroker(ctx, &mq.RebootBrokerInput{ BrokerId: aws.String(d.Id()), diff --git a/internal/service/mq/broker_test.go b/internal/service/mq/broker_test.go index ad8a7acda4e..34aff0ef368 100644 --- a/internal/service/mq/broker_test.go +++ b/internal/service/mq/broker_test.go @@ -8,10 +8,12 @@ import ( "fmt" "strings" "testing" + "time" "github.com/YakDriver/regexache" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/mq" + "github.com/aws/aws-sdk-go-v2/service/mq/types" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -1369,6 +1371,72 @@ func TestAccMQBroker_ldap(t *testing.T) { }) } +func TestAccMQBroker_dataReplicationMode(t *testing.T) { + ctx := acctest.Context(t) + if testing.Short() { + t.Skip("skipping long-running test in short mode") + } + + var broker mq.DescribeBrokerOutput + var brokerAlternate mq.DescribeBrokerOutput + var providers []*schema.Provider + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_mq_broker.test" + primaryBrokerResourceName := "aws_mq_broker.primary" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { + acctest.PreCheck(ctx, t) + acctest.PreCheckMultipleRegion(t, 2) + acctest.PreCheckPartitionHasService(t, names.MQEndpointID) + testAccPreCheck(ctx, t) + }, + ErrorCheck: acctest.ErrorCheck(t, names.MQServiceID), + ProtoV5ProviderFactories: acctest.ProtoV5FactoriesPlusProvidersAlternate(ctx, t, &providers), + CheckDestroy: testAccCheckBrokerDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeCrdr)), + Check: resource.ComposeTestCheckFunc( + testAccCheckBrokerExists(ctx, resourceName, &broker), + testAccCheckBrokerExistsWithProvider(ctx, primaryBrokerResourceName, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers)), + resource.TestCheckResourceAttr(resourceName, "broker_name", rName), + resource.TestCheckResourceAttr(resourceName, "deployment_mode", string(types.DeploymentModeActiveStandbyMultiAz)), + // data_replication_mode is not returned until after reboot + resource.TestCheckResourceAttr(resourceName, "data_replication_mode", ""), + resource.TestCheckResourceAttr(resourceName, "pending_data_replication_mode", string(types.DataReplicationModeCrdr)), + resource.TestCheckResourceAttrPair(resourceName, "data_replication_primary_broker_arn", primaryBrokerResourceName, "arn"), + ), + }, + { + Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeCrdr)), + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + ImportStateVerifyIgnore: []string{"apply_immediately", "user", "data_replication_primary_broker_arn"}, + }, + { + // Preparation for destruction would require multiple configuration changes + // and applies to unpair brokers. Instead, complete the necessary update, reboot, + // and delete opreations on the primary cluster out-of-band to ensure remaining + // resources will be freed for clean up. + PreConfig: func() { + // In order to delete, replicated brokers must first be unpaired by setting + // data replication mode on the primary broker to "NONE". + testAccUnpairBrokerWithProvider(ctx, t, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers)) + // The primary broker must be deleted before replica broker. The direct + // dependency in the Terraform configuration would cause this to happen + // in the opposite order, so delete the primary out of band instead. + testAccDeleteBrokerWithProvider(ctx, t, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers)) + }, + Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeNone)), + PlanOnly: true, + ExpectNonEmptyPlan: true, + }, + }, + }) +} + func testAccCheckBrokerDestroy(ctx context.Context) resource.TestCheckFunc { return func(s *terraform.State) error { conn := acctest.Provider.Meta().(*conns.AWSClient).MQClient(ctx) @@ -1416,6 +1484,27 @@ func testAccCheckBrokerExists(ctx context.Context, n string, v *mq.DescribeBroke } } +func testAccCheckBrokerExistsWithProvider(ctx context.Context, n string, v *mq.DescribeBrokerOutput, providerF func() *schema.Provider) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + + conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx) + + output, err := tfmq.FindBrokerByID(ctx, conn, rs.Primary.ID) + + if err != nil { + return err + } + + *v = *output + + return nil + } +} + func testAccPreCheck(ctx context.Context, t *testing.T) { conn := acctest.Provider.Meta().(*conns.AWSClient).MQClient(ctx) @@ -1432,6 +1521,46 @@ func testAccPreCheck(ctx context.Context, t *testing.T) { } } +func testAccUnpairBrokerWithProvider(ctx context.Context, t *testing.T, broker *mq.DescribeBrokerOutput, providerF func() *schema.Provider) { + brokerID := aws.ToString(broker.BrokerId) + deadline := tfresource.NewDeadline(30 * time.Minute) + conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx) + + _, err := conn.UpdateBroker(ctx, &mq.UpdateBrokerInput{ + BrokerId: aws.String(brokerID), + DataReplicationMode: types.DataReplicationModeNone, + }) + if err != nil { + t.Fatalf("updating broker (%s): %s", brokerID, err) + } + + _, err = conn.RebootBroker(ctx, &mq.RebootBrokerInput{BrokerId: aws.String(brokerID)}) + if err != nil { + t.Fatalf("rebooting broker (%s): %s", brokerID, err) + } + + _, err = tfmq.WaitBrokerRebooted(ctx, conn, brokerID, deadline.Remaining()) + if err != nil { + t.Fatalf("waiting for broker (%s) reboot: %s", brokerID, err) + } +} + +func testAccDeleteBrokerWithProvider(ctx context.Context, t *testing.T, broker *mq.DescribeBrokerOutput, providerF func() *schema.Provider) { + brokerID := aws.ToString(broker.BrokerId) + deadline := tfresource.NewDeadline(30 * time.Minute) + conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx) + + _, err := conn.DeleteBroker(ctx, &mq.DeleteBrokerInput{BrokerId: aws.String(brokerID)}) + if err != nil { + t.Fatalf("deleting broker (%s): %s", brokerID, err) + } + + _, err = tfmq.WaitBrokerDeleted(ctx, conn, brokerID, deadline.Remaining()) + if err != nil { + t.Fatalf("waiting for broker (%s) deletion: %s", brokerID, err) + } +} + func testAccCheckBrokerNotRecreated(before, after *mq.DescribeBrokerOutput) resource.TestCheckFunc { return func(s *terraform.State) error { if before, after := aws.ToString(before.BrokerId), aws.ToString(after.BrokerId); before != after { @@ -2231,3 +2360,82 @@ resource "aws_mq_broker" "test" { } `, rName, version, instanceType) } + +// testAccBrokerConfig_dataReplicationMode creates a primary and replica broker +// in different regions, linking the former using the data replication arguments +func testAccBrokerConfig_dataReplicationMode(rName, version, dataReplicationMode string) string { + return acctest.ConfigCompose( + acctest.ConfigMultipleRegionProvider(2), + fmt.Sprintf(` +resource "aws_security_group" "primary" { + provider = awsalternate + + name = "%[1]s-primary" + + tags = { + Name = "%[1]s-primary" + } +} + +resource "aws_mq_broker" "primary" { + provider = awsalternate + + apply_immediately = true + broker_name = "%[1]s-primary" + engine_type = "ActiveMQ" + engine_version = %[2]q + host_instance_type = "mq.m5.large" + security_groups = [aws_security_group.primary.id] + deployment_mode = "ACTIVE_STANDBY_MULTI_AZ" + + logs { + general = true + } + + user { + username = "Test" + password = "TestTest1234" + } + user { + username = "Test-ReplicationUser" + password = "TestTest1234" + replication_user = true + } +} + +resource "aws_security_group" "test" { + name = %[1]q + + tags = { + Name = %[1]q + } +} + +resource "aws_mq_broker" "test" { + apply_immediately = true + broker_name = %[1]q + engine_type = "ActiveMQ" + engine_version = %[2]q + host_instance_type = "mq.m5.large" + security_groups = [aws_security_group.test.id] + deployment_mode = "ACTIVE_STANDBY_MULTI_AZ" + + data_replication_mode = %[3]q + data_replication_primary_broker_arn = aws_mq_broker.primary.arn + + logs { + general = true + } + + user { + username = "Test" + password = "TestTest1234" + } + user { + username = "Test-ReplicationUser" + password = "TestTest1234" + replication_user = true + } +} +`, rName, version, dataReplicationMode)) +} diff --git a/internal/service/mq/exports_test.go b/internal/service/mq/exports_test.go index 515e63f19c5..6d71465acf5 100644 --- a/internal/service/mq/exports_test.go +++ b/internal/service/mq/exports_test.go @@ -10,4 +10,7 @@ var ( FindBrokerByID = findBrokerByID FindConfigurationByID = findConfigurationByID + + WaitBrokerRebooted = waitBrokerRebooted + WaitBrokerDeleted = waitBrokerDeleted ) diff --git a/website/docs/r/mq_broker.html.markdown b/website/docs/r/mq_broker.html.markdown index 03b5ed54451..11a47b658f6 100644 --- a/website/docs/r/mq_broker.html.markdown +++ b/website/docs/r/mq_broker.html.markdown @@ -69,6 +69,58 @@ resource "aws_mq_broker" "example" { } ``` +### Cross-Region Data Replication + +```terraform +resource "aws_mq_broker" "example_primary" { + # primary broker configured in an alternate region + provider = awsalternate + + apply_immediately = true + broker_name = "example_primary" + engine_type = "ActiveMQ" + engine_version = "5.17.6" + host_instance_type = "mq.m5.large" + security_groups = [aws_security_group.example_primary.id] + deployment_mode = "ACTIVE_STANDBY_MULTI_AZ" + + user { + username = "ExampleUser" + password = "MindTheGap" + } + user { + username = "ExampleReplicationUser" + password = "Example12345" + replication_user = true + } +} + +resource "aws_mq_broker" "example" { + apply_immediately = true + broker_name = "example" + engine_type = "ActiveMQ" + engine_version = "5.17.6" + host_instance_type = "mq.m5.large" + security_groups = [aws_security_group.example.id] + deployment_mode = "ACTIVE_STANDBY_MULTI_AZ" + + data_replication_mode = "CRDR" + data_replication_primary_broker_arn = aws_mq_broker.primary.arn + + user { + username = "ExampleUser" + password = "MindTheGap" + } + user { + username = "ExampleReplicationUser" + password = "Example12345" + replication_user = true + } +} +``` + +See the [AWS MQ documentation](https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/crdr-for-active-mq.html) on cross-region data replication for additional details. + ## Argument Reference The following arguments are required: @@ -85,6 +137,8 @@ The following arguments are optional: * `authentication_strategy` - (Optional) Authentication strategy used to secure the broker. Valid values are `simple` and `ldap`. `ldap` is not supported for `engine_type` `RabbitMQ`. * `auto_minor_version_upgrade` - (Optional) Whether to automatically upgrade to new minor versions of brokers as Amazon MQ makes releases available. * `configuration` - (Optional) Configuration block for broker configuration. Applies to `engine_type` of `ActiveMQ` and `RabbitMQ` only. Detailed below. +* `data_replication_mode` - (Optional) Defines whether this broker is a part of a data replication pair. Valid values are `CRDR` and `NONE`. +* `data_replication_primary_broker_arn` - (Optional) The Amazon Resource Name (ARN) of the primary broker that is used to replicate data from in a data replication pair, and is applied to the replica broker. Must be set when `data_replication_mode` is `CRDR`. * `deployment_mode` - (Optional) Deployment mode of the broker. Valid values are `SINGLE_INSTANCE`, `ACTIVE_STANDBY_MULTI_AZ`, and `CLUSTER_MULTI_AZ`. Default is `SINGLE_INSTANCE`. * `encryption_options` - (Optional) Configuration block containing encryption options. Detailed below. * `ldap_server_metadata` - (Optional) Configuration block for the LDAP server used to authenticate and authorize connections to the broker. Not supported for `engine_type` `RabbitMQ`. Detailed below. (Currently, AWS may not process changes to LDAP server metadata.) @@ -169,6 +223,7 @@ This resource exports the following attributes in addition to the arguments abov * `wss://broker-id.mq.us-west-2.amazonaws.com:61619` * For `RabbitMQ`: * `amqps://broker-id.mq.us-west-2.amazonaws.com:5671` +* `pending_data_replication_mode` - (Optional) The data replication mode that will be applied after reboot. * `tags_all` - A map of tags assigned to the resource, including those inherited from the provider [`default_tags` configuration block](https://registry.terraform.io/providers/hashicorp/aws/latest/docs#default_tags-configuration-block). ## Timeouts