-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ Expose multipart upload #63
Changes from 16 commits
f95ac9e
1fb6277
8508c0c
f3c5791
c177c3f
d2b0376
98e2e4c
08acb53
a9aeb28
581297e
c2fb0ca
b2fe544
fd42d3a
b37a4ac
393a662
a80a3ae
fa6fbce
60bc124
90cff10
1f4e918
4ca5480
f092cce
8aad099
d9bcb74
19d02f0
7600bf4
d7f19eb
dd17c44
237775a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
import asyncio | ||
import json | ||
import math | ||
from pathlib import Path | ||
from typing import Any, Iterator, List, Optional, Tuple, Union | ||
|
||
import aiohttp | ||
from aiohttp import ClientSession | ||
from osparc_client import ( | ||
BodyCompleteMultipartUploadV0FilesFileIdCompletePost, | ||
ClientFile, | ||
ClientFileUploadSchema, | ||
) | ||
from osparc_client import FilesApi as _FilesApi | ||
from osparc_client import FileUploadCompletionBody, FileUploadLinks, UploadedPart | ||
from tqdm.asyncio import tqdm_asyncio | ||
|
||
from . import ApiClient, File | ||
from ._http_client import HttpClient | ||
from ._utils import _file_chunk_generator | ||
from ._warnings_and_errors import aiohttp_error_handler_async | ||
|
||
|
||
class FilesApi(_FilesApi): | ||
"""Class for interacting with files""" | ||
|
||
def __init__(self, api_client: Optional[ApiClient] = None): | ||
"""Construct object | ||
|
||
Args: | ||
api_client (ApiClient, optinal): osparc.ApiClient object | ||
""" | ||
super().__init__(api_client) | ||
self._super = super(FilesApi, self) | ||
user: Optional[str] = self.api_client.configuration.username | ||
passwd: Optional[str] = self.api_client.configuration.password | ||
self._auth: Optional[aiohttp.BasicAuth] = ( | ||
aiohttp.BasicAuth(login=user, password=passwd) | ||
if (user is not None and passwd is not None) | ||
else None | ||
) | ||
|
||
def upload_file(self, file: Union[str, Path]): | ||
return asyncio.run(self.upload_file_async(file=file)) | ||
|
||
@aiohttp_error_handler_async | ||
async def upload_file_async(self, file: Union[str, Path]) -> File: | ||
if isinstance(file, str): | ||
file = Path(file) | ||
if not file.is_file(): | ||
raise RuntimeError(f"{file} is not a file") | ||
client_file: ClientFile = ClientFile( | ||
filename=file.name, filesize=file.stat().st_size | ||
) | ||
client_upload_schema: ClientFileUploadSchema = self._super.get_upload_links( | ||
client_file=client_file | ||
) | ||
chunk_size: int = client_upload_schema.upload_schema.chunk_size | ||
links: FileUploadLinks = client_upload_schema.upload_schema.links | ||
url_iter: Iterator[Tuple[int, str]] = enumerate( | ||
iter(client_upload_schema.upload_schema.urls), start=1 | ||
) | ||
if len(client_upload_schema.upload_schema.urls) < math.ceil( | ||
file.stat().st_size / chunk_size | ||
): | ||
raise RuntimeError( | ||
"Did not receive sufficient number of upload URLs from the server." | ||
) | ||
|
||
tasks: list = [] | ||
async with HttpClient( | ||
exc_req_typ="post", exc_url=links.abort_upload, exc_auth=self._auth | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you really like writing acronyms :-) ... do not be shy. You can write long names. what is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, sorry. This should already have been fixed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done now |
||
) as session: | ||
async for chunck, size in _file_chunk_generator(file, chunk_size): | ||
# following https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task | ||
index, url = next(url_iter) | ||
task = asyncio.create_task( | ||
self._upload_chunck( | ||
http_client=session, | ||
chunck=chunck, | ||
chunck_size=size, | ||
upload_link=url, | ||
index=index, | ||
) | ||
) | ||
tasks.append(task) | ||
|
||
uploaded_parts: List[UploadedPart] = await tqdm_asyncio.gather(*tasks) | ||
|
||
return await self._complete_multipart_upload( | ||
session, links.complete_upload, client_file, uploaded_parts | ||
) | ||
|
||
async def _complete_multipart_upload( | ||
self, | ||
http_client: ClientSession, | ||
complete_link: str, | ||
client_file: ClientFile, | ||
uploaded_parts: List[UploadedPart], | ||
) -> File: | ||
complete_payload = BodyCompleteMultipartUploadV0FilesFileIdCompletePost( | ||
client_file=client_file, | ||
uploaded_parts=FileUploadCompletionBody(parts=uploaded_parts), | ||
) | ||
async with http_client.post( | ||
complete_link, | ||
json=complete_payload.to_dict(), | ||
auth=self._auth, | ||
) as response: | ||
response.raise_for_status() | ||
payload: dict[str, Any] = await response.json() | ||
return File(**payload) | ||
|
||
async def _upload_chunck( | ||
self, | ||
http_client: ClientSession, | ||
chunck: bytes, | ||
chunck_size: int, | ||
upload_link: str, | ||
index: int, | ||
) -> UploadedPart: | ||
async with http_client.put( | ||
upload_link, data=chunck, headers={"Content-Length": f"{chunck_size}"} | ||
) as response: | ||
response.raise_for_status() | ||
assert response.headers # nosec | ||
assert "Etag" in response.headers # nosec | ||
etag: str = json.loads(response.headers["Etag"]) | ||
return UploadedPart(number=index, e_tag=etag) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
from typing import Optional | ||
|
||
import aiohttp | ||
import tenacity | ||
|
||
|
||
class HttpClient: | ||
"""Async http client context manager""" | ||
|
||
def __init__( | ||
self, | ||
exc_req_typ: Optional[str] = None, | ||
bisgaard-itis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
exc_url: Optional[str] = None, | ||
exc_auth: Optional[aiohttp.BasicAuth] = None, | ||
): | ||
self._client = aiohttp.ClientSession() | ||
self._exc_callback = getattr(self._client, exc_req_typ) if exc_req_typ else None | ||
self._exc_url = exc_url | ||
self._exc_auth = exc_auth | ||
|
||
async def __aenter__(self) -> aiohttp.ClientSession: | ||
return self._client | ||
|
||
async def __aexit__(self, exc_type, exc_value, traceback) -> None: | ||
if exc_value is None: | ||
await self._client.close() | ||
else: # exception raised: need to handle | ||
if self._exc_callback is not None: | ||
try: | ||
async for attempt in tenacity.AsyncRetrying( | ||
reraise=True, | ||
wait=tenacity.wait_fixed(1), | ||
stop=tenacity.stop_after_delay(10), | ||
retry=tenacity.retry_if_exception_type( | ||
aiohttp.ServerDisconnectedError | ||
), | ||
): | ||
with attempt: | ||
async with self._exc_callback( | ||
self._exc_url, auth=self._exc_auth | ||
) as response: | ||
response.raise_for_status() | ||
except Exception as err: | ||
await self._client.close() | ||
raise err from exc_value | ||
await self._client.close() | ||
raise exc_value |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,25 @@ | ||
import aiohttp | ||
|
||
|
||
class VisibleDeprecationWarning(UserWarning): | ||
"""Visible deprecation warning. | ||
|
||
Acknowledgement: Having this wrapper is borrowed from numpy | ||
""" | ||
|
||
|
||
class RequestError(Exception): | ||
"""For exceptions encountered when performing HTTP requests.""" | ||
|
||
|
||
def aiohttp_error_handler_async(method): | ||
"""Handle Aiohttp errors""" | ||
|
||
async def wrapper(*args, **kwargs): | ||
try: | ||
return await method(*args, **kwargs) | ||
except aiohttp.ClientResponseError as err: | ||
msg = f"HTTP status: {err.code}. {err.message}" | ||
raise RequestError(msg) from err | ||
|
||
return wrapper |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
import os | ||
|
||
import osparc | ||
import pytest | ||
|
||
|
||
@pytest.fixture | ||
def cfg() -> osparc.Configuration: | ||
"""Configuration | ||
|
||
Returns: | ||
osparc.Configuration: The Configuration | ||
""" | ||
cfg = osparc.Configuration( | ||
host=os.environ["OSPARC_API_HOST"], | ||
username=os.environ["OSPARC_API_KEY"], | ||
password=os.environ["OSPARC_API_SECRET"], | ||
) | ||
return cfg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MINOR: In reality
client_upload_schema
is NOT a schema but data. Note that pydantic class builds the schema but the instance is a data object. For that reason I try avoiding the wordSchema
in the classes. This should be modified in the server as well.BTW, we try to follow a naming convention for request payloads and response bodies. We use the anem of the resource and then a verb at the end. For instance,
ProjectGet
would be the response body for aGET /project/{id}
resourceThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, concerning the schema thing I will address this among other small things I need to do on the API server: ITISFoundation/osparc-simcore#4641