Skip to content

Commit

Permalink
sdk/python: object file max_resume per object file
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Koo <[email protected]>
  • Loading branch information
rkoo19 committed Sep 19, 2024
1 parent 4bc51f6 commit 3eaf228
Showing 1 changed file with 16 additions and 18 deletions.
34 changes: 16 additions & 18 deletions python/aistore/sdk/object_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#

# pylint: disable=too-many-branches

from io import BufferedIOBase
from typing import Iterator

import requests

from aistore.sdk.object_reader import ObjectReader
from aistore.sdk.utils import get_logger

Expand Down Expand Up @@ -84,17 +84,16 @@ def empty(self):

class ObjectFile(BufferedIOBase):
"""
A file-like object for reading object data, with support for both reading a fixed size
of data and reading until the end of the stream (EOF). It provides the ability to resume
and continue reading from the last known position in the event of a streaming error, such
as a `ChunkedEncodingError`.
Data is fetched in chunks via the object reader and temporarily stored in an internal buffer.
The buffer is filled either to the required size or until EOF is reached. If a streaming
error occurs during this process, the object file automatically attempts to resume the buffer
filling process from the last known chunk position, up to a configurable number of resume
attempts (`max_resume`), raising a `ChunkedEncodingError` if the maximum number of attempts
is exceeded.
A file-like object for reading object data, with support for both reading a fixed size of data
and reading until the end of the stream (EOF). It provides the ability to resume and continue
reading from the last known position in the event of a ChunkedEncodingError.
Data is fetched in chunks via the object reader iterator and temporarily stored in an internal
buffer. The buffer is filled either to the required size or until EOF is reached. If a
`ChunkedEncodingError` occurs during this process, ObjectFile catches and automatically attempts
to resume the buffer filling process from the last known chunk position. The number of resume
attempts is tracked across the entire object file, and if the total number of attempts exceeds
the configurable `max_resume`, a `ChunkedEncodingError` is raised.
Once the buffer is adequately filled, the `read()` method reads and returns the requested amount
of data from the buffer.
Expand All @@ -111,6 +110,7 @@ def __init__(self, object_reader: ObjectReader, max_resume: int):
self._closed = False
self._buffer = SimpleBuffer()
self._chunk_iterator = self._object_reader.iter_from_position(self._current_pos)
self._resume_total = 0

def close(self) -> None:
"""
Expand Down Expand Up @@ -178,23 +178,21 @@ def read(self, size=-1):
if size == 0:
return b""

resume_attempts = 0

while True:
try:
# Fill the buffer with the requested size or to EOF
self._buffer.fill(self._chunk_iterator, size)
# If successfully filled the buffer, exit
break
except requests.exceptions.ChunkedEncodingError as err:
resume_attempts += 1
if resume_attempts > self._max_resume:
self._resume_total += 1
if self._resume_total > self._max_resume:
logger.error("Max retries reached. Cannot resume read.")
raise err
logger.warning(
"Chunked encoding error (%s), retrying %d/%d",
err,
resume_attempts,
self._resume_total,
self._max_resume,
)

Expand Down

0 comments on commit 3eaf228

Please sign in to comment.