diff --git a/python/aistore/sdk/object_file.py b/python/aistore/sdk/object_file.py index 4eaca82608f..a9a8b79e1ac 100644 --- a/python/aistore/sdk/object_file.py +++ b/python/aistore/sdk/object_file.py @@ -2,11 +2,11 @@ # Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. # -# pylint: disable=too-many-branches - from io import BufferedIOBase from typing import Iterator + import requests + from aistore.sdk.object_reader import ObjectReader from aistore.sdk.utils import get_logger @@ -84,17 +84,16 @@ def empty(self): class ObjectFile(BufferedIOBase): """ - A file-like object for reading object data, with support for both reading a fixed size - of data and reading until the end of the stream (EOF). It provides the ability to resume - and continue reading from the last known position in the event of a streaming error, such - as a `ChunkedEncodingError`. - - Data is fetched in chunks via the object reader and temporarily stored in an internal buffer. - The buffer is filled either to the required size or until EOF is reached. If a streaming - error occurs during this process, the object file automatically attempts to resume the buffer - filling process from the last known chunk position, up to a configurable number of resume - attempts (`max_resume`), raising a `ChunkedEncodingError` if the maximum number of attempts - is exceeded. + A file-like object for reading object data, with support for both reading a fixed size of data + and reading until the end of the stream (EOF). It provides the ability to resume and continue + reading from the last known position in the event of a ChunkedEncodingError. + + Data is fetched in chunks via the object reader iterator and temporarily stored in an internal + buffer. The buffer is filled either to the required size or until EOF is reached. If a + `ChunkedEncodingError` occurs during this process, ObjectFile catches and automatically attempts + to resume the buffer filling process from the last known chunk position. The number of resume + attempts is tracked across the entire object file, and if the total number of attempts exceeds + the configurable `max_resume`, a `ChunkedEncodingError` is raised. Once the buffer is adequately filled, the `read()` method reads and returns the requested amount of data from the buffer. @@ -111,6 +110,7 @@ def __init__(self, object_reader: ObjectReader, max_resume: int): self._closed = False self._buffer = SimpleBuffer() self._chunk_iterator = self._object_reader.iter_from_position(self._current_pos) + self._resume_total = 0 def close(self) -> None: """ @@ -178,8 +178,6 @@ def read(self, size=-1): if size == 0: return b"" - resume_attempts = 0 - while True: try: # Fill the buffer with the requested size or to EOF @@ -187,14 +185,14 @@ def read(self, size=-1): # If successfully filled the buffer, exit break except requests.exceptions.ChunkedEncodingError as err: - resume_attempts += 1 - if resume_attempts > self._max_resume: + self._resume_total += 1 + if self._resume_total > self._max_resume: logger.error("Max retries reached. Cannot resume read.") raise err logger.warning( "Chunked encoding error (%s), retrying %d/%d", err, - resume_attempts, + self._resume_total, self._max_resume, )