Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #19160: Add Depth Support for Structured Containers #19288

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 56 additions & 9 deletions ingestion/src/metadata/ingestion/source/storage/s3/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import json
import secrets
import traceback
from copy import deepcopy
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, Iterable, List, Optional, Tuple
Expand Down Expand Up @@ -336,6 +337,45 @@ def _generate_container_details(
)
return None

def _generate_structured_containers_by_depth(
self,
bucket_response: S3BucketResponse,
metadata_entry: MetadataEntry,
parent: Optional[EntityReference] = None,
) -> Iterable[S3ContainerDetails]:
try:
prefix = self._get_sample_file_prefix(metadata_entry=metadata_entry)
if prefix:
response = self.s3_client.list_objects_v2(
Bucket=bucket_response.name, Prefix=prefix
)
# total depth is depth of prefix + depth of the metadata entry
total_depth = metadata_entry.depth + len(prefix[:-1].split("/"))
candidate_keys = {
"/".join(entry.get("Key").split("/")[:total_depth]) + "/"
for entry in response[S3_CLIENT_ROOT_RESPONSE]
if entry
and entry.get("Key")
and len(entry.get("Key").split("/")) > total_depth
}
for key in candidate_keys:
metadata_entry_copy = deepcopy(metadata_entry)
metadata_entry_copy.dataPath = key.strip(KEY_SEPARATOR)
structured_container: Optional[
S3ContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry_copy,
parent=parent,
)
if structured_container:
yield structured_container
except Exception as err:
logger.warning(
f"Error while generating structured containers by depth for {metadata_entry.dataPath} - {err}"
)
logger.debug(traceback.format_exc())

def _generate_structured_containers(
self,
bucket_response: S3BucketResponse,
Expand All @@ -347,15 +387,22 @@ def _generate_structured_containers(
f"Extracting metadata from path {metadata_entry.dataPath.strip(KEY_SEPARATOR)} "
f"and generating structured container"
)
structured_container: Optional[
S3ContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
if structured_container:
yield structured_container
if metadata_entry.depth == 0:
structured_container: Optional[
S3ContainerDetails
] = self._generate_container_details(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)
if structured_container:
yield structured_container
else:
yield from self._generate_structured_containers_by_depth(
bucket_response=bucket_response,
metadata_entry=metadata_entry,
parent=parent,
)

def is_valid_unstructured_file(self, accepted_extensions: List, key: str) -> bool:
# Split the string into a list of values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,43 @@ Again, this information will be added on top of the inferred schema from the dat

{% /codeInfo %}

{% codeInfo srNumber=7 %}

**Automated Container Ingestion**: Registering all the data paths one by one can be a time consuming job,
to make the automated structure container ingestion you can provide the depth at which all the data is available.

Let us understand this with the example, suppose following is the file hierarchy within my bucket.

```

# prefix/depth1/depth2/depth3
athena_service/my_database_a/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=02-01-2025/data.parquet

athena_service/my_database_b/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=02-01-2025/data.parquet

```

all my tables folders which contains the actual data are available at depth 3, hence when you specify the `depth: 3` in
manifest entry all following path will get registered as container in OpenMetadata with this single entry

```
athena_service/my_database_a/my_schema_a/table_a
athena_service/my_database_a/my_schema_a/table_b
athena_service/my_database_b/my_schema_a/table_a
athena_service/my_database_b/my_schema_a/table_b
```

saving efforts to add 4 individual entries compared to 1

{% /codeInfo %}


{% codeInfo srNumber=6 %}

**Unstructured Container**: OpenMetadata supports ingesting unstructured files like images, pdf's etc. We support fetching the file names, size and tags associates to such files.
Expand Down Expand Up @@ -124,6 +161,14 @@ In case you want to ingest all unstructured files with irrespective of their fil
]
}
```
```json {% srNumber=7 %}
{
"dataPath": "athena_service",
"structureFormat": "parquet",
"isPartitioned": true,
"depth": 2
}
```
```json {% srNumber=6 %}
{
"dataPath": "path/to/solution.pdf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,43 @@ Again, this information will be added on top of the inferred schema from the dat

{% /codeInfo %}

{% codeInfo srNumber=7 %}

**Automated Container Ingestion**: Registering all the data paths one by one can be a time consuming job,
to make the automated structure container ingestion you can provide the depth at which all the data is available.

Let us understand this with the example, suppose following is the file hierarchy within my bucket.

```

# prefix/depth1/depth2/depth3
athena_service/my_database_a/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_a/my_schema_a/table_b/date=02-01-2025/data.parquet

athena_service/my_database_b/my_schema_a/table_a/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_a/date=02-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=01-01-2025/data.parquet
athena_service/my_database_b/my_schema_a/table_b/date=02-01-2025/data.parquet

```

all my tables folders which contains the actual data are available at depth 3, hence when you specify the `depth: 3` in
manifest entry all following path will get registered as container in OpenMetadata with this single entry

```
athena_service/my_database_a/my_schema_a/table_a
athena_service/my_database_a/my_schema_a/table_b
athena_service/my_database_b/my_schema_a/table_a
athena_service/my_database_b/my_schema_a/table_b
```

saving efforts to add 4 individual entries compared to 1

{% /codeInfo %}


{% codeInfo srNumber=6 %}

**Unstructured Container**: OpenMetadata supports ingesting unstructured files like images, pdf's etc. We support fetching the file names, size and tags associates to such files.
Expand Down Expand Up @@ -124,6 +161,14 @@ In case you want to ingest all unstructured files with irrespective of their fil
]
}
```
```json {% srNumber=7 %}
{
"dataPath": "athena_service",
"structureFormat": "parquet",
"isPartitioned": true,
"depth": 2
}
```
```json {% srNumber=6 %}
{
"dataPath": "path/to/solution.pdf",
Expand Down
Loading