-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue_handler.py
169 lines (140 loc) · 7.22 KB
/
queue_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import asyncio
import logging
import tempfile
import os
import discord
import subprocess
from api_handlers import get_api_handler
from config import Config
from pathlib import Path
class QueueHandler:
def __init__(self, max_queue_size: int, tts: 'TTS'):
self.tts = tts
self.queue: asyncio.Queue = asyncio.Queue(maxsize=max_queue_size)
self.users_in_queue: set = set()
self.lock = asyncio.Lock()
self.handler = get_api_handler(Config.AI_HANDLER_TYPE)
self.logger = logging.getLogger(self.__class__.__name__)
async def handle_queue(self):
"""
Continuously processes messages from the queue.
"""
self.logger.info("Queue handler started.")
while True:
message: discord.Message = await self.queue.get()
try:
user_message = message.content
#test
# await asyncio.sleep(5) # Simulate external API call
#response = generate_random_string()
#test
self.logger.debug(f"Processing message from {message.author}: {user_message}")
# Get AI response
response = await self.handler.chat_completion(user_message)
if not response:
response = "Couldn't process your message. Please try again later."
if response:
# Send voice reply if response was received
# await self.send_voice_reply(message, response)
await self.send_video_reply(message, response)
else:
response = "Couldn't process your message. Please try again later."
await message.reply(response, mention_author=True)
self.logger.error(f"Failed to process message {message.id} from {message.author}")
self.logger.info(f"Replied to {message.author}: {response}")
except Exception as e:
self.logger.error(f"Failed to process message {message.id}: {e}", exc_info=True)
await message.reply("An error occurred while processing your message.", mention_author=True)
finally:
async with self.lock:
self.users_in_queue.discard(message.author.id)
self.queue.task_done()
async def send_video_reply(self, msg: discord.Message, response: str):
try:
with tempfile.TemporaryDirectory() as temp_dir:
voice_file_path = await self.generate_voice(response, temp_dir)
current_script_path = os.path.dirname(os.path.abspath(__file__))
sadtalker_path = os.path.join(current_script_path, 'externals', 'SadTalker')
if not os.path.isdir(sadtalker_path):
raise FileNotFoundError(f"The directory {sadtalker_path} does not exist.")
# Path to the source image
source_image_path = os.path.join(current_script_path, 'image.png')
if not os.path.isfile(source_image_path):
raise FileNotFoundError(f"The source image {source_image_path} does not exist.")
command = [
'conda', 'run', '-n', 'sadtalker', 'python', f'{sadtalker_path}/inference.py',
'--driven_audio', voice_file_path,
'--source_image', source_image_path,
'--result_dir', temp_dir,
'--still'
]
# Run the subprocess asynchronously
process = await asyncio.create_subprocess_exec(
*command,
cwd=sadtalker_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
# If returncode is non-zero, log stderr as an error
self.logger.error(f"Subprocess failed with return code {process.returncode}. Error: {stderr.decode()}")
await msg.reply("Failed to generate video reply due to an error in the process.", mention_author=True)
return
# Log stdout as regular output
if stdout:
self.logger.debug(f"Subprocess output: {stdout.decode()}")
video_files = list(Path(temp_dir).glob("*.mp4"))
if not video_files:
raise FileNotFoundError("No video file generated.")
with open(video_files[0], 'rb') as video_file:
discord_file = discord.File(fp=video_file, filename='response.mp4')
await msg.reply(file=discord_file)
self.logger.debug("Video reply sent.")
except Exception as e:
self.logger.error(f"Failed to send video reply: {e}", exc_info=True)
await msg.reply("Failed to generate video reply.", mention_author=True)
async def generate_voice(self, response: str, temp_dir: str) -> str:
loop = asyncio.get_running_loop()
temp_file_path = os.path.join(temp_dir, 'voice.wav')
# Run the TTS conversion in an executor to avoid blocking the event loop
await loop.run_in_executor(
None,
lambda: self.tts.tts_to_file(
text=response,
speaker="p230",
file_path=temp_file_path
)
)
self.logger.debug(f"TTS conversion completed. Voice file saved at: {temp_file_path}")
return temp_file_path
# async def send_voice_reply(self, msg: discord.Message, response: str):
# try:
# loop = asyncio.get_running_loop()
# # Create a temporary file to store the TTS output
# with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as tmp_file:
# temp_file_path = tmp_file.name
# # Run the TTS conversion in an executor to avoid blocking the event loop
# await loop.run_in_executor(
# None,
# lambda: self.tts.tts_to_file(
# text=response,
# speaker="p230",
# file_path=temp_file_path
# )
# )
# # Open the generated audio file in binary read mode and send it as a Discord attachment
# with open(temp_file_path, 'rb') as audio_file:
# discord_file = discord.File(fp=audio_file, filename='response.wav')
# await msg.reply(file=discord_file)
# self.logger.debug("Voice reply sent.")
# except Exception as e:
# self.logger.error(f"Failed to send voice reply: {e}", exc_info=True)
# await msg.reply("Failed to generate voice reply.", mention_author=True)
# finally:
# # Clean up: remove the temporary file
# try:
# os.unlink(temp_file_path)
# self.logger.debug(f"Temporary file {temp_file_path} deleted.")
# except Exception as cleanup_error:
# self.logger.warning(f"Failed to delete temporary file {temp_file_path}: {cleanup_error}")