-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/s3): Partition support #11083
Changes from 18 commits
1bb1626
7774e5f
7db9079
e8d6f3b
45efdb7
664bcd7
af6df2a
6f1de05
05a132b
34f629b
c247299
35bc791
8d20952
45ee511
f51a3e9
20c54e2
e401ee9
56e14e2
a4cbdb9
d5aee9d
b0c8799
4d6502a
cb56197
ab36e00
662fa52
12d6150
a07216e
e8c4f74
b929f6a
ee2b827
0ce1174
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,8 @@ | ||
import datetime | ||
import logging | ||
import os | ||
import re | ||
from enum import Enum | ||
from typing import Any, Dict, List, Optional, Tuple, Union | ||
|
||
import parse | ||
|
@@ -28,6 +30,57 @@ | |
"gzip", | ||
] | ||
|
||
java_to_python_mapping = { | ||
"yyyy": "Y", | ||
"MM": "m", | ||
"dd": "d", | ||
"HH": "H", | ||
"mm": "M", | ||
"ss": "S", | ||
} | ||
|
||
|
||
class SortKeyType(Enum): | ||
STRING = "STRING" | ||
INTEGER = "INTEGER" | ||
FLOAT = "FLOAT" | ||
DATETIME = "DATETIME" | ||
DATE = "DATE" | ||
|
||
def __str__(self): | ||
return self.value | ||
|
||
|
||
class SortKey(ConfigModel): | ||
key: str = Field( | ||
description="The key to sort on. This can be a compound key based on the path_spec variables." | ||
) | ||
type: SortKeyType = Field( | ||
default=SortKeyType.STRING, | ||
description="The date format to use when sorting. This is used to parse the date from the key. The format should follow the java [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) format.", | ||
) | ||
|
||
date_format: Optional[str] = Field( | ||
default=None, | ||
type=str, | ||
description="The date format to use when sorting. This is used to parse the date from the key. The format should follow the java [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) format.", | ||
) | ||
|
||
@pydantic.validator("date_format", always=True) | ||
def convert_date_format_to_python_format(cls, v: Optional[str]) -> Optional[str]: | ||
if v is None: | ||
return None | ||
else: | ||
for java_format, python_format in java_to_python_mapping.items(): | ||
v = v.replace(java_format, f"%{python_format}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we accept There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future, this should be supported across platforms (java, python, etc..) and I felt like the java one is more common |
||
return v | ||
|
||
|
||
class FolderTraversalMethod(Enum): | ||
ALL = "ALL" | ||
MIN_MAX = "MIN_MAX" | ||
MAX = "MAX" | ||
|
||
|
||
class PathSpec(ConfigModel): | ||
class Config: | ||
|
@@ -37,7 +90,7 @@ class Config: | |
description="Path to table. Name variable `{table}` is used to mark the folder with dataset. In absence of `{table}`, file level dataset will be created. Check below examples for more details." | ||
) | ||
exclude: Optional[List[str]] = Field( | ||
default=None, | ||
default=[], | ||
description="list of paths in glob pattern which will be excluded while scanning for the datasets", | ||
) | ||
file_types: List[str] = Field( | ||
|
@@ -55,6 +108,13 @@ class Config: | |
description="Display name of the dataset.Combination of named variables from include path and strings", | ||
) | ||
|
||
# This is not used yet, but will be used in the future to sort the partitions | ||
sort_key: Optional[SortKey] = Field( | ||
hidden_from_docs=True, | ||
default=None, | ||
description="Sort key to use when sorting the partitions. This is useful when the partitions are not sorted in the order of the data. The key can be a compound key based on the path_spec variables.", | ||
) | ||
|
||
enable_compression: bool = Field( | ||
default=True, | ||
description="Enable or disable processing compressed files. Currently .gz and .bz files are supported.", | ||
|
@@ -70,8 +130,41 @@ class Config: | |
description="Allow double stars in the include path. This can affect performance significantly if enabled", | ||
) | ||
|
||
def allowed(self, path: str) -> bool: | ||
autodetect_partitions: bool = Field( | ||
default=True, | ||
description="Autodetect partition(s) from the path. If set to true, it will autodetect partition key/value if the folder format is {partition_key}={partition_value} for example `year=2024`", | ||
) | ||
|
||
traversal_method: FolderTraversalMethod = Field( | ||
default=FolderTraversalMethod.MAX, | ||
description="Method to traverse the folder. ALL: Traverse all the folders, MIN_MAX: Traverse the folders by finding min and max value, MAX: Traverse the folder with max value", | ||
) | ||
|
||
include_hidden_folders: bool = Field( | ||
default=False, | ||
description="Include hidden folders in the traversal (folders starting with . or _", | ||
) | ||
|
||
def is_path_hidden(self, path: str) -> bool: | ||
# Split the path into directories and filename | ||
dirs, filename = os.path.split(path) | ||
|
||
# Check the filename | ||
if filename.startswith(".") or filename.startswith("_"): | ||
return True | ||
|
||
# Check each directory in the path | ||
for dir in dirs.split(os.sep): | ||
if dir.startswith(".") or dir.startswith("_"): | ||
return True | ||
|
||
return False | ||
|
||
def allowed(self, path: str, ignore_ext: bool = False) -> bool: | ||
logger.debug(f"Checking file to inclusion: {path}") | ||
if self.is_path_hidden(path) and not self.include_hidden_folders: | ||
return False | ||
treff7es marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if not pathlib.PurePath(path).globmatch( | ||
self.glob_include, flags=pathlib.GLOBSTAR | ||
): | ||
|
@@ -86,16 +179,20 @@ def allowed(self, path: str) -> bool: | |
logger.debug(f"{path} is not excluded") | ||
ext = os.path.splitext(path)[1].strip(".") | ||
|
||
if (ext == "" and self.default_extension is None) and ( | ||
ext != "*" and ext not in self.file_types | ||
): | ||
return False | ||
if not ignore_ext: | ||
if (ext == "" and self.default_extension is None) and ( | ||
ext != "*" and ext not in self.file_types | ||
): | ||
return False | ||
|
||
logger.debug(f"{path} had selected extension {ext}") | ||
logger.debug(f"{path} allowed for dataset creation") | ||
logger.debug(f"{path} had selected extension {ext}") | ||
logger.debug(f"{path} allowed for dataset creation") | ||
return True | ||
|
||
def dir_allowed(self, path: str) -> bool: | ||
if self.glob_include.endswith("**"): | ||
return self.allowed(path, ignore_ext=True) | ||
|
||
path_slash = path.count("/") | ||
glob_slash = self.glob_include.count("/") | ||
if path_slash > glob_slash: | ||
|
@@ -126,13 +223,28 @@ def dir_allowed(self, path: str) -> bool: | |
@classmethod | ||
def get_parsable_include(cls, include: str) -> str: | ||
parsable_include = include | ||
for i in range(parsable_include.count("*")): | ||
parsable_include = parsable_include.replace("*", f"{{folder[{i}]}}", 1) | ||
if parsable_include.endswith("/{table}/**"): | ||
# Remove the last two characters to make it parsable if it ends with {table}/** which marks autodetect partition | ||
parsable_include = parsable_include[:-2] | ||
else: | ||
# Replace all * with {folder[i]} to make it parsable | ||
for i in range(parsable_include.count("*")): | ||
parsable_include = parsable_include.replace("*", f"{{folder[{i}]}}", 1) | ||
return parsable_include | ||
|
||
def get_named_vars(self, path: str) -> Union[None, parse.Result, parse.Match]: | ||
if self.autodetect_partitions and self.include.endswith("{table}/**"): | ||
# If we have a partial path with ** at the end, we need to truncate the path to parse correctly | ||
splits = len(self.include[: self.include.find("{table}/")].split("/")) | ||
treff7es marked this conversation as resolved.
Show resolved
Hide resolved
|
||
path = "/".join(path.split("/", splits)[:-1]) + "/" | ||
|
||
return self.compiled_include.parse(path) | ||
|
||
def get_folder_named_vars( | ||
self, path: str | ||
) -> Union[None, parse.Result, parse.Match]: | ||
return self.compiled_folder_include.parse(path) | ||
|
||
@pydantic.root_validator() | ||
def validate_no_double_stars(cls, values: Dict) -> Dict: | ||
if "include" not in values: | ||
|
@@ -227,6 +339,96 @@ def compiled_include(self): | |
logger.debug(f"Setting compiled_include: {compiled_include}") | ||
return compiled_include | ||
|
||
@cached_property | ||
def compiled_folder_include(self): | ||
parsable_folder_include = PathSpec.get_parsable_include(self.include).rsplit( | ||
"/", 1 | ||
)[0] | ||
logger.debug(f"parsable_folder_include: {parsable_folder_include}") | ||
compiled_folder_include = parse.compile(parsable_folder_include) | ||
logger.debug(f"Setting compiled_folder_include: {compiled_folder_include}") | ||
return compiled_folder_include | ||
|
||
@cached_property | ||
def extract_variable_names(self): | ||
# Regular expression to find all substrings enclosed in {} | ||
pattern = r"\{(.*?)\}" | ||
# Find all matches | ||
matches = re.findall(pattern, self.include.split("{table}/")[1]) | ||
return matches | ||
|
||
def get_partition_from_path(self, path: str) -> Optional[List[Tuple[str, str]]]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add some description and examples here, as to what input would generate what output ? |
||
partition_keys: List[Tuple[str, str]] = [] | ||
if self.include.find("{table}/"): | ||
named_vars = self.get_named_vars(path) | ||
if named_vars: | ||
# If user has specified partition_key and partition_value in the path_spec then we use it to get partition keys | ||
if ( | ||
"partition_key" in named_vars.named | ||
and "partition_value" in named_vars.named | ||
and len(named_vars.named["partition_key"]) | ||
== len(named_vars.named["partition_value"]) | ||
): | ||
for key in named_vars.named["partition_key"]: | ||
if key in named_vars.named["partition_value"]: | ||
partition_keys.append( | ||
( | ||
named_vars.named["partition_key"][key], | ||
named_vars.named["partition_value"][key], | ||
) | ||
) | ||
return partition_keys | ||
else: | ||
# TODO: Fix this message | ||
logger.debug( | ||
"Partition key or value not found. Fallbacking another mechanism to get partition keys" | ||
) | ||
|
||
partition_vars = self.extract_variable_names | ||
if partition_vars: | ||
for partition_key in partition_vars: | ||
treff7es marked this conversation as resolved.
Show resolved
Hide resolved
|
||
[key, index] = partition_key.strip("]").split("[") | ||
|
||
if key in named_vars.named: | ||
if index and index in named_vars.named[key]: | ||
partition_keys.append( | ||
(partition_key, named_vars.named[key][index]) | ||
) | ||
else: | ||
partition_keys.append( | ||
(partition_key, named_vars.named[partition_key]) | ||
) | ||
return partition_keys | ||
|
||
# If user has not specified partition_key and partition_value in the path_spec then we use the default mechanism to get partition keys | ||
if len(self.include.split("{table}/")) == 2: | ||
num_slash = len(self.include.split("{table}/")[0].split("/")) | ||
partition = path.split("/", num_slash)[num_slash] | ||
else: | ||
return None | ||
if partition.endswith("/"): | ||
partition = partition[:-1] | ||
|
||
# If partition is in the form of key=value we infer it from the path | ||
if partition.find("=") != -1: | ||
partition = partition.rsplit("/", 1)[0] | ||
for partition_key in partition.split("/"): | ||
if partition_key.find("=") != -1: | ||
partition_keys.append(tuple(partition_key.split("="))) | ||
else: | ||
partition_split = partition.rsplit("/", 1) | ||
if len(partition_split) == 1: | ||
return None | ||
partition = partition_split[0] | ||
# If partition is in the form of /value1/value2/value3 we infer it from the path and assign partition_0, partition_1, partition_2 etc | ||
num = 0 | ||
for partition_value in partition.split("/"): | ||
partition_keys.append((f"partition_{num}", partition_value)) | ||
num += 1 | ||
return partition_keys | ||
|
||
return None | ||
|
||
@cached_property | ||
def glob_include(self): | ||
glob_include = re.sub(r"\{[^}]+\}", "*", self.include) | ||
|
@@ -244,7 +446,20 @@ def validate_path_spec(cls, values: Dict) -> Dict[str, Any]: | |
) | ||
return values | ||
|
||
if values["include"] and values["autodetect_partitions"]: | ||
include = values["include"] | ||
if include.endswith("/"): | ||
include = include[:-1] | ||
|
||
if include.endswith("{table}"): | ||
values["include"] = include + "/**" | ||
|
||
include_ext = os.path.splitext(values["include"])[1].strip(".") | ||
if not include_ext: | ||
include_ext = ( | ||
"*" # if no extension is provided, we assume all files are allowed | ||
) | ||
|
||
if ( | ||
include_ext not in values["file_types"] | ||
and include_ext != "*" | ||
|
@@ -263,6 +478,44 @@ def _extract_table_name(self, named_vars: dict) -> str: | |
raise ValueError("path_spec.table_name is not set") | ||
return self.table_name.format_map(named_vars) | ||
|
||
def extract_datetime_partition( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this used anywhere ? Would be good to add unit tests for this too. |
||
self, path: str, is_folder: bool = False | ||
) -> Optional[datetime.datetime]: | ||
if self.sort_key is None: | ||
return None | ||
|
||
if not self.sort_key.date_format and self.sort_key.type not in [ | ||
SortKeyType.DATETIME, | ||
SortKeyType.DATE, | ||
]: | ||
return None | ||
|
||
if is_folder: | ||
parsed_vars = self.get_folder_named_vars(path) | ||
else: | ||
parsed_vars = self.get_named_vars(path) | ||
if parsed_vars is None: | ||
return None | ||
|
||
partition_format = self.sort_key.key | ||
datetime_format = self.sort_key.date_format | ||
if datetime_format is None: | ||
return None | ||
|
||
for var_key in parsed_vars.named: | ||
var = parsed_vars.named[var_key] | ||
if isinstance(var, dict): | ||
for key in var: | ||
template_key = var_key + f"[{key}]" | ||
partition_format = partition_format.replace( | ||
f"{{{template_key}}}", var[key] | ||
) | ||
else: | ||
partition_format.replace(f"{{{var_key}}}", var) | ||
return datetime.datetime.strptime(partition_format, datetime_format).replace( | ||
tzinfo=datetime.timezone.utc | ||
) | ||
|
||
def extract_table_name_and_path(self, path: str) -> Tuple[str, str]: | ||
parsed_vars = self.get_named_vars(path) | ||
if parsed_vars is None or "table" not in parsed_vars.named: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like description needs to change here.