From 656d639753a8df15b4bb40cf89178b3e06ed80f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=93=AD=E6=98=95?= <715557344@qq.com> Date: Fri, 20 Dec 2024 15:03:22 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- roc/socket.py | 49 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/roc/socket.py b/roc/socket.py index 956c6ae..a948cbd 100644 --- a/roc/socket.py +++ b/roc/socket.py @@ -38,8 +38,15 @@ def __init__(self, host: str, port: int): self.idGenerator: IdGenerator = IdGenerator() self.dataFormatter: DataFormatter = DataFormatter() + asyncio.create_task(self.loop()) + asyncio.create_task(self.heartbeat()) + async def loop(self): while True: + if self.writer is None: + await asyncio.sleep(1) + continue + try: prefix = await self.recv(4) length = struct.unpack(">I", prefix)[0] @@ -56,18 +63,30 @@ async def loop(self): except SocketException: self.writer = None self.reader = None - break + continue + except Exception as exception: + print(f"loop发生了异常: {exception}") + await asyncio.sleep(1) + continue async def heartbeat(self): while True: - try: - await self.send(Packet(0, PING)) + if self.writer is None: + await asyncio.sleep(1) + continue + try: await asyncio.sleep(10) + + await self.send(Packet(0, PING)) except SocketException: self.writer = None self.reader = None - break + continue + except Exception as exception: + print(f"心跳发生了异常: {exception}") + await asyncio.sleep(1) + continue async def recv(self, length: int) -> bytes: result: bytes = b'' @@ -80,15 +99,11 @@ async def recv(self, length: int) -> bytes: return result async def send(self, packet: Packet) -> bool: - try: - if self.writer is None: - await self.start() + if self.writer is None: + await self.start() - self.writer.write(self.packer.pack(packet)) - return True - except Exception as exception: - print(f"发生了异常: {exception}") - return False + self.writer.write(self.packer.pack(packet)) + return True async def request(self, request: Request) -> Response: key = self.idGenerator.generate() @@ -96,9 +111,9 @@ async def request(self, request: Request) -> Response: packet = Packet(key, body) chan = self.channelManager.get(key, True) - await self.send(packet) - try: + await self.send(packet) + res = await chan.pop() if res is False: raise RequestException("request failed") @@ -106,6 +121,9 @@ async def request(self, request: Request) -> Response: data = json.loads(res) return make_response(data) + except SocketException: + self.reader = None + self.writer = None finally: self.channelManager.close(key) @@ -113,6 +131,3 @@ async def start(self): reader, writer = await asyncio.open_connection(self.host, self.port) self.reader = reader self.writer = writer - - asyncio.create_task(self.loop()) - asyncio.create_task(self.heartbeat())