Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/glue_crawler - add hudi target support #32898

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/32898.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_glue_crawler: Add `hudi_target` argument
```
84 changes: 83 additions & 1 deletion internal/service/glue/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func targets() []string {
return []string{"s3_target", "dynamodb_target", "mongodb_target", "jdbc_target", "catalog_target", "delta_target", "iceberg_target"}
return []string{"s3_target", "dynamodb_target", "mongodb_target", "jdbc_target", "catalog_target", "delta_target", "iceberg_target", "hudi_target"}
}

// @SDKResource("aws_glue_crawler", name="Crawler")
Expand Down Expand Up @@ -161,6 +161,35 @@ func ResourceCrawler() *schema.Resource {
},
},
},
"hudi_target": {
Type: schema.TypeList,
Optional: true,
MinItems: 1,
AtLeastOneOf: targets(),
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"connection_name": {
Type: schema.TypeString,
Optional: true,
},
"exclusions": {
Type: schema.TypeList,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"maximum_traversal_depth": {
Type: schema.TypeInt,
Required: true,
ValidateFunc: validation.IntBetween(1, 20),
},
"paths": {
Type: schema.TypeSet,
Required: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
},
},
},
"iceberg_target": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -520,6 +549,10 @@ func resourceCrawlerRead(ctx context.Context, d *schema.ResourceData, meta inter
return sdkdiag.AppendErrorf(diags, "setting delta_target: %s", err)
}

if err := d.Set("hudi_target", flattenHudiTargets(crawler.Targets.HudiTargets)); err != nil {
return sdkdiag.AppendErrorf(diags, "setting hudi_target: %s", err)
}

if err := d.Set("iceberg_target", flattenIcebergTargets(crawler.Targets.IcebergTargets)); err != nil {
return sdkdiag.AppendErrorf(diags, "setting iceberg_target: %s", err)
}
Expand Down Expand Up @@ -772,6 +805,10 @@ func expandCrawlerTargets(d *schema.ResourceData) *glue.CrawlerTargets {
crawlerTargets.DeltaTargets = expandDeltaTargets(v.([]interface{}))
}

if v, ok := d.GetOk("hudi_target"); ok {
crawlerTargets.HudiTargets = expandHudiTargets(v.([]interface{}))
}

if v, ok := d.GetOk("iceberg_target"); ok {
crawlerTargets.IcebergTargets = expandIcebergTargets(v.([]interface{}))
}
Expand Down Expand Up @@ -960,6 +997,36 @@ func expandDeltaTarget(cfg map[string]interface{}) *glue.DeltaTarget {
return target
}

func expandHudiTargets(targets []interface{}) []*glue.HudiTarget {
if len(targets) < 1 {
return []*glue.HudiTarget{}
}

perms := make([]*glue.HudiTarget, len(targets))
for i, rawCfg := range targets {
cfg := rawCfg.(map[string]interface{})
perms[i] = expandHudiTarget(cfg)
}
return perms
}

func expandHudiTarget(cfg map[string]interface{}) *glue.HudiTarget {
target := &glue.HudiTarget{
Paths: flex.ExpandStringSet(cfg["paths"].(*schema.Set)),
MaximumTraversalDepth: aws.Int64(int64(cfg["maximum_traversal_depth"].(int))),
}

if v, ok := cfg["exclusions"]; ok {
target.Exclusions = flex.ExpandStringList(v.([]interface{}))
}

if v, ok := cfg["connection_name"].(string); ok {
target.ConnectionName = aws.String(v)
}

return target
}

func expandIcebergTargets(targets []interface{}) []*glue.IcebergTarget {
if len(targets) < 1 {
return []*glue.IcebergTarget{}
Expand Down Expand Up @@ -1085,6 +1152,21 @@ func flattenDeltaTargets(deltaTargets []*glue.DeltaTarget) []map[string]interfac
return result
}

func flattenHudiTargets(hudiTargets []*glue.HudiTarget) []map[string]interface{} {
result := make([]map[string]interface{}, 0)

for _, hudiTarget := range hudiTargets {
attrs := make(map[string]interface{})
attrs["connection_name"] = aws.StringValue(hudiTarget.ConnectionName)
attrs["maximum_traversal_depth"] = aws.Int64Value(hudiTarget.MaximumTraversalDepth)
attrs["paths"] = flex.FlattenStringSet(hudiTarget.Paths)
attrs["exclusions"] = flex.FlattenStringList(hudiTarget.Exclusions)

result = append(result, attrs)
}
return result
}

func flattenIcebergTargets(icebergTargets []*glue.IcebergTarget) []map[string]interface{} {
result := make([]map[string]interface{}, 0)

Expand Down
99 changes: 99 additions & 0 deletions internal/service/glue/crawler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,51 @@ func TestAccGlueCrawler_deltaTarget(t *testing.T) {
})
}

func TestAccGlueCrawler_hudiTarget(t *testing.T) {
ctx := acctest.Context(t)
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_glue_crawler.test"

connectionUrl := fmt.Sprintf("mongodb://%s:27017/testdatabase", acctest.RandomDomainName())

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckCrawlerDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccCrawlerConfig_hudiTarget(rName, connectionUrl, "s3://table1", 1),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(ctx, resourceName, &crawler),
resource.TestCheckResourceAttr(resourceName, "hudi_target.#", "1"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.connection_name", rName),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.maximum_traversal_depth", "1"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.paths.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "hudi_target.0.paths.*", "s3://table1"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccCrawlerConfig_hudiTarget(rName, connectionUrl, "s3://table2", 2),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(ctx, resourceName, &crawler),
resource.TestCheckResourceAttr(resourceName, "hudi_target.#", "1"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.connection_name", rName),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.maximum_traversal_depth", "2"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.paths.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "hudi_target.0.paths.*", "s3://table2"),
),
},
},
})
}

func TestAccGlueCrawler_icebergTarget(t *testing.T) {
ctx := acctest.Context(t)
var crawler glue.Crawler
Expand Down Expand Up @@ -3075,6 +3120,60 @@ resource "aws_glue_crawler" "test" {
`, rName, connectionUrl, tableName, createNativeDeltaTable))
}

func testAccCrawlerConfig_hudiTarget(rName, connectionUrl, tableName string, depth int) string {
return acctest.ConfigCompose(testAccCrawlerConfig_base(rName), acctest.ConfigVPCWithSubnets(rName, 2), fmt.Sprintf(`
resource "aws_security_group" "test" {
name = %[1]q
vpc_id = aws_vpc.test.id

ingress {
from_port = 1
protocol = "tcp"
self = true
to_port = 65535
}

tags = {
Name = %[1]q
}
}

resource "aws_glue_catalog_database" "test" {
name = %[1]q
}

resource "aws_glue_connection" "test" {
connection_properties = {
JDBC_ENFORCE_SSL = false
}

connection_type = "NETWORK"

name = %[1]q

physical_connection_requirements {
availability_zone = aws_subnet.test[0].availability_zone
security_group_id_list = [aws_security_group.test.id]
subnet_id = aws_subnet.test[0].id
}
}

resource "aws_glue_crawler" "test" {
depends_on = [aws_iam_role_policy_attachment.test-AWSGlueServiceRole]

database_name = aws_glue_catalog_database.test.name
name = %[1]q
role = aws_iam_role.test.name

hudi_target {
connection_name = aws_glue_connection.test.name
paths = [%[3]q]
maximum_traversal_depth = %[4]d
}
}
`, rName, connectionUrl, tableName, depth))
}

func testAccCrawlerConfig_icebergTarget(rName, connectionUrl, tableName string, depth int) string {
return acctest.ConfigCompose(testAccCrawlerConfig_base(rName), acctest.ConfigVPCWithSubnets(rName, 2), fmt.Sprintf(`
resource "aws_security_group" "test" {
Expand Down
8 changes: 8 additions & 0 deletions website/docs/r/glue_crawler.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ This argument supports the following arguments:
* `jdbc_target` (Optional) List of nested JBDC target arguments. See [JDBC Target](#jdbc-target) below.
* `s3_target` (Optional) List nested Amazon S3 target arguments. See [S3 Target](#s3-target) below.
* `mongodb_target` (Optional) List nested MongoDB target arguments. See [MongoDB Target](#mongodb-target) below.
* `hudi_target` (Optional) List nested Hudi target arguments. See [Iceberg Target](#hudi-target) below.
* `iceberg_target` (Optional) List nested Iceberg target arguments. See [Iceberg Target](#iceberg-target) below.
* `schedule` (Optional) A cron expression used to specify the schedule. For more information, see [Time-Based Schedules for Jobs and Crawlers](https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html). For example, to run something every day at 12:15 UTC, you would specify: `cron(15 12 * * ? *)`.
* `schema_change_policy` (Optional) Policy for the crawler's update and deletion behavior. See [Schema Change Policy](#schema-change-policy) below.
Expand Down Expand Up @@ -193,6 +194,13 @@ This argument supports the following arguments:
* `path` - (Required) The path of the Amazon DocumentDB or MongoDB target (database/collection).
* `scan_all` - (Optional) Indicates whether to scan all the records, or to sample rows from the table. Scanning all the records can take a long time when the table is not a high throughput table. Default value is `true`.

### Hudi Target

* `connection_name` - (Optional) The name of the connection to use to connect to the Hudi target.
* `paths` - (Required) One or more Amazon S3 paths that contains Hudi metadata folders as s3://bucket/prefix.
* `exclusions` - (Optional) A list of glob patterns used to exclude from the crawl.
* `maximum_traversal_depth` - (Required) The maximum depth of Amazon S3 paths that the crawler can traverse to discover the Hudi metadata folder in your Amazon S3 path. Used to limit the crawler run time. Valid values are between `1` and `20`.

### Iceberg Target

* `connection_name` - (Optional) The name of the connection to use to connect to the Iceberg target.
Expand Down