From bcf414e83ac7be5120cf4b6053868cbddadafae3 Mon Sep 17 00:00:00 2001 From: drfaust92 Date: Tue, 8 Aug 2023 14:31:23 +0300 Subject: [PATCH 1/2] add hudi support --- internal/service/glue/crawler.go | 84 ++++++++++++++++++- internal/service/glue/crawler_test.go | 99 +++++++++++++++++++++++ website/docs/r/glue_crawler.html.markdown | 8 ++ 3 files changed, 190 insertions(+), 1 deletion(-) diff --git a/internal/service/glue/crawler.go b/internal/service/glue/crawler.go index e5fba168eb4..94c760f1684 100644 --- a/internal/service/glue/crawler.go +++ b/internal/service/glue/crawler.go @@ -29,7 +29,7 @@ import ( ) func targets() []string { - return []string{"s3_target", "dynamodb_target", "mongodb_target", "jdbc_target", "catalog_target", "delta_target", "iceberg_target"} + return []string{"s3_target", "dynamodb_target", "mongodb_target", "jdbc_target", "catalog_target", "delta_target", "iceberg_target", "hudi_target"} } // @SDKResource("aws_glue_crawler", name="Crawler") @@ -161,6 +161,35 @@ func ResourceCrawler() *schema.Resource { }, }, }, + "hudi_target": { + Type: schema.TypeList, + Optional: true, + MinItems: 1, + AtLeastOneOf: targets(), + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "connection_name": { + Type: schema.TypeString, + Optional: true, + }, + "exclusions": { + Type: schema.TypeList, + Optional: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "maximum_traversal_depth": { + Type: schema.TypeInt, + Required: true, + ValidateFunc: validation.IntBetween(1, 20), + }, + "paths": { + Type: schema.TypeSet, + Required: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + }, + }, + }, "iceberg_target": { Type: schema.TypeList, Optional: true, @@ -520,6 +549,10 @@ func resourceCrawlerRead(ctx context.Context, d *schema.ResourceData, meta inter return sdkdiag.AppendErrorf(diags, "setting delta_target: %s", err) } + if err := d.Set("hudi_target", flattenHudiTargets(crawler.Targets.HudiTargets)); err != nil { + return sdkdiag.AppendErrorf(diags, "setting hudi_target: %s", err) + } + if err := d.Set("iceberg_target", flattenIcebergTargets(crawler.Targets.IcebergTargets)); err != nil { return sdkdiag.AppendErrorf(diags, "setting iceberg_target: %s", err) } @@ -772,6 +805,10 @@ func expandCrawlerTargets(d *schema.ResourceData) *glue.CrawlerTargets { crawlerTargets.DeltaTargets = expandDeltaTargets(v.([]interface{})) } + if v, ok := d.GetOk("hudi_target"); ok { + crawlerTargets.HudiTargets = expandHudiTargets(v.([]interface{})) + } + if v, ok := d.GetOk("iceberg_target"); ok { crawlerTargets.IcebergTargets = expandIcebergTargets(v.([]interface{})) } @@ -960,6 +997,36 @@ func expandDeltaTarget(cfg map[string]interface{}) *glue.DeltaTarget { return target } +func expandHudiTargets(targets []interface{}) []*glue.HudiTarget { + if len(targets) < 1 { + return []*glue.HudiTarget{} + } + + perms := make([]*glue.HudiTarget, len(targets)) + for i, rawCfg := range targets { + cfg := rawCfg.(map[string]interface{}) + perms[i] = expandHudiTarget(cfg) + } + return perms +} + +func expandHudiTarget(cfg map[string]interface{}) *glue.HudiTarget { + target := &glue.HudiTarget{ + Paths: flex.ExpandStringSet(cfg["paths"].(*schema.Set)), + MaximumTraversalDepth: aws.Int64(int64(cfg["maximum_traversal_depth"].(int))), + } + + if v, ok := cfg["exclusions"]; ok { + target.Exclusions = flex.ExpandStringList(v.([]interface{})) + } + + if v, ok := cfg["connection_name"].(string); ok { + target.ConnectionName = aws.String(v) + } + + return target +} + func expandIcebergTargets(targets []interface{}) []*glue.IcebergTarget { if len(targets) < 1 { return []*glue.IcebergTarget{} @@ -1085,6 +1152,21 @@ func flattenDeltaTargets(deltaTargets []*glue.DeltaTarget) []map[string]interfac return result } +func flattenHudiTargets(hudiTargets []*glue.HudiTarget) []map[string]interface{} { + result := make([]map[string]interface{}, 0) + + for _, hudiTarget := range hudiTargets { + attrs := make(map[string]interface{}) + attrs["connection_name"] = aws.StringValue(hudiTarget.ConnectionName) + attrs["maximum_traversal_depth"] = aws.Int64Value(hudiTarget.MaximumTraversalDepth) + attrs["paths"] = flex.FlattenStringSet(hudiTarget.Paths) + attrs["exclusions"] = flex.FlattenStringList(hudiTarget.Exclusions) + + result = append(result, attrs) + } + return result +} + func flattenIcebergTargets(icebergTargets []*glue.IcebergTarget) []map[string]interface{} { result := make([]map[string]interface{}, 0) diff --git a/internal/service/glue/crawler_test.go b/internal/service/glue/crawler_test.go index 9f3e6bf6769..ca1b364d7da 100644 --- a/internal/service/glue/crawler_test.go +++ b/internal/service/glue/crawler_test.go @@ -591,6 +591,51 @@ func TestAccGlueCrawler_deltaTarget(t *testing.T) { }) } +func TestAccGlueCrawler_hudiTarget(t *testing.T) { + ctx := acctest.Context(t) + var crawler glue.Crawler + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_glue_crawler.test" + + connectionUrl := fmt.Sprintf("mongodb://%s:27017/testdatabase", acctest.RandomDomainName()) + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, glue.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckCrawlerDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccCrawlerConfig_hudiTarget(rName, connectionUrl, "s3://table1", 1), + Check: resource.ComposeTestCheckFunc( + testAccCheckCrawlerExists(ctx, resourceName, &crawler), + resource.TestCheckResourceAttr(resourceName, "hudi_target.#", "1"), + resource.TestCheckResourceAttr(resourceName, "hudi_target.0.connection_name", rName), + resource.TestCheckResourceAttr(resourceName, "hudi_target.0.maximum_traversal_depth", "1"), + resource.TestCheckResourceAttr(resourceName, "hudi_target.0.paths.#", "1"), + resource.TestCheckTypeSetElemAttr(resourceName, "hudi_target.0.paths.*", "s3://table1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccCrawlerConfig_hudiTarget(rName, connectionUrl, "s3://table2", 2), + Check: resource.ComposeTestCheckFunc( + testAccCheckCrawlerExists(ctx, resourceName, &crawler), + resource.TestCheckResourceAttr(resourceName, "hudi_target.#", "1"), + resource.TestCheckResourceAttr(resourceName, "hudi_target.0.connection_name", rName), + resource.TestCheckResourceAttr(resourceName, "hudi_target.0.maximum_traversal_depth", "2"), + resource.TestCheckResourceAttr(resourceName, "hudi_target.0.paths.#", "1"), + resource.TestCheckTypeSetElemAttr(resourceName, "hudi_target.0.paths.*", "s3://table2"), + ), + }, + }, + }) +} + func TestAccGlueCrawler_icebergTarget(t *testing.T) { ctx := acctest.Context(t) var crawler glue.Crawler @@ -3075,6 +3120,60 @@ resource "aws_glue_crawler" "test" { `, rName, connectionUrl, tableName, createNativeDeltaTable)) } +func testAccCrawlerConfig_hudiTarget(rName, connectionUrl, tableName string, depth int) string { + return acctest.ConfigCompose(testAccCrawlerConfig_base(rName), acctest.ConfigVPCWithSubnets(rName, 2), fmt.Sprintf(` +resource "aws_security_group" "test" { + name = %[1]q + vpc_id = aws_vpc.test.id + + ingress { + from_port = 1 + protocol = "tcp" + self = true + to_port = 65535 + } + + tags = { + Name = %[1]q + } +} + +resource "aws_glue_catalog_database" "test" { + name = %[1]q +} + +resource "aws_glue_connection" "test" { + connection_properties = { + JDBC_ENFORCE_SSL = false + } + + connection_type = "NETWORK" + + name = %[1]q + + physical_connection_requirements { + availability_zone = aws_subnet.test[0].availability_zone + security_group_id_list = [aws_security_group.test.id] + subnet_id = aws_subnet.test[0].id + } +} + +resource "aws_glue_crawler" "test" { + depends_on = [aws_iam_role_policy_attachment.test-AWSGlueServiceRole] + + database_name = aws_glue_catalog_database.test.name + name = %[1]q + role = aws_iam_role.test.name + + hudi_target { + connection_name = aws_glue_connection.test.name + paths = [%[3]q] + maximum_traversal_depth = %[4]d + } +} +`, rName, connectionUrl, tableName, depth)) +} + func testAccCrawlerConfig_icebergTarget(rName, connectionUrl, tableName string, depth int) string { return acctest.ConfigCompose(testAccCrawlerConfig_base(rName), acctest.ConfigVPCWithSubnets(rName, 2), fmt.Sprintf(` resource "aws_security_group" "test" { diff --git a/website/docs/r/glue_crawler.html.markdown b/website/docs/r/glue_crawler.html.markdown index 96c15303b8f..cf78d190f06 100644 --- a/website/docs/r/glue_crawler.html.markdown +++ b/website/docs/r/glue_crawler.html.markdown @@ -143,6 +143,7 @@ This argument supports the following arguments: * `jdbc_target` (Optional) List of nested JBDC target arguments. See [JDBC Target](#jdbc-target) below. * `s3_target` (Optional) List nested Amazon S3 target arguments. See [S3 Target](#s3-target) below. * `mongodb_target` (Optional) List nested MongoDB target arguments. See [MongoDB Target](#mongodb-target) below. +* `hudi_target` (Optional) List nested Hudi target arguments. See [Iceberg Target](#hudi-target) below. * `iceberg_target` (Optional) List nested Iceberg target arguments. See [Iceberg Target](#iceberg-target) below. * `schedule` (Optional) A cron expression used to specify the schedule. For more information, see [Time-Based Schedules for Jobs and Crawlers](https://docs.aws.amazon.com/glue/latest/dg/monitor-data-warehouse-schedule.html). For example, to run something every day at 12:15 UTC, you would specify: `cron(15 12 * * ? *)`. * `schema_change_policy` (Optional) Policy for the crawler's update and deletion behavior. See [Schema Change Policy](#schema-change-policy) below. @@ -193,6 +194,13 @@ This argument supports the following arguments: * `path` - (Required) The path of the Amazon DocumentDB or MongoDB target (database/collection). * `scan_all` - (Optional) Indicates whether to scan all the records, or to sample rows from the table. Scanning all the records can take a long time when the table is not a high throughput table. Default value is `true`. +### Hudi Target + +* `connection_name` - (Optional) The name of the connection to use to connect to the Hudi target. +* `paths` - (Required) One or more Amazon S3 paths that contains Hudi metadata folders as s3://bucket/prefix. +* `exclusions` - (Optional) A list of glob patterns used to exclude from the crawl. +* `maximum_traversal_depth` - (Required) The maximum depth of Amazon S3 paths that the crawler can traverse to discover the Hudi metadata folder in your Amazon S3 path. Used to limit the crawler run time. Valid values are between `1` and `20`. + ### Iceberg Target * `connection_name` - (Optional) The name of the connection to use to connect to the Iceberg target. From 8bcce0ea1aafdda412241acba223d149df24df26 Mon Sep 17 00:00:00 2001 From: drfaust92 Date: Tue, 8 Aug 2023 14:33:03 +0300 Subject: [PATCH 2/2] changelog --- .changelog/32898.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/32898.txt diff --git a/.changelog/32898.txt b/.changelog/32898.txt new file mode 100644 index 00000000000..bbd7bc58125 --- /dev/null +++ b/.changelog/32898.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_glue_crawler: Add `hudi_target` argument +``` \ No newline at end of file