Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/aws_mq_broker: enable cross-region data replication #35990

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/35990.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_mq_broker: Add `data_replication_mode` and `data_replication_primary_broker_arn` arguments, enabling support for cross-region data replication
```
49 changes: 49 additions & 0 deletions internal/service/mq/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,28 @@ func resourceBroker() *schema.Resource {
},
},
},
"data_replication_mode": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateDiagFunc: enum.Validate[types.DataReplicationMode](),
DiffSuppressFunc: func(k, o, n string, d *schema.ResourceData) bool {
// Suppress differences when the configured data replication mode
// matches a non-empty, pending replication mode. This scenario
// can exist when the mode has been set, but the broker has not
// yet been rebooted.
if n != "" && n == d.Get("pending_data_replication_mode").(string) {
return true
}
return false
},
},
"data_replication_primary_broker_arn": {
Type: schema.TypeString,
Optional: true,
ForceNew: true, // Can only be set on Create
ValidateFunc: verify.ValidARN,
},
"deployment_mode": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -269,6 +291,10 @@ func resourceBroker() *schema.Resource {
},
},
},
"pending_data_replication_mode": {
Type: schema.TypeString,
Computed: true,
},
"publicly_accessible": {
Type: schema.TypeBool,
Optional: true,
Expand Down Expand Up @@ -392,6 +418,12 @@ func resourceBrokerCreate(ctx context.Context, d *schema.ResourceData, meta inte
if v, ok := d.GetOk("deployment_mode"); ok {
input.DeploymentMode = types.DeploymentMode(v.(string))
}
if v, ok := d.GetOk("data_replication_mode"); ok {
input.DataReplicationMode = types.DataReplicationMode(v.(string))
}
if v, ok := d.GetOk("data_replication_primary_broker_arn"); ok {
input.DataReplicationPrimaryBrokerArn = aws.String(v.(string))
}
if v, ok := d.GetOk("encryption_options"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.EncryptionOptions = expandEncryptionOptions(d.Get("encryption_options").([]interface{}))
}
Expand Down Expand Up @@ -451,11 +483,13 @@ func resourceBrokerRead(ctx context.Context, d *schema.ResourceData, meta interf
d.Set("authentication_strategy", output.AuthenticationStrategy)
d.Set("auto_minor_version_upgrade", output.AutoMinorVersionUpgrade)
d.Set("broker_name", output.BrokerName)
d.Set("data_replication_mode", output.DataReplicationMode)
d.Set("deployment_mode", output.DeploymentMode)
d.Set("engine_type", output.EngineType)
d.Set("engine_version", output.EngineVersion)
d.Set("host_instance_type", output.HostInstanceType)
d.Set("instances", flattenBrokerInstances(output.BrokerInstances))
d.Set("pending_data_replication_mode", output.PendingDataReplicationMode)
d.Set("publicly_accessible", output.PubliclyAccessible)
d.Set("security_groups", output.SecurityGroups)
d.Set("storage_type", output.StorageType)
Expand Down Expand Up @@ -600,6 +634,21 @@ func resourceBrokerUpdate(ctx context.Context, d *schema.ResourceData, meta inte
requiresReboot = true
}

if d.HasChange("data_replication_mode") {
input := &mq.UpdateBrokerInput{
BrokerId: aws.String(d.Id()),
DataReplicationMode: types.DataReplicationMode(d.Get("data_replication_mode").(string)),
}

_, err := conn.UpdateBroker(ctx, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "updating MQ Broker (%s) data replication mode: %s", d.Id(), err)
}

requiresReboot = true
}

if d.Get("apply_immediately").(bool) && requiresReboot {
_, err := conn.RebootBroker(ctx, &mq.RebootBrokerInput{
BrokerId: aws.String(d.Id()),
Expand Down
208 changes: 208 additions & 0 deletions internal/service/mq/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/YakDriver/regexache"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/mq"
"github.com/aws/aws-sdk-go-v2/service/mq/types"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand Down Expand Up @@ -1369,6 +1371,72 @@ func TestAccMQBroker_ldap(t *testing.T) {
})
}

func TestAccMQBroker_dataReplicationMode(t *testing.T) {
ctx := acctest.Context(t)
if testing.Short() {
t.Skip("skipping long-running test in short mode")
}

var broker mq.DescribeBrokerOutput
var brokerAlternate mq.DescribeBrokerOutput
var providers []*schema.Provider
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_mq_broker.test"
primaryBrokerResourceName := "aws_mq_broker.primary"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acctest.PreCheck(ctx, t)
acctest.PreCheckMultipleRegion(t, 2)
acctest.PreCheckPartitionHasService(t, names.MQEndpointID)
testAccPreCheck(ctx, t)
},
ErrorCheck: acctest.ErrorCheck(t, names.MQServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5FactoriesPlusProvidersAlternate(ctx, t, &providers),
CheckDestroy: testAccCheckBrokerDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeCrdr)),
Check: resource.ComposeTestCheckFunc(
testAccCheckBrokerExists(ctx, resourceName, &broker),
testAccCheckBrokerExistsWithProvider(ctx, primaryBrokerResourceName, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers)),
resource.TestCheckResourceAttr(resourceName, "broker_name", rName),
resource.TestCheckResourceAttr(resourceName, "deployment_mode", string(types.DeploymentModeActiveStandbyMultiAz)),
// data_replication_mode is not returned until after reboot
resource.TestCheckResourceAttr(resourceName, "data_replication_mode", ""),
resource.TestCheckResourceAttr(resourceName, "pending_data_replication_mode", string(types.DataReplicationModeCrdr)),
resource.TestCheckResourceAttrPair(resourceName, "data_replication_primary_broker_arn", primaryBrokerResourceName, "arn"),
),
},
{
Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeCrdr)),
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"apply_immediately", "user", "data_replication_primary_broker_arn"},
},
{
// Preparation for destruction would require multiple configuration changes
// and applies to unpair brokers. Instead, complete the necessary update, reboot,
// and delete opreations on the primary cluster out-of-band to ensure remaining
// resources will be freed for clean up.
PreConfig: func() {
// In order to delete, replicated brokers must first be unpaired by setting
// data replication mode on the primary broker to "NONE".
testAccUnpairBrokerWithProvider(ctx, t, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers))
// The primary broker must be deleted before replica broker. The direct
// dependency in the Terraform configuration would cause this to happen
// in the opposite order, so delete the primary out of band instead.
testAccDeleteBrokerWithProvider(ctx, t, &brokerAlternate, acctest.RegionProviderFunc(acctest.AlternateRegion(), &providers))
},
Config: testAccBrokerConfig_dataReplicationMode(rName, testAccBrokerVersionNewer, string(types.DataReplicationModeNone)),
PlanOnly: true,
ExpectNonEmptyPlan: true,
},
},
})
}

func testAccCheckBrokerDestroy(ctx context.Context) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).MQClient(ctx)
Expand Down Expand Up @@ -1416,6 +1484,27 @@ func testAccCheckBrokerExists(ctx context.Context, n string, v *mq.DescribeBroke
}
}

func testAccCheckBrokerExistsWithProvider(ctx context.Context, n string, v *mq.DescribeBrokerOutput, providerF func() *schema.Provider) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx)

output, err := tfmq.FindBrokerByID(ctx, conn, rs.Primary.ID)

if err != nil {
return err
}

*v = *output

return nil
}
}

func testAccPreCheck(ctx context.Context, t *testing.T) {
conn := acctest.Provider.Meta().(*conns.AWSClient).MQClient(ctx)

Expand All @@ -1432,6 +1521,46 @@ func testAccPreCheck(ctx context.Context, t *testing.T) {
}
}

func testAccUnpairBrokerWithProvider(ctx context.Context, t *testing.T, broker *mq.DescribeBrokerOutput, providerF func() *schema.Provider) {
brokerID := aws.ToString(broker.BrokerId)
deadline := tfresource.NewDeadline(30 * time.Minute)
conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx)

_, err := conn.UpdateBroker(ctx, &mq.UpdateBrokerInput{
BrokerId: aws.String(brokerID),
DataReplicationMode: types.DataReplicationModeNone,
})
if err != nil {
t.Fatalf("updating broker (%s): %s", brokerID, err)
}

_, err = conn.RebootBroker(ctx, &mq.RebootBrokerInput{BrokerId: aws.String(brokerID)})
if err != nil {
t.Fatalf("rebooting broker (%s): %s", brokerID, err)
}

_, err = tfmq.WaitBrokerRebooted(ctx, conn, brokerID, deadline.Remaining())
if err != nil {
t.Fatalf("waiting for broker (%s) reboot: %s", brokerID, err)
}
}

func testAccDeleteBrokerWithProvider(ctx context.Context, t *testing.T, broker *mq.DescribeBrokerOutput, providerF func() *schema.Provider) {
brokerID := aws.ToString(broker.BrokerId)
deadline := tfresource.NewDeadline(30 * time.Minute)
conn := providerF().Meta().(*conns.AWSClient).MQClient(ctx)

_, err := conn.DeleteBroker(ctx, &mq.DeleteBrokerInput{BrokerId: aws.String(brokerID)})
if err != nil {
t.Fatalf("deleting broker (%s): %s", brokerID, err)
}

_, err = tfmq.WaitBrokerDeleted(ctx, conn, brokerID, deadline.Remaining())
if err != nil {
t.Fatalf("waiting for broker (%s) deletion: %s", brokerID, err)
}
}

func testAccCheckBrokerNotRecreated(before, after *mq.DescribeBrokerOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
if before, after := aws.ToString(before.BrokerId), aws.ToString(after.BrokerId); before != after {
Expand Down Expand Up @@ -2231,3 +2360,82 @@ resource "aws_mq_broker" "test" {
}
`, rName, version, instanceType)
}

// testAccBrokerConfig_dataReplicationMode creates a primary and replica broker
// in different regions, linking the former using the data replication arguments
func testAccBrokerConfig_dataReplicationMode(rName, version, dataReplicationMode string) string {
return acctest.ConfigCompose(
acctest.ConfigMultipleRegionProvider(2),
fmt.Sprintf(`
resource "aws_security_group" "primary" {
provider = awsalternate

name = "%[1]s-primary"

tags = {
Name = "%[1]s-primary"
}
}

resource "aws_mq_broker" "primary" {
provider = awsalternate

apply_immediately = true
broker_name = "%[1]s-primary"
engine_type = "ActiveMQ"
engine_version = %[2]q
host_instance_type = "mq.m5.large"
security_groups = [aws_security_group.primary.id]
deployment_mode = "ACTIVE_STANDBY_MULTI_AZ"

logs {
general = true
}

user {
username = "Test"
password = "TestTest1234"
}
user {
username = "Test-ReplicationUser"
password = "TestTest1234"
replication_user = true
}
}

resource "aws_security_group" "test" {
name = %[1]q

tags = {
Name = %[1]q
}
}

resource "aws_mq_broker" "test" {
apply_immediately = true
broker_name = %[1]q
engine_type = "ActiveMQ"
engine_version = %[2]q
host_instance_type = "mq.m5.large"
security_groups = [aws_security_group.test.id]
deployment_mode = "ACTIVE_STANDBY_MULTI_AZ"

data_replication_mode = %[3]q
data_replication_primary_broker_arn = aws_mq_broker.primary.arn

logs {
general = true
}

user {
username = "Test"
password = "TestTest1234"
}
user {
username = "Test-ReplicationUser"
password = "TestTest1234"
replication_user = true
}
}
`, rName, version, dataReplicationMode))
}
3 changes: 3 additions & 0 deletions internal/service/mq/exports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ var (

FindBrokerByID = findBrokerByID
FindConfigurationByID = findConfigurationByID

WaitBrokerRebooted = waitBrokerRebooted
WaitBrokerDeleted = waitBrokerDeleted
)
Loading
Loading