Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Encode JSON responses on a thread in C, mk2 #10905

Merged
merged 5 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10905.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up responding with large JSON objects to requests.
72 changes: 57 additions & 15 deletions synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import urllib
from http import HTTPStatus
from inspect import isawaitable
from io import BytesIO
from typing import (
Any,
Awaitable,
Expand All @@ -37,15 +36,15 @@
)

import jinja2
from canonicaljson import iterencode_canonical_json
from canonicaljson import encode_canonical_json
from typing_extensions import Protocol
from zope.interface import implementer

from twisted.internet import defer, interfaces
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
from twisted.web.static import File, NoRangeStaticProducer
from twisted.web.static import File
from twisted.web.util import redirectTo

from synapse.api.errors import (
Expand All @@ -56,10 +55,11 @@
UnrecognizedRequestError,
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import preserve_fn
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -620,12 +620,11 @@ def stopProducing(self) -> None:
self._request = None


def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
def _encode_json_bytes(json_object: Any) -> bytes:
"""
Encode an object into JSON. Returns an iterator of bytes.
"""
for chunk in json_encoder.iterencode(json_object):
yield chunk.encode("utf-8")
return json_encoder.encode(json_object).encode("utf-8")


def respond_with_json(
Expand Down Expand Up @@ -659,7 +658,7 @@ def respond_with_json(
return None

if canonical_json:
encoder = iterencode_canonical_json
encoder = encode_canonical_json
else:
encoder = _encode_json_bytes

Expand All @@ -670,7 +669,9 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)

_ByteProducer(request, encoder(json_object))
run_in_background(
_async_write_json_to_request_in_thread, request, encoder, json_object
)
return NOT_DONE_YET


Expand Down Expand Up @@ -706,15 +707,35 @@ def respond_with_json_bytes(
if send_cors:
set_cors_headers(request)

# note that this is zero-copy (the bytesio shares a copy-on-write buffer with
# the original `bytes`).
bytes_io = BytesIO(json_bytes)

producer = NoRangeStaticProducer(request, bytes_io)
producer.start()
_write_json_bytes_to_request(request, json_bytes)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From #10844 (comment)

why is this changing in this PR? It seems unrelated to how we encode JSON responses?

True, I can leave as is, but it just felt cleaner to have both respond_with_json and respond_with_json_bytes use the same producer. Leaving as is does mean a) returning large responses can lead to timeouts, and b) anything that writes responses via this method in the replication unit tests will fail (due to Request converting pull producers to push producers via the default reactor, which is never run. I am sorely tempted to ban all pull producers because I lost like half a day due to that, but I digress).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return NOT_DONE_YET


def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From #10844 (comment)

is this specific to being json bytes, or could it be used for any byte sequence?

It can be anything that slices to bytes, but I don't see any in built typing for that (ByteString seems to just be an alias for Sequence[int]]...). I can add a protocol, or do Union[bytes, bytearray]? I'm not sure its worth its worth it though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm mostly asking: why don't we just call it write_bytes_to_request (and also s/json_bytes/bytes_to_write/ or something)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooooooooooooooooooooooooooooooooooooooooooooh.

Yup, happy to rename.

"""Writes the JSON bytes to the request using an appropriate producer.

Note: This should be used instead of `Request.write` to correctly handle
large response bodies.
"""

# The problem with dumping all of the json response into the `Request`
# object at once (via `Request.write`) is that doing so starts the timeout
# for the next request to be received: so if it takes longer than 60s to
# stream back the response to the client, the client never gets it.
#
# The correct solution is to use a Producer; then the timeout is only
# started once all of the content is sent over the TCP connection.

# To make sure we don't write the whole of the json at once we split it up
# into chunks.
chunk_size = 4096
bytes_generator = chunk_seq(json_bytes, chunk_size)

# We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
# unit tests can't cope with being given a pull producer.
_ByteProducer(request, bytes_generator)


def set_cors_headers(request: Request):
"""Set the CORS headers so that javascript running in a web browsers can
use this API
Expand Down Expand Up @@ -809,3 +830,24 @@ def finish_request(request: Request):
request.finish()
except RuntimeError as e:
logger.info("Connection disconnected before response was written: %r", e)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From #10844 (comment)

is this really the only possible reason that a RuntimeError can be raised?

Probably not, but this is what it was before.



async def _async_write_json_to_request_in_thread(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this further up the file (above _write_json_bytes_to_request), so that the flow jumps up and down less?

request: SynapseRequest,
json_encoder: Callable[[Any], bytes],
json_object: Any,
):
"""Encodes the given JSON object on a thread and then writes it to the
request.

This is done so that encoding large JSON objects doesn't block the reactor
thread.

Note: We don't use JsonEncoder.iterencode here as that falls back to the
Python implementation (rather than the C backend), which is *much* more
expensive.
"""

json_str = await defer_to_thread(request.reactor, json_encoder, json_object)

_write_json_bytes_to_request(request, json_str)
2 changes: 1 addition & 1 deletion synapse/push/emailpusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async def _unsafe_process(self) -> None:

should_notify_at = max(notif_ready_at, room_ready_at)

if should_notify_at < self.clock.time_msec():
if should_notify_at <= self.clock.time_msec():
# one of our notifications is ready for sending, so we send
# *one* email updating the user on their notifications,
# we then consider all previously outstanding notifications
Expand Down
19 changes: 17 additions & 2 deletions synapse/util/iterutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,28 @@
Iterable,
Iterator,
Mapping,
Sequence,
Set,
Sized,
Tuple,
TypeVar,
)

from typing_extensions import Protocol

T = TypeVar("T")
S = TypeVar("S", bound="_SelfSlice")


class _SelfSlice(Sized, Protocol):
"""A helper protocol that matches types where taking a slice results in the
same type being returned.

This is more specific than `Sequence`, which allows another `Sequence` to be
returned.
"""

def __getitem__(self: S, i: slice) -> S:
...


def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
Expand All @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
return iter(lambda: tuple(islice(sourceiter, size)), ())


def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]:
def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
"""Split the given sequence into chunks of the given size

The last chunk may be shorter than the given size.
Expand Down