-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Encode JSON responses on a thread in C, mk2 #10905
Changes from 3 commits
2310933
02a50ee
b0cee6d
d071f76
5d1f9cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Speed up responding with large JSON objects to requests. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,6 @@ | |
import urllib | ||
from http import HTTPStatus | ||
from inspect import isawaitable | ||
from io import BytesIO | ||
from typing import ( | ||
Any, | ||
Awaitable, | ||
|
@@ -37,15 +36,15 @@ | |
) | ||
|
||
import jinja2 | ||
from canonicaljson import iterencode_canonical_json | ||
from canonicaljson import encode_canonical_json | ||
from typing_extensions import Protocol | ||
from zope.interface import implementer | ||
|
||
from twisted.internet import defer, interfaces | ||
from twisted.python import failure | ||
from twisted.web import resource | ||
from twisted.web.server import NOT_DONE_YET, Request | ||
from twisted.web.static import File, NoRangeStaticProducer | ||
from twisted.web.static import File | ||
from twisted.web.util import redirectTo | ||
|
||
from synapse.api.errors import ( | ||
|
@@ -56,10 +55,11 @@ | |
UnrecognizedRequestError, | ||
) | ||
from synapse.http.site import SynapseRequest | ||
from synapse.logging.context import preserve_fn | ||
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background | ||
from synapse.logging.opentracing import trace_servlet | ||
from synapse.util import json_encoder | ||
from synapse.util.caches import intern_dict | ||
from synapse.util.iterutils import chunk_seq | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -620,12 +620,11 @@ def stopProducing(self) -> None: | |
self._request = None | ||
|
||
|
||
def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: | ||
def _encode_json_bytes(json_object: Any) -> bytes: | ||
""" | ||
Encode an object into JSON. Returns an iterator of bytes. | ||
""" | ||
for chunk in json_encoder.iterencode(json_object): | ||
yield chunk.encode("utf-8") | ||
return json_encoder.encode(json_object).encode("utf-8") | ||
|
||
|
||
def respond_with_json( | ||
|
@@ -659,7 +658,7 @@ def respond_with_json( | |
return None | ||
|
||
if canonical_json: | ||
encoder = iterencode_canonical_json | ||
encoder = encode_canonical_json | ||
else: | ||
encoder = _encode_json_bytes | ||
|
||
|
@@ -670,7 +669,9 @@ def respond_with_json( | |
if send_cors: | ||
set_cors_headers(request) | ||
|
||
_ByteProducer(request, encoder(json_object)) | ||
run_in_background( | ||
_async_write_json_to_request_in_thread, request, encoder, json_object | ||
) | ||
return NOT_DONE_YET | ||
|
||
|
||
|
@@ -706,15 +707,35 @@ def respond_with_json_bytes( | |
if send_cors: | ||
set_cors_headers(request) | ||
|
||
# note that this is zero-copy (the bytesio shares a copy-on-write buffer with | ||
# the original `bytes`). | ||
bytes_io = BytesIO(json_bytes) | ||
|
||
producer = NoRangeStaticProducer(request, bytes_io) | ||
producer.start() | ||
_write_json_bytes_to_request(request, json_bytes) | ||
return NOT_DONE_YET | ||
|
||
|
||
def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From #10844 (comment)
It can be anything that slices to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm mostly asking: why don't we just call it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ooooooooooooooooooooooooooooooooooooooooooooh. Yup, happy to rename. |
||
"""Writes the JSON bytes to the request using an appropriate producer. | ||
|
||
Note: This should be used instead of `Request.write` to correctly handle | ||
large response bodies. | ||
""" | ||
|
||
# The problem with dumping all of the json response into the `Request` | ||
# object at once (via `Request.write`) is that doing so starts the timeout | ||
# for the next request to be received: so if it takes longer than 60s to | ||
# stream back the response to the client, the client never gets it. | ||
# | ||
# The correct solution is to use a Producer; then the timeout is only | ||
# started once all of the content is sent over the TCP connection. | ||
|
||
# To make sure we don't write the whole of the json at once we split it up | ||
# into chunks. | ||
chunk_size = 4096 | ||
bytes_generator = chunk_seq(json_bytes, chunk_size) | ||
|
||
# We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the | ||
# unit tests can't cope with being given a pull producer. | ||
_ByteProducer(request, bytes_generator) | ||
|
||
|
||
def set_cors_headers(request: Request): | ||
"""Set the CORS headers so that javascript running in a web browsers can | ||
use this API | ||
|
@@ -809,3 +830,24 @@ def finish_request(request: Request): | |
request.finish() | ||
except RuntimeError as e: | ||
logger.info("Connection disconnected before response was written: %r", e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From #10844 (comment)
Probably not, but this is what it was before. |
||
|
||
|
||
async def _async_write_json_to_request_in_thread( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you move this further up the file (above |
||
request: SynapseRequest, | ||
json_encoder: Callable[[Any], bytes], | ||
json_object: Any, | ||
): | ||
"""Encodes the given JSON object on a thread and then writes it to the | ||
request. | ||
|
||
This is done so that encoding large JSON objects doesn't block the reactor | ||
thread. | ||
|
||
Note: We don't use JsonEncoder.iterencode here as that falls back to the | ||
Python implementation (rather than the C backend), which is *much* more | ||
expensive. | ||
""" | ||
|
||
json_str = await defer_to_thread(request.reactor, json_encoder, json_object) | ||
|
||
_write_json_bytes_to_request(request, json_str) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From #10844 (comment)
True, I can leave as is, but it just felt cleaner to have both
respond_with_json
andrespond_with_json_bytes
use the same producer. Leaving as is does mean a) returning large responses can lead to timeouts, and b) anything that writes responses via this method in the replication unit tests will fail (due toRequest
converting pull producers to push producers via the default reactor, which is never run. I am sorely tempted to ban all pull producers because I lost like half a day due to that, but I digress).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍