Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/s3): Partition support #11083

Merged
merged 31 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1bb1626
Initial commit for s3 partition support
treff7es Aug 2, 2024
7774e5f
Merge branch 'master' into s3_partition_support
treff7es Aug 2, 2024
7db9079
black linting
treff7es Aug 2, 2024
e8d6f3b
Merge branch 'master' into s3_partition_support
treff7es Aug 5, 2024
45efdb7
Changing enum to check if doc build succeeds
treff7es Aug 5, 2024
664bcd7
Update to use the latest model
treff7es Aug 5, 2024
af6df2a
Update golden files
treff7es Aug 5, 2024
6f1de05
Update golden files
treff7es Aug 5, 2024
05a132b
Stabilizing s3 integration tests
treff7es Aug 5, 2024
34f629b
Updating golden again
treff7es Aug 5, 2024
c247299
Updating goldens again
treff7es Aug 5, 2024
35bc791
Fix formatting
treff7es Aug 5, 2024
8d20952
Merge branch 'master' into s3_partition_support
treff7es Aug 5, 2024
45ee511
- Adding option to disable partition aspect generation for backward c…
treff7es Aug 5, 2024
f51a3e9
Black formatting
treff7es Aug 5, 2024
20c54e2
Update doc
treff7es Aug 5, 2024
e401ee9
Merge branch 'master' into s3_partition_support
treff7es Aug 5, 2024
56e14e2
Fix typos
treff7es Aug 5, 2024
a4cbdb9
Addressing pr review comments
treff7es Aug 12, 2024
d5aee9d
Merge branch 'master' into s3_partition_support
treff7es Aug 12, 2024
b0c8799
Merge branch 'master' into s3_partition_support
treff7es Aug 13, 2024
4d6502a
Fix linter issues
treff7es Aug 13, 2024
cb56197
Update metadata-ingestion/docs/sources/s3/s3.md
treff7es Aug 21, 2024
ab36e00
Update metadata-ingestion/src/datahub/ingestion/source/data_lake_comm…
treff7es Aug 21, 2024
662fa52
Update metadata-ingestion/src/datahub/ingestion/source/data_lake_comm…
treff7es Aug 21, 2024
12d6150
Update metadata-ingestion/src/datahub/ingestion/source/s3/source.py
treff7es Aug 21, 2024
a07216e
Update metadata-ingestion/src/datahub/ingestion/source/s3/source.py
treff7es Aug 21, 2024
e8c4f74
Update metadata-ingestion/docs/sources/s3/s3.md
treff7es Aug 21, 2024
b929f6a
Addressing pr review comments
treff7es Aug 21, 2024
ee2b827
Merge branch 'master' into s3_partition_support
treff7es Aug 21, 2024
0ce1174
Update golden files
treff7es Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions metadata-ingestion/docs/sources/s3/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ Path specs config to ingest folders `orders` and `returns` as datasets:
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
```
or with partition auto-detection:
```
path_specs:
- include: s3://test-bucket/{table}/
```

One can also use `include: s3://test-bucket/{table}/*/*/*.parquet` here however above format is preferred as it allows declaring partitions explicitly.

Expand Down Expand Up @@ -150,6 +155,7 @@ Above config has 3 path_specs and will ingest following datasets
s3://my-bucket/foo/tests/bar.avro # single file table
s3://my-bucket/foo/tests/*.* # mulitple file level tables
s3://my-bucket/foo/tests/{table}/*.avro #table without partition
s3://my-bucket/foo/tests/{table}/ #table with partition autodetection. Partition only can be detected if it is in the format of key=value
s3://my-bucket/foo/tests/{table}/*/*.avro #table where partitions are not specified
s3://my-bucket/foo/tests/{table}/*.* # table where no partitions as well as data type specified
s3://my-bucket/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datetime
import logging
import os
import re
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union

import parse
Expand Down Expand Up @@ -28,6 +30,57 @@
"gzip",
]

java_to_python_mapping = {
"yyyy": "Y",
"MM": "m",
"dd": "d",
"HH": "H",
"mm": "M",
"ss": "S",
}


class SortKeyType(Enum):
STRING = "STRING"
INTEGER = "INTEGER"
FLOAT = "FLOAT"
DATETIME = "DATETIME"
DATE = "DATE"

def __str__(self):
return self.value


class SortKey(ConfigModel):
key: str = Field(
description="The key to sort on. This can be a compound key based on the path_spec variables."
)
type: SortKeyType = Field(
default=SortKeyType.STRING,
description="The date format to use when sorting. This is used to parse the date from the key. The format should follow the java [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) format.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like description needs to change here.

)

date_format: Optional[str] = Field(
default=None,
type=str,
description="The date format to use when sorting. This is used to parse the date from the key. The format should follow the java [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) format.",
)

@pydantic.validator("date_format", always=True)
def convert_date_format_to_python_format(cls, v: Optional[str]) -> Optional[str]:
if v is None:
return None
else:
for java_format, python_format in java_to_python_mapping.items():
v = v.replace(java_format, f"%{python_format}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we accept date_format in java format if we anyway convert it to python format here before actually using it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, this should be supported across platforms (java, python, etc..) and I felt like the java one is more common

return v


class FolderTraversalMethod(Enum):
ALL = "ALL"
MIN_MAX = "MIN_MAX"
MAX = "MAX"


class PathSpec(ConfigModel):
class Config:
Expand All @@ -37,7 +90,7 @@ class Config:
description="Path to table. Name variable `{table}` is used to mark the folder with dataset. In absence of `{table}`, file level dataset will be created. Check below examples for more details."
)
exclude: Optional[List[str]] = Field(
default=None,
default=[],
description="list of paths in glob pattern which will be excluded while scanning for the datasets",
)
file_types: List[str] = Field(
Expand All @@ -55,6 +108,13 @@ class Config:
description="Display name of the dataset.Combination of named variables from include path and strings",
)

# This is not used yet, but will be used in the future to sort the partitions
sort_key: Optional[SortKey] = Field(
hidden_from_docs=True,
default=None,
description="Sort key to use when sorting the partitions. This is useful when the partitions are not sorted in the order of the data. The key can be a compound key based on the path_spec variables.",
)

enable_compression: bool = Field(
default=True,
description="Enable or disable processing compressed files. Currently .gz and .bz files are supported.",
Expand All @@ -70,8 +130,41 @@ class Config:
description="Allow double stars in the include path. This can affect performance significantly if enabled",
)

def allowed(self, path: str) -> bool:
autodetect_partitions: bool = Field(
default=True,
description="Autodetect partition(s) from the path. If set to true, it will autodetect partition key/value if the folder format is {partition_key}={partition_value} for example `year=2024`",
)

traversal_method: FolderTraversalMethod = Field(
default=FolderTraversalMethod.MAX,
description="Method to traverse the folder. ALL: Traverse all the folders, MIN_MAX: Traverse the folders by finding min and max value, MAX: Traverse the folder with max value",
)

include_hidden_folders: bool = Field(
default=False,
description="Include hidden folders in the traversal (folders starting with . or _",
)

def is_path_hidden(self, path: str) -> bool:
# Split the path into directories and filename
dirs, filename = os.path.split(path)

# Check the filename
if filename.startswith(".") or filename.startswith("_"):
return True

# Check each directory in the path
for dir in dirs.split(os.sep):
if dir.startswith(".") or dir.startswith("_"):
return True

return False

def allowed(self, path: str, ignore_ext: bool = False) -> bool:
logger.debug(f"Checking file to inclusion: {path}")
if self.is_path_hidden(path) and not self.include_hidden_folders:
return False
treff7es marked this conversation as resolved.
Show resolved Hide resolved

if not pathlib.PurePath(path).globmatch(
self.glob_include, flags=pathlib.GLOBSTAR
):
Expand All @@ -86,16 +179,20 @@ def allowed(self, path: str) -> bool:
logger.debug(f"{path} is not excluded")
ext = os.path.splitext(path)[1].strip(".")

if (ext == "" and self.default_extension is None) and (
ext != "*" and ext not in self.file_types
):
return False
if not ignore_ext:
if (ext == "" and self.default_extension is None) and (
ext != "*" and ext not in self.file_types
):
return False

logger.debug(f"{path} had selected extension {ext}")
logger.debug(f"{path} allowed for dataset creation")
logger.debug(f"{path} had selected extension {ext}")
logger.debug(f"{path} allowed for dataset creation")
return True

def dir_allowed(self, path: str) -> bool:
if self.glob_include.endswith("**"):
return self.allowed(path, ignore_ext=True)

path_slash = path.count("/")
glob_slash = self.glob_include.count("/")
if path_slash > glob_slash:
Expand Down Expand Up @@ -126,13 +223,28 @@ def dir_allowed(self, path: str) -> bool:
@classmethod
def get_parsable_include(cls, include: str) -> str:
parsable_include = include
for i in range(parsable_include.count("*")):
parsable_include = parsable_include.replace("*", f"{{folder[{i}]}}", 1)
if parsable_include.endswith("/{table}/**"):
# Remove the last two characters to make it parsable if it ends with {table}/** which marks autodetect partition
parsable_include = parsable_include[:-2]
else:
# Replace all * with {folder[i]} to make it parsable
for i in range(parsable_include.count("*")):
parsable_include = parsable_include.replace("*", f"{{folder[{i}]}}", 1)
return parsable_include

def get_named_vars(self, path: str) -> Union[None, parse.Result, parse.Match]:
if self.autodetect_partitions and self.include.endswith("{table}/**"):
# If we have a partial path with ** at the end, we need to truncate the path to parse correctly
splits = len(self.include[: self.include.find("{table}/")].split("/"))
treff7es marked this conversation as resolved.
Show resolved Hide resolved
path = "/".join(path.split("/", splits)[:-1]) + "/"

return self.compiled_include.parse(path)

def get_folder_named_vars(
self, path: str
) -> Union[None, parse.Result, parse.Match]:
return self.compiled_folder_include.parse(path)

@pydantic.root_validator()
def validate_no_double_stars(cls, values: Dict) -> Dict:
if "include" not in values:
Expand Down Expand Up @@ -227,6 +339,96 @@ def compiled_include(self):
logger.debug(f"Setting compiled_include: {compiled_include}")
return compiled_include

@cached_property
def compiled_folder_include(self):
parsable_folder_include = PathSpec.get_parsable_include(self.include).rsplit(
"/", 1
)[0]
logger.debug(f"parsable_folder_include: {parsable_folder_include}")
compiled_folder_include = parse.compile(parsable_folder_include)
logger.debug(f"Setting compiled_folder_include: {compiled_folder_include}")
return compiled_folder_include

@cached_property
def extract_variable_names(self):
# Regular expression to find all substrings enclosed in {}
pattern = r"\{(.*?)\}"
# Find all matches
matches = re.findall(pattern, self.include.split("{table}/")[1])
return matches

def get_partition_from_path(self, path: str) -> Optional[List[Tuple[str, str]]]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some description and examples here, as to what input would generate what output ?

partition_keys: List[Tuple[str, str]] = []
if self.include.find("{table}/"):
named_vars = self.get_named_vars(path)
if named_vars:
# If user has specified partition_key and partition_value in the path_spec then we use it to get partition keys
if (
"partition_key" in named_vars.named
and "partition_value" in named_vars.named
and len(named_vars.named["partition_key"])
== len(named_vars.named["partition_value"])
):
for key in named_vars.named["partition_key"]:
if key in named_vars.named["partition_value"]:
partition_keys.append(
(
named_vars.named["partition_key"][key],
named_vars.named["partition_value"][key],
)
)
return partition_keys
else:
# TODO: Fix this message
logger.debug(
"Partition key or value not found. Fallbacking another mechanism to get partition keys"
)

partition_vars = self.extract_variable_names
if partition_vars:
for partition_key in partition_vars:
treff7es marked this conversation as resolved.
Show resolved Hide resolved
[key, index] = partition_key.strip("]").split("[")

if key in named_vars.named:
if index and index in named_vars.named[key]:
partition_keys.append(
(partition_key, named_vars.named[key][index])
)
else:
partition_keys.append(
(partition_key, named_vars.named[partition_key])
)
return partition_keys

# If user has not specified partition_key and partition_value in the path_spec then we use the default mechanism to get partition keys
if len(self.include.split("{table}/")) == 2:
num_slash = len(self.include.split("{table}/")[0].split("/"))
partition = path.split("/", num_slash)[num_slash]
else:
return None
if partition.endswith("/"):
partition = partition[:-1]

# If partition is in the form of key=value we infer it from the path
if partition.find("=") != -1:
partition = partition.rsplit("/", 1)[0]
for partition_key in partition.split("/"):
if partition_key.find("=") != -1:
partition_keys.append(tuple(partition_key.split("=")))
else:
partition_split = partition.rsplit("/", 1)
if len(partition_split) == 1:
return None
partition = partition_split[0]
# If partition is in the form of /value1/value2/value3 we infer it from the path and assign partition_0, partition_1, partition_2 etc
num = 0
for partition_value in partition.split("/"):
partition_keys.append((f"partition_{num}", partition_value))
num += 1
return partition_keys

return None

@cached_property
def glob_include(self):
glob_include = re.sub(r"\{[^}]+\}", "*", self.include)
Expand All @@ -244,7 +446,20 @@ def validate_path_spec(cls, values: Dict) -> Dict[str, Any]:
)
return values

if values["include"] and values["autodetect_partitions"]:
include = values["include"]
if include.endswith("/"):
include = include[:-1]

if include.endswith("{table}"):
values["include"] = include + "/**"

include_ext = os.path.splitext(values["include"])[1].strip(".")
if not include_ext:
include_ext = (
"*" # if no extension is provided, we assume all files are allowed
)

if (
include_ext not in values["file_types"]
and include_ext != "*"
Expand All @@ -263,6 +478,44 @@ def _extract_table_name(self, named_vars: dict) -> str:
raise ValueError("path_spec.table_name is not set")
return self.table_name.format_map(named_vars)

def extract_datetime_partition(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere ? Would be good to add unit tests for this too.

self, path: str, is_folder: bool = False
) -> Optional[datetime.datetime]:
if self.sort_key is None:
return None

if not self.sort_key.date_format and self.sort_key.type not in [
SortKeyType.DATETIME,
SortKeyType.DATE,
]:
return None

if is_folder:
parsed_vars = self.get_folder_named_vars(path)
else:
parsed_vars = self.get_named_vars(path)
if parsed_vars is None:
return None

partition_format = self.sort_key.key
datetime_format = self.sort_key.date_format
if datetime_format is None:
return None

for var_key in parsed_vars.named:
var = parsed_vars.named[var_key]
if isinstance(var, dict):
for key in var:
template_key = var_key + f"[{key}]"
partition_format = partition_format.replace(
f"{{{template_key}}}", var[key]
)
else:
partition_format.replace(f"{{{var_key}}}", var)
return datetime.datetime.strptime(partition_format, datetime_format).replace(
tzinfo=datetime.timezone.utc
)

def extract_table_name_and_path(self, path: str) -> Tuple[str, str]:
parsed_vars = self.get_named_vars(path)
if parsed_vars is None or "table" not in parsed_vars.named:
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/s3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class DataLakeSourceConfig(
description="Whether or not to create tags in datahub from the s3 object",
)

get_all_partitions: bool = Field(
default=False,
description="Whether to list all partitions in the table, or only the latest",
)
treff7es marked this conversation as resolved.
Show resolved Hide resolved

# Whether to update the table schema when schema in files within the partitions are updated
_update_schema_on_partition_file_updates_deprecation = pydantic_field_deprecated(
"update_schema_on_partition_file_updates",
Expand Down Expand Up @@ -98,6 +103,11 @@ class DataLakeSourceConfig(
description="Whether to sort schema fields by fieldPath when inferring schemas.",
)

generate_partition_aspects: bool = Field(
default=True,
description="Whether to generate partition aspects for partitioned tables. On older servers for backward compatibility, this should be set to False. This flag will be removed in future versions.",
)

def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
Expand Down
Loading
Loading