Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark endpoints for streaming uploads #1371

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions connexion/apis/aiohttp_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,16 @@ def _add_operation_internal(self, method, path, operation):
)

@classmethod
async def get_request(cls, req):
async def get_request(cls, stream_upload, args, params):
"""Convert aiohttp request to connexion

:param req: instance of aiohttp.web.Request
:param stream_upload: flag indicating that the upload should be streamed)
:param args: one-element tuple containing an instance of aiohttp.web.Request
:param params: unused
:return: connexion request instance
:rtype: ConnexionRequest
"""
req = args[0]
url = str(req.url)
logger.debug('Getting data and status code',
extra={'has_body': req.has_body, 'url': url})
Expand Down
6 changes: 3 additions & 3 deletions connexion/apis/flask_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ def _serialize_data(cls, data, mimetype):
return body, mimetype

@classmethod
def get_request(cls, *args, **params):
# type: (*Any, **Any) -> ConnexionRequest
def get_request(cls, stream_upload, args, params):
# type: (bool, tuple, dict) -> ConnexionRequest
"""Gets ConnexionRequest instance for the operation handler
result. Status Code and Headers for response. If only body
data is returned by the endpoint function, then the status
Expand All @@ -232,7 +232,7 @@ def get_request(cls, *args, **params):
headers=flask_request.headers,
form=flask_request.form,
query=flask_request.args,
body=flask_request.get_data(),
body=flask_request.stream if stream_upload else flask_request.get_data(),
json_getter=lambda: flask_request.get_json(silent=True),
files=flask_request.files,
path_params=params,
Expand Down
6 changes: 4 additions & 2 deletions connexion/decorators/coroutine_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import functools


def get_request_life_cycle_wrapper(function, api, mimetype):
def get_request_life_cycle_wrapper(function, api, stream_upload, mimetype):
"""
It is a wrapper used on `RequestResponseDecorator` class.
This function is located in an extra module because python2.7 don't
Expand All @@ -14,7 +14,9 @@ def get_request_life_cycle_wrapper(function, api, mimetype):
"""
@functools.wraps(function)
def wrapper(*args, **kwargs):
connexion_request = api.get_request(*args, **kwargs)
# Pass args and kwargs as a tuple/dict respectively so they don't
# interfere with the other parameters.
connexion_request = api.get_request(stream_upload, args, kwargs)
while asyncio.iscoroutine(connexion_request):
connexion_request = yield from connexion_request

Expand Down
9 changes: 6 additions & 3 deletions connexion/decorators/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ class RequestResponseDecorator(BaseDecorator):
framework specific object.
"""

def __init__(self, api, mimetype):
def __init__(self, api, stream_upload, mimetype):
self.api = api
self.stream_upload = stream_upload
self.mimetype = mimetype

def __call__(self, function):
Expand All @@ -39,12 +40,14 @@ def __call__(self, function):
"""
if has_coroutine(function, self.api):
from .coroutine_wrappers import get_request_life_cycle_wrapper
wrapper = get_request_life_cycle_wrapper(function, self.api, self.mimetype)
wrapper = get_request_life_cycle_wrapper(function, self.api, self.stream_upload, self.mimetype)

else: # pragma: 3 no cover
@functools.wraps(function)
def wrapper(*args, **kwargs):
request = self.api.get_request(*args, **kwargs)
# Pass args and kwargs as a tuple/dict respectively so they don't
# interfere with the other parameters.
request = self.api.get_request(self.stream_upload, args, kwargs)
response = function(request)
return self.api.get_response(response, self.mimetype, request)

Expand Down
6 changes: 6 additions & 0 deletions connexion/operations/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ def consumes(self):
def produces(self):
return self._produces

def _get_stream_upload_internal(self):
if self.request_body:
return self.request_body.get('x-stream-upload', False)
else:
return False

def with_definitions(self, schema):
if self.components:
schema['schema']['components'] = self.components
Expand Down
26 changes: 25 additions & 1 deletion connexion/operations/secure.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import functools
import logging
import distutils

from ..decorators.decorator import RequestResponseDecorator

Expand Down Expand Up @@ -148,6 +149,29 @@ def security_decorator(self):
def get_mimetype(self):
return DEFAULT_MIMETYPE

def get_stream_upload(self):
"""
Indicates whether this operation should allow streaming uploads
to disk.
"""
stream_upload = self._get_stream_upload_internal()
if isinstance(stream_upload, bool):
return stream_upload
else:
try:
str_stream_upload = str(stream_upload).lower()
return distutils.util.strtobool(str_stream_upload)
except ValueError:
return False

def _get_stream_upload_internal(self):
"""
Subclasses can override this function to indicate whether this
operation should allow streaming uploads to disk. The value
returned from this function will be converted to a boolean.
"""
return False

@property
def _request_response_decorator(self):
"""
Expand All @@ -157,4 +181,4 @@ def _request_response_decorator(self):
object is returned.
:rtype: types.FunctionType
"""
return RequestResponseDecorator(self.api, self.get_mimetype())
return RequestResponseDecorator(self.api, self.get_stream_upload(), self.get_mimetype())
3 changes: 3 additions & 0 deletions connexion/operations/swagger2.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ def consumes(self):
def produces(self):
return self._produces

def _get_stream_upload_internal(self):
return self.body_definition.get('x-stream-upload', False)

def get_path_parameter_types(self):
types = {}
path_parameters = (p for p in self.parameters if p["in"] == "path")
Expand Down
68 changes: 68 additions & 0 deletions docs/request.rst
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,71 @@ change the validation, you can override the defaults with:
app.add_api('api.yaml', ..., validator_map=validator_map)

See custom validator example in ``examples/enforcedefaults``.

Upload Streaming
----------------

Connexion typically reads an entire request into memory so that it can be validated.
Applications that upload large files, however, may want to stream them to disk
instead. This can be configured using the ``x-stream-upload`` vendor extension.

If you are using the OpenAPI 3.0 specification, ``x-stream-upload`` should be added
to the ``requestBody``. The ``content`` type should be a binary type such as
``application/octet-stream`` (``application/json`` will not work).
The ``schema`` type should be ``string``, and the schema ``format`` should be
``binary``:

.. code-block:: yaml

/streaming_upload_endpoint:
post:
operationId: api.streaming_upload
requestBody:
x-stream-upload: true
content:
application/octet-stream:
schema:
type: string
format: binary

For the OpenAPI 2.0 specification, ``x-stream-upload`` should be included in the
``body`` parameter:

.. code-block:: yaml

/streaming_upload_endpoint:
post:
operationId: api.streaming_upload
consumes:
- application/octet-stream
produces:
- application/json
parameters:
- name: body
in: body
required: true
x-stream-upload: true
schema:
type: string

In your application, the ``body`` parameter will be set to `flask.request.stream`_,
which you can use to write the data to disk in chunks:

.. code-block:: python

# api.py file
def streaming_upload(body):
chunk_size = 4096
with open('output_file', 'wb') as output_file:
while True:
chunk = body.read(chunk_size)
if len(chunk) == 0:
break
output_file.write(chunk)

.. warning:: No validation is performed on streamed requests. The application
should perform its own validation.

.. note:: Upload streaming is only supported for Flask.

.. _flask.request.stream: https://flask.palletsprojects.com/en/2.0.x/api/#flask.Request.stream
33 changes: 33 additions & 0 deletions tests/api/test_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,36 @@ def test_get_bad_default_response(simple_app):

resp = app_client.get('/v1.0/get_bad_default_response/202')
assert resp.status_code == 500


def test_streaming_upload_response(simple_app):
app_client = simple_app.app.test_client()
resp = app_client.post(
'/v1.0/test_streaming_upload',
data=json.dumps({}),
headers={'Content-Type': 'application/octet-stream'})
assert resp.status_code == 200
data = json.loads(resp.data.decode('utf-8'))
assert data['streaming']


def test_non_streaming_upload_response(simple_app):
app_client = simple_app.app.test_client()
resp = app_client.post(
'/v1.0/test_non_streaming_upload',
data=json.dumps({}),
headers={'Content-Type': 'application/octet-stream'})
assert resp.status_code == 200
data = json.loads(resp.data.decode('utf-8'))
assert not data['streaming']


def test_invalid_streaming_upload_response(simple_app):
app_client = simple_app.app.test_client()
resp = app_client.post(
'/v1.0/test_invalid_streaming_upload',
data=json.dumps({}),
headers={'Content-Type': 'application/octet-stream'})
assert resp.status_code == 200
data = json.loads(resp.data.decode('utf-8'))
assert not data['streaming']
17 changes: 17 additions & 0 deletions tests/fakeapi/hello/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,20 @@ def get_date():

def get_uuid():
return {'value': uuid.UUID(hex='e7ff66d0-3ec2-4c4e-bed0-6e4723c24c51')}


def check_if_streaming(body):
streaming = body is request.stream
return {'streaming': streaming}, 200


def test_streaming_upload(body=None):
return check_if_streaming(body)


def test_non_streaming_upload(body=None):
return check_if_streaming(body)


def test_invalid_streaming_upload(body=None):
return check_if_streaming(body)
55 changes: 55 additions & 0 deletions tests/fixtures/simple/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,61 @@ paths:
application/json:
schema:
type: object
/test_streaming_upload:
post:
operationId: fakeapi.hello.test_streaming_upload
requestBody:
x-stream-upload: true
content:
application/octet-stream:
schema:
type: string
format: binary
responses:
'200':
description: >
JSON indicating whether the request body was streamed.
content:
application/json:
schema:
type: object
/test_non_streaming_upload:
post:
operationId: fakeapi.hello.test_non_streaming_upload
requestBody:
x-stream-upload: false
content:
application/octet-stream:
schema:
type: string
format: binary
responses:
'200':
description: >
JSON indicating whether the request body was streamed.
content:
application/json:
schema:
type: object

/test_invalid_streaming_upload:
post:
operationId: fakeapi.hello.test_invalid_streaming_upload
requestBody:
x-stream-upload: abc
content:
application/octet-stream:
schema:
type: string
format: binary
responses:
'200':
description: >
JSON indicating whether the request body was streamed.
content:
application/json:
schema:
type: object

servers:
- url: http://localhost:{port}/{basePath}
Expand Down
Loading