From 7351591d83f760d592e282236394a94c8b1a56e5 Mon Sep 17 00:00:00 2001 From: "patchback[bot]" <45432694+patchback[bot]@users.noreply.github.com> Date: Tue, 7 Nov 2023 17:58:05 +0000 Subject: [PATCH] [PR #7797/27c308b1 backport][3.9] Fix increase in latency with small messages from websocket compression changes (#7799) **This is a backport of PR #7797 as merged into master (27c308b177a8421e2ce093505e71d74a2082f374).** Changes the threshold that is required to compress in the executor for websocket messages to 5KiB https://github.com/aio-libs/aiohttp/pull/7223 changed the websocket implementation to compress messages > 1KiB in the executor. The threshold was a bit low which caused an increase in latency compressing messages as the overhead to use the executor can exceed the cost to compress tiny messages. When testing 3.9.0 with Home Assistant, we saw a 3 order of magnitude increase in executor usage which resulted in an overall increase in cpu time since all the tiny messages were being compressed in the executor. I could not find the motivation for choosing 1KiB in the original PR - [x] I think the code is well written - [ ] Unit tests for the changes exist - [ ] Documentation reflects the changes - [ ] If you provide code modification, please add yourself to `CONTRIBUTORS.txt` * The format is <Name> <Surname>. * Please keep alphabetical order, the file is sorted by names. - [ ] Add a new news fragment into the `CHANGES` folder * name it `.` for example (588.bugfix) * if you don't have an `issue_id` change it to the pr id after creating the pr * ensure type is one of the following: * `.feature`: Signifying a new feature. * `.bugfix`: Signifying a bug fix. * `.doc`: Signifying a documentation improvement. * `.removal`: Signifying a deprecation or removal of public API. * `.misc`: A ticket has been closed, but it is not of interest to users. * Make sure to use full sentences with correct case and punctuation, for example: "Fix issue with non-ascii contents in doctest text files." Co-authored-by: J. Nick Koston --- CHANGES/7797.bugfix | 1 + aiohttp/http_websocket.py | 39 +++++++++++++++++++++------------------ 2 files changed, 22 insertions(+), 18 deletions(-) create mode 100644 CHANGES/7797.bugfix diff --git a/CHANGES/7797.bugfix b/CHANGES/7797.bugfix new file mode 100644 index 00000000000..a573c861897 --- /dev/null +++ b/CHANGES/7797.bugfix @@ -0,0 +1 @@ +Fix increase in latency with small messages from websocket compression changes diff --git a/aiohttp/http_websocket.py b/aiohttp/http_websocket.py index 475b1f7824a..8e33857ae30 100644 --- a/aiohttp/http_websocket.py +++ b/aiohttp/http_websocket.py @@ -83,6 +83,15 @@ class WSMsgType(IntEnum): CLOSED = 0x101 ERROR = 0x102 + text = TEXT + binary = BINARY + ping = PING + pong = PONG + close = CLOSE + closing = CLOSING + closed = CLOSED + error = ERROR + WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11" @@ -135,7 +144,7 @@ class WSHandshakeError(Exception): # Used by _websocket_mask_python -@functools.lru_cache +@functools.lru_cache() def _xor_table() -> List[bytes]: return [bytes(a ^ b for a in range(256)) for b in range(256)] @@ -626,17 +635,21 @@ async def _send_frame( if (compress or self.compress) and opcode < 8: if compress: # Do not set self._compress if compressing is for this frame - compressobj = self._make_compress_obj(compress) + compressobj = ZLibCompressor( + level=zlib.Z_BEST_SPEED, + wbits=-compress, + max_sync_chunk_size=WEBSOCKET_MAX_SYNC_CHUNK_SIZE, + ) else: # self.compress if not self._compressobj: - self._compressobj = self._make_compress_obj(self.compress) + self._compressobj = ZLibCompressor( + level=zlib.Z_BEST_SPEED, + wbits=-self.compress, + max_sync_chunk_size=WEBSOCKET_MAX_SYNC_CHUNK_SIZE, + ) compressobj = self._compressobj message = await compressobj.compress(message) - # Its critical that we do not return control to the event - # loop until we have finished sending all the compressed - # data. Otherwise we could end up mixing compressed frames - # if there are multiple coroutines compressing data. message += compressobj.flush( zlib.Z_FULL_FLUSH if self.notakeover else zlib.Z_SYNC_FLUSH ) @@ -674,22 +687,12 @@ async def _send_frame( self._output_size += len(header) + len(message) - # It is safe to return control to the event loop when using compression - # after this point as we have already sent or buffered all the data. - if self._output_size > self._limit: self._output_size = 0 await self.protocol._drain_helper() - def _make_compress_obj(self, compress: int) -> ZLibCompressor: - return ZLibCompressor( - level=zlib.Z_BEST_SPEED, - wbits=-compress, - max_sync_chunk_size=WEBSOCKET_MAX_SYNC_CHUNK_SIZE, - ) - def _write(self, data: bytes) -> None: - if self.transport.is_closing(): + if self.transport is None or self.transport.is_closing(): raise ConnectionResetError("Cannot write to closing transport") self.transport.write(data)