Skip to content

Commit

Permalink
Merge pull request #32898 from DrFaust92/glue-crawler-hudi
Browse files Browse the repository at this point in the history
r/glue_crawler - add hudi target support
  • Loading branch information
ewbankkit authored Aug 8, 2023
2 parents 9dba0eb + 8bcce0e commit 5abea1a
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .changelog/32898.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_glue_crawler: Add `hudi_target` argument
```
84 changes: 83 additions & 1 deletion internal/service/glue/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

func targets() []string {
return []string{"s3_target", "dynamodb_target", "mongodb_target", "jdbc_target", "catalog_target", "delta_target", "iceberg_target"}
return []string{"s3_target", "dynamodb_target", "mongodb_target", "jdbc_target", "catalog_target", "delta_target", "iceberg_target", "hudi_target"}
}

// @SDKResource("aws_glue_crawler", name="Crawler")
Expand Down Expand Up @@ -161,6 +161,35 @@ func ResourceCrawler() *schema.Resource {
},
},
},
"hudi_target": {
Type: schema.TypeList,
Optional: true,
MinItems: 1,
AtLeastOneOf: targets(),
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"connection_name": {
Type: schema.TypeString,
Optional: true,
},
"exclusions": {
Type: schema.TypeList,
Optional: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"maximum_traversal_depth": {
Type: schema.TypeInt,
Required: true,
ValidateFunc: validation.IntBetween(1, 20),
},
"paths": {
Type: schema.TypeSet,
Required: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
},
},
},
"iceberg_target": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -520,6 +549,10 @@ func resourceCrawlerRead(ctx context.Context, d *schema.ResourceData, meta inter
return sdkdiag.AppendErrorf(diags, "setting delta_target: %s", err)
}

if err := d.Set("hudi_target", flattenHudiTargets(crawler.Targets.HudiTargets)); err != nil {
return sdkdiag.AppendErrorf(diags, "setting hudi_target: %s", err)
}

if err := d.Set("iceberg_target", flattenIcebergTargets(crawler.Targets.IcebergTargets)); err != nil {
return sdkdiag.AppendErrorf(diags, "setting iceberg_target: %s", err)
}
Expand Down Expand Up @@ -772,6 +805,10 @@ func expandCrawlerTargets(d *schema.ResourceData) *glue.CrawlerTargets {
crawlerTargets.DeltaTargets = expandDeltaTargets(v.([]interface{}))
}

if v, ok := d.GetOk("hudi_target"); ok {
crawlerTargets.HudiTargets = expandHudiTargets(v.([]interface{}))
}

if v, ok := d.GetOk("iceberg_target"); ok {
crawlerTargets.IcebergTargets = expandIcebergTargets(v.([]interface{}))
}
Expand Down Expand Up @@ -960,6 +997,36 @@ func expandDeltaTarget(cfg map[string]interface{}) *glue.DeltaTarget {
return target
}

func expandHudiTargets(targets []interface{}) []*glue.HudiTarget {
if len(targets) < 1 {
return []*glue.HudiTarget{}
}

perms := make([]*glue.HudiTarget, len(targets))
for i, rawCfg := range targets {
cfg := rawCfg.(map[string]interface{})
perms[i] = expandHudiTarget(cfg)
}
return perms
}

func expandHudiTarget(cfg map[string]interface{}) *glue.HudiTarget {
target := &glue.HudiTarget{
Paths: flex.ExpandStringSet(cfg["paths"].(*schema.Set)),
MaximumTraversalDepth: aws.Int64(int64(cfg["maximum_traversal_depth"].(int))),
}

if v, ok := cfg["exclusions"]; ok {
target.Exclusions = flex.ExpandStringList(v.([]interface{}))
}

if v, ok := cfg["connection_name"].(string); ok {
target.ConnectionName = aws.String(v)
}

return target
}

func expandIcebergTargets(targets []interface{}) []*glue.IcebergTarget {
if len(targets) < 1 {
return []*glue.IcebergTarget{}
Expand Down Expand Up @@ -1085,6 +1152,21 @@ func flattenDeltaTargets(deltaTargets []*glue.DeltaTarget) []map[string]interfac
return result
}

func flattenHudiTargets(hudiTargets []*glue.HudiTarget) []map[string]interface{} {
result := make([]map[string]interface{}, 0)

for _, hudiTarget := range hudiTargets {
attrs := make(map[string]interface{})
attrs["connection_name"] = aws.StringValue(hudiTarget.ConnectionName)
attrs["maximum_traversal_depth"] = aws.Int64Value(hudiTarget.MaximumTraversalDepth)
attrs["paths"] = flex.FlattenStringSet(hudiTarget.Paths)
attrs["exclusions"] = flex.FlattenStringList(hudiTarget.Exclusions)

result = append(result, attrs)
}
return result
}

func flattenIcebergTargets(icebergTargets []*glue.IcebergTarget) []map[string]interface{} {
result := make([]map[string]interface{}, 0)

Expand Down
99 changes: 99 additions & 0 deletions internal/service/glue/crawler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,51 @@ func TestAccGlueCrawler_deltaTarget(t *testing.T) {
})
}

func TestAccGlueCrawler_hudiTarget(t *testing.T) {
ctx := acctest.Context(t)
var crawler glue.Crawler
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_glue_crawler.test"

connectionUrl := fmt.Sprintf("mongodb://%s:27017/testdatabase", acctest.RandomDomainName())

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckCrawlerDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccCrawlerConfig_hudiTarget(rName, connectionUrl, "s3://table1", 1),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(ctx, resourceName, &crawler),
resource.TestCheckResourceAttr(resourceName, "hudi_target.#", "1"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.connection_name", rName),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.maximum_traversal_depth", "1"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.paths.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "hudi_target.0.paths.*", "s3://table1"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccCrawlerConfig_hudiTarget(rName, connectionUrl, "s3://table2", 2),
Check: resource.ComposeTestCheckFunc(
testAccCheckCrawlerExists(ctx, resourceName, &crawler),
resource.TestCheckResourceAttr(resourceName, "hudi_target.#", "1"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.connection_name", rName),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.maximum_traversal_depth", "2"),
resource.TestCheckResourceAttr(resourceName, "hudi_target.0.paths.#", "1"),
resource.TestCheckTypeSetElemAttr(resourceName, "hudi_target.0.paths.*", "s3://table2"),
),
},
},
})
}

func TestAccGlueCrawler_icebergTarget(t *testing.T) {
ctx := acctest.Context(t)
var crawler glue.Crawler
Expand Down Expand Up @@ -3075,6 +3120,60 @@ resource "aws_glue_crawler" "test" {
`, rName, connectionUrl, tableName, createNativeDeltaTable))
}

func testAccCrawlerConfig_hudiTarget(rName, connectionUrl, tableName string, depth int) string {
return acctest.ConfigCompose(testAccCrawlerConfig_base(rName), acctest.ConfigVPCWithSubnets(rName, 2), fmt.Sprintf(`
resource "aws_security_group" "test" {
name = %[1]q
vpc_id = aws_vpc.test.id
ingress {
from_port = 1
protocol = "tcp"
self = true
to_port = 65535
}
tags = {
Name = %[1]q
}
}
resource "aws_glue_catalog_database" "test" {
name = %[1]q
}
resource "aws_glue_connection" "test" {
connection_properties = {
JDBC_ENFORCE_SSL = false
}
connection_type = "NETWORK"
name = %[1]q
physical_connection_requirements {
availability_zone = aws_subnet.test[0].availability_zone
security_group_id_list = [aws_security_group.test.id]
subnet_id = aws_subnet.test[0].id
}
}
resource "aws_glue_crawler" "test" {
depends_on = [aws_iam_role_policy_attachment.test-AWSGlueServiceRole]
database_name = aws_glue_catalog_database.test.name
name = %[1]q
role = aws_iam_role.test.name
hudi_target {
connection_name = aws_glue_connection.test.name
paths = [%[3]q]
maximum_traversal_depth = %[4]d
}
}
`, rName, connectionUrl, tableName, depth))
}

func testAccCrawlerConfig_icebergTarget(rName, connectionUrl, tableName string, depth int) string {
return acctest.ConfigCompose(testAccCrawlerConfig_base(rName), acctest.ConfigVPCWithSubnets(rName, 2), fmt.Sprintf(`
resource "aws_security_group" "test" {
Expand Down
8 changes: 8 additions & 0 deletions website/docs/r/glue_crawler.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ This argument supports the following arguments:
* `jdbc_target` (Optional) List of nested JBDC target arguments. See [JDBC Target](#jdbc-target) below.
* `s3_target` (Optional) List nested Amazon S3 target arguments. See [S3 Target](#s3-target) below.
* `mongodb_target` (Optional) List nested MongoDB target arguments. See [MongoDB Target](#mongodb-target) below.
* `hudi_target` (Optional) List nested Hudi target arguments. See [Iceberg Target](#hudi-target) below.
* `iceberg_target` (Optional) List nested Iceberg target arguments. See [Iceberg Target](#iceberg-target) below.
* `schedule` (Optional) A cron expression used to specify the schedule. For more information, see [Time-Based Schedules for Jobs and Crawlers](https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html). For example, to run something every day at 12:15 UTC, you would specify: `cron(15 12 * * ? *)`.
* `schema_change_policy` (Optional) Policy for the crawler's update and deletion behavior. See [Schema Change Policy](#schema-change-policy) below.
Expand Down Expand Up @@ -193,6 +194,13 @@ This argument supports the following arguments:
* `path` - (Required) The path of the Amazon DocumentDB or MongoDB target (database/collection).
* `scan_all` - (Optional) Indicates whether to scan all the records, or to sample rows from the table. Scanning all the records can take a long time when the table is not a high throughput table. Default value is `true`.

### Hudi Target

* `connection_name` - (Optional) The name of the connection to use to connect to the Hudi target.
* `paths` - (Required) One or more Amazon S3 paths that contains Hudi metadata folders as s3://bucket/prefix.
* `exclusions` - (Optional) A list of glob patterns used to exclude from the crawl.
* `maximum_traversal_depth` - (Required) The maximum depth of Amazon S3 paths that the crawler can traverse to discover the Hudi metadata folder in your Amazon S3 path. Used to limit the crawler run time. Valid values are between `1` and `20`.

### Iceberg Target

* `connection_name` - (Optional) The name of the connection to use to connect to the Iceberg target.
Expand Down

0 comments on commit 5abea1a

Please sign in to comment.