Skip to content

Commit

Permalink
优化重连
Browse files Browse the repository at this point in the history
  • Loading branch information
limingxinleo committed Dec 20, 2024
1 parent 631ea14 commit 656d639
Showing 1 changed file with 32 additions and 17 deletions.
49 changes: 32 additions & 17 deletions roc/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,15 @@ def __init__(self, host: str, port: int):
self.idGenerator: IdGenerator = IdGenerator()
self.dataFormatter: DataFormatter = DataFormatter()

asyncio.create_task(self.loop())
asyncio.create_task(self.heartbeat())

async def loop(self):
while True:
if self.writer is None:
await asyncio.sleep(1)
continue

try:
prefix = await self.recv(4)
length = struct.unpack(">I", prefix)[0]
Expand All @@ -56,18 +63,30 @@ async def loop(self):
except SocketException:
self.writer = None
self.reader = None
break
continue
except Exception as exception:
print(f"loop发生了异常: {exception}")
await asyncio.sleep(1)
continue

async def heartbeat(self):
while True:
try:
await self.send(Packet(0, PING))
if self.writer is None:
await asyncio.sleep(1)
continue

try:
await asyncio.sleep(10)

await self.send(Packet(0, PING))
except SocketException:
self.writer = None
self.reader = None
break
continue
except Exception as exception:
print(f"心跳发生了异常: {exception}")
await asyncio.sleep(1)
continue

async def recv(self, length: int) -> bytes:
result: bytes = b''
Expand All @@ -80,39 +99,35 @@ async def recv(self, length: int) -> bytes:
return result

async def send(self, packet: Packet) -> bool:
try:
if self.writer is None:
await self.start()
if self.writer is None:
await self.start()

self.writer.write(self.packer.pack(packet))
return True
except Exception as exception:
print(f"发生了异常: {exception}")
return False
self.writer.write(self.packer.pack(packet))
return True

async def request(self, request: Request) -> Response:
key = self.idGenerator.generate()
body = self.dataFormatter.format_request(request)
packet = Packet(key, body)
chan = self.channelManager.get(key, True)

await self.send(packet)

try:
await self.send(packet)

res = await chan.pop()
if res is False:
raise RequestException("request failed")

data = json.loads(res)

return make_response(data)
except SocketException:
self.reader = None
self.writer = None
finally:
self.channelManager.close(key)

async def start(self):
reader, writer = await asyncio.open_connection(self.host, self.port)
self.reader = reader
self.writer = writer

asyncio.create_task(self.loop())
asyncio.create_task(self.heartbeat())

0 comments on commit 656d639

Please sign in to comment.