Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue streaming job update #23275

Merged
3 changes: 3 additions & 0 deletions .changelog/23275.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_glue_job: Add support for [streaming jobs](https://docs.aws.amazon.com/glue/latest/dg/add-job-streaming.html) by removing the default value for the `timeout` argument and marking it as Computed
```
17 changes: 12 additions & 5 deletions internal/service/glue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,10 @@ func ResourceJob() *schema.Resource {
"tags": tftags.TagsSchema(),
"tags_all": tftags.TagsSchemaComputed(),
"timeout": {
Type: schema.TypeInt,
Optional: true,
Default: 2880,
Type: schema.TypeInt,
Optional: true,
Computed: true,
ValidateFunc: validation.IntAtLeast(1),
ewbankkit marked this conversation as resolved.
Show resolved Hide resolved
},
"security_configuration": {
Type: schema.TypeString,
Expand Down Expand Up @@ -172,7 +173,10 @@ func resourceJobCreate(d *schema.ResourceData, meta interface{}) error {
Name: aws.String(name),
Role: aws.String(d.Get("role_arn").(string)),
Tags: Tags(tags.IgnoreAWS()),
Timeout: aws.Int64(int64(d.Get("timeout").(int))),
}

if v, ok := d.GetOk("timeout"); ok {
input.Timeout = aws.Int64(int64(v.(int)))
}

if v, ok := d.GetOk("max_capacity"); ok {
Expand Down Expand Up @@ -334,7 +338,10 @@ func resourceJobUpdate(d *schema.ResourceData, meta interface{}) error {
jobUpdate := &glue.JobUpdate{
Command: expandGlueJobCommand(d.Get("command").([]interface{})),
Role: aws.String(d.Get("role_arn").(string)),
Timeout: aws.Int64(int64(d.Get("timeout").(int))),
}

if v, ok := d.GetOk("timeout"); ok {
jobUpdate.Timeout = aws.Int64(int64(v.(int)))
}

if v, ok := d.GetOk("number_of_workers"); ok {
Expand Down
90 changes: 90 additions & 0 deletions internal/service/glue/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,43 @@ func TestAccGlueJob_basic(t *testing.T) {
})
}

func TestAccGlueJob_basicStreaming(t *testing.T) {
var job glue.Job

rName := fmt.Sprintf("tf-acc-test-%s", sdkacctest.RandString(5))
resourceName := "aws_glue_job.test"
roleResourceName := "aws_iam_role.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckJobDestroy,
Steps: []resource.TestStep{
{
Config: testAccJobConfig_RequiredStreaming(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckJobExists(resourceName, &job),
acctest.CheckResourceAttrRegionalARN(resourceName, "arn", "glue", fmt.Sprintf("job/%s", rName)),
resource.TestCheckResourceAttr(resourceName, "command.#", "1"),
resource.TestCheckResourceAttr(resourceName, "command.0.name", "gluestreaming"),
resource.TestCheckResourceAttr(resourceName, "command.0.script_location", "testscriptlocation"),
resource.TestCheckResourceAttr(resourceName, "default_arguments.%", "0"),
resource.TestCheckResourceAttr(resourceName, "non_overridable_arguments.%", "0"),
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttrPair(resourceName, "role_arn", roleResourceName, "arn"),
resource.TestCheckResourceAttr(resourceName, "tags.%", "0"),
resource.TestCheckResourceAttr(resourceName, "timeout", "0"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}
func TestAccGlueJob_command(t *testing.T) {
var job glue.Job

Expand Down Expand Up @@ -412,6 +449,40 @@ func TestAccGlueJob_tags(t *testing.T) {
})
}

func TestAccGlueJob_streamingTimeout(t *testing.T) {
var job glue.Job

rName := fmt.Sprintf("tf-acc-test-%s", sdkacctest.RandString(5))
resourceName := "aws_glue_job.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
Providers: acctest.Providers,
CheckDestroy: testAccCheckJobDestroy,
Steps: []resource.TestStep{
{
Config: testAccJobConfig_Timeout(rName, 1),
Check: resource.ComposeTestCheckFunc(
testAccCheckJobExists(resourceName, &job),
resource.TestCheckResourceAttr(resourceName, "timeout", "1"),
),
},
{
Config: testAccJobConfig_Timeout(rName, 2),
Check: resource.ComposeTestCheckFunc(
testAccCheckJobExists(resourceName, &job),
resource.TestCheckResourceAttr(resourceName, "timeout", "2"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}
func TestAccGlueJob_timeout(t *testing.T) {
var job glue.Job

Expand Down Expand Up @@ -943,6 +1014,25 @@ resource "aws_glue_job" "test" {
`, testAccJobConfig_Base(rName), rName)
}

func testAccJobConfig_RequiredStreaming(rName string) string {
return fmt.Sprintf(`
%s

resource "aws_glue_job" "test" {
max_capacity = 10
name = "%s"
role_arn = aws_iam_role.test.arn

command {
name = "gluestreaming"
script_location = "testscriptlocation"
}

depends_on = [aws_iam_role_policy_attachment.test]
}
`, testAccJobConfig_Base(rName), rName)
}

func testAccJobTags1Config(rName, tagKey1, tagValue1 string) string {
return testAccJobConfig_Base(rName) + fmt.Sprintf(`
resource "aws_glue_job" "test" {
Expand Down
18 changes: 16 additions & 2 deletions website/docs/r/glue_job.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ resource "aws_glue_job" "example" {
}
```

### Streaming Job

```terraform
resource "aws_glue_job" "example" {
name = "example streaming job"
role_arn = aws_iam_role.example.arn

command {
name = "gluestreaming"
script_location = "s3://${aws_s3_bucket.example.bucket}/example.script"
}
}
```

### Enabling CloudWatch Logs and Metrics

```terraform
Expand Down Expand Up @@ -82,14 +96,14 @@ The following arguments are supported:
* `notification_property` - (Optional) Notification property of the job. Defined below.
* `role_arn` – (Required) The ARN of the IAM role associated with this job.
* `tags` - (Optional) Key-value map of resource tags. If configured with a provider [`default_tags` configuration block](/docs/providers/aws/index.html#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level.
* `timeout` – (Optional) The job timeout in minutes. The default is 2880 minutes (48 hours).
* `timeout` – (Optional) The job timeout in minutes. The default is 2880 minutes (48 hours) for `glueetl` and `pythonshell` jobs, and null (unlimted) for `gluestreaming` jobs.
* `security_configuration` - (Optional) The name of the Security Configuration to be associated with the job.
* `worker_type` - (Optional) The type of predefined worker that is allocated when a job runs. Accepts a value of Standard, G.1X, or G.2X.
* `number_of_workers` - (Optional) The number of workers of a defined workerType that are allocated when a job runs.

### command Argument Reference

* `name` - (Optional) The name of the job command. Defaults to `glueetl`. Use `pythonshell` for Python Shell Job Type, `max_capacity` needs to be set if `pythonshell` is chosen.
* `name` - (Optional) The name of the job command. Defaults to `glueetl`. Use `pythonshell` for Python Shell Job Type, or `gluestreaming` for Streaming Job Type. `max_capacity` needs to be set if `pythonshell` is chosen.
* `script_location` - (Required) Specifies the S3 path to a script that executes a job.
* `python_version` - (Optional) The Python version being used to execute a Python shell job. Allowed values are 2 or 3.

Expand Down