From 6aeb241733462736314ce6c933df8c3f73866159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Fri, 5 Jul 2024 09:59:18 +0200 Subject: [PATCH 01/24] Moving callable session from storage S3 to AWSClient and adding a marker interface for AccessKeyCredentials in the AWS context. --- adapta/security/clients/aws/_aws_client.py | 24 +++++++++++++------ .../security/clients/aws/_aws_credentials.py | 24 ++++++++++--------- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/adapta/security/clients/aws/_aws_client.py b/adapta/security/clients/aws/_aws_client.py index acab751a..90508cfd 100644 --- a/adapta/security/clients/aws/_aws_client.py +++ b/adapta/security/clients/aws/_aws_client.py @@ -33,17 +33,29 @@ class AwsClient(AuthenticationClient): AWS Credentials provider for various AWS resources. """ - def __init__(self, aws_credentials: Optional[AccessKeyCredentials] = None): - self._session = None + def __init__( + self, + aws_credentials: Optional[AccessKeyCredentials] = None, + session_callable: Optional[Callable[[], Session]] = None, + ): self._credentials = aws_credentials + self._session = None + self._session_callable = session_callable if session_callable is not None else None @property - def session(self): + def session(self) -> Optional[Session]: """ Returns configured session (if any) """ return self._session + @property + def session_callable(self) -> Optional[Callable[[], Session]]: + """ + Returns configured callable session (if any) + """ + return self._session_callable + @classmethod def from_base_client(cls, client: AuthenticationClient) -> Optional["AwsClient"]: """ @@ -97,10 +109,8 @@ def initialize_session(self, session_callable: Optional[Callable[[], Session]] = if self._session is not None: return self - if session_callable is None: - session_callable = self._default_aws_session - - self._session = session_callable() + self._session_callable = session_callable or self._session_callable or self._default_aws_session + self._session = self._session_callable() return self diff --git a/adapta/security/clients/aws/_aws_credentials.py b/adapta/security/clients/aws/_aws_credentials.py index 869727ce..aea9a62c 100644 --- a/adapta/security/clients/aws/_aws_credentials.py +++ b/adapta/security/clients/aws/_aws_credentials.py @@ -20,7 +20,19 @@ from typing import Optional -class AccessKeyCredentials(ABC): +class IConnectionDetails(ABC): + @property + @abstractmethod + def region(self) -> str: + """AWS region""" + + @property + @abstractmethod + def endpoint(self) -> Optional[str]: + """AWS custom endpoint""" + + +class AccessKeyCredentials(IConnectionDetails, ABC): """ Abstract class that represents credentials for AWS connections """ @@ -35,21 +47,11 @@ def access_key(self) -> str: def access_key_id(self) -> str: """AWS account access key id""" - @property - @abstractmethod - def region(self) -> str: - """AWS region""" - @property @abstractmethod def session_token(self) -> Optional[str]: """AWS session token""" - @property - @abstractmethod - def endpoint(self) -> Optional[str]: - """AWS custom endpoint""" - class EnvironmentAwsCredentials(AccessKeyCredentials): """ From d04009538d1f96e804e408a1ae222366523f1cd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Fri, 5 Jul 2024 11:00:08 +0200 Subject: [PATCH 02/24] Adding docstring for IConnectionDetails --- adapta/security/clients/aws/_aws_credentials.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/adapta/security/clients/aws/_aws_credentials.py b/adapta/security/clients/aws/_aws_credentials.py index aea9a62c..04391594 100644 --- a/adapta/security/clients/aws/_aws_credentials.py +++ b/adapta/security/clients/aws/_aws_credentials.py @@ -21,6 +21,10 @@ class IConnectionDetails(ABC): + """ + Abstract marker interface class that represents the connection details for AWS + """ + @property @abstractmethod def region(self) -> str: From b9432182b601ba04c59c655fde5312a57cd1213f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Mon, 8 Jul 2024 14:06:23 +0200 Subject: [PATCH 03/24] Adding missing S3Path annotation --- adapta/process_communication/_models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/adapta/process_communication/_models.py b/adapta/process_communication/_models.py index e3b04590..6c483a2c 100644 --- a/adapta/process_communication/_models.py +++ b/adapta/process_communication/_models.py @@ -23,6 +23,7 @@ from adapta.storage.models import parse_data_path from adapta.storage.models.astra import AstraPath +from adapta.storage.models.aws import S3Path from adapta.storage.models.base import DataPath from adapta.storage.models.azure import AdlsGen2Path, WasbPath from adapta.storage.models.local import LocalPath @@ -52,7 +53,7 @@ def __post_init__(self): ), "Fields alias, data_path and data_format must have a value provided to instantiate a DataSocket." def parse_data_path( - self, candidates: Iterable[DataPath] = (AdlsGen2Path, LocalPath, WasbPath, AstraPath) + self, candidates: Iterable[DataPath] = (AdlsGen2Path, LocalPath, WasbPath, AstraPath, S3Path) ) -> Optional[DataPath]: """ Attempts to convert this socket's data path to one of the known DataPath types. From 91015cdefad717920295e9aab72f959d3b905bd0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Tue, 9 Jul 2024 11:19:23 +0200 Subject: [PATCH 04/24] Some more improvements --- adapta/storage/blob/s3_storage_client.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/adapta/storage/blob/s3_storage_client.py b/adapta/storage/blob/s3_storage_client.py index e84a8cfe..625008fa 100644 --- a/adapta/storage/blob/s3_storage_client.py +++ b/adapta/storage/blob/s3_storage_client.py @@ -88,6 +88,15 @@ def blob_exists(self, blob_path: DataPath) -> bool: :return: Boolean indicator of blob existence """ s3_path = cast_path(blob_path) + + if not (hasattr(s3_path, "bucket") and s3_path.bucket) or not (hasattr(s3_path, "path") and s3_path.path): + missing_attributes = [ + attr for attr in ["bucket", "path"] if not (hasattr(s3_path, attr) and getattr(s3_path, attr)) + ] + raise StorageClientError( + f"Blob path provided does not have the needed parameters: {', '.join(missing_attributes)}" + ) + try: self._s3_resource.meta.client.head_object(Bucket=s3_path.bucket, Key=s3_path.path) return True @@ -196,8 +205,8 @@ def download_blobs( os.makedirs(os.path.dirname(local_file_path), exist_ok=True) try: self._s3_resource.meta.client.download_file(s3_path.bucket, blob.key, local_file_path) - except ClientError as error: - raise StorageClientError(f"Error downloading blob: {error}") from error + except ClientError: + raise StorageClientError("Error downloading blob") def copy_blob(self, blob_path: DataPath, target_blob_path: DataPath, doze_period_ms: int = 0) -> None: """ From 2ce116942bb6caffad6f324be1530777a58a739c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Tue, 16 Jul 2024 15:27:09 +0200 Subject: [PATCH 05/24] Small bugfix after testing with Azure client and rebase fix --- adapta/security/clients/aws/_aws_client.py | 7 ++++++- adapta/storage/models/azure.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/adapta/security/clients/aws/_aws_client.py b/adapta/security/clients/aws/_aws_client.py index f9a2eb2d..203a3cca 100644 --- a/adapta/security/clients/aws/_aws_client.py +++ b/adapta/security/clients/aws/_aws_client.py @@ -33,7 +33,12 @@ class AwsClient(AuthenticationClient): AWS Credentials provider for various AWS resources. """ - def __init__(self, aws_credentials: Optional[AccessKeyCredentials] = None, allow_http: bool = False, session_callable: Optional[Callable[[], Session]] = None,): + def __init__( + self, + aws_credentials: Optional[AccessKeyCredentials] = None, + allow_http: bool = False, + session_callable: Optional[Callable[[], Session]] = None, + ): self._session = None self._credentials = aws_credentials self._allow_http = allow_http diff --git a/adapta/storage/models/azure.py b/adapta/storage/models/azure.py index 428f2254..92bb1758 100644 --- a/adapta/storage/models/azure.py +++ b/adapta/storage/models/azure.py @@ -34,7 +34,7 @@ def base_uri(self) -> str: @classmethod def from_uri(cls, url: str) -> "DataPath": assert url.startswith("https://") and ( - "dfs.core.windows.net" in url + "dfs.core.windows.net" in url or "blob.core.windows.net" in url ), "Invalid URL supplied. Please use the following format: https://.dfs.core.windows.net or https://.blob.core.windows.net" return cls( From f847a233502bb063a21cde3c75fbf09d30203dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Tue, 16 Jul 2024 15:33:15 +0200 Subject: [PATCH 06/24] Addressing lint --- adapta/storage/blob/s3_storage_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adapta/storage/blob/s3_storage_client.py b/adapta/storage/blob/s3_storage_client.py index 79769e5d..ce29ade3 100644 --- a/adapta/storage/blob/s3_storage_client.py +++ b/adapta/storage/blob/s3_storage_client.py @@ -212,8 +212,8 @@ def download_blobs( os.makedirs(os.path.dirname(local_file_path), exist_ok=True) try: self._s3_resource.meta.client.download_file(s3_path.bucket, blob.key, local_file_path) - except ClientError: - raise StorageClientError("Error downloading blob") + except ClientError as exception: + raise StorageClientError("Error downloading blob") from exception def copy_blob(self, blob_path: DataPath, target_blob_path: DataPath, doze_period_ms: int = 0) -> None: """ From 85afee02aee133e63ee3ff8c0283504411abe78a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 10:12:54 +0200 Subject: [PATCH 07/24] Fixing readme file --- adapta/storage/blob/README.md | 120 ++++++++++++++++++++++++++-------- 1 file changed, 92 insertions(+), 28 deletions(-) diff --git a/adapta/storage/blob/README.md b/adapta/storage/blob/README.md index a960f5d5..2dd3cbb2 100644 --- a/adapta/storage/blob/README.md +++ b/adapta/storage/blob/README.md @@ -53,32 +53,72 @@ cat /local/path/file_name ### AWS example ```python +import os from adapta.security.clients import AwsClient +from adapta.security.clients.aws._aws_credentials import EnvironmentAwsCredentials from adapta.storage.models.aws import S3Path from adapta.storage.blob.s3_storage_client import S3StorageClient from adapta.storage.models.format import DictJsonSerializationFormat -aws_client = AwsClient() -s3_path = S3Path.from_hdfs_path('s3a://bucket/path/to/my/table') +# Set up environment variables +os.environ["PROTEUS__AWS_ACCESS_KEY_ID"] = +os.environ["PROTEUS__AWS_SESSION_TOKEN"] = +os.environ["PROTEUS__AWS_SECRET_ACCESS_KEY"] = +os.environ["PROTEUS__AWS_REGION"] = "eu-central-1" +os.environ["PROTEUS__AWS_ENDPOINT"] = "http://example.com" -# init storage client -s3_client = S3StorageClient.create(base_client=aws_client) +# Create client +credentials = EnvironmentAwsCredentials() # Create AWS credentials +aws_client = AwsClient(credentials) + +# Initialize storage client +s3_client = S3StorageClient.create(auth=aws_client) + +# Setting blob S3 path +s3_path = S3Path.from_hdfs_path('s3a://bucket/folder/path_to_my_blob') + +# Save data to S3 path +data = { + 'Character': ['Homer Simpson', 'Michael Scott', 'Ron Swanson', 'Sheldon Cooper', 'Captain Jack Sparrow'], + 'Occupation': ['Nuclear Safety Inspector', 'Regional Manager', 'Parks and Recreation Director', 'Theoretical Physicist', 'Pirate Captain'], + 'Catchphrase': [ + 'D\'oh!', + 'I am the World\'s Best Boss.', + 'I\'m a simple man. I like pretty, dark-haired women and breakfast food.', + 'Bazinga!', + 'Why is the rum always gone?' + ] +} -# Save data to S3 s3_client.save_data_as_blob( - data={"data_value": "2"}, blob_path=s3_path, serialization_format=DictJsonSerializationFormat, overwrite=True + data=data, blob_path=s3_path, serialization_format=DictJsonSerializationFormat, overwrite=True ) -# read files from S3 -blobs = s3_client.read_blobs(s3_path, serialization_format=DictJsonSerializationFormat) +# Read blob from S3 path +blob_iterator = s3_client.read_blobs(s3_path, serialization_format=DictJsonSerializationFormat) + +# Print read blob +print(list(blob_iterator)) +# The blob's content will be displayed as follows: +# [{'Character': ['Boromir', 'Harry Potter', 'Sherlock Holmes', 'Tony Stark', 'Darth Vader'], 'Occupation': ['Professional succumber to temptation', 'Wizard', 'Detective', 'Iron Man', 'Sith Lord'], 'Catchphrase': ['One does not simply walk into Mordor.', 'Expecto Patronum!', 'Elementary, my dear Watson.', 'I am Iron Man.', 'I find your lack of faith disturbing.']}] + +# List all blobs for in the same folder +folder_s3_path = S3Path.from_hdfs_path('s3a://bucket/folder/') +folder_data_path_iterator = s3_client.list_blobs(blob_path=folder_s3_path) + +print(list(folder_data_path_iterator)) +# The list of blobs will resemble this format: +# [S3Path(bucket='bucket', path='folder/path_to_my_blob', protocol='s3'), S3Path(bucket='bucket', path='folder/other_blob', protocol='s3')] ``` #### Download a single blob: ```python +import os from adapta.security.clients import AwsClient +from adapta.security.clients.aws._aws_credentials import EnvironmentAwsCredentials from adapta.storage.models.aws import S3Path from adapta.storage.blob.s3_storage_client import S3StorageClient -from adapta.storage.models.format import DictJsonSerializationFormat +from adapta.storage.models.format import DataFrameParquetSerializationFormat # Set up environment variables os.environ["PROTEUS__AWS_ACCESS_KEY_ID"] = @@ -91,20 +131,12 @@ os.environ["PROTEUS__AWS_ENDPOINT"] = "http://example.com" credentials = EnvironmentAwsCredentials() aws_client = AwsClient(credentials) -# Target path for copy_blob -blob_path = "path/to/blob.file" "# It can be either a 'blob.file' or a 'folder/' -s3_path = S3Path.from_hdfs_path(blob_path) - -# Init storage client -s3_client = S3StorageClient.create(base_client=aws_client) - -# Save data to S3 -s3_client.save_data_as_blob( - data={"data_value": "very_important_data"}, blob_path=s3_path, serialization_format=DictJsonSerializationFormat, overwrite=True -) +# Initialize storage client +s3_client = S3StorageClient.create(auth=aws_client) -# List all blob files in an S3 storage path -blob_list = s3_client.list_blobs(s3_path) +# List all blob files in the target folder +s3_folder_path = S3Path.from_hdfs_path('s3a://bucket/folder/') +blob_list = s3_client.list_blobs(blob_path=s3_folder_path) for blob_details in blob_list: print(blob_details) @@ -112,8 +144,8 @@ for blob_details in blob_list: # {'Key': 'data/0-c309720b-3577-4211-b403-bbb55083e5c3-0.parquet', 'LastModified': datetime.datetime(2024, 6, 27, 14, 10, 28, 29000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2067, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} # {'Key': 'data/_delta_log/00000000000000000000.json', 'LastModified': datetime.datetime(2024, 6, 27, 13, 49, 2, 942000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2074, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} -# Read blobs from the S3 path -s3_path_parquet_file = S3Path.from_hdfs_path("path/to/blob.file.parquet") +# Read blobs from the S3 +s3_path_parquet_file = S3Path.from_hdfs_path("'s3a://bucket/path_to_blob_file.parquet") blobs = s3_client.read_blobs(s3_path_parquet_file, serialization_format=DataFrameParquetSerializationFormat) # Print blobs content print(list(blobs)) @@ -128,13 +160,45 @@ print(list(blobs)) # Downloads blobs from the S3 storage path to the provided local path, as if you were navigating within the S3 path. -s3_client.download_blobs(s3_path, local_path="/tmp/path/to/download") +s3_client.download_blobs(s3_folder_path, local_path="/tmp/path/to/download") # Upon executing the command 'ls /tmp/path/to/download' in your terminal, a list of files be visible: # 0-c309720b-3577-4211-b403-bbb55083e5c3-0.parquet _delta_log +``` + + +#### Copy a single blob to a different location: +```python +import os +from adapta.security.clients import AwsClient +from adapta.security.clients.aws._aws_credentials import EnvironmentAwsCredentials +from adapta.storage.models.aws import S3Path +from adapta.storage.blob.s3_storage_client import S3StorageClient + +# Set up environment variables +os.environ["PROTEUS__AWS_ACCESS_KEY_ID"] = +os.environ["PROTEUS__AWS_SESSION_TOKEN"] = +os.environ["PROTEUS__AWS_SECRET_ACCESS_KEY"] = +os.environ["PROTEUS__AWS_REGION"] = "eu-central-1" +os.environ["PROTEUS__AWS_ENDPOINT"] = "http://example.com" + +# Create client +credentials = EnvironmentAwsCredentials() +aws_client = AwsClient(credentials) + +# Initialize storage client +s3_client = S3StorageClient.create(auth=aws_client) # Copy blob from one location to another in S3 -target_blob_path='s3a://path/to/blob_copy/' -s3_target_blob_path = S3Path.from_hdfs_path(target_blob_path) -s3_client.copy_blob(blob_path=s3_path, target_blob_path=s3_target_blob_path, doze_period_ms=1000) # Time in ms between files being copied +s3_target_blob_path = S3Path.from_hdfs_path('s3a://bucket/path_to_blob_copy') +s3_client.copy_blob(blob_path=s3_path, target_blob_path=s3_target_blob_path) + +# List all blob files in the target folder +blob_list = s3_client.list_blobs(blob_path=s3_target_blob_path) +for blob_details in blob_list: + print(blob_details) + +# The list of blobs and its details will be printed out like in the original blob location: +# {'Key': 'data/0-c309720b-3577-4211-b403-bbb55083e5c3-0.parquet', 'LastModified': datetime.datetime(2024, 6, 27, 14, 10, 28, 29000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2067, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} +# {'Key': 'data/_delta_log/00000000000000000000.json', 'LastModified': datetime.datetime(2024, 6, 27, 13, 49, 2, 942000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2074, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} ``` \ No newline at end of file From 7a17a795611cc771d84d13ae32bf184a9127acac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 10:49:04 +0200 Subject: [PATCH 08/24] Small fixes --- adapta/storage/blob/s3_storage_client.py | 3 +++ adapta/storage/models/aws.py | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/adapta/storage/blob/s3_storage_client.py b/adapta/storage/blob/s3_storage_client.py index 5b16ca9c..1c2717f3 100644 --- a/adapta/storage/blob/s3_storage_client.py +++ b/adapta/storage/blob/s3_storage_client.py @@ -237,6 +237,9 @@ def copy_blob(self, blob_path: DataPath, target_blob_path: DataPath, doze_period source_s3_path = cast_path(blob_path) target_s3_path = cast_path(target_blob_path) + source_s3_path.path = source_s3_path.path.rstrip('/') + target_s3_path.path = target_s3_path.path.rstrip('/') + source_objects = self._s3_resource.Bucket(source_s3_path.bucket).objects.filter(Prefix=source_s3_path.path) for source_object in source_objects: diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index 3c1695fb..ebba9d5e 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -73,11 +73,15 @@ def from_hdfs_path(cls, hdfs_path: str) -> "S3Path": parsed_path = uri.path.split("/") return cls(bucket=uri.netloc, path="/".join(parsed_path[1:])) + def _check_path(self): + assert not self.path.startswith("/"), "Path should not start with /" + def to_hdfs_path(self) -> str: """ Converts the S3Path to an HDFS compatible path. :return: HDFS path """ + self._check_path() if not self.bucket or not self.path: raise ValueError("Bucket and path must be defined") @@ -88,6 +92,7 @@ def to_delta_rs_path(self) -> str: Converts the S3Path to a Delta Lake compatible path. :return: Delta Lake path """ + self._check_path() return f"s3a://{self.bucket}/{self.path}" From 9a571ffb3318b9e5f5da29b9c737d33a5b056c09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 10:52:01 +0200 Subject: [PATCH 09/24] Formating --- adapta/storage/blob/s3_storage_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/adapta/storage/blob/s3_storage_client.py b/adapta/storage/blob/s3_storage_client.py index 1c2717f3..95876715 100644 --- a/adapta/storage/blob/s3_storage_client.py +++ b/adapta/storage/blob/s3_storage_client.py @@ -237,8 +237,8 @@ def copy_blob(self, blob_path: DataPath, target_blob_path: DataPath, doze_period source_s3_path = cast_path(blob_path) target_s3_path = cast_path(target_blob_path) - source_s3_path.path = source_s3_path.path.rstrip('/') - target_s3_path.path = target_s3_path.path.rstrip('/') + source_s3_path.path = source_s3_path.path.rstrip("/") + target_s3_path.path = target_s3_path.path.rstrip("/") source_objects = self._s3_resource.Bucket(source_s3_path.bucket).objects.filter(Prefix=source_s3_path.path) From 4ed076dc80ec6fa470d0991c312211619c5bc290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 11:16:22 +0200 Subject: [PATCH 10/24] Bug fix now --- adapta/storage/models/aws.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index ebba9d5e..84d66337 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -36,7 +36,7 @@ def to_uri(self) -> str: if not self.bucket or not self.path: raise ValueError("Bucket and path must be defined") - return f"s3://{self.bucket}/{self.path}" + return f"s3://{self.bucket.rstrip('/')}/{self.path}" def base_uri(self) -> str: """ @@ -46,7 +46,7 @@ def base_uri(self) -> str: if not self.bucket: raise ValueError("Bucket must be defined") - return f"https://{self.bucket}.s3.amazonaws.com" + return f"https://{self.bucket.rstrip('/')}.s3.amazonaws.com" @classmethod def from_uri(cls, url: str) -> "S3Path": @@ -70,7 +70,7 @@ def from_hdfs_path(cls, hdfs_path: str) -> "S3Path": """ assert hdfs_path.startswith("s3a://"), "HDFS S3 path should start with s3a://" uri = urlparse(hdfs_path) - parsed_path = uri.path.split("/") + parsed_path = uri.path.replace("//", "/").split("/") return cls(bucket=uri.netloc, path="/".join(parsed_path[1:])) def _check_path(self): @@ -85,7 +85,7 @@ def to_hdfs_path(self) -> str: if not self.bucket or not self.path: raise ValueError("Bucket and path must be defined") - return f"s3a://{self.bucket}/{self.path}" + return f"s3a://{self.bucket.rstrip('/')}/{self.path}" def to_delta_rs_path(self) -> str: """ @@ -93,7 +93,7 @@ def to_delta_rs_path(self) -> str: :return: Delta Lake path """ self._check_path() - return f"s3a://{self.bucket}/{self.path}" + return f"s3a://{self.bucket.rstrip('/')}/{self.path}" def cast_path(blob_path: DataPath) -> S3Path: From bb012f8f942fcf40b7d8f6b3916ddb822849af4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 15:04:41 +0200 Subject: [PATCH 11/24] Reducing scope in the branch --- adapta/process_communication/_models.py | 3 +- adapta/security/clients/aws/_aws_client.py | 23 ++-- adapta/storage/blob/README.md | 120 +++++---------------- adapta/storage/blob/s3_storage_client.py | 16 +-- adapta/storage/models/azure.py | 2 +- 5 files changed, 38 insertions(+), 126 deletions(-) diff --git a/adapta/process_communication/_models.py b/adapta/process_communication/_models.py index 6c483a2c..e3b04590 100644 --- a/adapta/process_communication/_models.py +++ b/adapta/process_communication/_models.py @@ -23,7 +23,6 @@ from adapta.storage.models import parse_data_path from adapta.storage.models.astra import AstraPath -from adapta.storage.models.aws import S3Path from adapta.storage.models.base import DataPath from adapta.storage.models.azure import AdlsGen2Path, WasbPath from adapta.storage.models.local import LocalPath @@ -53,7 +52,7 @@ def __post_init__(self): ), "Fields alias, data_path and data_format must have a value provided to instantiate a DataSocket." def parse_data_path( - self, candidates: Iterable[DataPath] = (AdlsGen2Path, LocalPath, WasbPath, AstraPath, S3Path) + self, candidates: Iterable[DataPath] = (AdlsGen2Path, LocalPath, WasbPath, AstraPath) ) -> Optional[DataPath]: """ Attempts to convert this socket's data path to one of the known DataPath types. diff --git a/adapta/security/clients/aws/_aws_client.py b/adapta/security/clients/aws/_aws_client.py index 203a3cca..564bedeb 100644 --- a/adapta/security/clients/aws/_aws_client.py +++ b/adapta/security/clients/aws/_aws_client.py @@ -33,31 +33,18 @@ class AwsClient(AuthenticationClient): AWS Credentials provider for various AWS resources. """ - def __init__( - self, - aws_credentials: Optional[AccessKeyCredentials] = None, - allow_http: bool = False, - session_callable: Optional[Callable[[], Session]] = None, - ): + def __init__(self, aws_credentials: Optional[AccessKeyCredentials] = None, allow_http: bool = False): self._session = None self._credentials = aws_credentials self._allow_http = allow_http - self._session_callable = session_callable if session_callable is not None else None @property - def session(self) -> Optional[Session]: + def session(self): """ Returns configured session (if any) """ return self._session - @property - def session_callable(self) -> Optional[Callable[[], Session]]: - """ - Returns configured callable session (if any) - """ - return self._session_callable - @classmethod def from_base_client(cls, client: AuthenticationClient) -> Optional["AwsClient"]: """ @@ -112,8 +99,10 @@ def initialize_session(self, session_callable: Optional[Callable[[], Session]] = if self._session is not None: return self - self._session_callable = session_callable or self._session_callable or self._default_aws_session - self._session = self._session_callable() + if session_callable is None: + session_callable = self._default_aws_session + + self._session = session_callable() return self diff --git a/adapta/storage/blob/README.md b/adapta/storage/blob/README.md index 2dd3cbb2..a960f5d5 100644 --- a/adapta/storage/blob/README.md +++ b/adapta/storage/blob/README.md @@ -53,72 +53,32 @@ cat /local/path/file_name ### AWS example ```python -import os from adapta.security.clients import AwsClient -from adapta.security.clients.aws._aws_credentials import EnvironmentAwsCredentials from adapta.storage.models.aws import S3Path from adapta.storage.blob.s3_storage_client import S3StorageClient from adapta.storage.models.format import DictJsonSerializationFormat -# Set up environment variables -os.environ["PROTEUS__AWS_ACCESS_KEY_ID"] = -os.environ["PROTEUS__AWS_SESSION_TOKEN"] = -os.environ["PROTEUS__AWS_SECRET_ACCESS_KEY"] = -os.environ["PROTEUS__AWS_REGION"] = "eu-central-1" -os.environ["PROTEUS__AWS_ENDPOINT"] = "http://example.com" +aws_client = AwsClient() +s3_path = S3Path.from_hdfs_path('s3a://bucket/path/to/my/table') -# Create client -credentials = EnvironmentAwsCredentials() # Create AWS credentials -aws_client = AwsClient(credentials) - -# Initialize storage client -s3_client = S3StorageClient.create(auth=aws_client) - -# Setting blob S3 path -s3_path = S3Path.from_hdfs_path('s3a://bucket/folder/path_to_my_blob') - -# Save data to S3 path -data = { - 'Character': ['Homer Simpson', 'Michael Scott', 'Ron Swanson', 'Sheldon Cooper', 'Captain Jack Sparrow'], - 'Occupation': ['Nuclear Safety Inspector', 'Regional Manager', 'Parks and Recreation Director', 'Theoretical Physicist', 'Pirate Captain'], - 'Catchphrase': [ - 'D\'oh!', - 'I am the World\'s Best Boss.', - 'I\'m a simple man. I like pretty, dark-haired women and breakfast food.', - 'Bazinga!', - 'Why is the rum always gone?' - ] -} +# init storage client +s3_client = S3StorageClient.create(base_client=aws_client) +# Save data to S3 s3_client.save_data_as_blob( - data=data, blob_path=s3_path, serialization_format=DictJsonSerializationFormat, overwrite=True + data={"data_value": "2"}, blob_path=s3_path, serialization_format=DictJsonSerializationFormat, overwrite=True ) -# Read blob from S3 path -blob_iterator = s3_client.read_blobs(s3_path, serialization_format=DictJsonSerializationFormat) - -# Print read blob -print(list(blob_iterator)) -# The blob's content will be displayed as follows: -# [{'Character': ['Boromir', 'Harry Potter', 'Sherlock Holmes', 'Tony Stark', 'Darth Vader'], 'Occupation': ['Professional succumber to temptation', 'Wizard', 'Detective', 'Iron Man', 'Sith Lord'], 'Catchphrase': ['One does not simply walk into Mordor.', 'Expecto Patronum!', 'Elementary, my dear Watson.', 'I am Iron Man.', 'I find your lack of faith disturbing.']}] - -# List all blobs for in the same folder -folder_s3_path = S3Path.from_hdfs_path('s3a://bucket/folder/') -folder_data_path_iterator = s3_client.list_blobs(blob_path=folder_s3_path) - -print(list(folder_data_path_iterator)) -# The list of blobs will resemble this format: -# [S3Path(bucket='bucket', path='folder/path_to_my_blob', protocol='s3'), S3Path(bucket='bucket', path='folder/other_blob', protocol='s3')] +# read files from S3 +blobs = s3_client.read_blobs(s3_path, serialization_format=DictJsonSerializationFormat) ``` #### Download a single blob: ```python -import os from adapta.security.clients import AwsClient -from adapta.security.clients.aws._aws_credentials import EnvironmentAwsCredentials from adapta.storage.models.aws import S3Path from adapta.storage.blob.s3_storage_client import S3StorageClient -from adapta.storage.models.format import DataFrameParquetSerializationFormat +from adapta.storage.models.format import DictJsonSerializationFormat # Set up environment variables os.environ["PROTEUS__AWS_ACCESS_KEY_ID"] = @@ -131,12 +91,20 @@ os.environ["PROTEUS__AWS_ENDPOINT"] = "http://example.com" credentials = EnvironmentAwsCredentials() aws_client = AwsClient(credentials) -# Initialize storage client -s3_client = S3StorageClient.create(auth=aws_client) +# Target path for copy_blob +blob_path = "path/to/blob.file" "# It can be either a 'blob.file' or a 'folder/' +s3_path = S3Path.from_hdfs_path(blob_path) + +# Init storage client +s3_client = S3StorageClient.create(base_client=aws_client) + +# Save data to S3 +s3_client.save_data_as_blob( + data={"data_value": "very_important_data"}, blob_path=s3_path, serialization_format=DictJsonSerializationFormat, overwrite=True +) -# List all blob files in the target folder -s3_folder_path = S3Path.from_hdfs_path('s3a://bucket/folder/') -blob_list = s3_client.list_blobs(blob_path=s3_folder_path) +# List all blob files in an S3 storage path +blob_list = s3_client.list_blobs(s3_path) for blob_details in blob_list: print(blob_details) @@ -144,8 +112,8 @@ for blob_details in blob_list: # {'Key': 'data/0-c309720b-3577-4211-b403-bbb55083e5c3-0.parquet', 'LastModified': datetime.datetime(2024, 6, 27, 14, 10, 28, 29000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2067, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} # {'Key': 'data/_delta_log/00000000000000000000.json', 'LastModified': datetime.datetime(2024, 6, 27, 13, 49, 2, 942000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2074, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} -# Read blobs from the S3 -s3_path_parquet_file = S3Path.from_hdfs_path("'s3a://bucket/path_to_blob_file.parquet") +# Read blobs from the S3 path +s3_path_parquet_file = S3Path.from_hdfs_path("path/to/blob.file.parquet") blobs = s3_client.read_blobs(s3_path_parquet_file, serialization_format=DataFrameParquetSerializationFormat) # Print blobs content print(list(blobs)) @@ -160,45 +128,13 @@ print(list(blobs)) # Downloads blobs from the S3 storage path to the provided local path, as if you were navigating within the S3 path. -s3_client.download_blobs(s3_folder_path, local_path="/tmp/path/to/download") +s3_client.download_blobs(s3_path, local_path="/tmp/path/to/download") # Upon executing the command 'ls /tmp/path/to/download' in your terminal, a list of files be visible: # 0-c309720b-3577-4211-b403-bbb55083e5c3-0.parquet _delta_log -``` - - -#### Copy a single blob to a different location: -```python -import os -from adapta.security.clients import AwsClient -from adapta.security.clients.aws._aws_credentials import EnvironmentAwsCredentials -from adapta.storage.models.aws import S3Path -from adapta.storage.blob.s3_storage_client import S3StorageClient - -# Set up environment variables -os.environ["PROTEUS__AWS_ACCESS_KEY_ID"] = -os.environ["PROTEUS__AWS_SESSION_TOKEN"] = -os.environ["PROTEUS__AWS_SECRET_ACCESS_KEY"] = -os.environ["PROTEUS__AWS_REGION"] = "eu-central-1" -os.environ["PROTEUS__AWS_ENDPOINT"] = "http://example.com" - -# Create client -credentials = EnvironmentAwsCredentials() -aws_client = AwsClient(credentials) - -# Initialize storage client -s3_client = S3StorageClient.create(auth=aws_client) # Copy blob from one location to another in S3 -s3_target_blob_path = S3Path.from_hdfs_path('s3a://bucket/path_to_blob_copy') -s3_client.copy_blob(blob_path=s3_path, target_blob_path=s3_target_blob_path) - -# List all blob files in the target folder -blob_list = s3_client.list_blobs(blob_path=s3_target_blob_path) -for blob_details in blob_list: - print(blob_details) - -# The list of blobs and its details will be printed out like in the original blob location: -# {'Key': 'data/0-c309720b-3577-4211-b403-bbb55083e5c3-0.parquet', 'LastModified': datetime.datetime(2024, 6, 27, 14, 10, 28, 29000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2067, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} -# {'Key': 'data/_delta_log/00000000000000000000.json', 'LastModified': datetime.datetime(2024, 6, 27, 13, 49, 2, 942000, tzinfo=tzutc()), 'ETag': '"29097d7d2d11d49fed28745a674af776"', 'Size': 2074, 'StorageClass': 'STANDARD', 'Owner': {'DisplayName': 'minio', 'ID': '02d61764'}} +target_blob_path='s3a://path/to/blob_copy/' +s3_target_blob_path = S3Path.from_hdfs_path(target_blob_path) +s3_client.copy_blob(blob_path=s3_path, target_blob_path=s3_target_blob_path, doze_period_ms=1000) # Time in ms between files being copied ``` \ No newline at end of file diff --git a/adapta/storage/blob/s3_storage_client.py b/adapta/storage/blob/s3_storage_client.py index 95876715..ebd06c98 100644 --- a/adapta/storage/blob/s3_storage_client.py +++ b/adapta/storage/blob/s3_storage_client.py @@ -95,15 +95,6 @@ def blob_exists(self, blob_path: DataPath) -> bool: :return: Boolean indicator of blob existence """ s3_path = cast_path(blob_path) - - if not (hasattr(s3_path, "bucket") and s3_path.bucket) or not (hasattr(s3_path, "path") and s3_path.path): - missing_attributes = [ - attr for attr in ["bucket", "path"] if not (hasattr(s3_path, attr) and getattr(s3_path, attr)) - ] - raise StorageClientError( - f"Blob path provided does not have the needed parameters: {', '.join(missing_attributes)}" - ) - try: self._s3_resource.meta.client.head_object(Bucket=s3_path.bucket, Key=s3_path.path) return True @@ -223,8 +214,8 @@ def download_blobs( os.makedirs(os.path.dirname(local_file_path), exist_ok=True) try: self._s3_resource.meta.client.download_file(s3_path.bucket, blob.key, local_file_path) - except ClientError as exception: - raise StorageClientError("Error downloading blob") from exception + except ClientError as error: + raise StorageClientError(f"Error downloading blob: {error}") from error def copy_blob(self, blob_path: DataPath, target_blob_path: DataPath, doze_period_ms: int = 0) -> None: """ @@ -237,9 +228,6 @@ def copy_blob(self, blob_path: DataPath, target_blob_path: DataPath, doze_period source_s3_path = cast_path(blob_path) target_s3_path = cast_path(target_blob_path) - source_s3_path.path = source_s3_path.path.rstrip("/") - target_s3_path.path = target_s3_path.path.rstrip("/") - source_objects = self._s3_resource.Bucket(source_s3_path.bucket).objects.filter(Prefix=source_s3_path.path) for source_object in source_objects: diff --git a/adapta/storage/models/azure.py b/adapta/storage/models/azure.py index 92bb1758..428f2254 100644 --- a/adapta/storage/models/azure.py +++ b/adapta/storage/models/azure.py @@ -34,7 +34,7 @@ def base_uri(self) -> str: @classmethod def from_uri(cls, url: str) -> "DataPath": assert url.startswith("https://") and ( - "dfs.core.windows.net" in url or "blob.core.windows.net" in url + "dfs.core.windows.net" in url ), "Invalid URL supplied. Please use the following format: https://.dfs.core.windows.net or https://.blob.core.windows.net" return cls( From 2fa69e8b8080d925c65c4f669c6f6bdf7cca865f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 15:11:13 +0200 Subject: [PATCH 12/24] Missed a file --- .../security/clients/aws/_aws_credentials.py | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/adapta/security/clients/aws/_aws_credentials.py b/adapta/security/clients/aws/_aws_credentials.py index 04391594..869727ce 100644 --- a/adapta/security/clients/aws/_aws_credentials.py +++ b/adapta/security/clients/aws/_aws_credentials.py @@ -20,23 +20,7 @@ from typing import Optional -class IConnectionDetails(ABC): - """ - Abstract marker interface class that represents the connection details for AWS - """ - - @property - @abstractmethod - def region(self) -> str: - """AWS region""" - - @property - @abstractmethod - def endpoint(self) -> Optional[str]: - """AWS custom endpoint""" - - -class AccessKeyCredentials(IConnectionDetails, ABC): +class AccessKeyCredentials(ABC): """ Abstract class that represents credentials for AWS connections """ @@ -51,11 +35,21 @@ def access_key(self) -> str: def access_key_id(self) -> str: """AWS account access key id""" + @property + @abstractmethod + def region(self) -> str: + """AWS region""" + @property @abstractmethod def session_token(self) -> Optional[str]: """AWS session token""" + @property + @abstractmethod + def endpoint(self) -> Optional[str]: + """AWS custom endpoint""" + class EnvironmentAwsCredentials(AccessKeyCredentials): """ From def765b6aa197d781d8fa6d5a9709e660602d526 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 16:20:54 +0200 Subject: [PATCH 13/24] Added unit tests --- adapta/storage/models/aws.py | 2 +- tests/test_s3_storage_client.py | 58 +++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index 84d66337..cdff6230 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -36,7 +36,7 @@ def to_uri(self) -> str: if not self.bucket or not self.path: raise ValueError("Bucket and path must be defined") - return f"s3://{self.bucket.rstrip('/')}/{self.path}" + return f"s3a://{self.bucket.rstrip('/')}/{self.path}" def base_uri(self) -> str: """ diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index 62925be8..54eaaac9 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -18,12 +18,70 @@ from unittest.mock import patch, MagicMock +def test_to_uri(): + path = S3Path(bucket="bucket", path="nested/key") + assert path.to_uri() == "s3a://bucket/nested/key" + + +def test_to_uri_malformed_bucket(): + path = S3Path(bucket="bucket/", path="nested/key") + assert path.to_uri() == "s3a://bucket/nested/key" + + +def test_base_uri(): + path = S3Path(bucket="bucket", path="nested/key") + assert path.base_uri() == "https://bucket.s3.amazonaws.com" + + +def test_base_uri_malformed_bucket(): + path = S3Path(bucket="bucket/", path="nested/key") + assert path.base_uri() == "https://bucket.s3.amazonaws.com" + + def test_from_hdfs_path(): path = S3Path.from_hdfs_path("s3a://bucket/nested/key") assert path.bucket == "bucket" assert path.path == "nested/key" +def test_from_hdfs_path_malformed_bucktet(): + malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested/key") + different_malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested//key") + assert different_malformed_path == malformed_path == S3Path(bucket="bucket", path="nested/key") + + +def test_to_uri(): + bucket_name = "bucket" + path = "nested/key" + path_instance = S3Path(bucket=bucket_name, path=path) + assert path_instance.to_uri() == f"s3a://{bucket_name}/{path}" + + +def test_to_uri_malformed_bucket(): + bucket_name = "bucket/" + path = "nested/key" + path_instance = S3Path(bucket=bucket_name, path=path) + assert path_instance.to_uri() == f"s3a://bucket/nested/key" + + +def test_to_delta_rs_path(): + bucket_name = "bucket" + path = "nested/key" + path_instance = S3Path(bucket=bucket_name, path=path) + assert path_instance.to_delta_rs_path() == f"s3a://bucket/nested/key" + + +def test_to_delta_rs_bucket_malformed(): + bucket_name = "bucket/" + path = "nested/key" + path_instance = S3Path(bucket=bucket_name, path=path) + assert path_instance.to_delta_rs_path() == f"s3a://bucket/nested/key" + + +def test_to_uri_malformed_url(): + path = S3Path.from_hdfs_path("s3a://bucket//nested/key") + + def test_to_hdfs_path(): path = S3Path("bucket", "nested/key").to_hdfs_path() assert path == "s3a://bucket/nested/key" From 5a459dac4669fb80780bda42e8851de44f17e44e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Wed, 17 Jul 2024 16:38:17 +0200 Subject: [PATCH 14/24] Removing duplicates --- tests/test_s3_storage_client.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index 54eaaac9..92deaed1 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -15,18 +15,7 @@ from adapta.storage.blob.s3_storage_client import S3StorageClient from adapta.storage.models.aws import S3Path -from unittest.mock import patch, MagicMock - - -def test_to_uri(): - path = S3Path(bucket="bucket", path="nested/key") - assert path.to_uri() == "s3a://bucket/nested/key" - - -def test_to_uri_malformed_bucket(): - path = S3Path(bucket="bucket/", path="nested/key") - assert path.to_uri() == "s3a://bucket/nested/key" - +from unittest.mock import patch def test_base_uri(): path = S3Path(bucket="bucket", path="nested/key") @@ -44,7 +33,7 @@ def test_from_hdfs_path(): assert path.path == "nested/key" -def test_from_hdfs_path_malformed_bucktet(): +def test_from_hdfs_path_malformed_bucket(): malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested/key") different_malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested//key") assert different_malformed_path == malformed_path == S3Path(bucket="bucket", path="nested/key") From f48d1ead20d1c7322adb63ad7de844070b22731d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Thu, 18 Jul 2024 09:41:13 +0200 Subject: [PATCH 15/24] Formatting --- tests/test_s3_storage_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index 92deaed1..62c17d1f 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -17,6 +17,7 @@ from adapta.storage.models.aws import S3Path from unittest.mock import patch + def test_base_uri(): path = S3Path(bucket="bucket", path="nested/key") assert path.base_uri() == "https://bucket.s3.amazonaws.com" From 2c3d77e0e995d5b39bb87edb6209632403fc1a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Thu, 18 Jul 2024 09:52:04 +0200 Subject: [PATCH 16/24] Improving some unittests --- tests/test_s3_storage_client.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index 62c17d1f..9d188077 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -23,7 +23,7 @@ def test_base_uri(): assert path.base_uri() == "https://bucket.s3.amazonaws.com" -def test_base_uri_malformed_bucket(): +def test_base_uri_with_malformed_bucket_path(): path = S3Path(bucket="bucket/", path="nested/key") assert path.base_uri() == "https://bucket.s3.amazonaws.com" @@ -34,7 +34,7 @@ def test_from_hdfs_path(): assert path.path == "nested/key" -def test_from_hdfs_path_malformed_bucket(): +def test_from_hdfs_path_with_empty_path_segments(): malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested/key") different_malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested//key") assert different_malformed_path == malformed_path == S3Path(bucket="bucket", path="nested/key") @@ -47,7 +47,7 @@ def test_to_uri(): assert path_instance.to_uri() == f"s3a://{bucket_name}/{path}" -def test_to_uri_malformed_bucket(): +def test_to_uri_malformed_bucket_path(): bucket_name = "bucket/" path = "nested/key" path_instance = S3Path(bucket=bucket_name, path=path) @@ -61,14 +61,14 @@ def test_to_delta_rs_path(): assert path_instance.to_delta_rs_path() == f"s3a://bucket/nested/key" -def test_to_delta_rs_bucket_malformed(): +def test_to_delta_rs_malformed_bucket_path(): bucket_name = "bucket/" path = "nested/key" path_instance = S3Path(bucket=bucket_name, path=path) assert path_instance.to_delta_rs_path() == f"s3a://bucket/nested/key" -def test_to_uri_malformed_url(): +def test_to_uri_with_empty_path_segment(): path = S3Path.from_hdfs_path("s3a://bucket//nested/key") From cbf15e9e7b14a169873cfe6abc83a39a2df21bc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Mon, 29 Jul 2024 13:42:32 +0200 Subject: [PATCH 17/24] Addressing PR comments --- adapta/ml/mlflow/_client.py | 2 +- adapta/storage/models/aws.py | 36 ++++++++++++--------- tests/test_s3_storage_client.py | 57 ++++++++++++++++----------------- 3 files changed, 50 insertions(+), 45 deletions(-) diff --git a/adapta/ml/mlflow/_client.py b/adapta/ml/mlflow/_client.py index 01cd30cd..e59047d9 100644 --- a/adapta/ml/mlflow/_client.py +++ b/adapta/ml/mlflow/_client.py @@ -129,7 +129,7 @@ def load_model_by_uri(model_uri: str) -> PyFuncModel: - ``/Users/me/path/to/local/model`` - ``relative/path/to/local/model`` - - ``s3://my_bucket/path/to/model`` + - ``s3a://my_bucket/path/to/model`` - ``runs://run-relative/path/to/model`` - ``models://`` - ``models://`` diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index cdff6230..69de9448 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -16,6 +16,8 @@ # limitations under the License. # +import re + from dataclasses import dataclass from urllib.parse import urlparse @@ -33,10 +35,10 @@ def to_uri(self) -> str: Converts the S3Path to a URI. :return: URI path """ - if not self.bucket or not self.path: - raise ValueError("Bucket and path must be defined") + if not self.bucket: + raise ValueError("Bucket must be defined") - return f"s3a://{self.bucket.rstrip('/')}/{self.path}" + return f"s3a://{self.bucket}/{self.path}" def base_uri(self) -> str: """ @@ -46,7 +48,7 @@ def base_uri(self) -> str: if not self.bucket: raise ValueError("Bucket must be defined") - return f"https://{self.bucket.rstrip('/')}.s3.amazonaws.com" + return f"https://{self.bucket}.s3.amazonaws.com" @classmethod def from_uri(cls, url: str) -> "S3Path": @@ -62,6 +64,18 @@ def from_uri(cls, url: str) -> "S3Path": path: str protocol: str = DataProtocols.S3.value + def __post_init__(self): + if not self.bucket: + raise ValueError("Bucket must be defined") + + path_regex = r"^(?![0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$)[a-z0-9]([a-z0-9\-]{1,61}[a-z0-9])?(\/(?!.*(\/\/|\\))([^\/].{0,1022}\/?)?)?$" + + s3_path_without_prefix = f"{self.bucket}/{self.path}" + match = re.match(path_regex, s3_path_without_prefix) + + if not match: + raise ValueError(f"Invalid S3Path provided, must comply with : {path_regex}") + @classmethod def from_hdfs_path(cls, hdfs_path: str) -> "S3Path": """ @@ -70,30 +84,22 @@ def from_hdfs_path(cls, hdfs_path: str) -> "S3Path": """ assert hdfs_path.startswith("s3a://"), "HDFS S3 path should start with s3a://" uri = urlparse(hdfs_path) - parsed_path = uri.path.replace("//", "/").split("/") + parsed_path = uri.path.split("/") return cls(bucket=uri.netloc, path="/".join(parsed_path[1:])) - def _check_path(self): - assert not self.path.startswith("/"), "Path should not start with /" - def to_hdfs_path(self) -> str: """ Converts the S3Path to an HDFS compatible path. :return: HDFS path """ - self._check_path() - if not self.bucket or not self.path: - raise ValueError("Bucket and path must be defined") - - return f"s3a://{self.bucket.rstrip('/')}/{self.path}" + return f"s3a://{self.bucket}/{self.path}" def to_delta_rs_path(self) -> str: """ Converts the S3Path to a Delta Lake compatible path. :return: Delta Lake path """ - self._check_path() - return f"s3a://{self.bucket.rstrip('/')}/{self.path}" + return f"s3a://{self.bucket}/{self.path}" def cast_path(blob_path: DataPath) -> S3Path: diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index 9d188077..fc0c88dc 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -12,19 +12,42 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import pytest from adapta.storage.blob.s3_storage_client import S3StorageClient from adapta.storage.models.aws import S3Path from unittest.mock import patch -def test_base_uri(): - path = S3Path(bucket="bucket", path="nested/key") - assert path.base_uri() == "https://bucket.s3.amazonaws.com" +def test_valid_s3_datapath(): + malformed_s3_datapaths = [ + lambda: S3Path(bucket="bucket", path=""), + lambda: S3Path(bucket="bucket", path="path"), + lambda: S3Path(bucket="bucket", path="path/"), + lambda: S3Path(bucket="bucket", path="path/path_segment"), + lambda: S3Path(bucket="bucket", path="path/path_segment/path_segment"), + ] + + for new_s3_data_path in malformed_s3_datapaths: + new_s3_data_path() + + +def test_invalid_s3_datapath(): + malformed_s3_datapaths = [ + lambda: S3Path(bucket="/bucket/", path="path"), + lambda: S3Path(bucket="/bucket", path="path"), + lambda: S3Path(bucket="bucket", path="/path"), + lambda: S3Path(bucket="bucket", path="/path//path"), + lambda: S3Path(bucket="bucket", path="/path/path//path"), + ] + for new_s3_data_path in malformed_s3_datapaths: + with pytest.raises(ValueError, match=r"Invalid S3Path provided, must comply with : .*"): + new_s3_data_path() -def test_base_uri_with_malformed_bucket_path(): - path = S3Path(bucket="bucket/", path="nested/key") + +def test_base_uri(): + path = S3Path(bucket="bucket", path="nested/key") assert path.base_uri() == "https://bucket.s3.amazonaws.com" @@ -34,12 +57,6 @@ def test_from_hdfs_path(): assert path.path == "nested/key" -def test_from_hdfs_path_with_empty_path_segments(): - malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested/key") - different_malformed_path = S3Path.from_hdfs_path("s3a://bucket//nested//key") - assert different_malformed_path == malformed_path == S3Path(bucket="bucket", path="nested/key") - - def test_to_uri(): bucket_name = "bucket" path = "nested/key" @@ -47,13 +64,6 @@ def test_to_uri(): assert path_instance.to_uri() == f"s3a://{bucket_name}/{path}" -def test_to_uri_malformed_bucket_path(): - bucket_name = "bucket/" - path = "nested/key" - path_instance = S3Path(bucket=bucket_name, path=path) - assert path_instance.to_uri() == f"s3a://bucket/nested/key" - - def test_to_delta_rs_path(): bucket_name = "bucket" path = "nested/key" @@ -61,17 +71,6 @@ def test_to_delta_rs_path(): assert path_instance.to_delta_rs_path() == f"s3a://bucket/nested/key" -def test_to_delta_rs_malformed_bucket_path(): - bucket_name = "bucket/" - path = "nested/key" - path_instance = S3Path(bucket=bucket_name, path=path) - assert path_instance.to_delta_rs_path() == f"s3a://bucket/nested/key" - - -def test_to_uri_with_empty_path_segment(): - path = S3Path.from_hdfs_path("s3a://bucket//nested/key") - - def test_to_hdfs_path(): path = S3Path("bucket", "nested/key").to_hdfs_path() assert path == "s3a://bucket/nested/key" From 820f8a0cc17fab0e45d144803b11894b3a8563dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Mon, 29 Jul 2024 13:49:13 +0200 Subject: [PATCH 18/24] Removing no redundant checks --- adapta/storage/models/aws.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index 69de9448..1250f52e 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -35,9 +35,6 @@ def to_uri(self) -> str: Converts the S3Path to a URI. :return: URI path """ - if not self.bucket: - raise ValueError("Bucket must be defined") - return f"s3a://{self.bucket}/{self.path}" def base_uri(self) -> str: @@ -45,9 +42,6 @@ def base_uri(self) -> str: Returns the base URI of the S3Path. :return: URI path """ - if not self.bucket: - raise ValueError("Bucket must be defined") - return f"https://{self.bucket}.s3.amazonaws.com" @classmethod From 8c3bcea9a19fe34b45c1ecf39e5a6c498d58b331 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Mon, 29 Jul 2024 13:51:00 +0200 Subject: [PATCH 19/24] Small improvement to the unitests --- tests/test_s3_storage_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index fc0c88dc..a63cb523 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -37,8 +37,8 @@ def test_invalid_s3_datapath(): lambda: S3Path(bucket="/bucket/", path="path"), lambda: S3Path(bucket="/bucket", path="path"), lambda: S3Path(bucket="bucket", path="/path"), - lambda: S3Path(bucket="bucket", path="/path//path"), - lambda: S3Path(bucket="bucket", path="/path/path//path"), + lambda: S3Path(bucket="bucket", path="path//path_segment"), + lambda: S3Path(bucket="bucket", path="path/path_segment//path_segment"), ] for new_s3_data_path in malformed_s3_datapaths: From 3c0746bcb7444d08ce7cfe5f1d68314a3a976365 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Mon, 29 Jul 2024 13:55:59 +0200 Subject: [PATCH 20/24] Small improvement to the unitests naming descriptions --- tests/test_s3_storage_client.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index a63cb523..7cdf798e 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -20,7 +20,7 @@ def test_valid_s3_datapath(): - malformed_s3_datapaths = [ + valid_s3_datapaths = [ lambda: S3Path(bucket="bucket", path=""), lambda: S3Path(bucket="bucket", path="path"), lambda: S3Path(bucket="bucket", path="path/"), @@ -28,8 +28,11 @@ def test_valid_s3_datapath(): lambda: S3Path(bucket="bucket", path="path/path_segment/path_segment"), ] - for new_s3_data_path in malformed_s3_datapaths: - new_s3_data_path() + for s3_data_path in valid_s3_datapaths: + try: + s3_data_path() + except Exception as e: + pytest.fail(f"S3Path creation raised an exception: {e}") def test_invalid_s3_datapath(): @@ -41,9 +44,9 @@ def test_invalid_s3_datapath(): lambda: S3Path(bucket="bucket", path="path/path_segment//path_segment"), ] - for new_s3_data_path in malformed_s3_datapaths: + for s3_data_path in malformed_s3_datapaths: with pytest.raises(ValueError, match=r"Invalid S3Path provided, must comply with : .*"): - new_s3_data_path() + s3_data_path() def test_base_uri(): From c5770be1b786e5b14ecb37ccb0a9e54f0ba95ce5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Mon, 29 Jul 2024 14:03:56 +0200 Subject: [PATCH 21/24] Let's not change this --- adapta/storage/models/aws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index 1250f52e..017c1eda 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -35,7 +35,7 @@ def to_uri(self) -> str: Converts the S3Path to a URI. :return: URI path """ - return f"s3a://{self.bucket}/{self.path}" + return f"s3://{self.bucket}/{self.path}" def base_uri(self) -> str: """ From 0f71abccad44d2f4d0d70c448f8a848ba5f47398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Costa?= Date: Mon, 29 Jul 2024 14:03:56 +0200 Subject: [PATCH 22/24] Let's not change this --- adapta/storage/models/aws.py | 2 +- tests/test_s3_storage_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index 1250f52e..017c1eda 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -35,7 +35,7 @@ def to_uri(self) -> str: Converts the S3Path to a URI. :return: URI path """ - return f"s3a://{self.bucket}/{self.path}" + return f"s3://{self.bucket}/{self.path}" def base_uri(self) -> str: """ diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index 7cdf798e..b1144495 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -64,7 +64,7 @@ def test_to_uri(): bucket_name = "bucket" path = "nested/key" path_instance = S3Path(bucket=bucket_name, path=path) - assert path_instance.to_uri() == f"s3a://{bucket_name}/{path}" + assert path_instance.to_uri() == f"s3://{bucket_name}/{path}" def test_to_delta_rs_path(): From 2183be5d3128d54a76c30c58cae28b17c79480cf Mon Sep 17 00:00:00 2001 From: Andre Costa Date: Mon, 28 Oct 2024 14:34:04 +0000 Subject: [PATCH 23/24] Addressing PR comments --- adapta/storage/models/aws.py | 9 +++------ tests/test_s3_storage_client.py | 5 +---- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index 017c1eda..d03b75e0 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -62,13 +62,10 @@ def __post_init__(self): if not self.bucket: raise ValueError("Bucket must be defined") - path_regex = r"^(?![0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$)[a-z0-9]([a-z0-9\-]{1,61}[a-z0-9])?(\/(?!.*(\/\/|\\))([^\/].{0,1022}\/?)?)?$" + path_regex = r"//" - s3_path_without_prefix = f"{self.bucket}/{self.path}" - match = re.match(path_regex, s3_path_without_prefix) - - if not match: - raise ValueError(f"Invalid S3Path provided, must comply with : {path_regex}") + if re.search(path_regex, self.path): + raise ValueError(f"Invalid S3Path provided: path should not contain consecutive slashes (//)") @classmethod def from_hdfs_path(cls, hdfs_path: str) -> "S3Path": diff --git a/tests/test_s3_storage_client.py b/tests/test_s3_storage_client.py index b1144495..2ff4a55b 100644 --- a/tests/test_s3_storage_client.py +++ b/tests/test_s3_storage_client.py @@ -37,15 +37,12 @@ def test_valid_s3_datapath(): def test_invalid_s3_datapath(): malformed_s3_datapaths = [ - lambda: S3Path(bucket="/bucket/", path="path"), - lambda: S3Path(bucket="/bucket", path="path"), - lambda: S3Path(bucket="bucket", path="/path"), lambda: S3Path(bucket="bucket", path="path//path_segment"), lambda: S3Path(bucket="bucket", path="path/path_segment//path_segment"), ] for s3_data_path in malformed_s3_datapaths: - with pytest.raises(ValueError, match=r"Invalid S3Path provided, must comply with : .*"): + with pytest.raises(ValueError, match=r"Invalid S3Path provided: .*"): s3_data_path() From fb9ba51e452cd22700e7e677775fcd054ff152d3 Mon Sep 17 00:00:00 2001 From: Andre Costa Date: Mon, 28 Oct 2024 14:46:34 +0000 Subject: [PATCH 24/24] Fix Linting --- adapta/storage/models/aws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapta/storage/models/aws.py b/adapta/storage/models/aws.py index d03b75e0..d6264db7 100644 --- a/adapta/storage/models/aws.py +++ b/adapta/storage/models/aws.py @@ -65,7 +65,7 @@ def __post_init__(self): path_regex = r"//" if re.search(path_regex, self.path): - raise ValueError(f"Invalid S3Path provided: path should not contain consecutive slashes (//)") + raise ValueError("Invalid S3Path provided: path should not contain consecutive slashes (//)") @classmethod def from_hdfs_path(cls, hdfs_path: str) -> "S3Path":