From 91e3fd33df77d44c08821d69d03010030904af75 Mon Sep 17 00:00:00 2001 From: Angie Pinilla Date: Wed, 23 Feb 2022 01:11:34 -0500 Subject: [PATCH 1/2] r/dms_replication_task: set cdc_start_position to computed; refactor --- internal/service/dms/consts.go | 7 + internal/service/dms/find.go | 49 ++++ internal/service/dms/replication_task.go | 236 ++++++------------ internal/service/dms/replication_task_test.go | 100 +++++--- internal/service/dms/status.go | 16 ++ internal/service/dms/wait.go | 48 ++++ 6 files changed, 267 insertions(+), 189 deletions(-) diff --git a/internal/service/dms/consts.go b/internal/service/dms/consts.go index 72b9908c4356..0d9e5cd7933e 100644 --- a/internal/service/dms/consts.go +++ b/internal/service/dms/consts.go @@ -2,6 +2,13 @@ package dms const ( endpointStatusDeleting = "deleting" + + replicationTaskStatusCreating = "creating" + replicationTaskStatusDeleting = "deleting" + replicationTaskStatusFailed = "failed" + replicationTaskStatusModifying = "modifying" + replicationTaskStatusReady = "ready" + replicationTaskStatusStopped = "stopped" ) const ( diff --git a/internal/service/dms/find.go b/internal/service/dms/find.go index d907df20dacb..f34479a84c72 100644 --- a/internal/service/dms/find.go +++ b/internal/service/dms/find.go @@ -41,3 +41,52 @@ func FindEndpointByID(conn *dms.DatabaseMigrationService, id string) (*dms.Endpo return output.Endpoints[0], nil } + +func FindReplicationTaskByID(conn *dms.DatabaseMigrationService, id string) (*dms.ReplicationTask, error) { + input := &dms.DescribeReplicationTasksInput{ + Filters: []*dms.Filter{ + { + Name: aws.String("replication-task-id"), + Values: []*string{aws.String(id)}, // Must use d.Id() to work with import. + }, + }, + } + + var results []*dms.ReplicationTask + + err := conn.DescribeReplicationTasksPages(input, func(page *dms.DescribeReplicationTasksOutput, lastPage bool) bool { + if page == nil { + return !lastPage + } + + for _, task := range page.ReplicationTasks { + if task == nil { + continue + } + results = append(results, task) + } + + return !lastPage + }) + + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + if len(results) == 0 { + return nil, tfresource.NewEmptyResultError(input) + } + + if count := len(results); count > 1 { + return nil, tfresource.NewTooManyResultsError(count, input) + } + + return results[0], nil +} diff --git a/internal/service/dms/replication_task.go b/internal/service/dms/replication_task.go index c631b9245c14..1f4ee9d884cb 100644 --- a/internal/service/dms/replication_task.go +++ b/internal/service/dms/replication_task.go @@ -8,13 +8,13 @@ import ( "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/hashicorp/terraform-provider-aws/internal/conns" tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" "github.com/hashicorp/terraform-provider-aws/internal/verify" ) @@ -33,6 +33,7 @@ func ResourceReplicationTask() *schema.Resource { "cdc_start_position": { Type: schema.TypeString, Optional: true, + Computed: true, ConflictsWith: []string{"cdc_start_time"}, }, "cdc_start_time": { @@ -103,10 +104,12 @@ func resourceReplicationTaskCreate(d *schema.ResourceData, meta interface{}) err defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig tags := defaultTagsConfig.MergeTags(tftags.New(d.Get("tags").(map[string]interface{}))) + taskId := d.Get("replication_task_id").(string) + request := &dms.CreateReplicationTaskInput{ MigrationType: aws.String(d.Get("migration_type").(string)), ReplicationInstanceArn: aws.String(d.Get("replication_instance_arn").(string)), - ReplicationTaskIdentifier: aws.String(d.Get("replication_task_id").(string)), + ReplicationTaskIdentifier: aws.String(taskId), SourceEndpointArn: aws.String(d.Get("source_endpoint_arn").(string)), TableMappings: aws.String(d.Get("table_mappings").(string)), Tags: Tags(tags.IgnoreAWS()), @@ -133,25 +136,13 @@ func resourceReplicationTaskCreate(d *schema.ResourceData, meta interface{}) err _, err := conn.CreateReplicationTask(request) if err != nil { - return err + return fmt.Errorf("error creating DMS Replication Task (%s): %w", taskId, err) } - taskId := d.Get("replication_task_id").(string) d.SetId(taskId) - stateConf := &resource.StateChangeConf{ - Pending: []string{"creating"}, - Target: []string{"ready"}, - Refresh: resourceReplicationTaskStateRefreshFunc(d, meta), - Timeout: d.Timeout(schema.TimeoutCreate), - MinTimeout: 10 * time.Second, - Delay: 30 * time.Second, // Wait 30 secs before starting - } - - // Wait, catching any errors - _, err = stateConf.WaitForState() - if err != nil { - return err + if err := waitReplicationTaskReady(conn, d.Id(), d.Timeout(schema.TimeoutCreate)); err != nil { + return fmt.Errorf("error waiting for DMS Replication Task (%s) to become available: %w", d.Id(), err) } return resourceReplicationTaskRead(d, meta) @@ -162,28 +153,38 @@ func resourceReplicationTaskRead(d *schema.ResourceData, meta interface{}) error defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig - response, err := conn.DescribeReplicationTasks(&dms.DescribeReplicationTasksInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("replication-task-id"), - Values: []*string{aws.String(d.Id())}, // Must use d.Id() to work with import. - }, - }, - }) + task, err := FindReplicationTaskByID(conn, d.Id()) + + if !d.IsNewResource() && tfresource.NotFound(err) { + log.Printf("[WARN] DMS Replication Task (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + if err != nil { - if dmserr, ok := err.(awserr.Error); ok && dmserr.Code() == "ResourceNotFoundFault" { - log.Printf("[DEBUG] DMS Replication Task %q Not Found", d.Id()) - d.SetId("") - return nil - } - return err + return fmt.Errorf("error reading DMS Replication Task (%s): %w", d.Id(), err) + } + + if task == nil { + return fmt.Errorf("error reading DMS Replication Task (%s): empty output", d.Id()) } - err = resourceReplicationTaskSetState(d, response.ReplicationTasks[0]) + d.Set("cdc_start_position", task.CdcStartPosition) + d.Set("migration_type", task.MigrationType) + d.Set("replication_instance_arn", task.ReplicationInstanceArn) + d.Set("replication_task_arn", task.ReplicationTaskArn) + d.Set("replication_task_id", task.ReplicationTaskIdentifier) + d.Set("source_endpoint_arn", task.SourceEndpointArn) + d.Set("table_mappings", task.TableMappings) + d.Set("target_endpoint_arn", task.TargetEndpointArn) + + settings, err := dmsReplicationTaskRemoveReadOnlySettings(aws.StringValue(task.ReplicationTaskSettings)) if err != nil { return err } + d.Set("replication_task_settings", settings) + tags, err := ListTags(conn, d.Get("replication_task_arn").(string)) if err != nil { @@ -207,163 +208,86 @@ func resourceReplicationTaskRead(d *schema.ResourceData, meta interface{}) error func resourceReplicationTaskUpdate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*conns.AWSClient).DMSConn - request := &dms.ModifyReplicationTaskInput{ - ReplicationTaskArn: aws.String(d.Get("replication_task_arn").(string)), - } - hasChanges := false - - if d.HasChange("cdc_start_position") { - request.CdcStartPosition = aws.String(d.Get("cdc_start_position").(string)) - hasChanges = true - } - - if d.HasChange("cdc_start_time") { - seconds, err := strconv.ParseInt(d.Get("cdc_start_time").(string), 10, 64) - if err != nil { - return fmt.Errorf("DMS update replication task. Invalid CRC Unix timestamp: %s", err) + if d.HasChangesExcept("tags", "tags_all") { + input := &dms.ModifyReplicationTaskInput{ + ReplicationTaskArn: aws.String(d.Get("replication_task_arn").(string)), } - request.CdcStartTime = aws.Time(time.Unix(seconds, 0)) - hasChanges = true - } - if d.HasChange("migration_type") { - request.MigrationType = aws.String(d.Get("migration_type").(string)) - hasChanges = true - } + if d.HasChange("cdc_start_position") { + input.CdcStartPosition = aws.String(d.Get("cdc_start_position").(string)) + } - if d.HasChange("replication_task_settings") { - request.ReplicationTaskSettings = aws.String(d.Get("replication_task_settings").(string)) - hasChanges = true - } + if d.HasChange("cdc_start_time") { + seconds, err := strconv.ParseInt(d.Get("cdc_start_time").(string), 10, 64) + if err != nil { + return fmt.Errorf("DMS update replication task. Invalid CRC Unix timestamp: %s", err) + } + input.CdcStartTime = aws.Time(time.Unix(seconds, 0)) + } - if d.HasChange("table_mappings") { - request.TableMappings = aws.String(d.Get("table_mappings").(string)) - hasChanges = true - } + if d.HasChange("migration_type") { + input.MigrationType = aws.String(d.Get("migration_type").(string)) + } - if d.HasChange("tags_all") { - arn := d.Get("replication_task_arn").(string) - o, n := d.GetChange("tags_all") + if d.HasChange("replication_task_settings") { + input.ReplicationTaskSettings = aws.String(d.Get("replication_task_settings").(string)) + } - if err := UpdateTags(conn, arn, o, n); err != nil { - return fmt.Errorf("error updating DMS Replication Task (%s) tags: %s", arn, err) + if d.HasChange("table_mappings") { + input.TableMappings = aws.String(d.Get("table_mappings").(string)) } - } - if hasChanges { - log.Println("[DEBUG] DMS update replication task:", request) + log.Println("[DEBUG] DMS update replication task:", input) - _, err := conn.ModifyReplicationTask(request) + _, err := conn.ModifyReplicationTask(input) if err != nil { - return err + return fmt.Errorf("error updating DMS Replication Task (%s): %w", d.Id(), err) } - stateConf := &resource.StateChangeConf{ - Pending: []string{"modifying"}, - Target: []string{"ready", "stopped", "failed"}, - Refresh: resourceReplicationTaskStateRefreshFunc(d, meta), - Timeout: d.Timeout(schema.TimeoutCreate), - MinTimeout: 10 * time.Second, - Delay: 30 * time.Second, // Wait 30 secs before starting + if err := waitReplicationTaskModified(conn, d.Id(), d.Timeout(schema.TimeoutUpdate)); err != nil { + return fmt.Errorf("error waiting for DMS Replication Task (%s) update: %s", d.Id(), err) } + } - // Wait, catching any errors - _, err = stateConf.WaitForState() - if err != nil { - return err - } + if d.HasChange("tags_all") { + arn := d.Get("replication_task_arn").(string) + o, n := d.GetChange("tags_all") - return resourceReplicationTaskRead(d, meta) + if err := UpdateTags(conn, arn, o, n); err != nil { + return fmt.Errorf("error updating DMS Replication Task (%s) tags: %s", arn, err) + } } - return nil + return resourceReplicationTaskRead(d, meta) } func resourceReplicationTaskDelete(d *schema.ResourceData, meta interface{}) error { conn := meta.(*conns.AWSClient).DMSConn - request := &dms.DeleteReplicationTaskInput{ + input := &dms.DeleteReplicationTaskInput{ ReplicationTaskArn: aws.String(d.Get("replication_task_arn").(string)), } - log.Printf("[DEBUG] DMS delete replication task: %#v", request) + log.Printf("[DEBUG] DMS delete replication task: %#v", input) - _, err := conn.DeleteReplicationTask(request) - if err != nil { - if dmserr, ok := err.(awserr.Error); ok && dmserr.Code() == "ResourceNotFoundFault" { - log.Printf("[DEBUG] DMS Replication Task %q Not Found", d.Id()) - return nil - } - return err - } + _, err := conn.DeleteReplicationTask(input) - stateConf := &resource.StateChangeConf{ - Pending: []string{"deleting"}, - Target: []string{}, - Refresh: resourceReplicationTaskStateRefreshFunc(d, meta), - Timeout: d.Timeout(schema.TimeoutCreate), - MinTimeout: 10 * time.Second, - Delay: 30 * time.Second, // Wait 30 secs before starting + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil } - // Wait, catching any errors - _, err = stateConf.WaitForState() - - return err -} - -func resourceReplicationTaskSetState(d *schema.ResourceData, task *dms.ReplicationTask) error { - d.SetId(aws.StringValue(task.ReplicationTaskIdentifier)) - - d.Set("cdc_start_position", task.CdcStartPosition) - d.Set("migration_type", task.MigrationType) - d.Set("replication_instance_arn", task.ReplicationInstanceArn) - d.Set("replication_task_arn", task.ReplicationTaskArn) - d.Set("replication_task_id", task.ReplicationTaskIdentifier) - d.Set("source_endpoint_arn", task.SourceEndpointArn) - d.Set("table_mappings", task.TableMappings) - d.Set("target_endpoint_arn", task.TargetEndpointArn) - - settings, err := dmsReplicationTaskRemoveReadOnlySettings(*task.ReplicationTaskSettings) if err != nil { - return err + return fmt.Errorf("error deleting DMS Replication Task (%s): %w", d.Id(), err) } - d.Set("replication_task_settings", settings) - - return nil -} - -func resourceReplicationTaskStateRefreshFunc( - d *schema.ResourceData, meta interface{}) resource.StateRefreshFunc { - return func() (interface{}, string, error) { - conn := meta.(*conns.AWSClient).DMSConn - - v, err := conn.DescribeReplicationTasks(&dms.DescribeReplicationTasksInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("replication-task-id"), - Values: []*string{aws.String(d.Id())}, // Must use d.Id() to work with import. - }, - }, - }) - if err != nil { - if dmserr, ok := err.(awserr.Error); ok && dmserr.Code() == "ResourceNotFoundFault" { - return nil, "", nil - } - log.Printf("Error on retrieving DMS Replication Task when waiting: %s", err) - return nil, "", err - } - if v == nil { - return nil, "", nil - } - - if v.ReplicationTasks != nil { - log.Printf("[DEBUG] DMS Replication Task status for instance %s: %s", d.Id(), *v.ReplicationTasks[0].Status) + if err := waitReplicationTaskDeleted(conn, d.Id(), d.Timeout(schema.TimeoutDelete)); err != nil { + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil } - - return v, *v.ReplicationTasks[0].Status, nil + return fmt.Errorf("error waiting for DMS Replication Task (%s) to be deleted: %w", d.Id(), err) } + + return nil } func dmsReplicationTaskRemoveReadOnlySettings(settings string) (*string, error) { diff --git a/internal/service/dms/replication_task_test.go b/internal/service/dms/replication_task_test.go index b460421fab70..541bdec427a9 100644 --- a/internal/service/dms/replication_task_test.go +++ b/internal/service/dms/replication_task_test.go @@ -4,14 +4,14 @@ import ( "fmt" "testing" - "github.com/aws/aws-sdk-go/aws" dms "github.com/aws/aws-sdk-go/service/databasemigrationservice" - "github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr" sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" "github.com/hashicorp/terraform-provider-aws/internal/acctest" "github.com/hashicorp/terraform-provider-aws/internal/conns" + tfdms "github.com/hashicorp/terraform-provider-aws/internal/service/dms" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" ) func TestAccDMSReplicationTask_basic(t *testing.T) { @@ -31,12 +31,12 @@ func TestAccDMSReplicationTask_basic(t *testing.T) { PreCheck: func() { acctest.PreCheck(t) }, ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID), Providers: acctest.Providers, - CheckDestroy: dmsReplicationTaskDestroy, + CheckDestroy: testAccCheckReplicationTaskDestroy, Steps: []resource.TestStep{ { Config: dmsReplicationTaskConfig(rName, tags), Check: resource.ComposeTestCheckFunc( - checkDmsReplicationTaskExists(resourceName), + testAccCheckReplicationTaskExists(resourceName), resource.TestCheckResourceAttrSet(resourceName, "replication_task_arn"), ), }, @@ -53,14 +53,40 @@ func TestAccDMSReplicationTask_basic(t *testing.T) { { Config: dmsReplicationTaskConfig(rName, updatedTags), Check: resource.ComposeTestCheckFunc( - checkDmsReplicationTaskExists(resourceName), + testAccCheckReplicationTaskExists(resourceName), ), }, }, }) } -func checkDmsReplicationTaskExists(n string) resource.TestCheckFunc { +func TestAccDMSReplicationTask_cdcStartPosition(t *testing.T) { + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_dms_replication_task.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t) }, + ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckReplicationTaskDestroy, + Steps: []resource.TestStep{ + { + Config: dmsReplicationTaskConfig_CdcStartPosition(rName, "mysql-bin-changelog.000024:373"), + Check: resource.ComposeTestCheckFunc( + testAccCheckReplicationTaskExists(resourceName), + resource.TestCheckResourceAttr(resourceName, "cdc_start_position", "mysql-bin-changelog.000024:373"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccCheckReplicationTaskExists(n string) resource.TestCheckFunc { return func(s *terraform.State) error { rs, ok := s.RootModule().Resources[n] if !ok { @@ -72,60 +98,45 @@ func checkDmsReplicationTaskExists(n string) resource.TestCheckFunc { } conn := acctest.Provider.Meta().(*conns.AWSClient).DMSConn - resp, err := conn.DescribeReplicationTasks(&dms.DescribeReplicationTasksInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("replication-task-id"), - Values: []*string{aws.String(rs.Primary.ID)}, - }, - }, - }) + + _, err := tfdms.FindReplicationTaskByID(conn, rs.Primary.ID) if err != nil { return err } - if resp.ReplicationTasks == nil { - return fmt.Errorf("DMS replication task error: %v", err) - } return nil } } -func dmsReplicationTaskDestroy(s *terraform.State) error { +func testAccCheckReplicationTaskDestroy(s *terraform.State) error { for _, rs := range s.RootModule().Resources { if rs.Type != "aws_dms_replication_task" { continue } conn := acctest.Provider.Meta().(*conns.AWSClient).DMSConn - resp, err := conn.DescribeReplicationTasks(&dms.DescribeReplicationTasksInput{ - Filters: []*dms.Filter{ - { - Name: aws.String("replication-task-id"), - Values: []*string{aws.String(rs.Primary.ID)}, - }, - }, - }) - if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + _, err := tfdms.FindReplicationTaskByID(conn, rs.Primary.ID) + + if tfresource.NotFound(err) { continue } if err != nil { - return fmt.Errorf("error reading DMS Replication Task (%s): %w", rs.Primary.ID, err) + return err } - if resp != nil && len(resp.ReplicationTasks) > 0 { - return fmt.Errorf("DMS replication task still exists: %v", err) - } + return fmt.Errorf("DMS replication task (%s) still exists", rs.Primary.ID) } return nil } -func dmsReplicationTaskConfig(rName, tags string) string { - return acctest.ConfigCompose(acctest.ConfigAvailableAZsNoOptIn(), fmt.Sprintf(` +func dmsReplicationTaskConfigBase(rName string) string { + return acctest.ConfigCompose( + acctest.ConfigAvailableAZsNoOptIn(), + fmt.Sprintf(` data "aws_partition" "current" {} data "aws_region" "current" {} @@ -195,7 +206,13 @@ resource "aws_dms_replication_instance" "test" { publicly_accessible = false replication_subnet_group_id = aws_dms_replication_subnet_group.test.replication_subnet_group_id } +`, rName)) +} +func dmsReplicationTaskConfig(rName, tags string) string { + return acctest.ConfigCompose( + dmsReplicationTaskConfigBase(rName), + fmt.Sprintf(` resource "aws_dms_replication_task" "test" { migration_type = "full-load" replication_instance_arn = aws_dms_replication_instance.test.replication_instance_arn @@ -213,3 +230,20 @@ resource "aws_dms_replication_task" "test" { } `, rName, tags)) } + +func dmsReplicationTaskConfig_CdcStartPosition(rName, cdcStartPosition string) string { + return acctest.ConfigCompose( + dmsReplicationTaskConfigBase(rName), + fmt.Sprintf(` +resource "aws_dms_replication_task" "test" { + cdc_start_position = %[1]q + migration_type = "cdc" + replication_instance_arn = aws_dms_replication_instance.test.replication_instance_arn + replication_task_id = %[2]q + replication_task_settings = "{\"BeforeImageSettings\":null,\"FailTaskWhenCleanTaskResourceFailed\":false,\"ChangeProcessingDdlHandlingPolicy\":{\"HandleSourceTableAltered\":true,\"HandleSourceTableDropped\":true,\"HandleSourceTableTruncated\":true},\"ChangeProcessingTuning\":{\"BatchApplyMemoryLimit\":500,\"BatchApplyPreserveTransaction\":true,\"BatchApplyTimeoutMax\":30,\"BatchApplyTimeoutMin\":1,\"BatchSplitSize\":0,\"CommitTimeout\":1,\"MemoryKeepTime\":60,\"MemoryLimitTotal\":1024,\"MinTransactionSize\":1000,\"StatementCacheSize\":50},\"CharacterSetSettings\":null,\"ControlTablesSettings\":{\"ControlSchema\":\"\",\"FullLoadExceptionTableEnabled\":false,\"HistoryTableEnabled\":false,\"HistoryTimeslotInMinutes\":5,\"StatusTableEnabled\":false,\"SuspendedTablesTableEnabled\":false},\"ErrorBehavior\":{\"ApplyErrorDeletePolicy\":\"IGNORE_RECORD\",\"ApplyErrorEscalationCount\":0,\"ApplyErrorEscalationPolicy\":\"LOG_ERROR\",\"ApplyErrorFailOnTruncationDdl\":false,\"ApplyErrorInsertPolicy\":\"LOG_ERROR\",\"ApplyErrorUpdatePolicy\":\"LOG_ERROR\",\"DataErrorEscalationCount\":0,\"DataErrorEscalationPolicy\":\"SUSPEND_TABLE\",\"DataErrorPolicy\":\"LOG_ERROR\",\"DataTruncationErrorPolicy\":\"LOG_ERROR\",\"FailOnNoTablesCaptured\":false,\"FailOnTransactionConsistencyBreached\":false,\"FullLoadIgnoreConflicts\":true,\"RecoverableErrorCount\":-1,\"RecoverableErrorInterval\":5,\"RecoverableErrorStopRetryAfterThrottlingMax\":false,\"RecoverableErrorThrottling\":true,\"RecoverableErrorThrottlingMax\":1800,\"TableErrorEscalationCount\":0,\"TableErrorEscalationPolicy\":\"STOP_TASK\",\"TableErrorPolicy\":\"SUSPEND_TABLE\"},\"FullLoadSettings\":{\"CommitRate\":10000,\"CreatePkAfterFullLoad\":false,\"MaxFullLoadSubTasks\":8,\"StopTaskCachedChangesApplied\":false,\"StopTaskCachedChangesNotApplied\":false,\"TargetTablePrepMode\":\"DROP_AND_CREATE\",\"TransactionConsistencyTimeout\":600},\"Logging\":{\"EnableLogging\":false,\"LogComponents\":[{\"Id\":\"TRANSFORMATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"IO\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_LOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"PERFORMANCE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SORTER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"REST_SERVER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"VALIDATOR_EXT\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_APPLY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TASK_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TABLES_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"METADATA_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_FACTORY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMON\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"ADDONS\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"DATA_STRUCTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMUNICATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_TRANSFER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]},\"LoopbackPreventionSettings\":null,\"PostProcessingRules\":null,\"StreamBufferSettings\":{\"CtrlStreamBufferSizeInMB\":5,\"StreamBufferCount\":3,\"StreamBufferSizeInMB\":8},\"TargetMetadata\":{\"BatchApplyEnabled\":false,\"FullLobMode\":false,\"InlineLobMaxSize\":0,\"LimitedSizeLobMode\":true,\"LoadMaxFileSize\":0,\"LobChunkSize\":0,\"LobMaxSize\":32,\"ParallelApplyBufferSize\":0,\"ParallelApplyQueuesPerThread\":0,\"ParallelApplyThreads\":0,\"ParallelLoadBufferSize\":0,\"ParallelLoadQueuesPerThread\":0,\"ParallelLoadThreads\":0,\"SupportLobs\":true,\"TargetSchema\":\"\",\"TaskRecoveryTableEnabled\":false},\"TTSettings\":{\"EnableTT\":false,\"TTRecordSettings\":null,\"TTS3Settings\":null}}" + source_endpoint_arn = aws_dms_endpoint.source.endpoint_arn + table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"1\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}" + target_endpoint_arn = aws_dms_endpoint.target.endpoint_arn +} +`, cdcStartPosition, rName)) +} diff --git a/internal/service/dms/status.go b/internal/service/dms/status.go index d4b881e639f7..9a1d48525a5f 100644 --- a/internal/service/dms/status.go +++ b/internal/service/dms/status.go @@ -22,3 +22,19 @@ func statusEndpoint(conn *dms.DatabaseMigrationService, id string) resource.Stat return output, aws.StringValue(output.Status), nil } } + +func statusReplicationTask(conn *dms.DatabaseMigrationService, id string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + output, err := FindReplicationTaskByID(conn, id) + + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } + + return output, aws.StringValue(output.Status), nil + } +} diff --git a/internal/service/dms/wait.go b/internal/service/dms/wait.go index c53522e53999..3cbb941e6144 100644 --- a/internal/service/dms/wait.go +++ b/internal/service/dms/wait.go @@ -27,3 +27,51 @@ func waitEndpointDeleted(conn *dms.DatabaseMigrationService, id string) (*dms.En return nil, err } + +func waitReplicationTaskDeleted(conn *dms.DatabaseMigrationService, id string, timeout time.Duration) error { + stateConf := &resource.StateChangeConf{ + Pending: []string{replicationTaskStatusDeleting}, + Target: []string{}, + Refresh: statusReplicationTask(conn, id), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, // Wait 30 secs before starting + } + + // Wait, catching any errors + _, err := stateConf.WaitForState() + + return err +} + +func waitReplicationTaskModified(conn *dms.DatabaseMigrationService, id string, timeout time.Duration) error { + stateConf := &resource.StateChangeConf{ + Pending: []string{replicationTaskStatusModifying}, + Target: []string{replicationTaskStatusReady, replicationTaskStatusStopped, replicationTaskStatusFailed}, + Refresh: statusReplicationTask(conn, id), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, // Wait 30 secs before starting + } + + // Wait, catching any errors + _, err := stateConf.WaitForState() + + return err +} + +func waitReplicationTaskReady(conn *dms.DatabaseMigrationService, id string, timeout time.Duration) error { + stateConf := &resource.StateChangeConf{ + Pending: []string{replicationTaskStatusCreating}, + Target: []string{replicationTaskStatusReady}, + Refresh: statusReplicationTask(conn, id), + Timeout: timeout, + MinTimeout: 10 * time.Second, + Delay: 30 * time.Second, // Wait 30 secs before starting + } + + // Wait, catching any errors + _, err := stateConf.WaitForState() + + return err +} From e8eea9e8507e087460dd1252b21855bef761aefb Mon Sep 17 00:00:00 2001 From: Angie Pinilla Date: Wed, 23 Feb 2022 01:23:17 -0500 Subject: [PATCH 2/2] Update CHANGELOG for #23328 --- .changelog/23328.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/23328.txt diff --git a/.changelog/23328.txt b/.changelog/23328.txt new file mode 100644 index 000000000000..c491537d29ce --- /dev/null +++ b/.changelog/23328.txt @@ -0,0 +1,3 @@ +```release-note:bug +resource/aws_dms_replication_task: Allow `cdc_start_position` to be computed +``` \ No newline at end of file