Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-1146 part b, add resource for emr_instance_fleet #14813

Merged
merged 54 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
71a9b3d
issue-1146 add support for instance fleet in EMR
c4po Aug 20, 2020
fe50822
remove nested schema definition
c4po Aug 21, 2020
18bbe33
verify return value
c4po Aug 21, 2020
94a7b2d
fix emr version in acc test
c4po Aug 21, 2020
1011f8a
acctest
c4po Aug 21, 2020
0064827
remove emr instance fleet resource
c4po Aug 22, 2020
4bccedc
fix acc test
c4po Aug 23, 2020
6d3b53f
fix ebs_config in instance_type_configs
c4po Aug 23, 2020
7569aae
add allocation_strategy in launch_specifications
c4po Aug 24, 2020
6af15b6
address comments
c4po Aug 26, 2020
53b40bd
address PR comments
c4po Aug 27, 2020
c087eae
change variable name
c4po Aug 27, 2020
8483246
add document for emr fleet launch specification
c4po Aug 27, 2020
da861dd
change on_demand_specification and spot_specification to option
c4po Aug 27, 2020
b48de20
issue-1146 part b, add resource for emr_instance_fleet
c4po Aug 24, 2020
e139435
rebase issue-1146-a
c4po Aug 27, 2020
2981c3e
fix test case for emr_instance_fleet
c4po Aug 27, 2020
6a678c8
fix test
c4po Aug 27, 2020
261e290
test
c4po Aug 27, 2020
97ab742
fix
c4po Aug 28, 2020
044e440
issue-1146 add support for instance fleet in EMR
c4po Aug 20, 2020
a4901f3
remove nested schema definition
c4po Aug 21, 2020
ccf6959
verify return value
c4po Aug 21, 2020
bdce68e
fix emr version in acc test
c4po Aug 21, 2020
baa19a6
acctest
c4po Aug 21, 2020
d8b683d
remove emr instance fleet resource
c4po Aug 22, 2020
3289282
fix acc test
c4po Aug 23, 2020
ea9f80b
fix ebs_config in instance_type_configs
c4po Aug 23, 2020
f6e5806
add allocation_strategy in launch_specifications
c4po Aug 24, 2020
f0b68ce
address comments
c4po Aug 26, 2020
64a4778
address PR comments
c4po Aug 27, 2020
d517781
change variable name
c4po Aug 27, 2020
d689a9e
add document for emr fleet launch specification
c4po Aug 27, 2020
e6e3da1
change on_demand_specification and spot_specification to option
c4po Aug 27, 2020
2cb4a12
issue with spotSpecification.AllocationStrategy and onDemandSpecifica…
c4po Aug 28, 2020
d4ea552
fix iam role in test case
c4po Aug 28, 2020
435b748
fix merge conflict
c4po Aug 29, 2020
69f3a8c
fix doc
c4po Aug 29, 2020
5afe90e
fix document
c4po Aug 29, 2020
5ee1946
merge
c4po Aug 30, 2020
bab327c
document
c4po Aug 30, 2020
142bdcc
fmt
c4po Aug 30, 2020
91f0ada
merge conflict
c4po Aug 31, 2020
816c26f
address comments
c4po Aug 31, 2020
bc6ad83
document
c4po Aug 31, 2020
78fedeb
document
c4po Aug 31, 2020
2600d10
fix
c4po Aug 31, 2020
f89ead5
change to task_instance_fleet and fix test case
c4po Aug 31, 2020
5b71e2f
change the document for task_instance_fleet
c4po Aug 31, 2020
0ada461
remove task_instance_fleet block
c4po Aug 31, 2020
e1ab908
fix acc test
c4po Aug 31, 2020
bec49bc
address comments
c4po Sep 1, 2020
f2edb03
fix
c4po Sep 1, 2020
12ebdcb
add TestAccAWSEMRInstanceFleet_disappears
c4po Sep 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ func Provider() *schema.Provider {
"aws_elb_attachment": resourceAwsElbAttachment(),
"aws_emr_cluster": resourceAwsEMRCluster(),
"aws_emr_instance_group": resourceAwsEMRInstanceGroup(),
"aws_emr_instance_fleet": resourceAwsEMRInstanceFleet(),
"aws_emr_managed_scaling_policy": resourceAwsEMRManagedScalingPolicy(),
"aws_emr_security_configuration": resourceAwsEMRSecurityConfiguration(),
"aws_flow_log": resourceAwsFlowLog(),
Expand Down
369 changes: 369 additions & 0 deletions aws/resource_aws_emr_instance_fleet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,369 @@
package aws

import (
"fmt"
"log"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/emr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
)

func resourceAwsEMRInstanceFleet() *schema.Resource {
return &schema.Resource{
c4po marked this conversation as resolved.
Show resolved Hide resolved
Create: resourceAwsEMRInstanceFleetCreate,
Read: resourceAwsEMRInstanceFleetRead,
Update: resourceAwsEMRInstanceFleetUpdate,
Delete: resourceAwsEMRInstanceFleetDelete,
Importer: &schema.ResourceImporter{
c4po marked this conversation as resolved.
Show resolved Hide resolved
State: func(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
idParts := strings.Split(d.Id(), "/")
if len(idParts) != 2 || idParts[0] == "" || idParts[1] == "" {
return nil, fmt.Errorf("Unexpected format of ID (%q), expected cluster-id/fleet-id", d.Id())
}
clusterID := idParts[0]
resourceID := idParts[1]
d.Set("cluster_id", clusterID)
d.SetId(resourceID)
return []*schema.ResourceData{d}, nil
},
},
Schema: map[string]*schema.Schema{
"cluster_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"instance_type_configs": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bid_price": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"bid_price_as_percentage_of_on_demand_price": {
Type: schema.TypeFloat,
Optional: true,
ForceNew: true,
Default: 100,
},
"configurations": {
Type: schema.TypeSet,
Optional: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"classification": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"properties": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
Elem: schema.TypeString,
},
},
},
},
"ebs_config": {
Type: schema.TypeSet,
Optional: true,
Computed: true,
ForceNew: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"iops": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
},
"size": {
Type: schema.TypeInt,
Required: true,
ForceNew: true,
},
"type": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validateAwsEmrEbsVolumeType(),
},
"volumes_per_instance": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
Default: 1,
},
},
},
Set: resourceAwsEMRClusterEBSConfigHash,
},
"instance_type": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"weighted_capacity": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
Default: 1,
},
},
},
Set: resourceAwsEMRInstanceTypeConfigHash,
},
"launch_specifications": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"on_demand_specification": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MinItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"allocation_strategy": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice(emr.OnDemandProvisioningAllocationStrategy_Values(), false),
},
},
},
},
"spot_specification": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MinItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"allocation_strategy": {
Type: schema.TypeString,
ForceNew: true,
Required: true,
ValidateFunc: validation.StringInSlice(emr.SpotProvisioningAllocationStrategy_Values(), false),
},
"block_duration_minutes": {
Type: schema.TypeInt,
Optional: true,
ForceNew: true,
Default: 0,
},
"timeout_action": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice(emr.SpotProvisioningTimeoutAction_Values(), false),
},
"timeout_duration_minutes": {
Type: schema.TypeInt,
ForceNew: true,
Required: true,
},
},
},
},
},
},
},
"name": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
"target_on_demand_capacity": {
Type: schema.TypeInt,
Optional: true,
Default: 0,
},
"target_spot_capacity": {
Type: schema.TypeInt,
Optional: true,
Default: 0,
},
"provisioned_on_demand_capacity": {
Type: schema.TypeInt,
Computed: true,
},
"provisioned_spot_capacity": {
Type: schema.TypeInt,
Computed: true,
},
},
}
}

func resourceAwsEMRInstanceFleetCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn

addInstanceFleetInput := &emr.AddInstanceFleetInput{
ClusterId: aws.String(d.Get("cluster_id").(string)),
}

taskFleet := map[string]interface{}{
"name": d.Get("name"),
"target_on_demand_capacity": d.Get("target_on_demand_capacity"),
"target_spot_capacity": d.Get("target_spot_capacity"),
"instance_type_configs": d.Get("instance_type_configs"),
"launch_specifications": d.Get("launch_specifications"),
}
addInstanceFleetInput.InstanceFleet = readInstanceFleetConfig(taskFleet, emr.InstanceFleetTypeTask)

log.Printf("[DEBUG] Creating EMR instance fleet params: %s", addInstanceFleetInput)
resp, err := conn.AddInstanceFleet(addInstanceFleetInput)
if err != nil {
return fmt.Errorf("error adding EMR Instance Fleet: %w", err)
}

log.Printf("[DEBUG] Created EMR instance fleet finished: %#v", resp)
if resp == nil {
return fmt.Errorf("error creating instance fleet: no instance fleet returned")
}
d.SetId(*resp.InstanceFleetId)

return nil
}

func resourceAwsEMRInstanceFleetRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn
instanceFleets, err := fetchAllEMRInstanceFleets(conn, d.Get("cluster_id").(string))
if err != nil {
log.Printf("[DEBUG] EMR doesn't have any Instance Fleet ")
d.SetId("")
return nil
}

fleet := findInstanceFleetById(instanceFleets, d.Id())
if fleet == nil {
log.Printf("[DEBUG] EMR Instance Fleet (%s) not found, removing", d.Id())
d.SetId("")
return nil
}

taskFleet := flattenInstanceFleet(fleet)[0].(map[string]interface{})
d.Set("name", taskFleet["name"])
d.Set("target_on_demand_capacity", taskFleet["target_on_demand_capacity"])
d.Set("target_spot_capacity", taskFleet["target_spot_capacity"])
d.Set("instance_type_configs", taskFleet["instance_type_configs"])
d.Set("launch_specifications", taskFleet["launch_specifications"])
d.Set("provisioned_on_demand_capacity", taskFleet["provisioned_on_demand_capacity"])
d.Set("provisioned_spot_capacity", taskFleet["provisioned_spot_capacity"])
c4po marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func findInstanceFleetById(instanceFleets []*emr.InstanceFleet, fleetId string) *emr.InstanceFleet {
for _, fleet := range instanceFleets {
if fleet != nil && aws.StringValue(fleet.Id) == fleetId {
return fleet
}
}
return nil
}

func resourceAwsEMRInstanceFleetUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).emrconn

log.Printf("[DEBUG] Modify EMR task fleet")

modifyConfig := &emr.InstanceFleetModifyConfig{
InstanceFleetId: aws.String(d.Id()),
}

if v, ok := d.GetOk("target_on_demand_capacity"); ok {
modifyConfig.TargetOnDemandCapacity = aws.Int64(int64(v.(int)))
} else {
modifyConfig.TargetOnDemandCapacity = aws.Int64(0)
}
if v, ok := d.GetOk("target_spot_capacity"); ok {
modifyConfig.TargetSpotCapacity = aws.Int64(int64(v.(int)))
} else {
modifyConfig.TargetSpotCapacity = aws.Int64(0)
}
c4po marked this conversation as resolved.
Show resolved Hide resolved

modifyInstanceFleetInput := &emr.ModifyInstanceFleetInput{
ClusterId: aws.String(d.Get("cluster_id").(string)),
InstanceFleet: modifyConfig,
}

_, err := conn.ModifyInstanceFleet(modifyInstanceFleetInput)
if err != nil {
return fmt.Errorf("error modifying EMR Instance Fleet (%s): %w", d.Id(), err)
}

stateConf := &resource.StateChangeConf{
Pending: []string{emr.InstanceFleetStateProvisioning, emr.InstanceFleetStateBootstrapping, emr.InstanceFleetStateResizing},
Target: []string{emr.InstanceFleetStateRunning},
Refresh: instanceFleetStateRefresh(conn, d.Get("cluster_id").(string), d.Id()),
Timeout: 75 * time.Minute,
Delay: 10 * time.Second,
MinTimeout: 30 * time.Second,
}

_, err = stateConf.WaitForState()
if err != nil {
return fmt.Errorf("error waiting for instance (%s) to terminate: %s", d.Id(), err)
}

return resourceAwsEMRInstanceFleetRead(d, meta)
}

func instanceFleetStateRefresh(conn *emr.EMR, clusterID, ifID string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {

instanceFleets, err := fetchAllEMRInstanceFleets(conn, clusterID)
if err != nil {
return nil, "Not Found", err
}

fleet := findInstanceFleetById(instanceFleets, ifID)
if fleet == nil {
return nil, "Not Found", err
}

if fleet.Status == nil || fleet.Status.State == nil {
log.Printf("[WARN] ERM Instance Fleet found, but without state")
return nil, "Undefined", fmt.Errorf("undefined EMR Cluster Instance Fleet state")
}

return fleet, *fleet.Status.State, nil
}
}

func resourceAwsEMRInstanceFleetDelete(d *schema.ResourceData, meta interface{}) error {
log.Printf("[WARN] AWS EMR Instance Fleet does not support DELETE; resizing cluster to zero before removing from state")
conn := meta.(*AWSClient).emrconn

clusterId := d.Get("cluster_id").(string)

modifyInstanceFleetInput := &emr.ModifyInstanceFleetInput{
ClusterId: aws.String(clusterId),
InstanceFleet: &emr.InstanceFleetModifyConfig{
InstanceFleetId: aws.String(d.Id()),
TargetOnDemandCapacity: aws.Int64(0),
TargetSpotCapacity: aws.Int64(0),
},
}

_, err := conn.ModifyInstanceFleet(modifyInstanceFleetInput)
if err != nil {
return fmt.Errorf("error deleting/modifying EMR Instance Fleet (%s): %w", d.Id(), err)
}

return nil
}
Loading