Skip to content

Commit

Permalink
perf(ffxiv): 獭窝优化
Browse files Browse the repository at this point in the history
  • Loading branch information
LambdaYH committed Aug 9, 2024
1 parent 3557bc4 commit a1060b7
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 34 deletions.
25 changes: 7 additions & 18 deletions migang/plugins/ffxiv/ffxiv_otterbot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from nonebot.drivers import Driver
from nonebot.plugin import PluginMetadata
from nonebot import get_driver, on_message
from nonebot.adapters.onebot.v11 import Bot, MessageEvent
from nonebot.adapters.onebot.v11 import MessageEvent

from migang.core import ConfigItem, get_config

Expand Down Expand Up @@ -51,29 +51,18 @@ async def _message_handler(event: MessageEvent):
await ws_conn.forwardEvent(event)


@driver.on_bot_connect
async def setup_ws(bot: Bot):
@driver.on_startup
async def setup_ws():
access_token = await get_config("access_token")
if not access_token:
return
bot_id = await get_config("bot_id")
if not bot_id:
bot_id = bot.self_id
return
url = await get_config("url")
if not url:
return
global ws_conn
ws_conn = WebSocketConn(bot=bot, url=url, bot_id=bot_id, access_token=access_token)
asyncio.gather(ws_conn.connect())
global is_matcher_created
if not is_matcher_created:
on_message(block=False, rule=_rule).append_handler(_message_handler)
is_matcher_created = True


@driver.on_bot_disconnect
async def stop_ws():
global ws_conn
if ws_conn is not None:
await ws_conn.stop()
ws_conn = None
ws_conn = WebSocketConn(url=url, bot_id=bot_id, access_token=access_token)
asyncio.create_task(ws_conn.connect())
on_message(block=False, rule=_rule).append_handler(_message_handler)
61 changes: 45 additions & 16 deletions migang/plugins/ffxiv/ffxiv_otterbot/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

import ujson
import websockets
from nonebot import get_bot
from nonebot.log import logger
from pydantic import BaseModel
from nonebot.adapters import Bot, Event
from nonebot.adapters import Event
from websockets import WebSocketClientProtocol
from nonebot.adapters.onebot.v11 import Message, MessageSegment
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
Expand Down Expand Up @@ -47,16 +48,16 @@ def _proccess_api(action: str, data: dict[str, Any]):


class WebSocketConn:
def __init__(self, bot: Bot, url: str, bot_id: int, access_token: str) -> None:
def __init__(self, url: str, bot_id: int, access_token: str) -> None:
self.__queue = Queue()
self.__url = url
self.__bot_id = bot_id
self.__access_token = access_token
self.__bot = bot
self.__send_task = None
self.__recv_task = None
self.__heartbeat = None
self.__stop_flag = False
self.__connect = False
self.__websocket: WebSocketClientProtocol = None

async def connect(self):
Expand All @@ -74,6 +75,7 @@ async def connect(self):
) as websocket:
logger.info("与獭窝已成功建立连接")
self.__websocket = websocket
self.__connect = True
self.__send_task = asyncio.create_task(self.__ws_send(websocket))
self.__recv_task = asyncio.create_task(self.__ws_recv(websocket))
self.__heartbeat = asyncio.create_task(
Expand All @@ -86,12 +88,8 @@ async def connect(self):
logger.opt(colors=True).warning(
"<y><bg #f8bbd0>与獭窝连接关闭</bg #f8bbd0></y>"
)
self.__send_task.cancel()
self.__recv_task.cancel()
self.__heartbeat.cancel()
await self.__handle_disconnect()
await asyncio.sleep(1) # 等待重连
except CancelledError:
logger.info("獭窝断开连接,异步线程终止")
except Exception as e:
logger.opt(colors=True).error(
f"<y><bg #f8bbd0>连接獭窝发生意料之外的错误:{e}</bg #f8bbd0></y>"
Expand All @@ -102,19 +100,50 @@ async def connect(self):
await asyncio.sleep(15)

async def __send_heartbeat(self, ws: WebSocketClientProtocol):
while self.__stop_flag:
while self.__connect:
try:
await asyncio.sleep(_HEARTBEAT_INTERVAL)
await ws.send(_get_heartbeat_event(self.__bot_id))
except Exception as e:
logger.error(f"发送心跳事件失败:{e}")

async def __handle_disconnect(self):
self.__connect = False
if self.__send_task:
self.__send_task.cancel()
try:
await self.__send_task
except asyncio.CancelledError:
logger.info("send_task已取消")
except Exception as e:
logger.error(f"send_task 错误: {e}")

if self.__recv_task:
self.__recv_task.cancel()
try:
await self.__recv_task
except asyncio.CancelledError:
logger.info("recv_task已取消")
except Exception as e:
logger.error(f"recv_task 错误: {e}")

if self.__heartbeat:
self.__heartbeat.cancel()
try:
await self.__heartbeat
except asyncio.CancelledError:
logger.info("heartbeat已取消")
except Exception as e:
logger.error(f"heartbeat 错误: {e}")

try:
await self.__websocket.close()
except Exception as e:
logger.info(f"ws连接关闭:{e}")

async def stop(self):
self.__stop_flag = False
self.__send_task.cancel()
self.__recv_task.cancel()
self.__heartbeat.cancel()
await self.__websocket.close()
await self.__handle_disconnect()

async def forwardEvent(self, event: Event):
if hasattr(event, "self_id"):
Expand All @@ -128,7 +157,7 @@ async def _call_api(self, raw_data: str) -> Any:
echo = data.get("echo", "")
action = data["action"]
_proccess_api(action=action, data=data)
resp = await self.__bot.call_api(data["action"], **data["params"])
resp = await get_bot().call_api(data["action"], **data["params"])
resp_data = _build_ret_msg(resp)
if echo:
resp_data["echo"] = echo
Expand All @@ -142,7 +171,7 @@ async def _call_api(self, raw_data: str) -> Any:
return {"status": "failed", "echo": echo}

async def __ws_send(self, ws: WebSocketClientProtocol):
while self.__stop_flag:
while self.__connect:
try:
event = await self.__queue.get()
send_data: str
Expand All @@ -161,7 +190,7 @@ async def __ws_send(self, ws: WebSocketClientProtocol):
logger.error(f"发送獭窝信息异常:{e}")

async def __ws_recv(self, ws: WebSocketClientProtocol):
while self.__stop_flag:
while self.__connect:
try:
raw_data = await ws.recv()
logger.info(f"收到獭窝信息:{raw_data}")
Expand Down

0 comments on commit a1060b7

Please sign in to comment.