Skip to content

Commit

Permalink
Merge pull request hashicorp#17296 from hashicorp/elasticache-replica…
Browse files Browse the repository at this point in the history
…tion-group-scaling

resource/aws_elasticache_replication_group: Simplify scaling replicas in non-cluster mode
  • Loading branch information
gdavison authored Jan 27, 2021
2 parents 3eb2033 + dc3dc02 commit 5800943
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 167 deletions.
53 changes: 53 additions & 0 deletions aws/internal/service/elasticache/finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,28 @@ func ReplicationGroupByID(conn *elasticache.ElastiCache, id string) (*elasticach
return result.ReplicationGroups[0], nil
}

// ReplicationGroupMemberClustersByID retrieves all of an ElastiCache Replication Group's MemberClusters by the id of the Replication Group.
func ReplicationGroupMemberClustersByID(conn *elasticache.ElastiCache, id string) ([]*elasticache.CacheCluster, error) {
var results []*elasticache.CacheCluster

rg, err := ReplicationGroupByID(conn, id)
if err != nil {
return results, err
}

clusters, err := CacheClustersByID(conn, aws.StringValueSlice(rg.MemberClusters))
if err != nil {
return clusters, err
}
if len(clusters) == 0 {
return clusters, &resource.NotFoundError{
Message: "No Member Clusters found",
}
}

return clusters, nil
}

// CacheClusterByID retrieves an ElastiCache Cache Cluster by id.
func CacheClusterByID(conn *elasticache.ElastiCache, id string) (*elasticache.CacheCluster, error) {
input := &elasticache.DescribeCacheClustersInput{
Expand Down Expand Up @@ -72,3 +94,34 @@ func CacheCluster(conn *elasticache.ElastiCache, input *elasticache.DescribeCach

return result.CacheClusters[0], nil
}

// CacheClustersByID retrieves a list of ElastiCache Cache Clusters by id.
// Order of the clusters is not guaranteed.
func CacheClustersByID(conn *elasticache.ElastiCache, idList []string) ([]*elasticache.CacheCluster, error) {
var results []*elasticache.CacheCluster
ids := make(map[string]bool)
for _, v := range idList {
ids[v] = true
}

input := &elasticache.DescribeCacheClustersInput{}
err := conn.DescribeCacheClustersPages(input, func(page *elasticache.DescribeCacheClustersOutput, _ bool) bool {
if page == nil || page.CacheClusters == nil || len(page.CacheClusters) == 0 {
return true
}

for _, v := range page.CacheClusters {
if ids[aws.StringValue(v.CacheClusterId)] {
results = append(results, v)
delete(ids, aws.StringValue(v.CacheClusterId))
if len(ids) == 0 {
break
}
}
}

return len(ids) != 0
})

return results, err
}
24 changes: 24 additions & 0 deletions aws/internal/service/elasticache/waiter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,30 @@ func ReplicationGroupStatus(conn *elasticache.ElastiCache, replicationGroupID st
}
}

// ReplicationGroupMemberClustersStatus fetches the ReplicationGroup's Member Clusters and either "available" or the first non-"available" status.
// NOTE: This function assumes that the intended end-state is to have all member clusters in "available" status.
func ReplicationGroupMemberClustersStatus(conn *elasticache.ElastiCache, replicationGroupID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
clusters, err := finder.ReplicationGroupMemberClustersByID(conn, replicationGroupID)
if tfresource.NotFound(err) {
return nil, "", nil
}
if err != nil {
return nil, "", err
}

status := CacheClusterStatusAvailable
for _, v := range clusters {
clusterStatus := aws.StringValue(v.CacheClusterStatus)
if clusterStatus != CacheClusterStatusAvailable {
status = clusterStatus
break
}
}
return clusters, status, nil
}
}

const (
CacheClusterStatusAvailable = "available"
CacheClusterStatusCreating = "creating"
Expand Down
22 changes: 22 additions & 0 deletions aws/internal/service/elasticache/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@ func ReplicationGroupDeleted(conn *elasticache.ElastiCache, replicationGroupID s
return nil, err
}

// ReplicationGroupMemberClustersAvailable waits for all of a ReplicationGroup's Member Clusters to return Available
func ReplicationGroupMemberClustersAvailable(conn *elasticache.ElastiCache, replicationGroupID string, timeout time.Duration) ([]*elasticache.CacheCluster, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{
CacheClusterStatusCreating,
CacheClusterStatusDeleting,
CacheClusterStatusModifying,
},
Target: []string{CacheClusterStatusAvailable},
Refresh: ReplicationGroupMemberClustersStatus(conn, replicationGroupID),
Timeout: timeout,
MinTimeout: cacheClusterAvailableMinTimeout,
Delay: cacheClusterAvailableDelay,
}

outputRaw, err := stateConf.WaitForState()
if v, ok := outputRaw.([]*elasticache.CacheCluster); ok {
return v, err
}
return nil, err
}

const (
CacheClusterCreatedTimeout = 40 * time.Minute
CacheClusterUpdatedTimeout = 80 * time.Minute
Expand Down
140 changes: 25 additions & 115 deletions aws/resource_aws_elasticache_replication_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,136 +756,46 @@ func elasticacheReplicationGroupModifyNumCacheClusters(conn *elasticache.ElastiC

var err error
if newNumberCacheClusters > oldNumberCacheClusters {
err = elasticacheReplicationGroupIncreaseNumCacheClusters(conn, d.Id(), oldNumberCacheClusters, newNumberCacheClusters, d.Timeout(schema.TimeoutUpdate))
} else {
err = elasticacheReplicationGroupReduceNumCacheClusters(conn, d.Id(), oldNumberCacheClusters, newNumberCacheClusters, d.Timeout(schema.TimeoutUpdate), d)
err = elasticacheReplicationGroupIncreaseNumCacheClusters(conn, d.Id(), newNumberCacheClusters, d.Timeout(schema.TimeoutUpdate))
} else if newNumberCacheClusters < oldNumberCacheClusters {
err = elasticacheReplicationGroupDecreaseNumCacheClusters(conn, d.Id(), newNumberCacheClusters, d.Timeout(schema.TimeoutUpdate))
}
return err
}

func elasticacheReplicationGroupIncreaseNumCacheClusters(conn *elasticache.ElastiCache, replicationGroupID string, o, n int, timeout time.Duration) error {
var addClusterIDs []string
for clusterID := o + 1; clusterID <= n; clusterID++ {
addClusterIDs = append(addClusterIDs, formatReplicationGroupClusterID(replicationGroupID, clusterID))
func elasticacheReplicationGroupIncreaseNumCacheClusters(conn *elasticache.ElastiCache, replicationGroupID string, newNumberCacheClusters int, timeout time.Duration) error {
input := &elasticache.IncreaseReplicaCountInput{
ApplyImmediately: aws.Bool(true),
NewReplicaCount: aws.Int64(int64(newNumberCacheClusters - 1)),
ReplicationGroupId: aws.String(replicationGroupID),
}

// Kick off all the Cache Cluster creations
for _, cacheClusterID := range addClusterIDs {
input := &elasticache.CreateCacheClusterInput{
CacheClusterId: aws.String(cacheClusterID),
ReplicationGroupId: aws.String(replicationGroupID),
}
_, err := createElasticacheCacheCluster(conn, input)
if err != nil {
// Future enhancement: we could retry creation with random ID on naming collision
// if isAWSErr(err, elasticache.ErrCodeCacheClusterAlreadyExistsFault, "") { ... }
return fmt.Errorf("error creating ElastiCache Cache Cluster (adding replica): %w", err)
}
_, err := conn.IncreaseReplicaCount(input)
if err != nil {
return fmt.Errorf("error adding ElastiCache Replication Group (%s) replicas: %w", replicationGroupID, err)
}

// Wait for all Cache Cluster creations
for _, cacheClusterID := range addClusterIDs {
_, err := waiter.CacheClusterAvailable(conn, cacheClusterID, timeout)
if err != nil {
return fmt.Errorf("error waiting for ElastiCache Cache Cluster (%s) to be created (adding replica): %w", cacheClusterID, err)
}
_, err = waiter.ReplicationGroupMemberClustersAvailable(conn, replicationGroupID, timeout)
if err != nil {
return fmt.Errorf("error waiting for ElastiCache Replication Group (%s) replica addition: %w", replicationGroupID, err)
}

return nil
}

func elasticacheReplicationGroupReduceNumCacheClusters(conn *elasticache.ElastiCache, replicationGroupID string, o, n int, timeout time.Duration, d *schema.ResourceData) error {
var removeClusterIDs []string
for clusterID := o; clusterID >= (n + 1); clusterID-- {
removeClusterIDs = append(removeClusterIDs, formatReplicationGroupClusterID(replicationGroupID, clusterID))
}

// Cannot reassign primary cluster ID while automatic failover is enabled
// If we temporarily disable automatic failover, ensure we re-enable it
reEnableAutomaticFailover := false

// Kick off all the Cache Cluster deletions
for _, cacheClusterID := range removeClusterIDs {
var finalSnapshotID = d.Get("final_snapshot_identifier").(string)
err := deleteElasticacheCacheCluster(conn, cacheClusterID, finalSnapshotID)
if err != nil {
// Future enhancement: we could retry deletion with random existing ID on missing name
// if isAWSErr(err, elasticache.ErrCodeCacheClusterNotFoundFault, "") { ... }
if !isAWSErr(err, elasticache.ErrCodeInvalidCacheClusterStateFault, "serving as primary") {
return fmt.Errorf("error deleting ElastiCache Cache Cluster (%s) (removing replica): %w", cacheClusterID, err)
}

// Use Replication Group MemberClusters to find a new primary cache cluster ID
// that is not in removeClusterIDs
newPrimaryClusterID := ""

rg, err := finder.ReplicationGroupByID(conn, replicationGroupID)
if err != nil {
return fmt.Errorf("error reading ElastiCache Replication Group (%s) to determine new primary: %w", replicationGroupID, err)
}

for _, memberClusterPtr := range rg.MemberClusters {
memberCluster := aws.StringValue(memberClusterPtr)
memberClusterInRemoveClusterIDs := false
for _, removeClusterID := range removeClusterIDs {
if memberCluster == removeClusterID {
memberClusterInRemoveClusterIDs = true
break
}
}
if !memberClusterInRemoveClusterIDs {
newPrimaryClusterID = memberCluster
break
}
}
if newPrimaryClusterID == "" {
return fmt.Errorf("error reading ElastiCache Replication Group (%s) to determine new primary: unable to assign new primary", replicationGroupID)
}

// Disable automatic failover if enabled
// Must be applied previous to trying to set new primary
// InvalidReplicationGroupState: Cannot manually promote a new master cache cluster while autofailover is enabled
if aws.StringValue(rg.AutomaticFailover) == elasticache.AutomaticFailoverStatusEnabled {
// Be kind and rewind
if d.Get("automatic_failover_enabled").(bool) {
reEnableAutomaticFailover = true
}

err = resourceAwsElasticacheReplicationGroupDisableAutomaticFailover(conn, replicationGroupID, timeout)
if err != nil {
return fmt.Errorf("error disabling Elasticache Replication Group (%s) automatic failover: %w", replicationGroupID, err)
}
}

// Set new primary
err = resourceAwsElasticacheReplicationGroupSetPrimaryClusterID(conn, replicationGroupID, newPrimaryClusterID, timeout)
if err != nil {
return fmt.Errorf("error changing Elasticache Replication Group (%s) primary cluster: %w", replicationGroupID, err)
}

// Finally retry deleting the cache cluster
var finalSnapshotID = d.Get("final_snapshot_identifier").(string)
err = deleteElasticacheCacheCluster(conn, cacheClusterID, finalSnapshotID)
if err != nil {
return fmt.Errorf("error deleting ElastiCache Cache Cluster (%s) (removing replica after setting new primary): %w", cacheClusterID, err)
}
}
func elasticacheReplicationGroupDecreaseNumCacheClusters(conn *elasticache.ElastiCache, replicationGroupID string, newNumberCacheClusters int, timeout time.Duration) error {
input := &elasticache.DecreaseReplicaCountInput{
ApplyImmediately: aws.Bool(true),
NewReplicaCount: aws.Int64(int64(newNumberCacheClusters - 1)),
ReplicationGroupId: aws.String(replicationGroupID),
}

// Wait for all Cache Cluster deletions
for _, cacheClusterID := range removeClusterIDs {
_, err := waiter.CacheClusterDeleted(conn, cacheClusterID, timeout)
if err != nil {
return fmt.Errorf("error waiting for ElastiCache Cache Cluster (%s) to be deleted (removing replica): %w", cacheClusterID, err)
}
_, err := conn.DecreaseReplicaCount(input)
if err != nil {
return fmt.Errorf("error removing ElastiCache Replication Group (%s) replicas: %w", replicationGroupID, err)
}

// Re-enable automatic failover if we needed to temporarily disable it
if reEnableAutomaticFailover {
err := resourceAwsElasticacheReplicationGroupEnableAutomaticFailover(conn, replicationGroupID, timeout)
if err != nil {
return fmt.Errorf("error re-enabling Elasticache Replication Group (%s) automatic failover: %w", replicationGroupID, err)
}
_, err = waiter.ReplicationGroupMemberClustersAvailable(conn, replicationGroupID, timeout)
if err != nil {
return fmt.Errorf("error waiting for ElastiCache Replication Group (%s) replica removal: %w", replicationGroupID, err)
}

return nil
Expand Down
Loading

0 comments on commit 5800943

Please sign in to comment.