Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Expose multipart upload #63

Merged
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f95ac9e
add new openapi.json
bisgaard-itis Aug 16, 2023
1fb6277
start adding wrapper for files api
bisgaard-itis Aug 16, 2023
8508c0c
first implementation of async fileupload
bisgaard-itis Aug 16, 2023
f3c5791
use complete/upload link directly
bisgaard-itis Aug 16, 2023
c177c3f
add initial test for uploading 10gb file
bisgaard-itis Aug 17, 2023
d2b0376
minor correction
bisgaard-itis Aug 17, 2023
98e2e4c
update openapi.json as well as upload fcn and test
bisgaard-itis Aug 17, 2023
08acb53
factor out completion request
bisgaard-itis Aug 17, 2023
a9aeb28
cleanup
bisgaard-itis Aug 17, 2023
581297e
add final exception handling to http client
bisgaard-itis Aug 18, 2023
c2fb0ca
update tests
bisgaard-itis Aug 18, 2023
b2fe544
skip test if not right osparc version
bisgaard-itis Aug 18, 2023
fd42d3a
tuple -> Tuple in hints to make py 3.6 happy
bisgaard-itis Aug 18, 2023
b37a4ac
list[] -> List[] in hints
bisgaard-itis Aug 18, 2023
393a662
change initialization
bisgaard-itis Aug 18, 2023
a80a3ae
minor correction
bisgaard-itis Aug 18, 2023
fa6fbce
start using httpx in order to also have sync client
bisgaard-itis Aug 23, 2023
60bc124
make pagination iterator sync for now
bisgaard-itis Aug 23, 2023
90cff10
several small changes to iterator and test
bisgaard-itis Aug 23, 2023
1f4e918
remove aiohttp error handler
bisgaard-itis Aug 23, 2023
4ca5480
small bug fix
bisgaard-itis Aug 24, 2023
f092cce
implement destination_folder input in download_file
bisgaard-itis Aug 24, 2023
8aad099
several small changes according to PR feedback
bisgaard-itis Aug 24, 2023
d9bcb74
list_jobs -> jobs
bisgaard-itis Aug 25, 2023
19d02f0
merge master into expose-multipart-upload
bisgaard-itis Aug 25, 2023
7600bf4
add junit test suite name
bisgaard-itis Aug 25, 2023
d7f19eb
use junit prefix instead
bisgaard-itis Aug 25, 2023
dd17c44
fix addopts
bisgaard-itis Aug 25, 2023
237775a
fix prefix
bisgaard-itis Aug 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
711 changes: 537 additions & 174 deletions api/openapi.json

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions clients/python/client/osparc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
from typing import Tuple

import nest_asyncio
from osparc_client import ( # APIs; API client; models
ApiClient,
ApiException,
Expand All @@ -13,7 +14,6 @@
Configuration,
ErrorGet,
File,
FilesApi,
Groups,
HTTPValidationError,
Job,
Expand All @@ -22,10 +22,6 @@
JobMetadataUpdate,
JobOutputs,
JobStatus,
LimitOffsetPageFile,
LimitOffsetPageJob,
LimitOffsetPageSolver,
LimitOffsetPageStudy,
Links,
Meta,
MetaApi,
Expand All @@ -49,13 +45,18 @@
__version__,
)

from ._files_api import FilesApi
from ._info import openapi
from ._solvers_api import SolversApi
from ._utils import PaginationGenerator

nest_asyncio.apply() # allow to run coroutines via asyncio.run(coro)

__all__: Tuple[str, ...] = (
# imports from osparc_client
"__version__",
"FilesApi",
"PaginationGenerator",
"MetaApi",
"SolversApi",
"UsersApi",
Expand Down Expand Up @@ -86,14 +87,10 @@
"OnePageSolverPort",
"StudyPort",
"Study",
"LimitOffsetPageStudy",
"LimitOffsetPageFile",
"JobMetadataUpdate",
"LimitOffsetPageJob",
"Links",
"SolverPort",
"JobMetadata",
"LimitOffsetPageSolver",
"ErrorGet",
"OnePageStudyPort",
# imports from osparc
Expand Down
151 changes: 151 additions & 0 deletions clients/python/client/osparc/_files_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import asyncio
import json
import math
import random
import shutil
import string
from pathlib import Path
from typing import Any, Iterator, List, Optional, Tuple, Union

import httpx
from httpx import AsyncClient, Response
from osparc_client import (
BodyCompleteMultipartUploadV0FilesFileIdCompletePost,
ClientFile,
ClientFileUploadSchema,
)
from osparc_client import FilesApi as _FilesApi
from osparc_client import FileUploadCompletionBody, FileUploadLinks, UploadedPart
from tqdm.asyncio import tqdm_asyncio

from . import ApiClient, File
from ._http_client import AsyncHttpClient
from ._utils import _file_chunk_generator


class FilesApi(_FilesApi):
"""Class for interacting with files"""

def __init__(self, api_client: Optional[ApiClient] = None):
"""Construct object

Args:
api_client (ApiClient, optinal): osparc.ApiClient object
"""
super().__init__(api_client)
self._super = super(FilesApi, self)
user: Optional[str] = self.api_client.configuration.username
passwd: Optional[str] = self.api_client.configuration.password
self._auth: Optional[httpx.BasicAuth] = (
httpx.BasicAuth(username=user, password=passwd)
if (user is not None and passwd is not None)
else None
)

def download_file(
self, file_id: str, *, destination_folder: Optional[Path] = None
) -> str:
if destination_folder is not None and not destination_folder.is_dir():
raise RuntimeError(
f"destination_folder: {destination_folder} must be a directory"
)
downloaded_file: Path = Path(super().download_file(file_id))
if destination_folder is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not using https://docs.python.org/3/library/tempfile.html#tempfile.mkdtemp if `destination_folder is not given?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downloaded file is already located in the temporary directory by the automatically generated client. So that shouldn't be necessary.

dest_file: Path = destination_folder / downloaded_file.name
while dest_file.is_file():
new_name = (
downloaded_file.stem
+ "".join(random.choices(string.ascii_letters, k=8))
+ downloaded_file.suffix
)
dest_file = destination_folder / new_name
shutil.move(downloaded_file, dest_file)
downloaded_file = dest_file
return str(downloaded_file.resolve())

def upload_file(self, file: Union[str, Path]):
return asyncio.run(self.upload_file_async(file=file))

async def upload_file_async(self, file: Union[str, Path]) -> File:
if isinstance(file, str):
file = Path(file)
if not file.is_file():
raise RuntimeError(f"{file} is not a file")
client_file: ClientFile = ClientFile(
filename=file.name, filesize=file.stat().st_size
)
client_upload_schema: ClientFileUploadSchema = self._super.get_upload_links(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR: In reality client_upload_schema is NOT a schema but data. Note that pydantic class builds the schema but the instance is a data object. For that reason I try avoiding the word Schema in the classes. This should be modified in the server as well.

BTW, we try to follow a naming convention for request payloads and response bodies. We use the anem of the resource and then a verb at the end. For instance, ProjectGet would be the response body for a GET /project/{id} resource

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, concerning the schema thing I will address this among other small things I need to do on the API server: ITISFoundation/osparc-simcore#4641

client_file=client_file
)
chunk_size: int = client_upload_schema.upload_schema.chunk_size
links: FileUploadLinks = client_upload_schema.upload_schema.links
url_iter: Iterator[Tuple[int, str]] = enumerate(
iter(client_upload_schema.upload_schema.urls), start=1
)
if len(client_upload_schema.upload_schema.urls) < math.ceil(
file.stat().st_size / chunk_size
):
raise RuntimeError(
"Did not receive sufficient number of upload URLs from the server."
)

tasks: list = []
async with AsyncHttpClient(
exc_req_typ="post", exc_url=links.abort_upload, exc_auth=self._auth
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you really like writing acronyms :-) ... do not be shy. You can write long names. what is exc here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry. This should already have been fixed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now

) as session:
async for chunck, size in _file_chunk_generator(file, chunk_size):
# following https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
index, url = next(url_iter)
task = asyncio.create_task(
self._upload_chunck(
http_client=session,
chunck=chunck,
chunck_size=size,
upload_link=url,
index=index,
)
)
tasks.append(task)

uploaded_parts: List[UploadedPart] = await tqdm_asyncio.gather(*tasks)

return await self._complete_multipart_upload(
session, links.complete_upload, client_file, uploaded_parts
)

async def _complete_multipart_upload(
self,
http_client: AsyncClient,
complete_link: str,
client_file: ClientFile,
uploaded_parts: List[UploadedPart],
) -> File:
complete_payload = BodyCompleteMultipartUploadV0FilesFileIdCompletePost(
client_file=client_file,
uploaded_parts=FileUploadCompletionBody(parts=uploaded_parts),
)
response: Response = await http_client.post(
complete_link,
json=complete_payload.to_dict(),
auth=self._auth,
)
response.raise_for_status()
payload: dict[str, Any] = response.json()
return File(**payload)

async def _upload_chunck(
self,
http_client: AsyncClient,
chunck: bytes,
chunck_size: int,
upload_link: str,
index: int,
) -> UploadedPart:
response: Response = await http_client.put(
upload_link, content=chunck, headers={"Content-Length": f"{chunck_size}"}
)
response.raise_for_status()
assert response.headers # nosec
assert "Etag" in response.headers # nosec
etag: str = json.loads(response.headers["Etag"])
return UploadedPart(number=index, e_tag=etag)
49 changes: 49 additions & 0 deletions clients/python/client/osparc/_http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Optional

import httpx
import tenacity


class AsyncHttpClient:
"""Async http client context manager"""

def __init__(
self,
exception_request_type: Optional[str] = None,
exception_url: Optional[str] = None,
exception_auth: Optional[httpx.BasicAuth] = None,
):
self._client = httpx.AsyncClient()
self._exc_callback = (
getattr(self._client, exception_request_type)
if exception_request_type
else None
)
self._exc_url = exception_url
self._exc_auth = exception_auth

async def __aenter__(self) -> httpx.AsyncClient:
return self._client

async def __aexit__(self, exc_type, exc_value, traceback) -> None:
if exc_value is None:
await self._client.aclose()
else: # exception raised: need to handle
if self._exc_callback is not None:
try:
async for attempt in tenacity.AsyncRetrying(
reraise=True,
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_delay(10),
retry=tenacity.retry_if_exception_type(httpx.RequestError),
):
with attempt:
response = await self._exc_callback(
self._exc_url, auth=self._exc_auth
)
response.raise_for_status()
except Exception as err:
await self._client.aclose()
raise err from exc_value
await self._client.aclose()
raise exc_value
22 changes: 16 additions & 6 deletions clients/python/client/osparc/_solvers_api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Optional

import httpx
from osparc_client import SolversApi as _SolversApi

from . import ApiClient
Expand All @@ -16,14 +17,19 @@ def __init__(self, api_client: Optional[ApiClient] = None):
api_client (ApiClient, optinal): osparc.ApiClient object
"""
super().__init__(api_client)
user: Optional[str] = self.api_client.configuration.username
passwd: Optional[str] = self.api_client.configuration.password
self._auth: Optional[httpx.BasicAuth] = (
httpx.BasicAuth(username=user, password=passwd)
if (user is not None and passwd is not None)
else None
)

def get_jobs_page(self, solver_key: str, version: str) -> None:
"""Method only for internal use"""
raise NotImplementedError("This method is only for internal use")

def get_jobs(
self, solver_key: str, version: str, limit: int = 20, offset: int = 0
) -> PaginationGenerator:
def list_jobs(self, solver_key: str, version: str) -> PaginationGenerator:
"""Returns an iterator through which one can iterate over
all Jobs submitted to the solver

Expand All @@ -39,9 +45,13 @@ def get_jobs(
(its "length")
"""

def pagination_method(limit, offset):
def pagination_method():
return super(SolversApi, self).get_jobs_page(
solver_key=solver_key, version=version, limit=limit, offset=offset
solver_key=solver_key, version=version, limit=20, offset=0
)

return PaginationGenerator(pagination_method, limit, offset)
return PaginationGenerator(
pagination_method,
base_url=self.api_client.configuration.host,
auth=self._auth,
)
Loading