From 4e54f83f583c812fbde71abdfd9dd0cebc0312cf Mon Sep 17 00:00:00 2001 From: flukexp Date: Sun, 19 Jan 2025 00:21:38 +0700 Subject: [PATCH] added amicaHandler --- src/features/chat/chat.ts | 99 +++++++++++++- src/features/vrmViewer/viewer.ts | 73 +++++++++- src/pages/api/amicaHandler.ts | 221 ++++++++++++++++++++++++++++++- 3 files changed, 384 insertions(+), 9 deletions(-) diff --git a/src/features/chat/chat.ts b/src/features/chat/chat.ts index 17d7b4e..2b177b4 100644 --- a/src/features/chat/chat.ts +++ b/src/features/chat/chat.ts @@ -20,13 +20,14 @@ import { localXTTSTTS} from "@/features/localXTTS/localXTTS"; import { AmicaLife } from '@/features/amicaLife/amicaLife'; -import { config } from "@/utils/config"; +import { config, updateConfig } from "@/utils/config"; import { cleanTalk } from "@/utils/cleanTalk"; import { processResponse } from "@/utils/processResponse"; import { wait } from "@/utils/wait"; import { isCharacterIdle, characterIdleTime, resetIdleTimer } from "@/utils/isIdle"; import { getOpenRouterChatResponseStream } from './openRouterChat'; import { handleUserInput } from '../externalAPI/externalAPI'; +import { loadVRMAnimation } from '@/lib/VRMAnimation/loadVRMAnimation'; type Speak = { @@ -124,6 +125,8 @@ export class Chat { this.updateAwake(); this.initialized = true; + + this.serverSentEvent(); } public setMessageList(messages: Message[]) { @@ -359,6 +362,100 @@ export class Chat { await this.makeAndHandleStream(messages); } + public serverSentEvent() { + // Client-side code in a React component or elsewhere + const eventSource = new EventSource('/api/amicaHandler'); + + eventSource.onmessage = async (event) => { + const data = JSON.parse(event.data); + console.log('Received message:', data); + + }; + // Listen for incoming messages from the server + eventSource.onmessage = async (event) => { + try { + // Parse the incoming JSON message + const message = JSON.parse(event.data); + + console.log(message); + + // Destructure to get the message type and data + const { type, data } = message; + + // Handle the message based on its type + switch (type) { + case 'normal': + console.log('Normal message received:', data); + const messages: Message[] = [ + { role: "system", content: config("system_prompt") }, + ...this.messageList!, + { role: "user", content: data}, + ]; + let stream = await getEchoChatResponseStream(messages); + this.streams.push(stream); + this.handleChatResponseStream(); + break; + + case 'animation': + console.log('Animation data received:', data); + const animation = await loadVRMAnimation(`/animations/${data}`); + if (!animation) { + throw new Error("Loading animation failed"); + } + this.viewer?.model?.playAnimation(animation,data); + requestAnimationFrame(() => { this.viewer?.resetCameraLerp(); }); + break; + + case 'playback': + console.log('Playback flag received:', data); + this.viewer?.startRecording(); + // Automatically stop recording after 10 seconds + setTimeout(() => { + this.viewer?.stopRecording((videoBlob) => { + // Log video blob to console + console.log("Video recording finished", videoBlob); + + // Create a download link for the video file + const url = URL.createObjectURL(videoBlob!); + const a = document.createElement("a"); + a.href = url; + a.download = "recording.webm"; // Set the file name for download + document.body.appendChild(a); + a.click(); + document.body.removeChild(a); + + // Revoke the URL to free up memory + URL.revokeObjectURL(url); + }); + }, data); // Stop recording after 10 seconds + break; + + case 'systemPrompt': + console.log('System Prompt data received:', data); + updateConfig("system_prompt",data); + break; + + default: + console.warn('Unknown message type:', type); + } + } catch (error) { + console.error('Error parsing SSE message:', error); + } + }; + + + eventSource.addEventListener('end', () => { + console.log('SSE session ended'); + eventSource.close(); + }); + + eventSource.onerror = (error) => { + console.error('Error in SSE connection:', error); + eventSource.close(); + }; + + } + public async makeAndHandleStream(messages: Message[]) { try { diff --git a/src/features/vrmViewer/viewer.ts b/src/features/vrmViewer/viewer.ts index e6b4100..dea0c7e 100644 --- a/src/features/vrmViewer/viewer.ts +++ b/src/features/vrmViewer/viewer.ts @@ -26,6 +26,10 @@ export class Viewer { private sendScreenshotToCallback: boolean; private screenshotCallback: BlobCallback | undefined; + private mediaRecorder?: MediaRecorder; + private recordedChunks: Blob[] = []; + private videoStream: any; + constructor() { this.isReady = false; this.sendScreenshotToCallback = false; @@ -218,11 +222,78 @@ export class Viewer { if (this.sendScreenshotToCallback && this.screenshotCallback) { this._renderer.domElement.toBlob(this.screenshotCallback, "image/jpeg"); this.sendScreenshotToCallback = false; - } } }; + public startStreaming(videoElement: HTMLVideoElement) { + if (!this._renderer) return; + + // Create a stream from the renderer's canvas + const stream = this._renderer.domElement.captureStream(60); // 60 FPS for smooth streaming + + this.videoStream = stream; + + // Assign the stream to the provided video element for live view + videoElement.srcObject = stream; + videoElement.play(); + + console.log("Start streaming!") + } + + public stopStreaming() { + if (!this.videoStream) return; + + // Stop all tracks on the stream to end streaming + this.videoStream.getTracks().forEach((track: { stop: () => any; }) => track.stop()); + this.videoStream = null; // Clear the stream reference + + console.log("Streaming stopped!"); +} + + + // Method to start recording + public startRecording() { + if (!this._renderer) return; + + // Create a stream from the renderer's canvas + const stream = this._renderer.domElement.captureStream(60); // 30 FPS + + // Higher quality and bit rate for better video clarity + const options = { + mimeType: 'video/webm;codecs=vp9', + videoBitsPerSecond: 8000000, // 8 Mbps for higher quality + }; + + this.mediaRecorder = new MediaRecorder(stream, options); + + // Collect data in chunks + this.mediaRecorder.ondataavailable = (event) => { + if (event.data.size > 0) { + this.recordedChunks.push(event.data); + } + }; + + // Start recording + this.mediaRecorder.start(); + } + + + // Method to stop recording and trigger callback + public stopRecording(callback: BlobCallback) { + if (!this.mediaRecorder) return; + + // Stop recording and create the video blob + this.mediaRecorder.onstop = () => { + const recordedBlob = new Blob(this.recordedChunks, { type: 'video/webm' }); + callback(recordedBlob); // Pass the video blob to the callback + this.recordedChunks = []; // Clear chunks for the next recording + }; + + // Stop the recorder + this.mediaRecorder.stop(); + } + public onMouseClick(event: MouseEvent): boolean { if (!this._renderer || !this._camera || !this.model?.vrm) return false; diff --git a/src/pages/api/amicaHandler.ts b/src/pages/api/amicaHandler.ts index ef04e47..3c25e94 100644 --- a/src/pages/api/amicaHandler.ts +++ b/src/pages/api/amicaHandler.ts @@ -1,10 +1,217 @@ -import { NextApiRequest, NextApiResponse } from 'next'; +import { askLLM } from "@/utils/askLlm"; +import { TimestampedPrompt } from "@/features/amicaLife/eventHandler"; +import { randomBytes } from "crypto"; +import type { NextApiRequest, NextApiResponse } from "next"; +import { twitterClientInstance as twitterClient } from "@/features/externalAPI/socialMedia/twitterClient"; +import { telegramClientInstance as telegramCLient } from "@/features/externalAPI/socialMedia/telegramClient"; +import { config } from "@/utils/config"; +import isDev from "@/utils/isDev"; +import { handleConfig, subconsciousUrl, userInputUrl } from "@/features/externalAPI/externalAPI"; -export default function handler(req: NextApiRequest, res: NextApiResponse) { - if (req.method === 'GET') { - res.status(200).json({ message: 'Amica Handler' }); - } else { - res.setHeader('Allow', ['GET']); - res.status(405).end(`Method ${req.method} Not Allowed`); +interface ApiResponse { + sessionId?: string; + outputType?: string; + response?: any; + error?: string; +} + +interface LogEntry { + sessionId: string; + timestamp: string; + inputType: string; + outputType: string; + response?: any; + error?: string; +} + +const logs: LogEntry[] = []; +const clients: Array<{ res: NextApiResponse }> = []; + +let logsUrl = new URL(`${process.env.NEXT_PUBLIC_DEVELOPMENT_BASE_URL}/api/dataHandler`); +logsUrl.searchParams.append("type", "logs"); + +// Helper Functions +const generateSessionId = (sessionId?: string): string => + sessionId || randomBytes(8).toString("hex"); + +const sendError = ( + res: NextApiResponse, + sessionId: string, + message: string, + status = 400 +) => res.status(status).json({ sessionId, error: message }); + +const sendToClients = (message: { type: string; data: any }) => { + const formattedMessage = JSON.stringify(message); + clients.forEach((client) => client.res.write(`data: ${formattedMessage}\n\n`)); +}; + +// Processors +const processNormalChat = async (message: string): Promise => + await askLLM(config("system_prompt"), message, null); + +const requestMemory = async (): Promise => { + const response = await fetch(subconsciousUrl); + return response.json(); +}; + +const requestLogs = async (): Promise<[]> => { + const response = await fetch(logsUrl); + return response.json(); +}; + +const requestUserInputMessages = async (): Promise<[]> => { + const response = await fetch(userInputUrl); + return response.json(); +}; + +const updateSystemPrompt = async (payload: any): Promise => { + const { prompt } = payload; + let response = sendToClients({ type: "systemPrompt", data: prompt }); + return response; +}; + +const triggerAmicaActions = async (payload: any): Promise => { + const { text, socialMedia, playback, reprocess, animation } = payload; + let response; + + if (text) { + const message = reprocess + ? await askLLM(config("system_prompt"), text, null) + : text; + + response = await handleSocialMediaActions(message, socialMedia); + } + + if (playback) { + response = sendToClients({ type: "playback", data: 10000 }); + } + + if (animation) { + response = sendToClients({ type: "animation", data: animation }); + } + + return response; +}; + +const handleSocialMediaActions = async ( + message: string, + socialMedia: string +): Promise => { + switch (socialMedia) { + case "twitter": + return await twitterClient.postTweet(message); + case "tg": + return await telegramCLient.postMessage(message); + case "none": + sendToClients({ type: "normal", data: message }); + return "Broadcasted to clients"; + default: + console.log("No social media selected for posting."); + return "No action taken for social media."; + } +}; + +// API Handler +export default async function handler( + req: NextApiRequest, + res: NextApiResponse +) { + // Syncing config to be accessible from server side + await handleConfig("fetch"); + + if (req.method === "GET") { + handleSSEConnection(req, res); + return; + } + + if (config("external_api_enabled") !== "true") { + return sendError(res, "", "API is currently disabled.", 503); + } + + const { sessionId, inputType, payload, noProcessChat = false } = req.body; + const currentSessionId = generateSessionId(sessionId); + const timestamp = new Date().toISOString(); + + if (!inputType) { + return sendError(res, currentSessionId, "inputType are required."); + } + + try { + const { response, outputType } = await processRequest(inputType, payload); + logs.push({ + sessionId: currentSessionId, + timestamp, + inputType, + outputType, + response, + }); + res.status(200).json({ sessionId: currentSessionId, outputType, response }); + } catch (error) { + handleProcessingError(res, error, currentSessionId, inputType, timestamp); } } + +// Sub-functions +const handleSSEConnection = ( + req: NextApiRequest, + res: NextApiResponse +): void => { + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache, no-transform"); + res.setHeader("X-Accel-Buffering", "no"); + res.setHeader("Connection", "keep-alive"); + + const client = { res }; + clients.push(client); + + req.on("close", () => { + console.log("Client disconnected"); + clients.splice(clients.indexOf(client), 1); + res.end(); + }); +}; + +const processRequest = async ( + inputType: string, + payload: any +): Promise<{ response: any; outputType: string }> => { + + switch (inputType) { + case "Normal Chat Message": + return { response: await processNormalChat(payload), outputType: "Complete stream" }; + case "Memory Request": + return { response: await requestMemory(), outputType: "Memory Array" }; + case "RPC Logs": + return { response: await requestLogs(), outputType: "Webhook" }; + case "RPC User Input Messages": + return { response: await requestUserInputMessages(), outputType: "Webhook" }; + case "Update System Prompt": + return { response: await updateSystemPrompt(payload), outputType: "Updated system prompt" }; + case "Twitter Message": + case "Brain Message": + return { response: payload, outputType: "Text" }; + case "Reasoning Server": + return { response: await triggerAmicaActions(payload), outputType: "Action Triggered" }; + default: + throw new Error("Unknown input type"); + } +}; + +const handleProcessingError = ( + res: NextApiResponse, + error: any, + sessionId: string, + inputType: string, + timestamp: string +): void => { + console.error("Handler error:", error); + logs.push({ + sessionId, + timestamp, + inputType, + outputType: "Error", + error: String(error), + }); + sendError(res, sessionId, "An error occurred while processing the request.", 500); +}; \ No newline at end of file