From 5ae897a0d4e8185c1905f7958b69d3a58dd50926 Mon Sep 17 00:00:00 2001 From: Aaron Wilson Date: Wed, 25 Sep 2024 11:59:32 -0500 Subject: [PATCH] sdk/python: Refactor internal object classes for accessing and iterating over object content Signed-off-by: Aaron Wilson --- docs/python_sdk.md | 837 +++++++++++------- python/Makefile | 2 +- python/aistore/sdk/obj/content_iterator.py | 38 + python/aistore/sdk/obj/object.py | 64 +- python/aistore/sdk/obj/object_client.py | 72 ++ python/aistore/sdk/obj/object_file.py | 14 +- python/aistore/sdk/obj/object_reader.py | 106 ++- .../sdk/resilient-streaming-object-file.ipynb | 4 +- .../integration/sdk/test_object_file_ops.py | 2 +- python/tests/unit/sdk/obj/__init__.py | 0 .../unit/sdk/obj/test_content_iterator.py | 49 + .../tests/unit/sdk/{ => obj}/test_object.py | 54 +- .../sdk/{ => obj}/test_object_attributes.py | 0 .../tests/unit/sdk/obj/test_object_client.py | 80 ++ .../unit/sdk/{ => obj}/test_object_file.py | 31 +- .../tests/unit/sdk/obj/test_object_reader.py | 133 +++ python/tests/unit/sdk/test_object_reader.py | 124 --- 17 files changed, 1000 insertions(+), 610 deletions(-) create mode 100644 python/aistore/sdk/obj/content_iterator.py create mode 100644 python/aistore/sdk/obj/object_client.py create mode 100644 python/tests/unit/sdk/obj/__init__.py create mode 100644 python/tests/unit/sdk/obj/test_content_iterator.py rename python/tests/unit/sdk/{ => obj}/test_object.py (90%) rename python/tests/unit/sdk/{ => obj}/test_object_attributes.py (100%) create mode 100644 python/tests/unit/sdk/obj/test_object_client.py rename python/tests/unit/sdk/{ => obj}/test_object_file.py (86%) create mode 100644 python/tests/unit/sdk/obj/test_object_reader.py delete mode 100644 python/tests/unit/sdk/test_object_reader.py diff --git a/docs/python_sdk.md b/docs/python_sdk.md index 3a50580230..acb0b23f58 100644 --- a/docs/python_sdk.md +++ b/docs/python_sdk.md @@ -104,15 +104,6 @@ or see [https://github.com/NVIDIA/aistore/tree/main/python/aistore](https://gith * [get\_performance](#cluster.Cluster.get_performance) * [get\_uuid](#cluster.Cluster.get_uuid) * [etl](#etl) - * [Etl](#etl.Etl) - * [name](#etl.Etl.name) - * [init\_spec](#etl.Etl.init_spec) - * [init\_code](#etl.Etl.init_code) - * [view](#etl.Etl.view) - * [start](#etl.Etl.start) - * [stop](#etl.Etl.stop) - * [delete](#etl.Etl.delete) - * [validate\_etl\_name](#etl.Etl.validate_etl_name) * [job](#job) * [Job](#job.Job) * [job\_id](#job.Job.job_id) @@ -143,34 +134,60 @@ or see [https://github.com/NVIDIA/aistore/tree/main/python/aistore](https://gith * [from\_string](#multiobj.object_range.ObjectRange.from_string) * [multiobj.object\_template](#multiobj.object_template) * [ObjectTemplate](#multiobj.object_template.ObjectTemplate) -* [object](#object) - * [Object](#object.Object) - * [bucket](#object.Object.bucket) - * [name](#object.Object.name) - * [props](#object.Object.props) - * [head](#object.Object.head) - * [get](#object.Object.get) - * [as\_file](#object.Object.as_file) - * [get\_semantic\_url](#object.Object.get_semantic_url) - * [get\_url](#object.Object.get_url) - * [put\_content](#object.Object.put_content) - * [put\_file](#object.Object.put_file) - * [promote](#object.Object.promote) - * [delete](#object.Object.delete) - * [blob\_download](#object.Object.blob_download) - * [append\_content](#object.Object.append_content) - * [set\_custom\_props](#object.Object.set_custom_props) -* [object\_iterator](#object_iterator) - * [ObjectIterator](#object_iterator.ObjectIterator) -* [object\_reader](#object_reader) - * [ObjectReader](#object_reader.ObjectReader) - * [head](#object_reader.ObjectReader.head) - * [attributes](#object_reader.ObjectReader.attributes) - * [chunk\_size](#object_reader.ObjectReader.chunk_size) - * [read\_all](#object_reader.ObjectReader.read_all) - * [raw](#object_reader.ObjectReader.raw) - * [iter\_from\_position](#object_reader.ObjectReader.iter_from_position) - * [\_\_iter\_\_](#object_reader.ObjectReader.__iter__) +* [obj.object](#obj.object) + * [Object](#obj.object.Object) + * [bucket](#obj.object.Object.bucket) + * [name](#obj.object.Object.name) + * [props](#obj.object.Object.props) + * [head](#obj.object.Object.head) + * [get](#obj.object.Object.get) + * [get\_semantic\_url](#obj.object.Object.get_semantic_url) + * [get\_url](#obj.object.Object.get_url) + * [put\_content](#obj.object.Object.put_content) + * [put\_file](#obj.object.Object.put_file) + * [promote](#obj.object.Object.promote) + * [delete](#obj.object.Object.delete) + * [blob\_download](#obj.object.Object.blob_download) + * [append\_content](#obj.object.Object.append_content) + * [set\_custom\_props](#obj.object.Object.set_custom_props) +* [obj.object\_reader](#obj.object_reader) + * [ObjectReader](#obj.object_reader.ObjectReader) + * [head](#obj.object_reader.ObjectReader.head) + * [attributes](#obj.object_reader.ObjectReader.attributes) + * [read\_all](#obj.object_reader.ObjectReader.read_all) + * [raw](#obj.object_reader.ObjectReader.raw) + * [as\_file](#obj.object_reader.ObjectReader.as_file) + * [iter\_from\_position](#obj.object_reader.ObjectReader.iter_from_position) + * [\_\_iter\_\_](#obj.object_reader.ObjectReader.__iter__) +* [obj.object\_file](#obj.object_file) + * [SimpleBuffer](#obj.object_file.SimpleBuffer) + * [\_\_len\_\_](#obj.object_file.SimpleBuffer.__len__) + * [read](#obj.object_file.SimpleBuffer.read) + * [fill](#obj.object_file.SimpleBuffer.fill) + * [empty](#obj.object_file.SimpleBuffer.empty) + * [ObjectFile](#obj.object_file.ObjectFile) + * [close](#obj.object_file.ObjectFile.close) + * [tell](#obj.object_file.ObjectFile.tell) + * [readable](#obj.object_file.ObjectFile.readable) + * [seekable](#obj.object_file.ObjectFile.seekable) + * [read](#obj.object_file.ObjectFile.read) +* [obj.object\_props](#obj.object_props) + * [ObjectProps](#obj.object_props.ObjectProps) + * [bucket\_name](#obj.object_props.ObjectProps.bucket_name) + * [bucket\_provider](#obj.object_props.ObjectProps.bucket_provider) + * [name](#obj.object_props.ObjectProps.name) + * [location](#obj.object_props.ObjectProps.location) + * [mirror\_paths](#obj.object_props.ObjectProps.mirror_paths) + * [mirror\_copies](#obj.object_props.ObjectProps.mirror_copies) + * [present](#obj.object_props.ObjectProps.present) +* [obj.object\_attributes](#obj.object_attributes) + * [ObjectAttributes](#obj.object_attributes.ObjectAttributes) + * [size](#obj.object_attributes.ObjectAttributes.size) + * [checksum\_type](#obj.object_attributes.ObjectAttributes.checksum_type) + * [checksum\_value](#obj.object_attributes.ObjectAttributes.checksum_value) + * [access\_time](#obj.object_attributes.ObjectAttributes.access_time) + * [obj\_version](#obj.object_attributes.ObjectAttributes.obj_version) + * [custom\_metadata](#obj.object_attributes.ObjectAttributes.custom_metadata) @@ -1559,12 +1576,14 @@ AIStore client for managing buckets, objects, ETL jobs - `endpoint` _str_ - AIStore endpoint - `skip_verify` _bool, optional_ - If True, skip SSL certificate verification. Defaults to False. -- `ca_cert` _str, optional_ - Path to a CA certificate file for SSL verification. +- `ca_cert` _str, optional_ - Path to a CA certificate file for SSL verification. If not provided, the + 'AIS_CLIENT_CA' environment variable will be used. Defaults to None. - `timeout` _Union[float, Tuple[float, float], None], optional_ - Request timeout in seconds; a single float for both connect/read timeouts (e.g., 5.0), a tuple for separate connect/read timeouts (e.g., (3.0, 10.0)), or None to disable timeout. - `retry` _urllib3.Retry, optional_ - Retry configuration object from the urllib3 library. -- `token` _str, optional_ - Authorization token. +- `token` _str, optional_ - Authorization token. If not provided, the 'AIS_AUTHN_TOKEN' environment variable + will be used. Defaults to None. @@ -1882,163 +1901,6 @@ def get_uuid() -> str Returns: UUID of AIStore Cluster - - -## Class: Etl - -```python -class Etl() -``` - -A class containing ETL-related functions. - - - -### name - -```python -@property -def name() -> str -``` - -Name of the ETL - - - -### init\_spec - -```python -def init_spec(template: str, - communication_type: str = DEFAULT_ETL_COMM, - timeout: str = DEFAULT_ETL_TIMEOUT, - arg_type: str = "") -> str -``` - -Initializes ETL based on Kubernetes pod spec template. - -**Arguments**: - -- `template` _str_ - Kubernetes pod spec template - Existing templates can be found at `sdk.etl_templates` - For more information visit: https://github.com/NVIDIA/ais-etl/tree/master/transformers -- `communication_type` _str_ - Communication type of the ETL (options: hpull, hrev, hpush) -- `timeout` _str_ - Timeout of the ETL job (e.g. 5m for 5 minutes) - -**Returns**: - - Job ID string associated with this ETL - - - -### init\_code - -```python -def init_code(transform: Callable, - dependencies: List[str] = None, - preimported_modules: List[str] = None, - runtime: str = _get_default_runtime(), - communication_type: str = DEFAULT_ETL_COMM, - timeout: str = DEFAULT_ETL_TIMEOUT, - chunk_size: int = None, - arg_type: str = "") -> str -``` - -Initializes ETL based on the provided source code. - -**Arguments**: - -- `transform` _Callable_ - Transform function of the ETL -- `dependencies` _list[str]_ - Python dependencies to install -- `preimported_modules` _list[str]_ - Modules to import before running the transform function. This can - be necessary in cases where the modules used both attempt to import each other circularly -- `runtime` _str_ - [optional, default= V2 implementation of the current python version if supported, else - python3.8v2] Runtime environment of the ETL [choose from: python3.8v2, python3.10v2, python3.11v2] - (see ext/etl/runtime/all.go) -- `communication_type` _str_ - [optional, default="hpush"] Communication type of the ETL (options: hpull, hrev, - hpush, io) -- `timeout` _str_ - [optional, default="5m"] Timeout of the ETL job (e.g. 5m for 5 minutes) -- `chunk_size` _int_ - Chunk size in bytes if transform function in streaming data. - (whole object is read by default) -- `arg_type` _optional, str_ - The type of argument the runtime will provide the transform function. - The default value of "" will provide the raw bytes read from the object. - When used with hpull communication_type, setting this to "url" will provide the URL of the object. - -**Returns**: - - Job ID string associated with this ETL - - - -### view - -```python -def view() -> ETLDetails -``` - -View ETL details - -**Returns**: - -- `ETLDetails` - details of the ETL - - - -### start - -```python -def start() -``` - -Resumes a stopped ETL with given ETL name. - -Note: Deleted ETLs cannot be started. - - - -### stop - -```python -def stop() -``` - -Stops ETL. Stops (but does not delete) all the pods created by Kubernetes for this ETL and -terminates any transforms. - - - -### delete - -```python -def delete() -``` - -Delete ETL. Deletes pods created by Kubernetes for this ETL and specifications for this ETL -in Kubernetes. - -Note: Running ETLs cannot be deleted. - - - -### validate\_etl\_name - -```python -@staticmethod -def validate_etl_name(name: str) -``` - -Validate the ETL name based on specific criteria. - -**Arguments**: - -- `name` _str_ - The name of the ETL to validate. - - -**Raises**: - -- `ValueError` - If the name is too short (less than 6 characters), - too long (more than 32 characters), - or contains invalid characters (anything other than lowercase letters, digits, or hyphens). - ## Class: Job @@ -2228,31 +2090,26 @@ Start a job and return its ID. ### get\_within\_timeframe ```python -def get_within_timeframe(start_time: datetime.time, - end_time: datetime.time) -> List[JobSnapshot] +def get_within_timeframe(start_time: datetime.datetime, + end_time: datetime.datetime) -> List[JobSnapshot] ``` -Checks for jobs that started and finished within a specified timeframe +Checks for jobs that started and finished within a specified timeframe. **Arguments**: -- `start_time` _datetime.time_ - The start of the timeframe for monitoring jobs -- `end_time` _datetime.time_ - The end of the timeframe for monitoring jobs +- `start_time` _datetime.datetime_ - The start of the timeframe for monitoring jobs. +- `end_time` _datetime.datetime_ - The end of the timeframe for monitoring jobs. **Returns**: -- `list` - A list of jobs that have finished within the specified timeframe +- `List[JobSnapshot]` - A list of jobs that have finished within the specified timeframe. **Raises**: -- `requests.RequestException` - "There was an ambiguous exception that occurred while handling..." -- `requests.ConnectionError` - Connection error -- `requests.ConnectionTimeout` - Timed out connecting to AIStore -- `requests.ReadTimeout` - Timed out waiting response from AIStore -- `errors.Timeout` - Timeout while waiting for the job to finish -- `errors.JobInfoNotFound` - Raised when information on a job's status could not be found on the AIS cluster +- `JobInfoNotFound` - Raised when information on a job's status could not be found. @@ -2393,6 +2250,7 @@ NOTE: only Cloud buckets can be evicted. ```python def prefetch(blob_threshold: int = None, + num_workers: int = None, latest: bool = False, continue_on_error: bool = False) ``` @@ -2406,6 +2264,9 @@ NOTE: only Cloud buckets can be prefetched. - `continue_on_error` _bool, optional_ - Whether to continue if there is an error prefetching a single object - `blob_threshold` _int, optional_ - Utilize built-in blob-downloader for remote objects greater than the specified (threshold) size in bytes +- `num_workers` _int, optional_ - Number of concurrent workers (readers). Defaults to the number of target + mountpaths if omitted or zero. A value of -1 indicates no workers at all (i.e., single-threaded + execution). Any positive value will be adjusted not to exceed the number of target CPUs. **Raises**: @@ -2433,7 +2294,8 @@ def copy(to_bck: "Bucket", dry_run: bool = False, force: bool = False, latest: bool = False, - sync: bool = False) + sync: bool = False, + num_workers: int = None) ``` Copies a list or range of objects in a bucket @@ -2448,6 +2310,9 @@ Copies a list or range of objects in a bucket (see "limited coexistence" and xact/xreg/xreg.go) - `latest` _bool, optional_ - GET the latest object version from the associated remote bucket - `sync` _bool, optional_ - synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source +- `num_workers` _int, optional_ - Number of concurrent workers (readers). Defaults to the number of target + mountpaths if omitted or zero. A value of -1 indicates no workers at all (i.e., single-threaded + execution). Any positive value will be adjusted not to exceed the number of target CPUs. **Raises**: @@ -2477,7 +2342,8 @@ def transform(to_bck: "Bucket", dry_run: bool = False, force: bool = False, latest: bool = False, - sync: bool = False) + sync: bool = False, + num_workers: int = None) ``` Performs ETL operation on a list or range of objects in a bucket, placing the results in the destination bucket @@ -2494,6 +2360,9 @@ Performs ETL operation on a list or range of objects in a bucket, placing the re (see "limited coexistence" and xact/xreg/xreg.go) - `latest` _bool, optional_ - GET the latest object version from the associated remote bucket - `sync` _bool, optional_ - synchronize destination bucket with its remote (e.g., Cloud or remote AIS) source +- `num_workers` _int, optional_ - Number of concurrent workers (readers). Defaults to the number of target + mountpaths if omitted or zero. A value of -1 indicates no workers at all (i.e., single-threaded + execution). Any positive value will be adjusted not to exceed the number of target CPUs. **Raises**: @@ -2621,7 +2490,7 @@ A collection of object names specified by a template in the bash brace expansion - `template` _str_ - A string template that defines the names of objects to include in the collection - + ## Class: Object @@ -2638,7 +2507,7 @@ A class representing an object of a bucket bound to a client. - `size` _int, optional_ - size of object in bytes - `props` _ObjectProps, optional_ - Properties of object - + ### bucket @@ -2649,7 +2518,7 @@ def bucket() Bucket containing this object. - + ### name @@ -2660,7 +2529,7 @@ def name() -> str Name of this object. - + ### props @@ -2671,7 +2540,7 @@ def props() -> ObjectProps Properties of this object. - + ### head @@ -2694,13 +2563,13 @@ Requests object properties and returns headers. Updates props. - `requests.ReadTimeout` - Timed out waiting response from AIStore - `requests.exceptions.HTTPError(404)` - The object does not exist - + ### get ```python -def get(archive_settings: ArchiveSettings = None, - blob_download_settings: BlobDownloadSettings = None, +def get(archive_config: ArchiveConfig = None, + blob_download_config: BlobDownloadConfig = None, chunk_size: int = DEFAULT_CHUNK_SIZE, etl_name: str = None, writer: BufferedWriter = None, @@ -2712,8 +2581,8 @@ Creates and returns an ObjectReader with access to object contents and optionall **Arguments**: -- `archive_settings` _ArchiveSettings, optional_ - Settings for archive extraction -- `blob_download_settings` _BlobDownloadSettings, optional_ - Settings for using blob download +- `archive_config` _ArchiveConfig, optional_ - Settings for archive extraction +- `blob_download_config` _BlobDownloadConfig, optional_ - Settings for using blob download - `chunk_size` _int, optional_ - chunk_size to use while reading from stream - `etl_name` _str, optional_ - Transforms an object based on ETL with etl_name - `writer` _BufferedWriter, optional_ - User-provided writer for writing content output @@ -2736,63 +2605,12 @@ Creates and returns an ObjectReader with access to object contents and optionall - `requests.ConnectionTimeout` - Timed out connecting to AIStore - `requests.ReadTimeout` - Timed out waiting response from AIStore - - -### as\_file - -```python -def as_file(max_resume: int = 5, - archive_settings: ArchiveSettings = None, - blob_download_settings: BlobDownloadSettings = None, - chunk_size: int = DEFAULT_CHUNK_SIZE, - etl_name: str = None, - writer: BufferedWriter = None, - latest: bool = False, - byte_range: str = None) -> ObjectFile -``` - -Creates an `ObjectFile` for reading object data in chunks with support for -resuming and retrying from the last known position in the case the object stream -is prematurely closed due to an unexpected error. - -**Arguments**: - -- `max_resume` _int, optional_ - If streaming object contents is interrupted, this - defines the maximum number of attempts to resume the connection before - raising an exception. -- `archive_settings` _ArchiveSettings, optional_ - Settings for archive extraction. -- `blob_download_settings` _BlobDownloadSettings, optional_ - Settings for using blob - download (e.g., chunk size, workers). -- `chunk_size` _int, optional_ - The size of chunks to use while reading from the stream. -- `etl_name` _str, optional_ - Name of the ETL (Extract, Transform, Load) transformation - to apply during the get operation. -- `writer` _BufferedWriter, optional_ - A writer for writing content output. User is - responsible for closing the writer. -- `latest` _bool, optional_ - Whether to get the latest version of the object from - a remote bucket (if applicable). -- `byte_range` _str, optional_ - Specify a byte range to fetch a segment of the object - (e.g., "bytes=0-499" for the first 500 bytes). - - -**Returns**: - -- `ObjectFile` - A file-like object that can be used to read the object content. - - -**Raises**: - -- `requests.RequestException` - An ambiguous exception occurred while handling the request. -- `requests.ConnectionError` - A connection error occurred. -- `requests.ConnectionTimeout` - The connection to AIStore timed out. -- `requests.ReadTimeout` - Waiting for a response from AIStore timed out. -- `requests.exceptions.HTTPError(404)` - The object does not exist. - - + ### get\_semantic\_url ```python -def get_semantic_url() +def get_semantic_url() -> str ``` Get the semantic URL to the object @@ -2801,12 +2619,12 @@ Get the semantic URL to the object Semantic URL to get object - + ### get\_url ```python -def get_url(archpath: str = "", etl_name: str = None) +def get_url(archpath: str = "", etl_name: str = None) -> str ``` Get the full url to the object including base url and any query parameters @@ -2822,12 +2640,12 @@ Get the full url to the object including base url and any query parameters Full URL to get object - + ### put\_content ```python -def put_content(content: bytes) -> Header +def put_content(content: bytes) -> Response ``` Puts bytes as an object to a bucket in AIS storage. @@ -2844,12 +2662,12 @@ Puts bytes as an object to a bucket in AIS storage. - `requests.ConnectionTimeout` - Timed out connecting to AIStore - `requests.ReadTimeout` - Timed out waiting response from AIStore - + ### put\_file ```python -def put_file(path: str = None) +def put_file(path: str = None) -> Response ``` Puts a local file as an object to a bucket in AIS storage. @@ -2867,7 +2685,7 @@ Puts a local file as an object to a bucket in AIS storage. - `requests.ReadTimeout` - Timed out waiting response from AIStore - `ValueError` - The path provided is not a valid file - + ### promote @@ -2908,12 +2726,12 @@ See more info here: https://aiatscale.org/blog/2022/03/17/promote - `requests.ReadTimeout` - Timed out waiting response from AIStore - `AISError` - Path does not exist on the AIS cluster storage - + ### delete ```python -def delete() +def delete() -> Response ``` Delete an object from a bucket. @@ -2931,7 +2749,7 @@ Delete an object from a bucket. - `requests.ReadTimeout` - Timed out waiting response from AIStore - `requests.exceptions.HTTPError(404)` - The object does not exist - + ### blob\_download @@ -2964,7 +2782,7 @@ Returns job ID that for the blob download operation. - `requests.exceptions.HTTPError` - Service unavailable - `requests.RequestException` - "There was an ambiguous exception that occurred while handling..." - + ### append\_content @@ -2996,13 +2814,13 @@ Append bytes as an object to a bucket in AIS storage. - `requests.ReadTimeout` - Timed out waiting response from AIStore - `requests.exceptions.HTTPError(404)` - The object does not exist - + ### set\_custom\_props ```python def set_custom_props(custom_metadata: Dict[str, str], - replace_existing: bool = False) + replace_existing: bool = False) -> Response ``` Set custom properties for the object. @@ -3012,32 +2830,23 @@ Set custom properties for the object. - `custom_metadata` _Dict[str, str]_ - Custom metadata key-value pairs. - `replace_existing` _bool, optional_ - Whether to replace existing metadata. Defaults to False. - + -## Class: ObjectIterator +## Class: ObjectReader ```python -class ObjectIterator() +class ObjectReader() ``` -Represents an iterable that will fetch all objects from a bucket, querying as needed with the specified function +Provide a way to read an object's contents and attributes, optionally iterating over a stream of content. **Arguments**: -- `list_objects` _Callable_ - Function returning a BucketList from an AIS cluster - - - -## Class: ObjectReader - -```python -class ObjectReader() -``` - -Represents the data returned by the API when getting an object, including access to the content stream and object -attributes. +- `object_client` _ObjectClient_ - Client for making requests to a specific object in AIS +- `chunk_size` _int, optional_ - Size of each data chunk to be fetched from the stream. + Defaults to DEFAULT_CHUNK_SIZE. - + ### head @@ -3049,9 +2858,9 @@ Make a head request to AIS to update and return only object attributes. **Returns**: - ObjectAttributes for this object + `ObjectAttributes` containing metadata for this object. - + ### attributes @@ -3064,54 +2873,69 @@ Object metadata attributes. **Returns**: -- `ObjectAttributes` - Parsed object attributes from the headers returned by AIS +- `ObjectAttributes` - Parsed object attributes from the headers returned by AIS. - + -### chunk\_size +### read\_all ```python -@property -def chunk_size() -> int +def read_all() -> bytes ``` -Chunk size. +Read all byte data directly from the object response without using a stream. + +This requires all object content to fit in memory at once and downloads all content before returning. **Returns**: -- `int` - Current chunk size for reading the object. +- `bytes` - Object content as bytes. - + -### read\_all +### raw ```python -def read_all() -> bytes +def raw() -> requests.Response ``` -Read all byte data directly from the object response without using a stream. - -This requires all object content to fit in memory at once and downloads all content before returning. +Return the raw byte stream of object content. **Returns**: -- `bytes` - Object content as bytes. +- `requests.Response` - Raw byte stream of the object content. - + -### raw +### as\_file ```python -def raw() -> requests.Response +def as_file(max_resume: Optional[int] = 5) -> ObjectFile ``` -Returns the raw byte stream of object content. +Create an `ObjectFile` for reading object data in chunks. `ObjectFile` supports +resuming and retrying from the last known position in the case the object stream +is prematurely closed due to an unexpected error. + +**Arguments**: + +- `max_resume` _int, optional_ - Maximum number of resume attempts in case of streaming failure. Defaults to 5. + **Returns**: -- `requests.Response` - Raw byte stream of the object content +- `ObjectFile` - A file-like object that can be used to read the object content. + - +**Raises**: + +- `requests.RequestException` - An ambiguous exception occurred while handling the request. +- `requests.ConnectionError` - A connection error occurred. +- `requests.ConnectionTimeout` - The connection to AIStore timed out. +- `requests.ReadTimeout` - Waiting for a response from AIStore timed out. +- `requests.exceptions.HTTPError(404)` - The object does not exist. + + ### iter\_from\_position @@ -3131,7 +2955,7 @@ and yield chunks of the stream content. - `Iterator[bytes]` - An iterator over each chunk of bytes in the object starting from the specific position. - + ### \_\_iter\_\_ @@ -3145,3 +2969,364 @@ Make a request to get a stream from the provided object and yield chunks of the - `Iterator[bytes]` - An iterator over each chunk of bytes in the object. + + +## Class: SimpleBuffer + +```python +class SimpleBuffer() +``` + +A buffer for efficiently handling streamed data with position tracking. + +It stores incoming chunks of data in a bytearray and tracks the current read position. +Once data is read, it is discarded from the buffer to free memory, ensuring efficient +usage. + + + +### \_\_len\_\_ + +```python +def __len__() +``` + +Return the number of unread bytes in the buffer. + +**Returns**: + +- `int` - The number of unread bytes remaining in the buffer. + + + +### read + +```python +def read(size: int = -1) -> bytes +``` + +Read bytes from the buffer and advance the read position. + +**Arguments**: + +- `size` _int, optional_ - Number of bytes to read from the buffer. If -1, reads all + remaining bytes. + + +**Returns**: + +- `bytes` - The data read from the buffer. + + + +### fill + +```python +def fill(source: Iterator[bytes], size: int = -1) +``` + +Fill the buffer with data from the source, up to the specified size. + +**Arguments**: + +- `source` _Iterator[bytes]_ - The data source (chunks). +- `size` _int, optional_ - The target size to fill the buffer up to. Default is -1 for unlimited. + +**Returns**: + +- `int` - Number of bytes in the buffer. + + + +### empty + +```python +def empty() +``` + +Empty the buffer. + + + +## Class: ObjectFile + +```python +class ObjectFile(BufferedIOBase) +``` + +A file-like object for reading object data, with support for both reading a fixed size of data +and reading until the end of the stream (EOF). It provides the ability to resume and continue +reading from the last known position in the event of a ChunkedEncodingError. + +Data is fetched in chunks via the object reader iterator and temporarily stored in an internal +buffer. The buffer is filled either to the required size or until EOF is reached. If a +`ChunkedEncodingError` occurs during this process, ObjectFile catches and automatically attempts +to resume the buffer filling process from the last known chunk position. The number of resume +attempts is tracked across the entire object file, and if the total number of attempts exceeds +the configurable `max_resume`, a `ChunkedEncodingError` is raised. + +Once the buffer is adequately filled, the `read()` method reads and returns the requested amount +of data from the buffer. + +**Arguments**: + +- `content_iterator` _ContentIterator_ - An iterator that can fetch object data from AIS in chunks. +- `max_resume` _int_ - Maximum number of retry attempts in case of a streaming failure. + + + +### close + +```python +def close() -> None +``` + +Close the file and release resources. + +**Raises**: + +- `ValueError` - I/O operation on closed file. + + + +### tell + +```python +def tell() -> int +``` + +Return the current file position. + +**Returns**: + + The current file position. + + +**Raises**: + +- `ValueError` - I/O operation on closed file. + + + +### readable + +```python +def readable() -> bool +``` + +Return whether the file is readable. + +**Returns**: + + True if the file is readable, False otherwise. + + +**Raises**: + +- `ValueError` - I/O operation on closed file. + + + +### seekable + +```python +def seekable() -> bool +``` + +Return whether the file supports seeking. + +**Returns**: + + False since the file does not support seeking. + + + +### read + +```python +def read(size=-1) +``` + +Read bytes from the object, handling retries in case of stream errors. + +**Arguments**: + +- `size` _int, optional_ - Number of bytes to read. If -1, reads until the end of the stream. + + +**Returns**: + +- `bytes` - The data read from the object. + + + +## Class: ObjectProps + +```python +class ObjectProps(ObjectAttributes) +``` + +Represents the attributes parsed from the response headers returned from an API call to get an object. +Extends ObjectAtributes and is a superset of that class. + +**Arguments**: + +- `response_headers` _CaseInsensitiveDict, optional_ - Response header dict containing object attributes + + + +### bucket\_name + +```python +@property +def bucket_name() +``` + +Name of object's bucket + + + +### bucket\_provider + +```python +@property +def bucket_provider() +``` + +Provider of object's bucket. + + + +### name + +```python +@property +def name() -> str +``` + +Name of the object. + + + +### location + +```python +@property +def location() -> str +``` + +Location of the object. + + + +### mirror\_paths + +```python +@property +def mirror_paths() -> List[str] +``` + +List of mirror paths. + + + +### mirror\_copies + +```python +@property +def mirror_copies() -> int +``` + +Number of mirror copies. + + + +### present + +```python +@property +def present() -> bool +``` + +True if object is present in cluster. + + + +## Class: ObjectAttributes + +```python +class ObjectAttributes() +``` + +Represents the attributes parsed from the response headers returned from an API call to get an object. + +**Arguments**: + +- `response_headers` _CaseInsensitiveDict_ - Response header dict containing object attributes + + + +### size + +```python +@property +def size() -> int +``` + +Size of object content. + + + +### checksum\_type + +```python +@property +def checksum_type() -> str +``` + +Type of checksum, e.g. xxhash or md5. + + + +### checksum\_value + +```python +@property +def checksum_value() -> str +``` + +Checksum value. + + + +### access\_time + +```python +@property +def access_time() -> str +``` + +Time this object was accessed. + + + +### obj\_version + +```python +@property +def obj_version() -> str +``` + +Object version. + + + +### custom\_metadata + +```python +@property +def custom_metadata() -> Dict[str, str] +``` + +Dictionary of custom metadata. + diff --git a/python/Makefile b/python/Makefile index b5e5556cce..3282d5b4de 100644 --- a/python/Makefile +++ b/python/Makefile @@ -108,7 +108,7 @@ generate-sdk-docs: @ pydoc-markdown -I ./aistore/sdk \ -m authn.authn_client -m authn.cluster_manager -m authn.role_manager -m authn.token_manager -m authn.user_manager -m authn.access_attr \ -m bucket -m client -m cluster -m etl -m job -m multiobj.object_group -m multiobj.object_names -m multiobj.object_range -m multiobj.object_template \ - -m object -m object_iterator -m object_reader \ + -m obj.object -m obj.object_reader -m obj.object_file -m obj.object_props -m obj.object_attributes \ '{ renderer: { type: markdown, descriptive_class_title: "Class: ", render_toc: true, render_toc_title: "", render_module_header: false, classdef_with_decorators: true } }' >> $(SDK_DOCFILE) @ sed -i -e 's/####/###/g' $(SDK_DOCFILE) diff --git a/python/aistore/sdk/obj/content_iterator.py b/python/aistore/sdk/obj/content_iterator.py new file mode 100644 index 0000000000..7892c5e26d --- /dev/null +++ b/python/aistore/sdk/obj/content_iterator.py @@ -0,0 +1,38 @@ +# +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +from typing import Iterator + +from aistore.sdk.obj.object_client import ObjectClient + + +# pylint: disable=too-few-public-methods +class ContentIterator: + """ + Provide an iterator to open an HTTP response stream and read chunks of object content. + + Args: + client (ObjectClient): Client for accessing contents of an individual object. + chunk_size (int): Size of each chunk of data yielded from the response stream. + """ + + def __init__(self, client: ObjectClient, chunk_size: int): + self._client = client + self._chunk_size = chunk_size + + def iter_from_position(self, start_position: int) -> Iterator[bytes]: + """ + Make a request to get a stream from the provided object starting at a specific byte position + and yield chunks of the stream content. + + Args: + start_position (int): The byte position from which to start reading. + + Returns: + Iterator[bytes]: An iterator over each chunk of bytes in the object starting from the specific position + """ + stream = self._client.get(stream=True, start_position=start_position) + try: + yield from stream.iter_content(chunk_size=self._chunk_size) + finally: + stream.close() diff --git a/python/aistore/sdk/obj/object.py b/python/aistore/sdk/obj/object.py index 56d48190b7..6fc923815e 100644 --- a/python/aistore/sdk/obj/object.py +++ b/python/aistore/sdk/obj/object.py @@ -33,7 +33,7 @@ HEADER_OBJECT_BLOB_WORKERS, HEADER_OBJECT_BLOB_CHUNK_SIZE, ) -from aistore.sdk.obj.object_file import ObjectFile +from aistore.sdk.obj.object_client import ObjectClient from aistore.sdk.obj.object_reader import ObjectReader from aistore.sdk.types import ( ActionMsg, @@ -165,71 +165,21 @@ def get( # https://www.rfc-editor.org/rfc/rfc7233#section-2.1 headers = {HEADER_RANGE: byte_range} - obj_reader = ObjectReader( - client=self._client, + obj_client = ObjectClient( + request_client=self._client, path=self._object_path, params=params, headers=headers, + ) + + obj_reader = ObjectReader( + object_client=obj_client, chunk_size=chunk_size, ) if writer: writer.writelines(obj_reader) return obj_reader - def as_file( - self, - max_resume: int = 5, - archive_settings: ArchiveConfig = None, - blob_download_settings: BlobDownloadConfig = None, - chunk_size: int = DEFAULT_CHUNK_SIZE, - etl_name: str = None, - writer: BufferedWriter = None, - latest: bool = False, - byte_range: str = None, - ) -> ObjectFile: # pylint: disable=too-many-arguments - """ - Creates an `ObjectFile` for reading object data in chunks with support for - resuming and retrying from the last known position in the case the object stream - is prematurely closed due to an unexpected error. - - Args: - max_resume (int, optional): If streaming object contents is interrupted, this - defines the maximum number of attempts to resume the connection before - raising an exception. - archive_settings (ArchiveConfig, optional): Settings for archive extraction. - blob_download_settings (BlobDownloadConfig, optional): Settings for using blob - download (e.g., chunk size, workers). - chunk_size (int, optional): The size of chunks to use while reading from the stream. - etl_name (str, optional): Name of the ETL (Extract, Transform, Load) transformation - to apply during the get operation. - writer (BufferedWriter, optional): A writer for writing content output. User is - responsible for closing the writer. - latest (bool, optional): Whether to get the latest version of the object from - a remote bucket (if applicable). - byte_range (str, optional): Specify a byte range to fetch a segment of the object - (e.g., "bytes=0-499" for the first 500 bytes). - - Returns: - ObjectFile: A file-like object that can be used to read the object content. - - Raises: - requests.RequestException: An ambiguous exception occurred while handling the request. - requests.ConnectionError: A connection error occurred. - requests.ConnectionTimeout: The connection to AIStore timed out. - requests.ReadTimeout: Waiting for a response from AIStore timed out. - requests.exceptions.HTTPError(404): The object does not exist. - """ - object_reader = self.get( - archive_config=archive_settings, - blob_download_config=blob_download_settings, - chunk_size=chunk_size, - etl_name=etl_name, - writer=writer, - latest=latest, - byte_range=byte_range, - ) - return ObjectFile(object_reader, max_resume=max_resume) - def get_semantic_url(self) -> str: """ Get the semantic URL to the object diff --git a/python/aistore/sdk/obj/object_client.py b/python/aistore/sdk/obj/object_client.py new file mode 100644 index 0000000000..fe34b1cf79 --- /dev/null +++ b/python/aistore/sdk/obj/object_client.py @@ -0,0 +1,72 @@ +# +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +from typing import Optional, Dict, List + +import requests + +from aistore.sdk.const import HTTP_METHOD_GET, HTTP_METHOD_HEAD, HEADER_RANGE +from aistore.sdk.obj.object_attributes import ObjectAttributes +from aistore.sdk.request_client import RequestClient + + +class ObjectClient: + """ + ObjectClient is a simple wrapper around a given RequestClient that makes requests to an individual object. + + Args: + request_client (RequestClient): The RequestClient used to make HTTP requests + path (str): URL Path to the object + params (List[str]): Query parameters for the request + headers (Optional[Dict[str, str]]): HTTP request headers + """ + + def __init__( + self, + request_client: RequestClient, + path: str, + params: List[str], + headers: Optional[Dict[str, str]] = None, + ): + self._request_client = request_client + self._request_path = path + self._request_params = params + self._request_headers = headers + + def get(self, stream: bool, start_position: int) -> requests.Response: + """ + Make a request to AIS to get the object content, optionally starting at a specific byte position. + + Args: + stream (bool): If True, stream the response content. + start_position (int): The byte position to start reading from. + + Returns: + requests.Response: The response object from the request. + """ + headers = self._request_headers.copy() if self._request_headers else {} + if start_position != 0: + headers[HEADER_RANGE] = f"bytes={start_position}-" + + resp = self._request_client.request( + HTTP_METHOD_GET, + path=self._request_path, + params=self._request_params, + stream=stream, + headers=headers, + ) + resp.raise_for_status() + return resp + + def head(self) -> ObjectAttributes: + """ + Make a head request to AIS to update and return only object attributes. + + Returns: + `ObjectAttributes` containing metadata for this object. + + """ + resp = self._request_client.request( + HTTP_METHOD_HEAD, path=self._request_path, params=self._request_params + ) + return ObjectAttributes(resp.headers) diff --git a/python/aistore/sdk/obj/object_file.py b/python/aistore/sdk/obj/object_file.py index ad752c6cb3..815d33484c 100644 --- a/python/aistore/sdk/obj/object_file.py +++ b/python/aistore/sdk/obj/object_file.py @@ -7,7 +7,7 @@ import requests -from aistore.sdk.obj.object_reader import ObjectReader +from aistore.sdk.obj.content_iterator import ContentIterator from aistore.sdk.utils import get_logger logger = get_logger(__name__) @@ -100,17 +100,19 @@ class ObjectFile(BufferedIOBase): of data from the buffer. Args: - object_reader (ObjectReader): The object reader used to fetch object data in chunks. + content_iterator (ContentIterator): An iterator that can fetch object data from AIS in chunks. max_resume (int): Maximum number of retry attempts in case of a streaming failure. """ - def __init__(self, object_reader: ObjectReader, max_resume: int): - self._object_reader = object_reader + def __init__(self, content_iterator: ContentIterator, max_resume: int): + self._content_iterator = content_iterator self._max_resume = max_resume self._current_pos = 0 self._closed = False self._buffer = SimpleBuffer() - self._chunk_iterator = self._object_reader.iter_from_position(self._current_pos) + self._chunk_iterator = self._content_iterator.iter_from_position( + self._current_pos + ) self._resume_total = 0 def close(self) -> None: @@ -198,7 +200,7 @@ def read(self, size=-1): ) # Reset the chunk iterator for resuming the stream - self._chunk_iterator = self._object_reader.iter_from_position( + self._chunk_iterator = self._content_iterator.iter_from_position( self._current_pos + len(self._buffer) ) diff --git a/python/aistore/sdk/obj/object_reader.py b/python/aistore/sdk/obj/object_reader.py index a434379b90..6dc88e1ae6 100644 --- a/python/aistore/sdk/obj/object_reader.py +++ b/python/aistore/sdk/obj/object_reader.py @@ -2,34 +2,33 @@ # Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. # -from typing import Iterator, List, Dict, Optional +from typing import Iterator, Optional import requests -from aistore.sdk.request_client import RequestClient -from aistore.sdk.const import DEFAULT_CHUNK_SIZE, HTTP_METHOD_GET, HTTP_METHOD_HEAD +from aistore.sdk.obj.content_iterator import ContentIterator +from aistore.sdk.obj.object_client import ObjectClient +from aistore.sdk.obj.object_file import ObjectFile +from aistore.sdk.const import DEFAULT_CHUNK_SIZE from aistore.sdk.obj.object_attributes import ObjectAttributes class ObjectReader: """ - Represents the data returned by the API when getting an object, including access to the content stream and object - attributes. + Provide a way to read an object's contents and attributes, optionally iterating over a stream of content. + + Args: + object_client (ObjectClient): Client for making requests to a specific object in AIS + chunk_size (int, optional): Size of each data chunk to be fetched from the stream. + Defaults to DEFAULT_CHUNK_SIZE. """ - # pylint: disable=too-many-arguments def __init__( self, - client: RequestClient, - path: str, - params: List[str], - headers: Optional[Dict[str, str]] = None, + object_client: ObjectClient, chunk_size: int = DEFAULT_CHUNK_SIZE, ): - self._request_client = client - self._request_path = path - self._request_params = params - self._request_headers = headers - self._chunk_size = chunk_size + self._object_client = object_client + self._content_iterator = ContentIterator(self._object_client, chunk_size) self._attributes = None def head(self) -> ObjectAttributes: @@ -37,39 +36,26 @@ def head(self) -> ObjectAttributes: Make a head request to AIS to update and return only object attributes. Returns: - ObjectAttributes for this object - + `ObjectAttributes` containing metadata for this object. """ - resp = self._request_client.request( - HTTP_METHOD_HEAD, path=self._request_path, params=self._request_params - ) - return ObjectAttributes(resp.headers) + self._attributes = self._object_client.head() + return self._attributes def _make_request( - self, stream: bool = True, start_position: Optional[int] = None + self, stream: bool = True, start_position: int = 0 ) -> requests.Response: """ - Make a request to AIS to get the object content, optionally starting at a specific byte position. + Use the object client to get a response from AIS and update the reader's object attributes. Args: - stream: Whether to stream the response. - start_position: The byte position to start reading from, if specified. + stream (bool, optional): If True, use the `requests` library `stream` option to stream the response content. + Defaults to True. + start_position (int, optional): The byte position to start reading from. Defaults to 0. Returns: The response object from the request. """ - headers = self._request_headers.copy() if self._request_headers else {} - if start_position is not None and start_position != 0: - headers["Range"] = f"bytes={start_position}-" - - resp = self._request_client.request( - HTTP_METHOD_GET, - path=self._request_path, - params=self._request_params, - stream=stream, - headers=headers, - ) - resp.raise_for_status() + resp = self._object_client.get(stream=stream, start_position=start_position) self._attributes = ObjectAttributes(resp.headers) return resp @@ -79,22 +65,12 @@ def attributes(self) -> ObjectAttributes: Object metadata attributes. Returns: - ObjectAttributes: Parsed object attributes from the headers returned by AIS + ObjectAttributes: Parsed object attributes from the headers returned by AIS. """ if not self._attributes: self._attributes = self.head() return self._attributes - @property - def chunk_size(self) -> int: - """ - Chunk size. - - Returns: - int: Current chunk size for reading the object. - """ - return self._chunk_size - def read_all(self) -> bytes: """ Read all byte data directly from the object response without using a stream. @@ -108,13 +84,37 @@ def read_all(self) -> bytes: def raw(self) -> requests.Response: """ - Returns the raw byte stream of object content. + Return the raw byte stream of object content. Returns: - requests.Response: Raw byte stream of the object content + requests.Response: Raw byte stream of the object content. """ return self._make_request(stream=True).raw + def as_file( + self, + max_resume: Optional[int] = 5, + ) -> ObjectFile: + """ + Create an `ObjectFile` for reading object data in chunks. `ObjectFile` supports + resuming and retrying from the last known position in the case the object stream + is prematurely closed due to an unexpected error. + + Args: + max_resume (int, optional): Maximum number of resume attempts in case of streaming failure. Defaults to 5. + + Returns: + ObjectFile: A file-like object that can be used to read the object content. + + Raises: + requests.RequestException: An ambiguous exception occurred while handling the request. + requests.ConnectionError: A connection error occurred. + requests.ConnectionTimeout: The connection to AIStore timed out. + requests.ReadTimeout: Waiting for a response from AIStore timed out. + requests.exceptions.HTTPError(404): The object does not exist. + """ + return ObjectFile(self._content_iterator, max_resume=max_resume) + def iter_from_position(self, start_position: int = 0) -> Iterator[bytes]: """ Make a request to get a stream from the provided object starting at a specific byte position @@ -126,11 +126,7 @@ def iter_from_position(self, start_position: int = 0) -> Iterator[bytes]: Returns: Iterator[bytes]: An iterator over each chunk of bytes in the object starting from the specific position. """ - stream = self._make_request(stream=True, start_position=start_position) - try: - yield from stream.iter_content(chunk_size=self.chunk_size) - finally: - stream.close() + return self._content_iterator.iter_from_position(start_position) def __iter__(self) -> Iterator[bytes]: """ diff --git a/python/examples/sdk/resilient-streaming-object-file.ipynb b/python/examples/sdk/resilient-streaming-object-file.ipynb index 6ea592916e..b894ed52ec 100644 --- a/python/examples/sdk/resilient-streaming-object-file.ipynb +++ b/python/examples/sdk/resilient-streaming-object-file.ipynb @@ -37,7 +37,7 @@ "\n", "AIS_ENDPOINT = \"http://localhost:8080\"\n", "\n", - "# Define custom retry logic for requests to AIS. This will also be used when re-establishing streams (in the case of object.as_file()).\n", + "# Define custom retry logic for requests to AIS. This will also be used when re-establishing streams (in the case of object.get().as_file()).\n", "# If you want to retry in the case of total pod failure, be sure to force retries on specific HTTP response codes that are not typically retried\n", "# In particular, 400 and 404 are what you might see as the client attempts to redirect requests to an object on a missing target\n", "# The timing on each retry is determined by (backoff_factor * 2^retry_count) -- here the last and longest retry waits 512 seconds\n", @@ -92,7 +92,7 @@ "# Step 3: Open the Object File & Read\n", "\n", "# Step 3a: Stream the object file and use tarfile.open to extract\n", - "with client.bucket(BUCKET_NAME).object(OBJECT_NAME).as_file(max_resume=3) as file_obj:\n", + "with client.bucket(BUCKET_NAME).object(OBJECT_NAME).get().as_file(max_resume=3) as file_obj:\n", " with tarfile.open(fileobj=file_obj, mode='r|*') as tar:\n", " if not os.path.exists(EXTRACT_PATH):\n", " os.makedirs(EXTRACT_PATH)\n", diff --git a/python/tests/integration/sdk/test_object_file_ops.py b/python/tests/integration/sdk/test_object_file_ops.py index d22b07c76e..cc3ae295d0 100644 --- a/python/tests/integration/sdk/test_object_file_ops.py +++ b/python/tests/integration/sdk/test_object_file_ops.py @@ -56,7 +56,7 @@ def tearDownClass(cls): os.remove(cls.TAR_FILE_PATH) def setUp(self): - self.file_obj = self.bucket.object(self.OBJECT_NAME).as_file() + self.file_obj = self.bucket.object(self.OBJECT_NAME).get().as_file() def tearDown(self): # Close the file object if it's still open diff --git a/python/tests/unit/sdk/obj/__init__.py b/python/tests/unit/sdk/obj/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/tests/unit/sdk/obj/test_content_iterator.py b/python/tests/unit/sdk/obj/test_content_iterator.py new file mode 100644 index 0000000000..8ef296b38a --- /dev/null +++ b/python/tests/unit/sdk/obj/test_content_iterator.py @@ -0,0 +1,49 @@ +import unittest +from unittest.mock import Mock +from aistore.sdk.obj.object_client import ObjectClient +from aistore.sdk.const import DEFAULT_CHUNK_SIZE +from aistore.sdk.obj.content_iterator import ContentIterator +from tests.utils import test_cases + +byte_chunks = [b"chunk1", b"chunk2", b"chunk3"] + + +class TestContentIterator(unittest.TestCase): + def setUp(self): + self.mock_client = Mock(spec=ObjectClient) + self.iterator = ContentIterator(self.mock_client, DEFAULT_CHUNK_SIZE) + + @test_cases(None, 1234) + def test_iter_from_position(self, chunk_size): + mock_stream = Mock() + mock_stream.iter_content.return_value = byte_chunks + self.mock_client.get.return_value = mock_stream + + if chunk_size: + self.iterator = ContentIterator(self.mock_client, chunk_size=chunk_size) + + start_position = 100 + res = list(self.iterator.iter_from_position(start_position)) + + self.assertEqual(byte_chunks, res) + self.mock_client.get.assert_called_with( + stream=True, start_position=start_position + ) + if chunk_size: + mock_stream.iter_content.assert_called_once_with(chunk_size=chunk_size) + else: + mock_stream.iter_content.assert_called_once_with( + chunk_size=DEFAULT_CHUNK_SIZE + ) + + mock_stream.close.assert_called_once() + + def test_iter_from_position_exception_handling(self): + mock_stream = Mock() + mock_stream.iter_content.side_effect = Exception("Stream error") + self.mock_client.get.return_value = mock_stream + + with self.assertRaises(Exception): + list(self.iterator.iter_from_position(0)) + + mock_stream.close.assert_called_once() diff --git a/python/tests/unit/sdk/test_object.py b/python/tests/unit/sdk/obj/test_object.py similarity index 90% rename from python/tests/unit/sdk/test_object.py rename to python/tests/unit/sdk/obj/test_object.py index 6dd69bf0c3..b2f9c0e4b7 100644 --- a/python/tests/unit/sdk/test_object.py +++ b/python/tests/unit/sdk/obj/test_object.py @@ -35,6 +35,7 @@ AIS_PRESENT, ) from aistore.sdk.obj.object import Object +from aistore.sdk.obj.object_client import ObjectClient from aistore.sdk.obj.object_reader import ObjectReader from aistore.sdk.archive_config import ArchiveMode, ArchiveConfig from aistore.sdk.obj.object_props import ObjectProps @@ -51,7 +52,7 @@ REQUEST_PATH = f"{URL_PATH_OBJECTS}/{BCK_NAME}/{OBJ_NAME}" -# pylint: disable=unused-variable, too-many-locals, too-many-public-methods +# pylint: disable=unused-variable, too-many-locals, too-many-public-methods, no-value-for-parameter class TestObject(unittest.TestCase): def setUp(self) -> None: self.mock_client = Mock() @@ -109,28 +110,35 @@ def test_get_archregex(self): archive_config = ArchiveConfig(regex=regex, mode=mode) self.get_exec_assert(archive_config=archive_config) - def get_exec_assert(self, **kwargs): - with patch( - "aistore.sdk.obj.object.ObjectReader", return_value=Mock(spec=ObjectReader) - ) as mock_obj_reader: - res = self.object.get(**kwargs) - - blob_config = kwargs.get("blob_download_config", BlobDownloadConfig()) - initial_headers = kwargs.get("expected_headers", {}) - expected_headers = self.get_expected_headers(initial_headers, blob_config) - - expected_chunk_size = kwargs.get("chunk_size", DEFAULT_CHUNK_SIZE) - - self.assertIsInstance(res, ObjectReader) - mock_obj_reader.assert_called_with( - client=self.mock_client, - path=REQUEST_PATH, - params=self.expected_params, - headers=expected_headers, - chunk_size=expected_chunk_size, - ) - if "writer" in kwargs: - self.mock_writer.writelines.assert_called_with(res) + @patch("aistore.sdk.obj.object.ObjectReader") + @patch("aistore.sdk.obj.object.ObjectClient") + def get_exec_assert(self, mock_obj_client, mock_obj_reader, **kwargs): + mock_obj_client_instance = Mock(spec=ObjectClient) + mock_obj_client.return_value = mock_obj_client_instance + mock_obj_reader.return_value = Mock(spec=ObjectReader) + + res = self.object.get(**kwargs) + + blob_config = kwargs.get("blob_download_config", BlobDownloadConfig()) + initial_headers = kwargs.get("expected_headers", {}) + expected_headers = self.get_expected_headers(initial_headers, blob_config) + + expected_chunk_size = kwargs.get("chunk_size", DEFAULT_CHUNK_SIZE) + + self.assertIsInstance(res, ObjectReader) + + mock_obj_client.assert_called_with( + request_client=self.mock_client, + path=REQUEST_PATH, + params=self.expected_params, + headers=expected_headers, + ) + mock_obj_reader.assert_called_with( + object_client=mock_obj_client_instance, + chunk_size=expected_chunk_size, + ) + if "writer" in kwargs: + self.mock_writer.writelines.assert_called_with(res) @staticmethod def get_expected_headers(initial_headers, blob_config): diff --git a/python/tests/unit/sdk/test_object_attributes.py b/python/tests/unit/sdk/obj/test_object_attributes.py similarity index 100% rename from python/tests/unit/sdk/test_object_attributes.py rename to python/tests/unit/sdk/obj/test_object_attributes.py diff --git a/python/tests/unit/sdk/obj/test_object_client.py b/python/tests/unit/sdk/obj/test_object_client.py new file mode 100644 index 0000000000..af1a5afee8 --- /dev/null +++ b/python/tests/unit/sdk/obj/test_object_client.py @@ -0,0 +1,80 @@ +import unittest +from unittest.mock import Mock, patch + +import requests + +from aistore.sdk.const import HTTP_METHOD_HEAD, HTTP_METHOD_GET, HEADER_RANGE +from aistore.sdk.request_client import RequestClient +from aistore.sdk.obj.object_client import ObjectClient + + +class TestObjectClient(unittest.TestCase): + def setUp(self) -> None: + self.request_client = Mock(spec=RequestClient) + self.path = "/test/path" + self.params = ["param1", "param2"] + self.headers = {"header1": "req1", "header2": "req2"} + self.response_headers = {"attr1": "resp1", "attr2": "resp2"} + self.object_client = ObjectClient( + self.request_client, self.path, self.params, self.headers + ) + + def test_get(self): + self.get_exec_assert( + self.object_client, + stream=True, + start_position=0, + expected_headers=self.headers, + ) + + def test_get_no_headers(self): + object_client = ObjectClient(self.request_client, self.path, self.params) + self.get_exec_assert( + object_client, stream=True, start_position=0, expected_headers={} + ) + + def test_get_with_options(self): + # Test with stream=False, start position set + mock_response = Mock(spec=requests.Response, headers=self.response_headers) + self.request_client.request.return_value = mock_response + # Any int + start_position = 4 + self.headers[HEADER_RANGE] = f"bytes={start_position}-" + + res = self.object_client.get(stream=False, start_position=start_position) + + self.assert_get(expected_stream=False, expected_headers=self.headers) + self.assertEqual(res, mock_response) + mock_response.raise_for_status.assert_called_once() + + def get_exec_assert(self, object_client, stream, start_position, expected_headers): + mock_response = Mock(spec=requests.Response) + self.request_client.request.return_value = mock_response + + res = object_client.get(stream=stream, start_position=start_position) + + self.assert_get(stream, expected_headers) + self.assertEqual(res, mock_response) + mock_response.raise_for_status.assert_called_once() + + def assert_get(self, expected_stream, expected_headers): + self.request_client.request.assert_called_once_with( + HTTP_METHOD_GET, + path=self.path, + params=self.params, + stream=expected_stream, + headers=expected_headers, + ) + + @patch("aistore.sdk.obj.object_client.ObjectAttributes", autospec=True) + def test_head(self, mock_attr): + mock_response = Mock(spec=requests.Response, headers=self.response_headers) + self.request_client.request.return_value = mock_response + + res = self.object_client.head() + + self.assertEqual(mock_attr.return_value, res) + self.request_client.request.assert_called_once_with( + HTTP_METHOD_HEAD, path=self.path, params=self.params + ) + mock_attr.assert_called_with(self.response_headers) diff --git a/python/tests/unit/sdk/test_object_file.py b/python/tests/unit/sdk/obj/test_object_file.py similarity index 86% rename from python/tests/unit/sdk/test_object_file.py rename to python/tests/unit/sdk/obj/test_object_file.py index 817b1903e4..64649f963b 100644 --- a/python/tests/unit/sdk/test_object_file.py +++ b/python/tests/unit/sdk/obj/test_object_file.py @@ -1,19 +1,20 @@ # # Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. # - -# pylint: disable=protected-access - import io import os import unittest from unittest import mock from requests.exceptions import ChunkedEncodingError + +from aistore.sdk.obj.content_iterator import ContentIterator from aistore.sdk.obj.object_file import ObjectFile -from aistore.sdk.obj.object_reader import ObjectReader from aistore.sdk.const import DEFAULT_CHUNK_SIZE +# pylint: disable=too-few-public-methods + + class BadObjectStream(io.BytesIO): """ Simulates a stream that fails with ChunkedEncodingError intermittently every `fail_on_read` @@ -33,20 +34,20 @@ def read(self, size=-1): return super().read(size) -class BadObjectReader(ObjectReader): +class BadContentIterator(ContentIterator): """ - Simulates an ObjectReader that streams data using BadObjectStream that fails with ChunksEncoding + Simulates an ContentIterator that streams data using BadObjectStream that fails with ChunksEncoding error every `fail_on_read` chunks read. - This class extends `ObjectReader` and the chunk size (DEFAULT_CHUNK_SIZE) is inherited from the - parent class `ObjectReader`. + This class extends `ContentIterator` and the chunk size (DEFAULT_CHUNK_SIZE) is inherited from the + parent class `ContentIterator`. The streaming starts from a specific position (`start_position`), allowing the object to resume reading from that point if necessary. """ def __init__(self, data=None, fail_on_read=2): - super().__init__(client=mock.Mock(), path="", params=[]) + super().__init__(client=mock.Mock(), chunk_size=DEFAULT_CHUNK_SIZE) self.data = data self.fail_on_read = fail_on_read @@ -58,7 +59,7 @@ def iterator(): self.data[start_position:], fail_on_read=self.fail_on_read ) while True: - chunk = stream.read(self.chunk_size) + chunk = stream.read(self._chunk_size) if not chunk: break yield chunk @@ -73,7 +74,7 @@ class TestObjectFile(unittest.TestCase): def test_buffer_usage(self): """Test ObjectFile uses buffer correctly, only fetching new chunks when needed.""" data = os.urandom(DEFAULT_CHUNK_SIZE * 2) - mock_reader = BadObjectReader(data=data, fail_on_read=0) + mock_reader = BadContentIterator(data=data, fail_on_read=0) object_file = ObjectFile(mock_reader, max_resume=0) # Mock `next` to track how many times we fetch new data @@ -108,7 +109,7 @@ def test_buffer_usage(self): def test_read_all_fails_after_max_retries(self): """Test that ObjectFile gives up after exceeding max retry attempts for read-all.""" data = os.urandom(DEFAULT_CHUNK_SIZE * 4) - mock_reader = BadObjectReader(data=data, fail_on_read=2) + mock_reader = BadContentIterator(data=data, fail_on_read=2) object_file = ObjectFile(mock_reader, max_resume=2) # Test that the read fails after 2 retry attempts @@ -118,7 +119,7 @@ def test_read_all_fails_after_max_retries(self): def test_read_fixed_fails_after_max_retries(self): """Test that ObjectFile gives up after exceeding max retry attempts for fixed-size reads.""" data = os.urandom(DEFAULT_CHUNK_SIZE * 4) - mock_reader = BadObjectReader(data=data, fail_on_read=2) + mock_reader = BadContentIterator(data=data, fail_on_read=2) object_file = ObjectFile(mock_reader, max_resume=2) # Test that the read fails after 2 retry attempts for a fixed-size read @@ -128,7 +129,7 @@ def test_read_fixed_fails_after_max_retries(self): def test_read_all_success_after_retries(self): """Test that ObjectFile retries and succeeds after intermittent ChunkedEncodingError for read-all.""" data = os.urandom(DEFAULT_CHUNK_SIZE * 4) - mock_reader = BadObjectReader(data=data, fail_on_read=2) + mock_reader = BadContentIterator(data=data, fail_on_read=2) object_file = ObjectFile(mock_reader, max_resume=4) result = object_file.read() @@ -139,7 +140,7 @@ def test_read_all_success_after_retries(self): def test_read_fixed_success_after_retries(self): """Test that ObjectFile retries and succeeds for fixed-size reads after intermittent ChunkedEncodingError.""" data = os.urandom(DEFAULT_CHUNK_SIZE * 4) - mock_reader = BadObjectReader(data=data, fail_on_read=2) + mock_reader = BadContentIterator(data=data, fail_on_read=2) object_file = ObjectFile(mock_reader, max_resume=4) # Read the first half of the data and check the position diff --git a/python/tests/unit/sdk/obj/test_object_reader.py b/python/tests/unit/sdk/obj/test_object_reader.py new file mode 100644 index 0000000000..d16ac4f0d0 --- /dev/null +++ b/python/tests/unit/sdk/obj/test_object_reader.py @@ -0,0 +1,133 @@ +import unittest +from unittest.mock import patch, Mock +import requests + +from aistore.sdk.obj.content_iterator import ContentIterator +from aistore.sdk.obj.object_file import ObjectFile +from aistore.sdk.obj.object_reader import ObjectReader +from aistore.sdk.obj.object_attributes import ObjectAttributes + + +class TestObjectReader(unittest.TestCase): + def setUp(self): + self.object_client = Mock() + self.chunk_size = 1024 + self.object_reader = ObjectReader(self.object_client, self.chunk_size) + self.response_headers = {"attr1": "resp1", "attr2": "resp2"} + + def test_head(self): + mock_attr = Mock() + self.object_client.head.return_value = mock_attr + + res = self.object_reader.head() + + # Attributes should be returned and the property updated + self.assertEqual(res, mock_attr) + self.assertEqual(mock_attr, self.object_reader.attributes) + self.object_client.head.assert_called_once() + + def test_attributes_property(self): + mock_attr = Mock() + self.object_client.head.return_value = mock_attr + + attr = self.object_reader.attributes + + # Attributes should be returned and the property updated + self.assertEqual(attr, mock_attr) + # If we access attributes again, no new call to the client + attr = self.object_reader.attributes + self.assertEqual(attr, mock_attr) + self.object_client.head.assert_called_once() + + @patch("aistore.sdk.obj.object_reader.ObjectAttributes", autospec=True) + def test_read_all(self, mock_attr): + # Should return the response content and update the attributes + chunk1 = b"chunk1" + chunk2 = b"chunk2" + mock_response = Mock( + spec=requests.Response, + content=chunk1 + chunk2, + headers=self.response_headers, + ) + self.object_client.get.return_value = mock_response + + content = self.object_reader.read_all() + + # Assert the result, the call to object client + self.assertEqual(chunk1 + chunk2, content) + self.object_client.get.assert_called_with(stream=False, start_position=0) + # Assert attributes parsed and updated + self.assertIsInstance(self.object_reader.attributes, ObjectAttributes) + mock_attr.assert_called_with(self.response_headers) + + @patch("aistore.sdk.obj.object_reader.ObjectAttributes", autospec=True) + def test_raw(self, mock_attr): + mock_response = Mock( + spec=requests.Response, raw=b"bytestream", headers=self.response_headers + ) + self.object_client.get.return_value = mock_response + + raw_stream = self.object_reader.raw() + + # Assert the result, the call to object client + self.assertEqual(mock_response.raw, raw_stream) + self.object_client.get.assert_called_with(stream=True, start_position=0) + # Assert attributes parsed and updated + self.assertIsInstance(self.object_reader.attributes, ObjectAttributes) + mock_attr.assert_called_with(self.response_headers) + + @patch("aistore.sdk.obj.object_reader.ContentIterator") + def test_iter(self, mock_cont_iter_class): + mock_cont_iter, iterable_bytes = self.setup_mock_iterator(mock_cont_iter_class) + + res = iter(self.object_reader) + + mock_cont_iter.iter_from_position.assert_called_with(0) + self.assertEqual(iterable_bytes, res) + + @patch("aistore.sdk.obj.object_reader.ContentIterator") + def test_iter_start_position(self, mock_cont_iter_class): + mock_cont_iter, iterable_bytes = self.setup_mock_iterator(mock_cont_iter_class) + start_position = 2048 + + res = self.object_reader.iter_from_position(start_position) + + mock_cont_iter.iter_from_position.assert_called_with(start_position) + self.assertEqual(iterable_bytes, res) + + def setup_mock_iterator(self, mock_cont_iter_class): + # We patch the class, so use it to create a new instance of a mock content iterator + mock_cont_iter = Mock() + iterable_bytes = iter(b"test") + mock_cont_iter.iter_from_position.return_value = iterable_bytes + mock_cont_iter_class.return_value = mock_cont_iter + # Re-create to use the patched ContentIterator in constructor + self.object_reader = ObjectReader(self.object_client) + return mock_cont_iter, iterable_bytes + + @patch("aistore.sdk.obj.object_reader.ObjectFile", autospec=True) + def test_as_file(self, mock_obj_file): + # Returns an object file with the default resume count + res = self.object_reader.as_file() + self.assertIsInstance(res, ObjectFile) + mock_obj_file.assert_called_once() + # Get the arguments passed to the mock + args, kwargs = mock_obj_file.call_args + # For now just check that we provided a content iterator + self.assertIsInstance(args[0], ContentIterator) + # Check the max_resume argument + self.assertEqual(kwargs.get("max_resume"), 5) + + @patch("aistore.sdk.obj.object_reader.ObjectFile", autospec=True) + def test_as_file_max_resume(self, mock_obj_file): + max_resume = 12 + # Returns an object file with the default resume count + res = self.object_reader.as_file(max_resume=max_resume) + self.assertIsInstance(res, ObjectFile) + mock_obj_file.assert_called_once() + # Get the arguments passed to the mock + args, kwargs = mock_obj_file.call_args + # For now just check that we provided a content iterator + self.assertIsInstance(args[0], ContentIterator) + # Check the max_resume argument + self.assertEqual(kwargs.get("max_resume"), max_resume) diff --git a/python/tests/unit/sdk/test_object_reader.py b/python/tests/unit/sdk/test_object_reader.py deleted file mode 100644 index 84610993c0..0000000000 --- a/python/tests/unit/sdk/test_object_reader.py +++ /dev/null @@ -1,124 +0,0 @@ -import unittest -from unittest.mock import MagicMock, patch, Mock -import requests - -from aistore.sdk.obj.object_reader import ObjectReader -from aistore.sdk.request_client import RequestClient -from aistore.sdk.const import HTTP_METHOD_GET, HTTP_METHOD_HEAD -from aistore.sdk.obj.object_attributes import ObjectAttributes - - -class TestObjectReader(unittest.TestCase): - def setUp(self): - self.client = MagicMock(spec=RequestClient) - self.path = "/test/path" - self.params = ["param1", "param2"] - self.headers = {"header1": "req1", "header2": "req2"} - self.response_headers = {"attr1": "resp1", "attr2": "resp2"} - self.chunk_size = 1024 - self.object_reader = ObjectReader( - self.client, self.path, self.params, self.headers, self.chunk_size - ) - - @patch("aistore.sdk.obj.object_reader.ObjectAttributes", autospec=True) - def test_attributes_head(self, mock_attr): - mock_response = Mock(spec=requests.Response, headers=self.response_headers) - self.client.request.return_value = mock_response - - res = self.object_reader.attributes - - self.assertEqual(mock_attr.return_value, res) - self.client.request.assert_called_once_with( - HTTP_METHOD_HEAD, path=self.path, params=self.params - ) - mock_attr.assert_called_with(self.response_headers) - - @patch("aistore.sdk.obj.object_reader.ObjectAttributes", autospec=True) - def test_read_all(self, mock_attr): - chunk1 = b"chunk1" - chunk2 = b"chunk2" - mock_response = Mock( - spec=requests.Response, - content=chunk1 + chunk2, - headers=self.response_headers, - ) - self.client.request.return_value = mock_response - - content = self.object_reader.read_all() - - self.assertEqual(chunk1 + chunk2, content) - self.assert_make_request(mock_attr, stream=False) - - @patch("aistore.sdk.obj.object_reader.ObjectAttributes", autospec=True) - def test_raw(self, mock_attr): - mock_response = Mock( - spec=requests.Response, raw=b"bytestream", headers=self.response_headers - ) - self.client.request.return_value = mock_response - - raw_stream = self.object_reader.raw() - - self.assertEqual(mock_response.raw, raw_stream) - self.assert_make_request(mock_attr, stream=True) - - @patch("aistore.sdk.obj.object_reader.ObjectAttributes", autospec=True) - def test_iter(self, mock_attr): - expected_chunks = [b"chunk1", b"chunk2"] - mock_response = Mock(spec=requests.Response, headers=self.response_headers) - mock_response.iter_content.return_value = expected_chunks - self.client.request.return_value = mock_response - - chunks = list(self.object_reader) - - mock_response.iter_content.assert_called_once_with(chunk_size=self.chunk_size) - mock_response.close.assert_called_once() - self.assertEqual(expected_chunks, chunks) - self.assert_make_request(mock_attr, stream=True) - - @patch("aistore.sdk.obj.object_reader.ObjectAttributes", autospec=True) - def test_iter_from_position(self, mock_attr): - expected_chunks = [b"chunk1", b"chunk2"] - mock_response = Mock(spec=requests.Response, headers=self.response_headers) - mock_response.iter_content.return_value = expected_chunks - self.client.request.return_value = mock_response - - start_position = 2048 - chunks = list( - self.object_reader.iter_from_position(start_position=start_position) - ) - - self.assertEqual(chunks, expected_chunks) - self.assert_make_request_with_range( - mock_attr, stream=True, start_position=start_position - ) - - self.client.request.reset_mock() - start_position = 0 - chunks = list( - self.object_reader.iter_from_position(start_position=start_position) - ) - - self.assertEqual(chunks, expected_chunks) - self.assert_make_request(mock_attr, stream=True) - - def assert_make_request(self, mock_attr, stream): - self.client.request.assert_called_once_with( - HTTP_METHOD_GET, - path=self.path, - params=self.params, - stream=stream, - headers=self.headers, - ) - self.assertIsInstance(self.object_reader.attributes, ObjectAttributes) - mock_attr.assert_called_with(self.response_headers) - - def assert_make_request_with_range(self, mock_attr, stream, start_position): - self.client.request.assert_called_once_with( - HTTP_METHOD_GET, - path=self.path, - params=self.params, - stream=stream, - headers={**self.headers, "Range": f"bytes={start_position}-"}, - ) - self.assertIsInstance(self.object_reader.attributes, ObjectAttributes) - mock_attr.assert_called_with(self.response_headers)