From 15275d26e19214ed01dc21b221b707177ba9a792 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 8 Jan 2025 18:43:12 +0530 Subject: [PATCH] Fix #19160: Add Depth Support for Structured Containers --- .../ingestion/source/storage/s3/metadata.py | 65 ++++++++++++++++--- .../v1.6/connectors/storage/manifest.md | 45 +++++++++++++ .../v1.7/connectors/storage/manifest.md | 45 +++++++++++++ 3 files changed, 146 insertions(+), 9 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index e6bca41938f5..6220433d72b5 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -12,6 +12,7 @@ import json import secrets import traceback +from copy import deepcopy from datetime import datetime, timedelta from enum import Enum from typing import Dict, Iterable, List, Optional, Tuple @@ -336,6 +337,45 @@ def _generate_container_details( ) return None + def _generate_structured_containers_by_depth( + self, + bucket_response: S3BucketResponse, + metadata_entry: MetadataEntry, + parent: Optional[EntityReference] = None, + ) -> Iterable[S3ContainerDetails]: + try: + prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry) + if prefix: + response = self.s3_client.list_objects_v2( + Bucket=bucket_response.name, Prefix=prefix + ) + # total depth is depth of prefix + depth of the metadata entry + total_depth = metadata_entry.depth + len(prefix[:-1].split("/")) + candidate_keys = { + "/".join(entry.get("Key").split("/")[:total_depth]) + "/" + for entry in response[S3_CLIENT_ROOT_RESPONSE] + if entry + and entry.get("Key") + and len(entry.get("Key").split("/")) > total_depth + } + for key in candidate_keys: + metadata_entry_copy = deepcopy(metadata_entry) + metadata_entry_copy.dataPath = key.strip(KEY_SEPARATOR) + structured_container: Optional[ + S3ContainerDetails + ] = self._generate_container_details( + bucket_response=bucket_response, + metadata_entry=metadata_entry_copy, + parent=parent, + ) + if structured_container: + yield structured_container + except Exception as err: + logger.warning( + f"Error while generating structured containers by depth for {metadata_entry.dataPath} - {err}" + ) + logger.debug(traceback.format_exc()) + def _generate_structured_containers( self, bucket_response: S3BucketResponse, @@ -347,15 +387,22 @@ def _generate_structured_containers( f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} " f"and generating structured container" ) - structured_container: Optional[ - S3ContainerDetails - ] = self._generate_container_details( - bucket_response=bucket_response, - metadata_entry=metadata_entry, - parent=parent, - ) - if structured_container: - yield structured_container + if metadata_entry.depth == 0: + structured_container: Optional[ + S3ContainerDetails + ] = self._generate_container_details( + bucket_response=bucket_response, + metadata_entry=metadata_entry, + parent=parent, + ) + if structured_container: + yield structured_container + else: + yield from self._generate_structured_containers_by_depth( + bucket_response=bucket_response, + metadata_entry=metadata_entry, + parent=parent, + ) def is_valid_unstructured_file(self, accepted_extensions: List, key: str) -> bool: # Split the string into a list of values diff --git a/openmetadata-docs/content/partials/v1.6/connectors/storage/manifest.md b/openmetadata-docs/content/partials/v1.6/connectors/storage/manifest.md index 0f165a9a57fa..aae373ab44bb 100644 --- a/openmetadata-docs/content/partials/v1.6/connectors/storage/manifest.md +++ b/openmetadata-docs/content/partials/v1.6/connectors/storage/manifest.md @@ -57,6 +57,43 @@ Again, this information will be added on top of the inferred schema from the dat {% /codeInfo %} +{% codeInfo srNumber=7 %} + +**Automated Container Ingestion**: Registering all the data paths one by one can be a time consuming job, +to make the automated structure container ingestion you can provide the depth at which all the data is available. + +Let us understand this with the example, suppose following is the file hierarchy within my bucket. + +``` + +# prefix/depth1/depth2/depth3 +athena_service/my_database_a/my_schema_a/table_a/date=01-01-2025/data.parquet +athena_service/my_database_a/my_schema_a/table_a/date=02-01-2025/data.parquet +athena_service/my_database_a/my_schema_a/table_b/date=01-01-2025/data.parquet +athena_service/my_database_a/my_schema_a/table_b/date=02-01-2025/data.parquet + +athena_service/my_database_b/my_schema_a/table_a/date=01-01-2025/data.parquet +athena_service/my_database_b/my_schema_a/table_a/date=02-01-2025/data.parquet +athena_service/my_database_b/my_schema_a/table_b/date=01-01-2025/data.parquet +athena_service/my_database_b/my_schema_a/table_b/date=02-01-2025/data.parquet + +``` + +all my tables folders which contains the actual data are available at depth 3, hence when you specify the `depth: 3` in +manifest entry all following path will get registered as container in OpenMetadata with this single entry + +``` +athena_service/my_database_a/my_schema_a/table_a +athena_service/my_database_a/my_schema_a/table_b +athena_service/my_database_b/my_schema_a/table_a +athena_service/my_database_b/my_schema_a/table_b +``` + +saving efforts to add 4 individual entries compared to 1 + +{% /codeInfo %} + + {% codeInfo srNumber=6 %} **Unstructured Container**: OpenMetadata supports ingesting unstructured files like images, pdf's etc. We support fetching the file names, size and tags associates to such files. @@ -124,6 +161,14 @@ In case you want to ingest all unstructured files with irrespective of their fil ] } ``` +```json {% srNumber=7 %} + { + "dataPath": "athena_service", + "structureFormat": "parquet", + "isPartitioned": true, + "depth": 2 + } +``` ```json {% srNumber=6 %} { "dataPath": "path/to/solution.pdf", diff --git a/openmetadata-docs/content/partials/v1.7/connectors/storage/manifest.md b/openmetadata-docs/content/partials/v1.7/connectors/storage/manifest.md index 0f165a9a57fa..aae373ab44bb 100644 --- a/openmetadata-docs/content/partials/v1.7/connectors/storage/manifest.md +++ b/openmetadata-docs/content/partials/v1.7/connectors/storage/manifest.md @@ -57,6 +57,43 @@ Again, this information will be added on top of the inferred schema from the dat {% /codeInfo %} +{% codeInfo srNumber=7 %} + +**Automated Container Ingestion**: Registering all the data paths one by one can be a time consuming job, +to make the automated structure container ingestion you can provide the depth at which all the data is available. + +Let us understand this with the example, suppose following is the file hierarchy within my bucket. + +``` + +# prefix/depth1/depth2/depth3 +athena_service/my_database_a/my_schema_a/table_a/date=01-01-2025/data.parquet +athena_service/my_database_a/my_schema_a/table_a/date=02-01-2025/data.parquet +athena_service/my_database_a/my_schema_a/table_b/date=01-01-2025/data.parquet +athena_service/my_database_a/my_schema_a/table_b/date=02-01-2025/data.parquet + +athena_service/my_database_b/my_schema_a/table_a/date=01-01-2025/data.parquet +athena_service/my_database_b/my_schema_a/table_a/date=02-01-2025/data.parquet +athena_service/my_database_b/my_schema_a/table_b/date=01-01-2025/data.parquet +athena_service/my_database_b/my_schema_a/table_b/date=02-01-2025/data.parquet + +``` + +all my tables folders which contains the actual data are available at depth 3, hence when you specify the `depth: 3` in +manifest entry all following path will get registered as container in OpenMetadata with this single entry + +``` +athena_service/my_database_a/my_schema_a/table_a +athena_service/my_database_a/my_schema_a/table_b +athena_service/my_database_b/my_schema_a/table_a +athena_service/my_database_b/my_schema_a/table_b +``` + +saving efforts to add 4 individual entries compared to 1 + +{% /codeInfo %} + + {% codeInfo srNumber=6 %} **Unstructured Container**: OpenMetadata supports ingesting unstructured files like images, pdf's etc. We support fetching the file names, size and tags associates to such files. @@ -124,6 +161,14 @@ In case you want to ingest all unstructured files with irrespective of their fil ] } ``` +```json {% srNumber=7 %} + { + "dataPath": "athena_service", + "structureFormat": "parquet", + "isPartitioned": true, + "depth": 2 + } +``` ```json {% srNumber=6 %} { "dataPath": "path/to/solution.pdf",