-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ Expose multipart upload #63
Changes from 23 commits
f95ac9e
1fb6277
8508c0c
f3c5791
c177c3f
d2b0376
98e2e4c
08acb53
a9aeb28
581297e
c2fb0ca
b2fe544
fd42d3a
b37a4ac
393a662
a80a3ae
fa6fbce
60bc124
90cff10
1f4e918
4ca5480
f092cce
8aad099
d9bcb74
19d02f0
7600bf4
d7f19eb
dd17c44
237775a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import asyncio | ||
import json | ||
import math | ||
import random | ||
import shutil | ||
import string | ||
from pathlib import Path | ||
from typing import Any, Iterator, List, Optional, Tuple, Union | ||
|
||
import httpx | ||
from httpx import AsyncClient, Response | ||
from osparc_client import ( | ||
BodyCompleteMultipartUploadV0FilesFileIdCompletePost, | ||
ClientFile, | ||
ClientFileUploadSchema, | ||
) | ||
from osparc_client import FilesApi as _FilesApi | ||
from osparc_client import FileUploadCompletionBody, FileUploadLinks, UploadedPart | ||
from tqdm.asyncio import tqdm_asyncio | ||
|
||
from . import ApiClient, File | ||
from ._http_client import AsyncHttpClient | ||
from ._utils import _file_chunk_generator | ||
|
||
|
||
class FilesApi(_FilesApi): | ||
"""Class for interacting with files""" | ||
|
||
def __init__(self, api_client: Optional[ApiClient] = None): | ||
"""Construct object | ||
|
||
Args: | ||
api_client (ApiClient, optinal): osparc.ApiClient object | ||
""" | ||
super().__init__(api_client) | ||
self._super = super(FilesApi, self) | ||
user: Optional[str] = self.api_client.configuration.username | ||
passwd: Optional[str] = self.api_client.configuration.password | ||
self._auth: Optional[httpx.BasicAuth] = ( | ||
httpx.BasicAuth(username=user, password=passwd) | ||
if (user is not None and passwd is not None) | ||
else None | ||
) | ||
|
||
def download_file( | ||
self, file_id: str, *, destination_folder: Optional[Path] = None | ||
) -> str: | ||
if destination_folder is not None and not destination_folder.is_dir(): | ||
raise RuntimeError( | ||
f"destination_folder: {destination_folder} must be a directory" | ||
) | ||
downloaded_file: Path = Path(super().download_file(file_id)) | ||
if destination_folder is not None: | ||
dest_file: Path = destination_folder / downloaded_file.name | ||
while dest_file.is_file(): | ||
new_name = ( | ||
downloaded_file.stem | ||
+ "".join(random.choices(string.ascii_letters, k=8)) | ||
+ downloaded_file.suffix | ||
) | ||
dest_file = destination_folder / new_name | ||
shutil.move(downloaded_file, dest_file) | ||
downloaded_file = dest_file | ||
return str(downloaded_file.resolve()) | ||
|
||
def upload_file(self, file: Union[str, Path]): | ||
return asyncio.run(self.upload_file_async(file=file)) | ||
|
||
async def upload_file_async(self, file: Union[str, Path]) -> File: | ||
if isinstance(file, str): | ||
file = Path(file) | ||
if not file.is_file(): | ||
raise RuntimeError(f"{file} is not a file") | ||
client_file: ClientFile = ClientFile( | ||
filename=file.name, filesize=file.stat().st_size | ||
) | ||
client_upload_schema: ClientFileUploadSchema = self._super.get_upload_links( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. MINOR: In reality BTW, we try to follow a naming convention for request payloads and response bodies. We use the anem of the resource and then a verb at the end. For instance, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, concerning the schema thing I will address this among other small things I need to do on the API server: ITISFoundation/osparc-simcore#4641 |
||
client_file=client_file | ||
) | ||
chunk_size: int = client_upload_schema.upload_schema.chunk_size | ||
links: FileUploadLinks = client_upload_schema.upload_schema.links | ||
url_iter: Iterator[Tuple[int, str]] = enumerate( | ||
iter(client_upload_schema.upload_schema.urls), start=1 | ||
) | ||
if len(client_upload_schema.upload_schema.urls) < math.ceil( | ||
file.stat().st_size / chunk_size | ||
): | ||
raise RuntimeError( | ||
"Did not receive sufficient number of upload URLs from the server." | ||
) | ||
|
||
tasks: list = [] | ||
async with AsyncHttpClient( | ||
exc_req_typ="post", exc_url=links.abort_upload, exc_auth=self._auth | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you really like writing acronyms :-) ... do not be shy. You can write long names. what is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, sorry. This should already have been fixed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done now |
||
) as session: | ||
async for chunck, size in _file_chunk_generator(file, chunk_size): | ||
# following https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task | ||
index, url = next(url_iter) | ||
task = asyncio.create_task( | ||
self._upload_chunck( | ||
http_client=session, | ||
chunck=chunck, | ||
chunck_size=size, | ||
upload_link=url, | ||
index=index, | ||
) | ||
) | ||
tasks.append(task) | ||
|
||
uploaded_parts: List[UploadedPart] = await tqdm_asyncio.gather(*tasks) | ||
|
||
return await self._complete_multipart_upload( | ||
session, links.complete_upload, client_file, uploaded_parts | ||
) | ||
|
||
async def _complete_multipart_upload( | ||
self, | ||
http_client: AsyncClient, | ||
complete_link: str, | ||
client_file: ClientFile, | ||
uploaded_parts: List[UploadedPart], | ||
) -> File: | ||
complete_payload = BodyCompleteMultipartUploadV0FilesFileIdCompletePost( | ||
client_file=client_file, | ||
uploaded_parts=FileUploadCompletionBody(parts=uploaded_parts), | ||
) | ||
response: Response = await http_client.post( | ||
complete_link, | ||
json=complete_payload.to_dict(), | ||
auth=self._auth, | ||
) | ||
response.raise_for_status() | ||
payload: dict[str, Any] = response.json() | ||
return File(**payload) | ||
|
||
async def _upload_chunck( | ||
self, | ||
http_client: AsyncClient, | ||
chunck: bytes, | ||
chunck_size: int, | ||
upload_link: str, | ||
index: int, | ||
) -> UploadedPart: | ||
response: Response = await http_client.put( | ||
upload_link, content=chunck, headers={"Content-Length": f"{chunck_size}"} | ||
) | ||
response.raise_for_status() | ||
assert response.headers # nosec | ||
assert "Etag" in response.headers # nosec | ||
etag: str = json.loads(response.headers["Etag"]) | ||
return UploadedPart(number=index, e_tag=etag) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from typing import Optional | ||
|
||
import httpx | ||
import tenacity | ||
|
||
|
||
class AsyncHttpClient: | ||
"""Async http client context manager""" | ||
|
||
def __init__( | ||
self, | ||
exception_request_type: Optional[str] = None, | ||
exception_url: Optional[str] = None, | ||
exception_auth: Optional[httpx.BasicAuth] = None, | ||
): | ||
self._client = httpx.AsyncClient() | ||
self._exc_callback = ( | ||
getattr(self._client, exception_request_type) | ||
if exception_request_type | ||
else None | ||
) | ||
self._exc_url = exception_url | ||
self._exc_auth = exception_auth | ||
|
||
async def __aenter__(self) -> httpx.AsyncClient: | ||
return self._client | ||
|
||
async def __aexit__(self, exc_type, exc_value, traceback) -> None: | ||
if exc_value is None: | ||
await self._client.aclose() | ||
else: # exception raised: need to handle | ||
if self._exc_callback is not None: | ||
try: | ||
async for attempt in tenacity.AsyncRetrying( | ||
reraise=True, | ||
wait=tenacity.wait_fixed(1), | ||
stop=tenacity.stop_after_delay(10), | ||
retry=tenacity.retry_if_exception_type(httpx.RequestError), | ||
): | ||
with attempt: | ||
response = await self._exc_callback( | ||
self._exc_url, auth=self._exc_auth | ||
) | ||
response.raise_for_status() | ||
except Exception as err: | ||
await self._client.aclose() | ||
raise err from exc_value | ||
await self._client.aclose() | ||
raise exc_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not using https://docs.python.org/3/library/tempfile.html#tempfile.mkdtemp if `destination_folder is not given?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The downloaded file is already located in the temporary directory by the automatically generated client. So that shouldn't be necessary.