Skip to content

Commit

Permalink
sdk/python: Refactor internal object classes for accessing and iterat…
Browse files Browse the repository at this point in the history
…ing over object content

Signed-off-by: Aaron Wilson <[email protected]>
  • Loading branch information
aaronnw committed Sep 26, 2024
1 parent 79543fd commit 5ae897a
Show file tree
Hide file tree
Showing 17 changed files with 1,000 additions and 610 deletions.
837 changes: 511 additions & 326 deletions docs/python_sdk.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ generate-sdk-docs:
@ pydoc-markdown -I ./aistore/sdk \
-m authn.authn_client -m authn.cluster_manager -m authn.role_manager -m authn.token_manager -m authn.user_manager -m authn.access_attr \
-m bucket -m client -m cluster -m etl -m job -m multiobj.object_group -m multiobj.object_names -m multiobj.object_range -m multiobj.object_template \
-m object -m object_iterator -m object_reader \
-m obj.object -m obj.object_reader -m obj.object_file -m obj.object_props -m obj.object_attributes \
'{ renderer: { type: markdown, descriptive_class_title: "Class: ", render_toc: true, render_toc_title: "", render_module_header: false, classdef_with_decorators: true } }' >> $(SDK_DOCFILE)
@ sed -i -e 's/####/###/g' $(SDK_DOCFILE)

Expand Down
38 changes: 38 additions & 0 deletions python/aistore/sdk/obj/content_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
from typing import Iterator

from aistore.sdk.obj.object_client import ObjectClient


# pylint: disable=too-few-public-methods
class ContentIterator:
"""
Provide an iterator to open an HTTP response stream and read chunks of object content.
Args:
client (ObjectClient): Client for accessing contents of an individual object.
chunk_size (int): Size of each chunk of data yielded from the response stream.
"""

def __init__(self, client: ObjectClient, chunk_size: int):
self._client = client
self._chunk_size = chunk_size

def iter_from_position(self, start_position: int) -> Iterator[bytes]:
"""
Make a request to get a stream from the provided object starting at a specific byte position
and yield chunks of the stream content.
Args:
start_position (int): The byte position from which to start reading.
Returns:
Iterator[bytes]: An iterator over each chunk of bytes in the object starting from the specific position
"""
stream = self._client.get(stream=True, start_position=start_position)
try:
yield from stream.iter_content(chunk_size=self._chunk_size)
finally:
stream.close()
64 changes: 7 additions & 57 deletions python/aistore/sdk/obj/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
HEADER_OBJECT_BLOB_WORKERS,
HEADER_OBJECT_BLOB_CHUNK_SIZE,
)
from aistore.sdk.obj.object_file import ObjectFile
from aistore.sdk.obj.object_client import ObjectClient
from aistore.sdk.obj.object_reader import ObjectReader
from aistore.sdk.types import (
ActionMsg,
Expand Down Expand Up @@ -165,71 +165,21 @@ def get(
# https://www.rfc-editor.org/rfc/rfc7233#section-2.1
headers = {HEADER_RANGE: byte_range}

obj_reader = ObjectReader(
client=self._client,
obj_client = ObjectClient(
request_client=self._client,
path=self._object_path,
params=params,
headers=headers,
)

obj_reader = ObjectReader(
object_client=obj_client,
chunk_size=chunk_size,
)
if writer:
writer.writelines(obj_reader)
return obj_reader

def as_file(
self,
max_resume: int = 5,
archive_settings: ArchiveConfig = None,
blob_download_settings: BlobDownloadConfig = None,
chunk_size: int = DEFAULT_CHUNK_SIZE,
etl_name: str = None,
writer: BufferedWriter = None,
latest: bool = False,
byte_range: str = None,
) -> ObjectFile: # pylint: disable=too-many-arguments
"""
Creates an `ObjectFile` for reading object data in chunks with support for
resuming and retrying from the last known position in the case the object stream
is prematurely closed due to an unexpected error.
Args:
max_resume (int, optional): If streaming object contents is interrupted, this
defines the maximum number of attempts to resume the connection before
raising an exception.
archive_settings (ArchiveConfig, optional): Settings for archive extraction.
blob_download_settings (BlobDownloadConfig, optional): Settings for using blob
download (e.g., chunk size, workers).
chunk_size (int, optional): The size of chunks to use while reading from the stream.
etl_name (str, optional): Name of the ETL (Extract, Transform, Load) transformation
to apply during the get operation.
writer (BufferedWriter, optional): A writer for writing content output. User is
responsible for closing the writer.
latest (bool, optional): Whether to get the latest version of the object from
a remote bucket (if applicable).
byte_range (str, optional): Specify a byte range to fetch a segment of the object
(e.g., "bytes=0-499" for the first 500 bytes).
Returns:
ObjectFile: A file-like object that can be used to read the object content.
Raises:
requests.RequestException: An ambiguous exception occurred while handling the request.
requests.ConnectionError: A connection error occurred.
requests.ConnectionTimeout: The connection to AIStore timed out.
requests.ReadTimeout: Waiting for a response from AIStore timed out.
requests.exceptions.HTTPError(404): The object does not exist.
"""
object_reader = self.get(
archive_config=archive_settings,
blob_download_config=blob_download_settings,
chunk_size=chunk_size,
etl_name=etl_name,
writer=writer,
latest=latest,
byte_range=byte_range,
)
return ObjectFile(object_reader, max_resume=max_resume)

def get_semantic_url(self) -> str:
"""
Get the semantic URL to the object
Expand Down
72 changes: 72 additions & 0 deletions python/aistore/sdk/obj/object_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
from typing import Optional, Dict, List

import requests

from aistore.sdk.const import HTTP_METHOD_GET, HTTP_METHOD_HEAD, HEADER_RANGE
from aistore.sdk.obj.object_attributes import ObjectAttributes
from aistore.sdk.request_client import RequestClient


class ObjectClient:
"""
ObjectClient is a simple wrapper around a given RequestClient that makes requests to an individual object.
Args:
request_client (RequestClient): The RequestClient used to make HTTP requests
path (str): URL Path to the object
params (List[str]): Query parameters for the request
headers (Optional[Dict[str, str]]): HTTP request headers
"""

def __init__(
self,
request_client: RequestClient,
path: str,
params: List[str],
headers: Optional[Dict[str, str]] = None,
):
self._request_client = request_client
self._request_path = path
self._request_params = params
self._request_headers = headers

def get(self, stream: bool, start_position: int) -> requests.Response:
"""
Make a request to AIS to get the object content, optionally starting at a specific byte position.
Args:
stream (bool): If True, stream the response content.
start_position (int): The byte position to start reading from.
Returns:
requests.Response: The response object from the request.
"""
headers = self._request_headers.copy() if self._request_headers else {}
if start_position != 0:
headers[HEADER_RANGE] = f"bytes={start_position}-"

resp = self._request_client.request(
HTTP_METHOD_GET,
path=self._request_path,
params=self._request_params,
stream=stream,
headers=headers,
)
resp.raise_for_status()
return resp

def head(self) -> ObjectAttributes:
"""
Make a head request to AIS to update and return only object attributes.
Returns:
`ObjectAttributes` containing metadata for this object.
"""
resp = self._request_client.request(
HTTP_METHOD_HEAD, path=self._request_path, params=self._request_params
)
return ObjectAttributes(resp.headers)
14 changes: 8 additions & 6 deletions python/aistore/sdk/obj/object_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import requests

from aistore.sdk.obj.object_reader import ObjectReader
from aistore.sdk.obj.content_iterator import ContentIterator
from aistore.sdk.utils import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -100,17 +100,19 @@ class ObjectFile(BufferedIOBase):
of data from the buffer.
Args:
object_reader (ObjectReader): The object reader used to fetch object data in chunks.
content_iterator (ContentIterator): An iterator that can fetch object data from AIS in chunks.
max_resume (int): Maximum number of retry attempts in case of a streaming failure.
"""

def __init__(self, object_reader: ObjectReader, max_resume: int):
self._object_reader = object_reader
def __init__(self, content_iterator: ContentIterator, max_resume: int):
self._content_iterator = content_iterator
self._max_resume = max_resume
self._current_pos = 0
self._closed = False
self._buffer = SimpleBuffer()
self._chunk_iterator = self._object_reader.iter_from_position(self._current_pos)
self._chunk_iterator = self._content_iterator.iter_from_position(
self._current_pos
)
self._resume_total = 0

def close(self) -> None:
Expand Down Expand Up @@ -198,7 +200,7 @@ def read(self, size=-1):
)

# Reset the chunk iterator for resuming the stream
self._chunk_iterator = self._object_reader.iter_from_position(
self._chunk_iterator = self._content_iterator.iter_from_position(
self._current_pos + len(self._buffer)
)

Expand Down
Loading

0 comments on commit 5ae897a

Please sign in to comment.