Skip to content

Commit

Permalink
Merge commit '6ab260e1e5ace4a802f78da379b2c3ab61094e2a' into HEAD
Browse files Browse the repository at this point in the history
# Conflicts:
#	internal/service/dms/replication_task_test.go
  • Loading branch information
ewbankkit committed Dec 18, 2023
2 parents bc2419c + 6ab260e commit fa60946
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .changelog/31917.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dms_replication_task: allow cdc_start_time parameter to use RFC3339 formatted date additionally to a UNIX timestamp.
```
29 changes: 20 additions & 9 deletions internal/service/dms/replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func ResourceReplicationTask() *schema.Resource {
ConflictsWith: []string{"cdc_start_time"},
},
"cdc_start_time": {
Type: schema.TypeString,
Optional: true,
// Requires a Unix timestamp in seconds. Example 1484346880
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidStringDateOrPositiveInt,
ConflictsWith: []string{"cdc_start_position"},
},
"migration_type": {
Expand Down Expand Up @@ -135,11 +135,19 @@ func resourceReplicationTaskCreate(ctx context.Context, d *schema.ResourceData,
}

if v, ok := d.GetOk("cdc_start_time"); ok {
seconds, err := strconv.ParseInt(v.(string), 10, 64)
// Check if input is RFC3339 date string or UNIX timestamp.
dateTime, err := time.Parse(time.RFC3339, v.(string))

if err != nil {
return sdkdiag.AppendErrorf(diags, "DMS create replication task. Invalid CDC Unix timestamp: %s", err)
// Not a valid RF3339 date, checking if this is a UNIX timestamp.
seconds, err := strconv.ParseInt(v.(string), 10, 64)
if err != nil {
return sdkdiag.AppendErrorf(diags, "DMS create replication task. Invalid Unix timestamp given for cdc_start_time parameter: %s", err)
}
request.CdcStartTime = aws.Time(time.Unix(seconds, 0))
} else {
request.CdcStartTime = aws.Time(dateTime)
}
request.CdcStartTime = aws.Time(time.Unix(seconds, 0))
}

if v, ok := d.GetOk("replication_task_settings"); ok {
Expand Down Expand Up @@ -224,11 +232,14 @@ func resourceReplicationTaskUpdate(ctx context.Context, d *schema.ResourceData,
}

if d.HasChange("cdc_start_time") {
seconds, err := strconv.ParseInt(d.Get("cdc_start_time").(string), 10, 64)
// Parse the RFC3339 date string into a time.Time object
dateTime, err := time.Parse(time.RFC3339, d.Get("cdc_start_time").(string))

if err != nil {
return sdkdiag.AppendErrorf(diags, "DMS update replication task. Invalid CRC Unix timestamp: %s", err)
return sdkdiag.AppendErrorf(diags, "DMS update replication task. Invalid cdc_start_time value: %s", err)
}
input.CdcStartTime = aws.Time(time.Unix(seconds, 0))

input.CdcStartTime = aws.Time(dateTime)
}

if d.HasChange("replication_task_settings") {
Expand Down
84 changes: 84 additions & 0 deletions internal/service/dms/replication_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ package dms_test
import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/YakDriver/regexache"
dms "github.com/aws/aws-sdk-go/service/databasemigrationservice"
Expand Down Expand Up @@ -284,6 +287,70 @@ func TestAccDMSReplicationTask_disappears(t *testing.T) {
})
}

func TestAccDMSReplicationTask_cdcStartTime_rfc3339_date(t *testing.T) {
ctx := acctest.Context(t)
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_dms_replication_task.test"

currentTime := time.Now().UTC()
rfc3339Time := currentTime.Format(time.RFC3339)
awsDmsExpectedOutput := strings.TrimRight(rfc3339Time, "Z") // AWS API drop "Z" part.

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckReplicationTaskDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccReplicationTaskConfig_cdcStartTime(rName, rfc3339Time),
Check: resource.ComposeTestCheckFunc(
testAccCheckReplicationTaskExists(ctx, resourceName),
resource.TestCheckResourceAttr(resourceName, "cdc_start_position", awsDmsExpectedOutput),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerifyIgnore: []string{"start_replication_task"},
},
},
})
}

func TestAccDMSReplicationTask_cdcStartTime_unix_timestamp(t *testing.T) {
ctx := acctest.Context(t)
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_dms_replication_task.test"

currentTime := time.Now().UTC()
rfc3339Time := currentTime.Format(time.RFC3339)
awsDmsExpectedOutput := strings.TrimRight(rfc3339Time, "Z") // AWS API drop "Z" part.
dateTime, _ := time.Parse(time.RFC3339, rfc3339Time)
unixDateTime := strconv.Itoa(int(dateTime.Unix()))

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckReplicationTaskDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccReplicationTaskConfig_cdcStartTime(rName, unixDateTime),
Check: resource.ComposeTestCheckFunc(
testAccCheckReplicationTaskExists(ctx, resourceName),
resource.TestCheckResourceAttr(resourceName, "cdc_start_position", awsDmsExpectedOutput),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerifyIgnore: []string{"start_replication_task"},
},
},
})
}

func testAccCheckReplicationTaskExists(ctx context.Context, n string) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
Expand Down Expand Up @@ -847,3 +914,20 @@ resource "aws_dms_replication_task" "test" {
}
`, rName))
}

func testAccReplicationTaskConfig_cdcStartTime(rName, cdcStartPosition string) string {
return acctest.ConfigCompose(
replicationTaskConfigBase(rName),
fmt.Sprintf(`
resource "aws_dms_replication_task" "test" {
cdc_start_time = %[1]q
migration_type = "cdc"
replication_instance_arn = aws_dms_replication_instance.test.replication_instance_arn
replication_task_id = %[2]q
replication_task_settings = "{\"BeforeImageSettings\":null,\"FailTaskWhenCleanTaskResourceFailed\":false,\"ChangeProcessingDdlHandlingPolicy\":{\"HandleSourceTableAltered\":true,\"HandleSourceTableDropped\":true,\"HandleSourceTableTruncated\":true},\"ChangeProcessingTuning\":{\"BatchApplyMemoryLimit\":500,\"BatchApplyPreserveTransaction\":true,\"BatchApplyTimeoutMax\":30,\"BatchApplyTimeoutMin\":1,\"BatchSplitSize\":0,\"CommitTimeout\":1,\"MemoryKeepTime\":60,\"MemoryLimitTotal\":1024,\"MinTransactionSize\":1000,\"StatementCacheSize\":50},\"CharacterSetSettings\":null,\"ControlTablesSettings\":{\"ControlSchema\":\"\",\"FullLoadExceptionTableEnabled\":false,\"HistoryTableEnabled\":false,\"HistoryTimeslotInMinutes\":5,\"StatusTableEnabled\":false,\"SuspendedTablesTableEnabled\":false},\"ErrorBehavior\":{\"ApplyErrorDeletePolicy\":\"IGNORE_RECORD\",\"ApplyErrorEscalationCount\":0,\"ApplyErrorEscalationPolicy\":\"LOG_ERROR\",\"ApplyErrorFailOnTruncationDdl\":false,\"ApplyErrorInsertPolicy\":\"LOG_ERROR\",\"ApplyErrorUpdatePolicy\":\"LOG_ERROR\",\"DataErrorEscalationCount\":0,\"DataErrorEscalationPolicy\":\"SUSPEND_TABLE\",\"DataErrorPolicy\":\"LOG_ERROR\",\"DataTruncationErrorPolicy\":\"LOG_ERROR\",\"EventErrorPolicy\":\"IGNORE\",\"FailOnNoTablesCaptured\":false,\"FailOnTransactionConsistencyBreached\":false,\"FullLoadIgnoreConflicts\":true,\"RecoverableErrorCount\":-1,\"RecoverableErrorInterval\":5,\"RecoverableErrorStopRetryAfterThrottlingMax\":false,\"RecoverableErrorThrottling\":true,\"RecoverableErrorThrottlingMax\":1800,\"TableErrorEscalationCount\":0,\"TableErrorEscalationPolicy\":\"STOP_TASK\",\"TableErrorPolicy\":\"SUSPEND_TABLE\"},\"FullLoadSettings\":{\"CommitRate\":10000,\"CreatePkAfterFullLoad\":false,\"MaxFullLoadSubTasks\":8,\"StopTaskCachedChangesApplied\":false,\"StopTaskCachedChangesNotApplied\":false,\"TargetTablePrepMode\":\"DROP_AND_CREATE\",\"TransactionConsistencyTimeout\":600},\"Logging\":{\"EnableLogging\":false,\"LogComponents\":[{\"Id\":\"TRANSFORMATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"IO\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_LOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"PERFORMANCE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SORTER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"REST_SERVER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"VALIDATOR_EXT\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_APPLY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TASK_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TABLES_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"METADATA_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_FACTORY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMON\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"ADDONS\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"DATA_STRUCTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMUNICATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_TRANSFER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]},\"LoopbackPreventionSettings\":null,\"PostProcessingRules\":null,\"StreamBufferSettings\":{\"CtrlStreamBufferSizeInMB\":5,\"StreamBufferCount\":3,\"StreamBufferSizeInMB\":8},\"TargetMetadata\":{\"BatchApplyEnabled\":false,\"FullLobMode\":false,\"InlineLobMaxSize\":0,\"LimitedSizeLobMode\":true,\"LoadMaxFileSize\":0,\"LobChunkSize\":0,\"LobMaxSize\":32,\"ParallelApplyBufferSize\":0,\"ParallelApplyQueuesPerThread\":0,\"ParallelApplyThreads\":0,\"ParallelLoadBufferSize\":0,\"ParallelLoadQueuesPerThread\":0,\"ParallelLoadThreads\":0,\"SupportLobs\":true,\"TargetSchema\":\"\",\"TaskRecoveryTableEnabled\":false},\"TTSettings\":{\"EnableTT\":false,\"TTRecordSettings\":null,\"TTS3Settings\":null}}"
source_endpoint_arn = aws_dms_endpoint.source.endpoint_arn
table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"1\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}"
target_endpoint_arn = aws_dms_endpoint.target.endpoint_arn
}
`, cdcStartPosition, rName))
}
10 changes: 7 additions & 3 deletions website/docs/r/dms_replication_task.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Provides a DMS (Data Migration Service) replication task resource. DMS replicati
```terraform
# Create a new replication task
resource "aws_dms_replication_task" "test" {
cdc_start_time = 1484346880
cdc_start_time = "1993-05-21T05:50:00Z"
migration_type = "full-load"
replication_instance_arn = aws_dms_replication_instance.test-dms-replication-instance-tf.replication_instance_arn
replication_task_id = "test-dms-replication-task-tf"
Expand All @@ -37,8 +37,12 @@ resource "aws_dms_replication_task" "test" {

This resource supports the following arguments:

* `cdc_start_position` - (Optional, Conflicts with `cdc_start_time`) Indicates when you want a change data capture (CDC) operation to start. The value can be in date, checkpoint, or LSN/SCN format depending on the source engine. For more information, see [Determining a CDC native start point](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Task.CDC.html#CHAP_Task.CDC.StartPoint.Native).
* `cdc_start_time` - (Optional, Conflicts with `cdc_start_position`) The Unix timestamp integer for the start of the Change Data Capture (CDC) operation.
* `cdc_start_position` - (Optional, Conflicts with `cdc_start_time`) Indicates when you want a change data capture (CDC)
operation to start. The value can be a RFC3339 formatted date, a checkpoint, or a LSN/SCN format depending on the
source engine. For more information,
see [Determining a CDC native start point](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Task.CDC.html#CHAP_Task.CDC.StartPoint.Native).
* `cdc_start_time` - (Optional, Conflicts with `cdc_start_position`) RFC3339 formatted date string or UNIX timestamp for
the start of the Change Data Capture (CDC) operation.
* `migration_type` - (Required) The migration type. Can be one of `full-load | cdc | full-load-and-cdc`.
* `replication_instance_arn` - (Required) The Amazon Resource Name (ARN) of the replication instance.
* `replication_task_id` - (Required) The replication task identifier.
Expand Down

0 comments on commit fa60946

Please sign in to comment.