Skip to content

Commit

Permalink
r/aws_mq_broker: enable cross-region data replication (#35990)
Browse files Browse the repository at this point in the history
This change adds the `data_replication_mode` and `data_replication_primary_broker_arn` arguments, enabling support for cross-region data replication. See the AWS MQ documentation for configuration details.

https://docs.aws.amazon.com/amazon-mq/latest/developer-guide/crdr-for-active-mq.html

```console
% make testacc PKG=mq TESTS=TestAccMQBroker_dataReplicationMode
==> Checking that code complies with gofmt requirements...
TF_ACC=1 go test ./internal/service/mq/... -v -count 1 -parallel 20 -run='TestAccMQBroker_dataReplicationMode'  -timeout 360m

--- PASS: TestAccMQBroker_dataReplicationMode (511.53s)
PASS
ok      github.com/hashicorp/terraform-provider-aws/internal/service/mq 518.736s
```
  • Loading branch information
jar-b authored Feb 29, 2024
1 parent 7db5ffa commit ef6ffcc
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/35990.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_mq_broker: Add `data_replication_mode` and `data_replication_primary_broker_arn` arguments, enabling support for cross-region data replication
```
49 changes: 49 additions & 0 deletions internal/service/mq/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,28 @@ func resourceBroker() *schema.Resource {
},
},
},
"data_replication_mode": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateDiagFunc: enum.Validate[types.DataReplicationMode](),
DiffSuppressFunc: func(k, o, n string, d *schema.ResourceData) bool {
// Suppress differences when the configured data replication mode
// matches a non-empty, pending replication mode. This scenario
// can exist when the mode has been set, but the broker has not
// yet been rebooted.
if n != "" && n == d.Get("pending_data_replication_mode").(string) {
return true
}
return false
},
},
"data_replication_primary_broker_arn": {
Type: schema.TypeString,
Optional: true,
ForceNew: true, // Can only be set on Create
ValidateFunc: verify.ValidARN,
},
"deployment_mode": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -269,6 +291,10 @@ func resourceBroker() *schema.Resource {
},
},
},
"pending_data_replication_mode": {
Type: schema.TypeString,
Computed: true,
},
"publicly_accessible": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -392,6 +418,12 @@ func resourceBrokerCreate(ctx context.Context, d *schema.ResourceData, meta inte
if v, ok := d.GetOk("deployment_mode"); ok {
input.DeploymentMode = types.DeploymentMode(v.(string))
}
if v, ok := d.GetOk("data_replication_mode"); ok {
input.DataReplicationMode = types.DataReplicationMode(v.(string))
}
if v, ok := d.GetOk("data_replication_primary_broker_arn"); ok {
input.DataReplicationPrimaryBrokerArn = aws.String(v.(string))
}
if v, ok := d.GetOk("encryption_options"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.EncryptionOptions = expandEncryptionOptions(d.Get("encryption_options").([]interface{}))
}
Expand Down Expand Up @@ -451,11 +483,13 @@ func resourceBrokerRead(ctx context.Context, d *schema.ResourceData, meta interf
d.Set("authentication_strategy", output.AuthenticationStrategy)
d.Set("auto_minor_version_upgrade", output.AutoMinorVersionUpgrade)
d.Set("broker_name", output.BrokerName)
d.Set("data_replication_mode", output.DataReplicationMode)
d.Set("deployment_mode", output.DeploymentMode)
d.Set("engine_type", output.EngineType)
d.Set("engine_version", output.EngineVersion)
d.Set("host_instance_type", output.HostInstanceType)
d.Set("instances", flattenBrokerInstances(output.BrokerInstances))
d.Set("pending_data_replication_mode", output.PendingDataReplicationMode)
d.Set("publicly_accessible", output.PubliclyAccessible)
d.Set("security_groups", output.SecurityGroups)
d.Set("storage_type", output.StorageType)
Expand Down Expand Up @@ -600,6 +634,21 @@ func resourceBrokerUpdate(ctx context.Context, d *schema.ResourceData, meta inte
requiresReboot = true
}

if d.HasChange("data_replication_mode") {
input := &mq.UpdateBrokerInput{
BrokerId: aws.String(d.Id()),
DataReplicationMode: types.DataReplicationMode(d.Get("data_replication_mode").(string)),
}

_, err := conn.UpdateBroker(ctx, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "updating MQ Broker (%s) data replication mode: %s", d.Id(), err)
}

requiresReboot = true
}

if d.Get("apply_immediately").(bool) && requiresReboot {
_, err := conn.RebootBroker(ctx, &mq.RebootBrokerInput{
BrokerId: aws.String(d.Id()),
Expand Down
208 changes: 208 additions & 0 deletions internal/service/mq/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/YakDriver/regexache"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/mq"
"github.com/aws/aws-sdk-go-v2/service/mq/types"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -1369,6 +1371,72 @@ func TestAccMQBroker_ldap(t *testing.T) {
})
}

func TestAccMQBroker_dataReplicationMode(t *testing.T) {
ctx := acctest.Context(t)
if testing.Short() {
t.Skip("skipping long-running test in short mode")
}

var broker mq.DescribeBrokerOutput
var brokerAlternate mq.DescribeBrokerOutput
var providers []*schema.Provider
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_mq_broker.test"
primaryBrokerResourceName := "aws_mq_broker.primary"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acctest.PreCheck(ctx, t)
acctest.PreCheckMultipleRegion(t, 2)
acctest.PreCheckPartitionHasService(t, names.MQEndpointID)
testAccPreCheck(ctx, t)
},
ErrorCheck: acctest.ErrorCheck(t, names.MQServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5FactoriesPlusProvidersAlternate(ctx, t, &providers),
CheckDestroy: testAccCheckBrokerDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeCrdr)),
Check: resource.ComposeTestCheckFunc(
testAccCheckBrokerExists(ctx, resourceName, &broker),
testAccCheckBrokerExistsWithProvider(ctx, primaryBrokerResourceName, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers)),
resource.TestCheckResourceAttr(resourceName, "broker_name", rName),
resource.TestCheckResourceAttr(resourceName, "deployment_mode", string(types.DeploymentModeActiveStandbyMultiAz)),
// data_replication_mode is not returned until after reboot
resource.TestCheckResourceAttr(resourceName, "data_replication_mode", ""),
resource.TestCheckResourceAttr(resourceName, "pending_data_replication_mode", string(types.DataReplicationModeCrdr)),
resource.TestCheckResourceAttrPair(resourceName, "data_replication_primary_broker_arn", primaryBrokerResourceName, "arn"),
),
},
{
Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeCrdr)),
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"apply_immediately", "user", "data_replication_primary_broker_arn"},
},
{
// Preparation for destruction would require multiple configuration changes
// and applies to unpair brokers. Instead, complete the necessary update, reboot,
// and delete opreations on the primary cluster out-of-band to ensure remaining
// resources will be freed for clean up.
PreConfig: func() {
// In order to delete, replicated brokers must first be unpaired by setting
// data replication mode on the primary broker to "NONE".
testAccUnpairBrokerWithProvider(ctx, t, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers))
// The primary broker must be deleted before replica broker. The direct
// dependency in the Terraform configuration would cause this to happen
// in the opposite order, so delete the primary out of band instead.
testAccDeleteBrokerWithProvider(ctx, t, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers))
},
Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeNone)),
PlanOnly: true,
ExpectNonEmptyPlan: true,
},
},
})
}

func testAccCheckBrokerDestroy(ctx context.Context) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).MQClient(ctx)
Expand Down Expand Up @@ -1416,6 +1484,27 @@ func testAccCheckBrokerExists(ctx context.Context, n string, v *mq.DescribeBroke
}
}

func testAccCheckBrokerExistsWithProvider(ctx context.Context, n string, v *mq.DescribeBrokerOutput, providerF func() *schema.Provider) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx)

output, err := tfmq.FindBrokerByID(ctx, conn, rs.Primary.ID)

if err != nil {
return err
}

*v = *output

return nil
}
}

func testAccPreCheck(ctx context.Context, t *testing.T) {
conn := acctest.Provider.Meta().(*conns.AWSClient).MQClient(ctx)

Expand All @@ -1432,6 +1521,46 @@ func testAccPreCheck(ctx context.Context, t *testing.T) {
}
}

func testAccUnpairBrokerWithProvider(ctx context.Context, t *testing.T, broker *mq.DescribeBrokerOutput, providerF func() *schema.Provider) {
brokerID := aws.ToString(broker.BrokerId)
deadline := tfresource.NewDeadline(30 * time.Minute)
conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx)

_, err := conn.UpdateBroker(ctx, &mq.UpdateBrokerInput{
BrokerId: aws.String(brokerID),
DataReplicationMode: types.DataReplicationModeNone,
})
if err != nil {
t.Fatalf("updating broker (%s): %s", brokerID, err)
}

_, err = conn.RebootBroker(ctx, &mq.RebootBrokerInput{BrokerId: aws.String(brokerID)})
if err != nil {
t.Fatalf("rebooting broker (%s): %s", brokerID, err)
}

_, err = tfmq.WaitBrokerRebooted(ctx, conn, brokerID, deadline.Remaining())
if err != nil {
t.Fatalf("waiting for broker (%s) reboot: %s", brokerID, err)
}
}

func testAccDeleteBrokerWithProvider(ctx context.Context, t *testing.T, broker *mq.DescribeBrokerOutput, providerF func() *schema.Provider) {
brokerID := aws.ToString(broker.BrokerId)
deadline := tfresource.NewDeadline(30 * time.Minute)
conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx)

_, err := conn.DeleteBroker(ctx, &mq.DeleteBrokerInput{BrokerId: aws.String(brokerID)})
if err != nil {
t.Fatalf("deleting broker (%s): %s", brokerID, err)
}

_, err = tfmq.WaitBrokerDeleted(ctx, conn, brokerID, deadline.Remaining())
if err != nil {
t.Fatalf("waiting for broker (%s) deletion: %s", brokerID, err)
}
}

func testAccCheckBrokerNotRecreated(before, after *mq.DescribeBrokerOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
if before, after := aws.ToString(before.BrokerId), aws.ToString(after.BrokerId); before != after {
Expand Down Expand Up @@ -2231,3 +2360,82 @@ resource "aws_mq_broker" "test" {
}
`, rName, version, instanceType)
}

// testAccBrokerConfig_dataReplicationMode creates a primary and replica broker
// in different regions, linking the former using the data replication arguments
func testAccBrokerConfig_dataReplicationMode(rName, version, dataReplicationMode string) string {
return acctest.ConfigCompose(
acctest.ConfigMultipleRegionProvider(2),
fmt.Sprintf(`
resource "aws_security_group" "primary" {
provider = awsalternate
name = "%[1]s-primary"
tags = {
Name = "%[1]s-primary"
}
}
resource "aws_mq_broker" "primary" {
provider = awsalternate
apply_immediately = true
broker_name = "%[1]s-primary"
engine_type = "ActiveMQ"
engine_version = %[2]q
host_instance_type = "mq.m5.large"
security_groups = [aws_security_group.primary.id]
deployment_mode = "ACTIVE_STANDBY_MULTI_AZ"
logs {
general = true
}
user {
username = "Test"
password = "TestTest1234"
}
user {
username = "Test-ReplicationUser"
password = "TestTest1234"
replication_user = true
}
}
resource "aws_security_group" "test" {
name = %[1]q
tags = {
Name = %[1]q
}
}
resource "aws_mq_broker" "test" {
apply_immediately = true
broker_name = %[1]q
engine_type = "ActiveMQ"
engine_version = %[2]q
host_instance_type = "mq.m5.large"
security_groups = [aws_security_group.test.id]
deployment_mode = "ACTIVE_STANDBY_MULTI_AZ"
data_replication_mode = %[3]q
data_replication_primary_broker_arn = aws_mq_broker.primary.arn
logs {
general = true
}
user {
username = "Test"
password = "TestTest1234"
}
user {
username = "Test-ReplicationUser"
password = "TestTest1234"
replication_user = true
}
}
`, rName, version, dataReplicationMode))
}
3 changes: 3 additions & 0 deletions internal/service/mq/exports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ var (

FindBrokerByID = findBrokerByID
FindConfigurationByID = findConfigurationByID

WaitBrokerRebooted = waitBrokerRebooted
WaitBrokerDeleted = waitBrokerDeleted
)
Loading

0 comments on commit ef6ffcc

Please sign in to comment.