Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Expose multipart upload #63

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f95ac9e
add new openapi.json
bisgaard-itis Aug 16, 2023
1fb6277
start adding wrapper for files api
bisgaard-itis Aug 16, 2023
8508c0c
first implementation of async fileupload
bisgaard-itis Aug 16, 2023
f3c5791
use complete/upload link directly
bisgaard-itis Aug 16, 2023
c177c3f
add initial test for uploading 10gb file
bisgaard-itis Aug 17, 2023
d2b0376
minor correction
bisgaard-itis Aug 17, 2023
98e2e4c
update openapi.json as well as upload fcn and test
bisgaard-itis Aug 17, 2023
08acb53
factor out completion request
bisgaard-itis Aug 17, 2023
a9aeb28
cleanup
bisgaard-itis Aug 17, 2023
581297e
add final exception handling to http client
bisgaard-itis Aug 18, 2023
c2fb0ca
update tests
bisgaard-itis Aug 18, 2023
b2fe544
skip test if not right osparc version
bisgaard-itis Aug 18, 2023
fd42d3a
tuple -> Tuple in hints to make py 3.6 happy
bisgaard-itis Aug 18, 2023
b37a4ac
list[] -> List[] in hints
bisgaard-itis Aug 18, 2023
393a662
change initialization
bisgaard-itis Aug 18, 2023
a80a3ae
minor correction
bisgaard-itis Aug 18, 2023
fa6fbce
start using httpx in order to also have sync client
bisgaard-itis Aug 23, 2023
60bc124
make pagination iterator sync for now
bisgaard-itis Aug 23, 2023
90cff10
several small changes to iterator and test
bisgaard-itis Aug 23, 2023
1f4e918
remove aiohttp error handler
bisgaard-itis Aug 23, 2023
4ca5480
small bug fix
bisgaard-itis Aug 24, 2023
f092cce
implement destination_folder input in download_file
bisgaard-itis Aug 24, 2023
8aad099
several small changes according to PR feedback
bisgaard-itis Aug 24, 2023
d9bcb74
list_jobs -> jobs
bisgaard-itis Aug 25, 2023
19d02f0
merge master into expose-multipart-upload
bisgaard-itis Aug 25, 2023
7600bf4
add junit test suite name
bisgaard-itis Aug 25, 2023
d7f19eb
use junit prefix instead
bisgaard-itis Aug 25, 2023
dd17c44
fix addopts
bisgaard-itis Aug 25, 2023
237775a
fix prefix
bisgaard-itis Aug 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
711 changes: 537 additions & 174 deletions api/openapi.json

Large diffs are not rendered by default.

15 changes: 6 additions & 9 deletions clients/python/client/osparc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
from typing import Tuple

import nest_asyncio
from osparc_client import ( # APIs; API client; models
ApiClient,
ApiException,
Expand All @@ -13,7 +14,6 @@
Configuration,
ErrorGet,
File,
FilesApi,
Groups,
HTTPValidationError,
Job,
Expand All @@ -22,10 +22,6 @@
JobMetadataUpdate,
JobOutputs,
JobStatus,
LimitOffsetPageFile,
LimitOffsetPageJob,
LimitOffsetPageSolver,
LimitOffsetPageStudy,
Links,
Meta,
MetaApi,
Expand All @@ -49,13 +45,18 @@
__version__,
)

from ._files_api import FilesApi
from ._info import openapi
from ._solvers_api import SolversApi
from ._utils import PaginationGenerator

nest_asyncio.apply() # allow to run coroutines via asyncio.run(coro)

__all__: Tuple[str, ...] = (
# imports from osparc_client
"__version__",
"FilesApi",
"PaginationGenerator",
"MetaApi",
"SolversApi",
"UsersApi",
Expand Down Expand Up @@ -86,14 +87,10 @@
"OnePageSolverPort",
"StudyPort",
"Study",
"LimitOffsetPageStudy",
"LimitOffsetPageFile",
"JobMetadataUpdate",
"LimitOffsetPageJob",
"Links",
"SolverPort",
"JobMetadata",
"LimitOffsetPageSolver",
"ErrorGet",
"OnePageStudyPort",
# imports from osparc
Expand Down
129 changes: 129 additions & 0 deletions clients/python/client/osparc/_files_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import asyncio
import json
import math
from pathlib import Path
from typing import Any, Iterator, List, Optional, Tuple, Union

import aiohttp
from aiohttp import ClientSession
from osparc_client import (
BodyCompleteMultipartUploadV0FilesFileIdCompletePost,
ClientFile,
ClientFileUploadSchema,
)
from osparc_client import FilesApi as _FilesApi
from osparc_client import FileUploadCompletionBody, FileUploadLinks, UploadedPart
from tqdm.asyncio import tqdm_asyncio

from . import ApiClient, File
from ._http_client import HttpClient
from ._utils import _file_chunk_generator
from ._warnings_and_errors import aiohttp_error_handler_async


class FilesApi(_FilesApi):
"""Class for interacting with files"""

def __init__(self, api_client: Optional[ApiClient] = None):
"""Construct object

Args:
api_client (ApiClient, optinal): osparc.ApiClient object
"""
super().__init__(api_client)
self._super = super(FilesApi, self)
user: Optional[str] = self.api_client.configuration.username
passwd: Optional[str] = self.api_client.configuration.password
self._auth: Optional[aiohttp.BasicAuth] = (
aiohttp.BasicAuth(login=user, password=passwd)
if (user is not None and passwd is not None)
else None
)

def upload_file(self, file: Union[str, Path]):
return asyncio.run(self.upload_file_async(file=file))

@aiohttp_error_handler_async
async def upload_file_async(self, file: Union[str, Path]) -> File:
if isinstance(file, str):
file = Path(file)
if not file.is_file():
raise RuntimeError(f"{file} is not a file")
client_file: ClientFile = ClientFile(
filename=file.name, filesize=file.stat().st_size
)
client_upload_schema: ClientFileUploadSchema = self._super.get_upload_links(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MINOR: In reality client_upload_schema is NOT a schema but data. Note that pydantic class builds the schema but the instance is a data object. For that reason I try avoiding the word Schema in the classes. This should be modified in the server as well.

BTW, we try to follow a naming convention for request payloads and response bodies. We use the anem of the resource and then a verb at the end. For instance, ProjectGet would be the response body for a GET /project/{id} resource

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, concerning the schema thing I will address this among other small things I need to do on the API server: ITISFoundation/osparc-simcore#4641

client_file=client_file
)
chunk_size: int = client_upload_schema.upload_schema.chunk_size
links: FileUploadLinks = client_upload_schema.upload_schema.links
url_iter: Iterator[Tuple[int, str]] = enumerate(
iter(client_upload_schema.upload_schema.urls), start=1
)
if len(client_upload_schema.upload_schema.urls) < math.ceil(
file.stat().st_size / chunk_size
):
raise RuntimeError(
"Did not receive sufficient number of upload URLs from the server."
)

tasks: list = []
async with HttpClient(
exc_req_typ="post", exc_url=links.abort_upload, exc_auth=self._auth
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you really like writing acronyms :-) ... do not be shy. You can write long names. what is exc here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry. This should already have been fixed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done now

) as session:
async for chunck, size in _file_chunk_generator(file, chunk_size):
# following https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
index, url = next(url_iter)
task = asyncio.create_task(
self._upload_chunck(
http_client=session,
chunck=chunck,
chunck_size=size,
upload_link=url,
index=index,
)
)
tasks.append(task)

uploaded_parts: List[UploadedPart] = await tqdm_asyncio.gather(*tasks)

return await self._complete_multipart_upload(
session, links.complete_upload, client_file, uploaded_parts
)

async def _complete_multipart_upload(
self,
http_client: ClientSession,
complete_link: str,
client_file: ClientFile,
uploaded_parts: List[UploadedPart],
) -> File:
complete_payload = BodyCompleteMultipartUploadV0FilesFileIdCompletePost(
client_file=client_file,
uploaded_parts=FileUploadCompletionBody(parts=uploaded_parts),
)
async with http_client.post(
complete_link,
json=complete_payload.to_dict(),
auth=self._auth,
) as response:
response.raise_for_status()
payload: dict[str, Any] = await response.json()
return File(**payload)

async def _upload_chunck(
self,
http_client: ClientSession,
chunck: bytes,
chunck_size: int,
upload_link: str,
index: int,
) -> UploadedPart:
async with http_client.put(
upload_link, data=chunck, headers={"Content-Length": f"{chunck_size}"}
) as response:
response.raise_for_status()
assert response.headers # nosec
assert "Etag" in response.headers # nosec
etag: str = json.loads(response.headers["Etag"])
return UploadedPart(number=index, e_tag=etag)
47 changes: 47 additions & 0 deletions clients/python/client/osparc/_http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Optional

import aiohttp
import tenacity


class HttpClient:
"""Async http client context manager"""

def __init__(
self,
exc_req_typ: Optional[str] = None,
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
exc_url: Optional[str] = None,
exc_auth: Optional[aiohttp.BasicAuth] = None,
):
self._client = aiohttp.ClientSession()
self._exc_callback = getattr(self._client, exc_req_typ) if exc_req_typ else None
self._exc_url = exc_url
self._exc_auth = exc_auth

async def __aenter__(self) -> aiohttp.ClientSession:
return self._client

async def __aexit__(self, exc_type, exc_value, traceback) -> None:
if exc_value is None:
await self._client.close()
else: # exception raised: need to handle
if self._exc_callback is not None:
try:
async for attempt in tenacity.AsyncRetrying(
reraise=True,
wait=tenacity.wait_fixed(1),
stop=tenacity.stop_after_delay(10),
retry=tenacity.retry_if_exception_type(
aiohttp.ServerDisconnectedError
),
):
with attempt:
async with self._exc_callback(
self._exc_url, auth=self._exc_auth
) as response:
response.raise_for_status()
except Exception as err:
await self._client.close()
raise err from exc_value
await self._client.close()
raise exc_value
68 changes: 60 additions & 8 deletions clients/python/client/osparc/_utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
from typing import Callable, Generator, TypeVar, Union
import asyncio
from pathlib import Path
from typing import AsyncGenerator, Callable, Generator, Tuple, TypeVar, Union

from osparc_client import (
File,
Job,
LimitOffsetPageFile,
LimitOffsetPageJob,
LimitOffsetPageSolver,
LimitOffsetPageStudy,
PageFile,
PageJob,
PageSolver,
PageStudy,
Solver,
Study,
)

Page = Union[
LimitOffsetPageJob, LimitOffsetPageFile, LimitOffsetPageSolver, LimitOffsetPageStudy
]
Page = Union[PageJob, PageFile, PageSolver, PageStudy]
T = TypeVar("T", Job, File, Solver, Study)


Expand Down Expand Up @@ -59,3 +59,55 @@ def __iter__(self) -> Generator[T, None, None]:
self._offset += len(page.items)
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
if self._offset >= page.total:
break

async def __aiter__(self) -> AsyncGenerator[T, None]:
"""Returns an async generator

Returns:
AsyncGenerator[T, None]: The async generator
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
"""
if len(self) == 0:
return
while True:
page: Page = await _fcn_to_coro(
self._pagination_method, self._limit, self._offset
)
assert page.items is not None
assert isinstance(page.total, int)
for item in page.items:
yield item
self._offset += len(page.items)
if self._offset >= page.total:
break


async def _file_chunk_generator(
file: Path, chunk_size: int
) -> AsyncGenerator[Tuple[bytes, int], None]:
if not file.is_file():
raise RuntimeError(f"{file} must be a file")
if chunk_size <= 0:
raise RuntimeError(f"chunk_size={chunk_size} must be a positive int")
bytes_read: int = 0
file_size: int = file.stat().st_size
while bytes_read < file_size:
with open(file, "rb") as f:
f.seek(bytes_read)
nbytes = (
chunk_size
if (bytes_read + chunk_size <= file_size)
else (file_size - bytes_read)
)
assert nbytes > 0
chunk = await asyncio.get_event_loop().run_in_executor(None, f.read, nbytes)
yield chunk, nbytes
bytes_read += nbytes


S = TypeVar("S")


async def _fcn_to_coro(callback: Callable[..., S], *args) -> S:
"""Get a coroutine from a callback."""
result = await asyncio.get_event_loop().run_in_executor(None, callback, *args)
return result
20 changes: 20 additions & 0 deletions clients/python/client/osparc/_warnings_and_errors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
import aiohttp


class VisibleDeprecationWarning(UserWarning):
"""Visible deprecation warning.

Acknowledgement: Having this wrapper is borrowed from numpy
"""


class RequestError(Exception):
"""For exceptions encountered when performing HTTP requests."""


def aiohttp_error_handler_async(method):
"""Handle Aiohttp errors"""

async def wrapper(*args, **kwargs):
try:
return await method(*args, **kwargs)
except aiohttp.ClientResponseError as err:
msg = f"HTTP status: {err.code}. {err.message}"
raise RequestError(msg) from err

return wrapper
2 changes: 1 addition & 1 deletion clients/python/client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# prerequisite: setuptools
# http://pypi.python.org/pypi/setuptools

REQUIRES = [f"osparc_client=={VERSION}"]
REQUIRES = [f"osparc_client=={VERSION}", "aiohttp", "tqdm", "nest_asyncio"]

setup(
name=NAME,
Expand Down
19 changes: 19 additions & 0 deletions clients/python/test/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os

import osparc
import pytest


@pytest.fixture
def cfg() -> osparc.Configuration:
"""Configuration

Returns:
osparc.Configuration: The Configuration
"""
cfg = osparc.Configuration(
host=os.environ["OSPARC_API_HOST"],
username=os.environ["OSPARC_API_KEY"],
password=os.environ["OSPARC_API_SECRET"],
)
return cfg
Loading