diff --git a/lab/voice-assistant/README.md b/lab/voice-assistant/README.md index 351b8d7e0b6021da200b194c01728f82441e648b..1a1b41edaf9c141cbc6d437f7077dbf5027ccda2 100644 --- a/lab/voice-assistant/README.md +++ b/lab/voice-assistant/README.md @@ -1,62 +1,60 @@ # @adornis/voice-assistant **Purpose:** -The `voice-assistant` module provides functionality to integrate a voice-based assistant into the application. It enables real-time, two-way voice conversations, typically between a user (e.g., on a phone call) and an AI-powered conversational agent. +The `voice-assistant` module provides a modular framework to integrate a voice-based assistant into an application. It enables real-time, two-way voice conversations between a user and an AI-powered conversational agent. -**Core Components:** +**Core Concept: The VoiceAgent** -1. **Twilio Integration (`twilio.ts`):** +The central component of this module is the `VoiceAgent`. It orchestrates the voice interaction by connecting three types of providers: - - Manages WebSocket connections with Twilio for streaming live audio from phone calls. - - Sets up an Express router (`twilioRouter`) to handle Twilio's TwiML webhook for initiating media streams (`/twilio-media-stream`). - - Handles incoming audio from Twilio (MuLaw format), converts it to PCM for AI processing, and sends AI-generated audio (converted back to MuLaw) to Twilio. - - Provides functions like `createTwilioWebSocketServer` to establish the WebSocket server that bridges Twilio and the AI. +1. **Call Provider:** Manages the connection to a telephony service (e.g., Twilio). It handles receiving incoming calls and streaming audio. +2. **LLM Provider:** Connects to a Large Language Model (e.g., Google Gemini, OpenAI GPT). It processes the conversation and generates responses. +3. **Audio Transformer:** Acts as a bridge between the Call Provider and the LLM Provider. It is responsible for converting audio formats and packaging data in the format expected by each end. -2. **Google AI Integration (`google-ai.ts`):** +This modular design allows for flexible combinations of services. For example, you can use Twilio for phone calls with Google's Gemini model, or switch to a different LLM provider without changing the call handling logic. - - Connects to Google's Generative AI (specifically models like `gemini-2.5-flash-preview-native-audio-dialog`) using the `@google/genai` SDK. - - Manages live sessions (`startLiveSession`) with the AI, sending user audio input and receiving AI-generated audio responses. - - Handles the turn-based conversation flow and processes messages from the AI, including audio data and control signals. +**Available Providers:** + +- **Call Providers:** + - `twilioCallProvider`: Integrates with Twilio. +- **LLM Providers:** + - `googleAiLLMProvider`: Integrates with Google's Generative AI. + - `openaiLLMProvider`: Integrates with OpenAI's models. +- **Audio Transformers:** + - `twilioGeminiTransformer`: Transforms audio between Twilio and Google Gemini. + - `twilioOpenAITransformer`: Transforms audio between Twilio and OpenAI. **Setup and Usage Example:** -Here's a simplified example of how to set up the voice assistant, typically within an Express server environment: +Here's how to set up the voice assistant within an Express server environment: ```typescript -import { twilioRouter, createTwilioWebSocketServer } from './voice-assistant/twilio.js'; // Adjust path as needed - -// 1. Add Twilio router for handling TwiML webhooks -app.use('/api/twilio', twilioRouter()); // Exposes /api/twilio/twilio-media-stream - -// 2. Configuration for the voice assistant -const voiceAssistantConfig = { - systemInstruction: `You are a helpful assistant. Your goal is to collect information about the user's issue.`, - firstMessage: 'Hello! How can I help you today?', - // Other Google GenAI LiveConnectConfig parameters can be added here, e.g.: - // speechConfig: { - // languageCode: 'en-US', - // voiceConfig: { prebuiltVoiceConfig: { voiceName: 'some-voice' } }, - // }, -}; - -// 3. Create the Twilio WebSocket server to bridge Twilio and Google AI -createTwilioWebSocketServer(httpServer, voiceAssistantConfig); +import { VoiceAgent } from '@adornis/voice-assistant/voice-agent.js'; +import { twilioCallProvider } from '@adornis/voice-assistant/call-providers/twilio.js'; +import { googleAiLLMProvider } from '@adornis/voice-assistant/llm-providers/google-ai.js'; +import { twilioGeminiTransformer } from '@adornis/voice-assistant/audio-transformers/twilio-gemini-transformer.js'; +import express from 'express'; +import { createServer } from 'http'; + +// 1. Initialize the VoiceAgent with your chosen providers +const voiceAgent = VoiceAgent({ + callProvider: twilioCallProvider, + llmProvider: googleAiLLMProvider, + audioBufferTransformerProvider: twilioGeminiTransformer, + config: { + tools: [], // Optional: Define tools for the LLM to use + systemInstruction: 'You are a helpful assistant.', + firstMessage: 'Say: Hello! How can I help you today?', + }, +}); + +// 2. Initialize the agent's routes and handlers +await voiceAgent.init(app, actualServer); ``` -To use the twilio router, you will need to configure a twilio phone number to forward to the `<ROOT_URL>/api/twilio/twilio-media-stream` endpoint. - **Explanation:** -1. **Mount Twilio Router:** Use `app.use()` to integrate the `twilioRouter()`. This router typically provides an endpoint (e.g., `/api/twilio/twilio-media-stream`) that Twilio calls to get TwiML instructions for connecting to the WebSocket server. -2. **Define Configuration:** Create a `voiceAssistantConfig` object. This includes: - - `systemInstruction`: A prompt that defines the AI's behavior, persona, and objectives. - - `firstMessage`: The initial message the AI will say to the caller. - - Optionally, other parameters for Google GenAI's `LiveConnectConfig` like language or voice selection. -3. **Initialize WebSocket Server:** Call `createTwilioWebSocketServer(httpServer, voiceAssistantConfig)`. This function attaches a WebSocket server (at the path `/api/twilio/ws`) to your existing HTTP server. Twilio will connect to this WebSocket to stream audio. - -This setup allows the application to: +1. **Instantiate `VoiceAgent`:** Create an instance of the `VoiceAgent`, passing in the selected providers and a configuration object. The `config` object allows you to define the AI's behavior (`systemInstruction`), its opening line (`firstMessage`), and any `tools` it can use to interact with other systems. +2. **Initialize Agent:** Call `voiceAgent.init(app, httpServer)`. This method sets up the necessary API endpoints (e.g., for Twilio webhooks) and WebSocket servers on your Express application. -- Receive incoming call notifications from Twilio at the webhook endpoint handled by `twilioRouter`. -- Instruct Twilio to stream call audio to the WebSocket endpoint. -- Process the audio via Google AI through the `createTwilioWebSocketServer` integration. -- Stream AI-generated audio responses back to the caller via Twilio. +To make this work, you will need to configure your call provider (e.g., Twilio) to point to the webhook endpoints created by the `voiceAgent`. The specific URL will be logged during startup or can be found in the provider's implementation. diff --git a/lab/voice-assistant/audio-transformers/twilio-gemini-transformer.ts b/lab/voice-assistant/audio-transformers/twilio-gemini-transformer.ts new file mode 100644 index 0000000000000000000000000000000000000000..737579d74d7bd1218f7ea92a38ccbee2b6ff3e3f --- /dev/null +++ b/lab/voice-assistant/audio-transformers/twilio-gemini-transformer.ts @@ -0,0 +1,12 @@ +import { convertMulawToPcm } from '../mulaw-to-pcm.js'; +import { convertPCMToMulaw } from '../pcm-to-mulaw.js'; +import type { IAudioBufferTransformerProvider } from '../voice-agent.js'; + +export const twilioGeminiTransformer: IAudioBufferTransformerProvider = { + callToLLM: async (audioChunk: Buffer) => { + return convertMulawToPcm(audioChunk); + }, + llmToCall: (audioChunk: Buffer) => { + return convertPCMToMulaw(audioChunk); + }, +}; diff --git a/lab/voice-assistant/audio-transformers/twilio-openai-transformer.ts b/lab/voice-assistant/audio-transformers/twilio-openai-transformer.ts new file mode 100644 index 0000000000000000000000000000000000000000..f3d87b63fb52624c58607707879715e1958503ca --- /dev/null +++ b/lab/voice-assistant/audio-transformers/twilio-openai-transformer.ts @@ -0,0 +1,11 @@ +import { convertMulawToPcm } from '../mulaw-to-pcm.js'; +import type { IAudioBufferTransformerProvider } from '../voice-agent.js'; + +export const twilioOpenAITransformer: IAudioBufferTransformerProvider = { + callToLLM: async (audioChunk: Buffer) => { + return convertMulawToPcm(audioChunk); + }, + llmToCall: (audioChunk: Buffer) => { + return audioChunk; + }, +}; diff --git a/lab/voice-assistant/call-providers/twilio.ts b/lab/voice-assistant/call-providers/twilio.ts new file mode 100644 index 0000000000000000000000000000000000000000..9b20f5d06503ef3f9e35a224478a994958b204df --- /dev/null +++ b/lab/voice-assistant/call-providers/twilio.ts @@ -0,0 +1,151 @@ +import { logger } from '@adornis/base/logging.js'; +import express from 'express'; +import { WebSocketServer } from 'ws'; +// import { startLiveSession, type LiveConnectAdornisConfig } from './google-ai.js'; +import type { WebSocket } from 'ws'; +import type { ICallProvider, ILLMSession } from '../voice-agent.js'; + +// --- Twilio WebSocket audio streaming --- +const twilioWebsockets = new WeakMap<WebSocket, string>(); + +// function saveBinaryFile(fileName: string, content: Buffer) { +// writeFile(fileName, content, 'utf8', err => { +// if (err) { +// console.error(`Error writing file ${fileName}:`, err); +// } +// }); +// } + +async function sendAudioToTwilio(buffer: Buffer, ws: WebSocket) { + const streamSid = twilioWebsockets.get(ws); + if (!streamSid) { + logger.warn('No active Twilio WebSocket connection to send audio.'); + return; + } + + try { + const mulawBase64 = buffer.toString('base64'); + + return ws.send( + JSON.stringify({ + event: 'media', + streamSid, + media: { payload: mulawBase64 }, + }), + ); + } catch (err) { + logger.error({ err }, 'HÄ'); + } +} + +function closeTwilioConnection(ws: WebSocket) { + ws.close(); +} + +function flushTwilioStream(ws: WebSocket) { + ws.send( + JSON.stringify({ + event: 'clear', + streamSid: twilioWebsockets.get(ws), + }), + ); +} + +function twilioRouter() { + const router = express.Router(); + + router.use('/twilio-media-stream', (req, res) => { + res.type('xml').send(` + <Response> + <Connect> + <Stream url="wss://zwickroellndi-racct-runtime.nexus.adornis.de/api/twilio/ws" /> + </Connect> + </Response> + `); + }); + + return router; +} + +export const twilioCallProvider: ICallProvider = { + init: ({ app, server, llmProvider, audioBufferTransformerProvider, config }) => { + app.use(twilioRouter()); + + // --- WebSocket server for Twilio media stream audio saving --- + const wss = new WebSocketServer({ server, path: '/api/twilio/ws' }); + + // const audioOutputDir = path.resolve('/home/coder/repositories/ZwickroellNdi-racct/data/', 'audio-output'); + // if (!fs.existsSync(audioOutputDir)) fs.mkdirSync(audioOutputDir, { recursive: true }); + // console.log('audioOutputDir', audioOutputDir); + + // const mediaStreamSaver = new TwilioMediaStreamSaveAudioFile({ + // saveLocation: audioOutputDir, + // saveFilename: 'my-twilio-media-stream-output', + // onSaved: () => console.log('File was saved!'), + // }); + + wss.on('connection', ws => { + twilioWebsockets.set(ws, ''); // Track the active Twilio websocket + ws.on('close', () => { + twilioWebsockets.delete(ws); + }); + + let llmSession: ILLMSession | undefined; + // let audioParts: string[] = []; + ws.on('message', async data => { + const msg = JSON.parse(data); + switch (msg.event) { + case 'connected': + console.log('A new call has connected'); + break; + case 'start': + console.log(`Starting media stream...`); + twilioWebsockets.set(ws, msg.start.streamSid); + // mediaStreamSaver.twilioStreamStart(); + // llmSession = await startLiveSession(ws, config); + llmSession = await llmProvider.init({ + server, + config, + audioBufferTransformerProvider, + flushAudioStream: () => flushTwilioStream(ws), + closeAudioStream: () => closeTwilioConnection(ws), + sendAudioChunkToCall: audioChunk => { + const callAudioChunk = audioBufferTransformerProvider.llmToCall(audioChunk); + void sendAudioToTwilio(callAudioChunk, ws); + }, + }); + + break; + case 'media': { + const muLawBuffer = Buffer.from(msg.media.payload, 'base64'); + const llmAudioBuffer = await audioBufferTransformerProvider.callToLLM(muLawBuffer); + + // only for debugging + // audioParts.push(pcmBuffer.toString('base64')); + // const buffer = convertToWav(audioParts, 'audio/pcm;rate=24000'); + // saveBinaryFile('/home/coder/repositories/ZwickroellNdi-racct/data/test.wav', buffer); + + // geminiSession?.sendRealtimeInput({ + // audio: { data: pcmBuffer.toString('base64'), mimeType: 'audio/pcm;rate=16000' }, + // }); + // llmSession?.send({ + // type: 'input_audio_buffer.append', + // audio: pcmBuffer.toString('base64'), + // }); + void llmSession?.sendAudioChunk(llmAudioBuffer); + // mediaStreamSaver.twilioStreamMedia(msg.media.payload); + break; + } + case 'stop': + console.log('Call has ended'); + // mediaStreamSaver.twilioStreamStop(); + // llmSession?.close(); + void llmSession?.close(); + break; + default: + break; + } + }); + }); + }, +}; diff --git a/lab/voice-assistant/google-ai.ts b/lab/voice-assistant/google-ai.ts deleted file mode 100644 index 5e7ea33e31961105503f222dd44ad31dc40ed757..0000000000000000000000000000000000000000 --- a/lab/voice-assistant/google-ai.ts +++ /dev/null @@ -1,186 +0,0 @@ -// To run this code you need to install the following dependencies: -// npm install @google/genai mime -// npm install -D @types/node -import { logger } from '@adornis/base/logging.js'; -import { - GoogleGenAI, - LiveServerMessage, - MediaResolution, - Modality, - Session, - type LiveConnectConfig, -} from '@google/genai'; -import { closeTwilioConnection, flushTwilioStream, sendAudioToTwilio } from './twilio.js'; - -const sessionManagement = new WeakMap< - Session, - { - ws: import('ws').WebSocket; - isWorking: boolean; - responseQueue: LiveServerMessage[]; - } ->(); - -async function handleTurn(session: Session): Promise<LiveServerMessage[]> { - const sessionInfo = sessionManagement.get(session); - if (!sessionInfo) throw new Error('Cannot handle turn with no active session'); - - sessionInfo.isWorking = true; - const turn: LiveServerMessage[] = []; - let done = false; - while (!done) { - const message = await waitMessage(session); - turn.push(message); - if (message.serverContent && message.serverContent.turnComplete) { - console.log('turn complete'); - done = true; - } - } - - // if session was closed before turn is finished - if (!sessionManagement.has(session)) return []; - - sessionInfo.isWorking = false; - return turn; -} - -async function waitMessage(session: Session): Promise<LiveServerMessage> { - let done = false; - let message: LiveServerMessage | undefined = undefined; - while (!done) { - const sessionInfo = sessionManagement.get(session); - if (!sessionInfo) throw new Error('Cannot wait for message with no active session'); - - message = sessionInfo.responseQueue.shift(); - if (message) { - handleModelTurn(message, session); - done = true; - } else { - await new Promise(resolve => setTimeout(resolve, 100)); - } - } - return message!; -} - -function handleModelTurn(message: LiveServerMessage, session: Session) { - const sessionInfo = sessionManagement.get(session); - - // if session was closed before turn is finished - if (!sessionInfo) return; - - if (message.serverContent?.modelTurn?.parts) { - const part = message.serverContent?.modelTurn?.parts?.[0]; - - if (part?.fileData) { - console.log(`File: ${part?.fileData.fileUri}`); - } - - if (part?.inlineData) { - const inlineData = part.inlineData; - - // decode base64 to Buffer - const audioBuffer = Buffer.from(inlineData.data ?? '', 'base64'); - - // Stream only the new PCM buffer to Twilio - sendAudioToTwilio(audioBuffer, sessionInfo.ws); - } - - if (part?.text) { - console.log(part.text); - } - } -} - -export async function startLiveSession( - ws: import('ws').WebSocket, - config: Omit<LiveConnectConfig, 'responseModalities'> & { firstMessage: string }, -) { - const ai = new GoogleGenAI({ - apiKey: process.env.GEMINI_API_KEY, - }); - - const model = 'models/gemini-2.5-flash-preview-native-audio-dialog'; - - const internalConfig: LiveConnectConfig = { - responseModalities: [Modality.AUDIO], - mediaResolution: MediaResolution.MEDIA_RESOLUTION_MEDIUM, - speechConfig: { - voiceConfig: { - prebuiltVoiceConfig: { - voiceName: 'Zephyr', - }, - }, - languageCode: 'de-DE', - }, - contextWindowCompression: { - triggerTokens: '25600', - slidingWindow: { targetTokens: '12800' }, - }, - ...config, - }; - - // let modelSpeaking = false; - const session = await ai.live.connect({ - model, - callbacks: { - onmessage: function (message: LiveServerMessage) { - if (!sessionManagement.has(session)) { - sessionManagement.set(session, { - ws, - isWorking: false, - responseQueue: [], - }); - } - - sessionManagement.get(session)!.responseQueue.push(message); - - // handle interruption by human - if (message.serverContent?.interrupted) { - sessionManagement.get(session)!.responseQueue = []; - flushTwilioStream(ws); - } else if (!sessionManagement.get(session)!.isWorking) { - console.log('handle turn'); - handleTurn(session).catch(err => { - if (sessionManagement.has(session)) { - logger.error({ err }, 'Error in ai live connect session'); - } - - // if session was closed before turn is finished -> ignore - }); - } - }, - onerror: function (e: ErrorEvent) { - logger.error({ err: e }, 'Error in ai live connect session'); - sessionManagement.delete(session); - }, - onclose: function (e: CloseEvent) { - logger.debug({ reason: e.reason }, 'Close in ai live connect session'); - closeTwilioConnection(sessionManagement.get(session)?.ws); - sessionManagement.delete(session); - }, - }, - config: internalConfig, - }); - - if (!sessionManagement.has(session)) { - sessionManagement.set(session, { - ws, - isWorking: false, - responseQueue: [], - }); - } - - session.sendClientContent({ - turns: [`Starte das Gespräch mit folgendem Satz: "${config.firstMessage}".`], - }); - - void handleTurn(session).catch(err => { - if (sessionManagement.has(session)) { - logger.error({ err }, 'Error in ai live connect session'); - } - - // if session was closed before turn is finished -> ignore - }); - - return session; -} diff --git a/lab/voice-assistant/llm-providers/google-ai.ts b/lab/voice-assistant/llm-providers/google-ai.ts new file mode 100644 index 0000000000000000000000000000000000000000..75aaf713b49e406cd75f7cdac239cf5d674ca51c --- /dev/null +++ b/lab/voice-assistant/llm-providers/google-ai.ts @@ -0,0 +1,312 @@ +import { logger } from '@adornis/base/logging.js'; +import { + GoogleGenAI, + type LiveConnectConfig, + type LiveServerMessage, + MediaResolution, + Modality, + type Session, + Type, +} from '@google/genai'; +import type { ILLMProvider, LLMProviderConfig } from '../voice-agent.js'; + +export type LiveConnectAdornisConfig = Omit<LiveConnectConfig, 'responseModalities' | 'tools'> & { + firstMessage: string; +} & { + tools: Array< + // @ts-expect-error [number] is fine here because in fact tools is an array + Omit<LiveConnectConfig['tools'][number], 'functionDeclarations'> & { + // @ts-expect-error [number] is fine here because in fact tools is an array + functionDeclarations: Array<LiveConnectConfig['tools'][number]['functionDeclarations'][number]> & { + resolve: (...args: any[]) => string | Promise<string>; + }; + } + >; +}; + +const sessionManagement = new WeakMap< + Session, + { + config: LLMProviderConfig; + isWorking: boolean; + responseQueue: LiveServerMessage[]; + sendAudioChunkToCall: (audioChunk: Buffer) => void | Promise<void>; + closeAudioStream: () => void | Promise<void>; + flushAudioStream: () => void | Promise<void>; + } +>(); + +async function handleTurn(session: Session): Promise<LiveServerMessage[]> { + const sessionInfo = sessionManagement.get(session); + if (!sessionInfo) throw new Error('Cannot handle turn with no active session'); + + sessionInfo.isWorking = true; + const turn: LiveServerMessage[] = []; + let done = false; + while (!done) { + const message = await waitMessage(session); + turn.push(message); + if (message.serverContent?.turnComplete) { + console.log('turn complete'); + done = true; + } + } + + // if session was closed before turn is finished + if (!sessionManagement.has(session)) return []; + + sessionInfo.isWorking = false; + return turn; +} + +async function waitMessage(session: Session): Promise<LiveServerMessage> { + let done = false; + let message: LiveServerMessage | undefined; + while (!done) { + const sessionInfo = sessionManagement.get(session); + if (!sessionInfo) throw new Error('Cannot wait for message with no active session'); + + message = sessionInfo.responseQueue.shift(); + if (message) { + handleModelTurn(message, session); + done = true; + } else { + await new Promise(resolve => setTimeout(resolve, 100)); + } + } + return message!; +} + +function handleModelTurn(message: LiveServerMessage, session: Session) { + const sessionInfo = sessionManagement.get(session); + console.log('handle model turn'); + + // if session was closed before turn is finished + if (!sessionInfo) return; + + // TODO needs testing with real tools as soon as gemini properly supports tools + // if (message.toolCall) { + // logger.info({ toolCall: message.toolCall }, 'toolCall'); + + // const tool = sessionInfo.config.tools?.find(tool => tool.name === message.toolCall.name); + // if (!tool) { + // logger.error({ toolCall: message.toolCall }, 'toolCall not found'); + // } else { + // const result = tool.resolve(...message.toolCall.args); + // if (result instanceof Promise) { + // void result.then(result => { + // session.sendToolResponse({ + // functionResponses: [ + // { + // toolName: message.toolCall.name, + // toolArgs: message.toolCall.args, + // toolResult: result, + // }, + // ], + // }); + // }); + // } else { + // logger.info({ result }, 'functionCall result'); + // session.sendToolResponse({ + // functionResponses: [ + // { + // toolName: message.toolCall.name, + // toolArgs: message.toolCall.args, + // toolResult: result, + // }, + // ], + // }); + // } + // } + // } + + if (message.serverContent?.modelTurn?.parts) { + const part = message.serverContent.modelTurn.parts[0]; + + if (part?.fileData) { + console.log(`File: ${part.fileData.fileUri}`); + } + + if (part?.inlineData) { + const inlineData = part.inlineData; + + // decode base64 to Buffer + const audioBuffer = Buffer.from(inlineData.data ?? '', 'base64'); + + // Stream only the new PCM buffer to Twilio + void sessionInfo.sendAudioChunkToCall(audioBuffer); + } + + if (part?.text) { + console.log(part.text); + } + } +} + +export async function startLiveSession( + config: LLMProviderConfig, + sendAudioChunkToCall: (audioChunk: Buffer) => void | Promise<void>, + closeAudioStream: () => void | Promise<void>, + flushAudioStream: () => void | Promise<void>, +) { + const ai = new GoogleGenAI({ + apiKey: process.env.GEMINI_API_KEY, + apiVersion: 'v1beta', + }); + + const model = 'models/gemini-2.5-flash-preview-native-audio-dialog'; + // const model = 'models/gemini-2.5-flash-exp-native-audio-thinking-dialog'; + // const model = 'models/gemini-2.0-flash-live-preview-04-09'; + + // TODO: fix the tool conversion + // @ts-expect-error need to fix the tool conversion + const internalConfig: LiveConnectConfig = { + responseModalities: [Modality.AUDIO], + mediaResolution: MediaResolution.MEDIA_RESOLUTION_MEDIUM, + speechConfig: { + voiceConfig: { + prebuiltVoiceConfig: { + voiceName: 'Zephyr', + }, + }, + languageCode: 'de-DE', + }, + contextWindowCompression: { + triggerTokens: '25600', + slidingWindow: { targetTokens: '12800' }, + }, + systemInstruction: config.systemInstruction, + ...(config.tools + ? { + tools: [ + { + functionDeclarations: config.tools.map(tool => ({ + ...tool, + properties: { + type: { + string: Type.STRING, + number: Type.NUMBER, + boolean: Type.BOOLEAN, + array: Type.ARRAY, + object: Type.OBJECT, + }[(tool.parameters as any)?.type], + }, + })), + }, + ], + } + : {}), + }; + + // let modelSpeaking = false; + const session = await ai.live.connect({ + model, + callbacks: { + onmessage: function (message: LiveServerMessage) { + if (!sessionManagement.has(session)) { + sessionManagement.set(session, { + isWorking: false, + responseQueue: [], + config, + sendAudioChunkToCall, + closeAudioStream, + flushAudioStream, + }); + } + + sessionManagement.get(session)!.responseQueue.push(message); + + const copy = JSON.parse(JSON.stringify(message)); + if (copy.serverContent?.modelTurn?.parts) { + copy.serverContent.modelTurn.parts = copy.serverContent.modelTurn.parts.map(part => { + if (part.inlineData) { + part.inlineData = undefined; + } + return part; + }); + } + // logger.info({ copy }, 'MESSAGE'); + + if (message.toolCall) { + logger.info({ toolCall: message.toolCall }, 'toolCall'); + } + + // handle interruption by human + if (message.serverContent?.interrupted) { + sessionManagement.get(session)!.responseQueue = []; + void flushAudioStream(); + } else if (!sessionManagement.get(session)!.isWorking) { + console.log('handle turn'); + handleTurn(session).catch(err => { + if (sessionManagement.has(session)) { + logger.error({ err }, 'Error in ai live connect session'); + } + + // if session was closed before turn is finished -> ignore + }); + } + }, + onerror: function (e: ErrorEvent) { + logger.error({ err: e }, 'Error in ai live connect session'); + sessionManagement.delete(session); + }, + onclose: function (e: CloseEvent) { + logger.debug({ reason: e.reason }, 'Close in ai live connect session'); + void closeAudioStream(); + sessionManagement.delete(session); + }, + }, + config: internalConfig, + }); + + if (!sessionManagement.has(session)) { + sessionManagement.set(session, { + isWorking: false, + responseQueue: [], + config, + sendAudioChunkToCall, + closeAudioStream, + flushAudioStream, + }); + } + + if (config.firstMessage) { + session.sendClientContent({ + turns: [config.firstMessage], + }); + } + + void handleTurn(session).catch(err => { + if (sessionManagement.has(session)) { + logger.error({ err }, 'Error in ai live connect session'); + } + + // if session was closed before turn is finished -> ignore + }); + + return session; +} + +export const googleAiLLMProvider: ILLMProvider = { + init: async ({ + server, + config, + audioBufferTransformerProvider, + flushAudioStream, + closeAudioStream, + sendAudioChunkToCall, + }) => { + const session = await startLiveSession(config, sendAudioChunkToCall, closeAudioStream, flushAudioStream); + return { + sendAudioChunk: (audioChunk: Buffer) => { + session.sendRealtimeInput({ + audio: { data: audioChunk.toString('base64'), mimeType: 'audio/pcm;rate=16000' }, + }); + }, + flush: () => {}, + close: () => { + session.close(); + }, + }; + }, +}; diff --git a/lab/voice-assistant/llm-providers/openai.ts b/lab/voice-assistant/llm-providers/openai.ts new file mode 100644 index 0000000000000000000000000000000000000000..534b9a0c626b6666e4d9d5fff1b7410a95f647fb --- /dev/null +++ b/lab/voice-assistant/llm-providers/openai.ts @@ -0,0 +1,257 @@ +import { logger } from '@adornis/base/logging.js'; +import type { OpenAIRealtimeError } from 'openai/beta/realtime/internal-base.mjs'; +import { OpenAIRealtimeWebSocket } from 'openai/beta/realtime/websocket'; +import type { ResponseAudioDeltaEvent, ResponseDoneEvent } from 'openai/resources/beta/realtime/realtime.mjs'; +import type { ILLMProvider, LLMProviderConfig } from '../voice-agent.js'; + +const sessionManagement = new WeakMap< + OpenAIRealtimeWebSocket, + { + config: any; + isWorking: boolean; + responseQueue: Array<ResponseAudioDeltaEvent | ResponseDoneEvent>; + sendAudioChunkToCall: (audioChunk: Buffer) => void | Promise<void>; + closeAudioStream: () => void | Promise<void>; + flushAudioStream: () => void | Promise<void>; + } +>(); + +async function handleTurn( + session: OpenAIRealtimeWebSocket, +): Promise<Array<ResponseAudioDeltaEvent | ResponseDoneEvent>> { + const sessionInfo = sessionManagement.get(session); + if (!sessionInfo) throw new Error('Cannot handle turn with no active session'); + + sessionInfo.isWorking = true; + const turn: Array<ResponseAudioDeltaEvent | ResponseDoneEvent> = []; + let done = false; + while (!done) { + const message = await waitMessage(session); + turn.push(message); + if (message.type === 'response.done') { + console.log('turn complete'); + done = true; + } + } + + // if session was closed before turn is finished + if (!sessionManagement.has(session)) return []; + + sessionInfo.isWorking = false; + return turn; +} + +async function waitMessage(session: OpenAIRealtimeWebSocket): Promise<ResponseAudioDeltaEvent | ResponseDoneEvent> { + let done = false; + let message: ResponseAudioDeltaEvent | ResponseDoneEvent | undefined; + while (!done) { + const sessionInfo = sessionManagement.get(session); + if (!sessionInfo) throw new Error('Cannot wait for message with no active session'); + + message = sessionInfo.responseQueue.shift(); + if (message) { + handleModelTurn(message, session); + done = true; + } else { + await new Promise(resolve => setTimeout(resolve, 100)); + } + } + return message!; +} + +function handleModelTurn(message: ResponseAudioDeltaEvent | ResponseDoneEvent, session: OpenAIRealtimeWebSocket) { + const sessionInfo = sessionManagement.get(session); + console.log('handle model turn'); + + // if session was closed before turn is finished + if (!sessionInfo) return; + + if (message.type === 'response.audio.delta') { + const inlineData = message.delta; + + // decode base64 to Buffer + const audioBuffer = Buffer.from(inlineData, 'base64'); + + // Stream only the new PCM buffer to Twilio + void sessionInfo.sendAudioChunkToCall(audioBuffer); + } +} + +// TODO typing config +export async function startLiveSession( + config: LLMProviderConfig, + sendAudioChunkToCall: (audioChunk: Buffer) => void | Promise<void>, + closeAudioStream: () => void | Promise<void>, + flushAudioStream: () => void | Promise<void>, +) { + const rt = new OpenAIRealtimeWebSocket({ + model: 'gpt-4o-realtime-preview-2024-12-17', + }); + + let connectedResolve; + const connectedPromise = new Promise<void>(resolve => { + connectedResolve = resolve; + }); + + rt.on('session.created', () => { + connectedResolve(); + }); + + rt.on('response.audio.delta', message => { + console.log(message); + + if (!sessionManagement.has(rt)) { + sessionManagement.set(rt, { + isWorking: false, + responseQueue: [], + config, + sendAudioChunkToCall, + closeAudioStream, + flushAudioStream, + }); + } + + sessionManagement.get(rt)!.responseQueue.push(message); + + const copy = JSON.parse(JSON.stringify(message)); + if (copy.serverContent?.modelTurn?.parts) { + copy.serverContent.modelTurn.parts = copy.serverContent.modelTurn.parts.map(part => { + if (part.inlineData) { + part.inlineData = undefined; + } + return part; + }); + } + + // handle interruption by human + // if (message.serverContent?.interrupted) { + // sessionManagement.get(session)!.responseQueue = []; + // flushTwilioStream(ws); + // } else + if (!sessionManagement.get(rt)!.isWorking) { + console.log('handle turn'); + handleTurn(rt).catch(err => { + if (sessionManagement.has(rt)) { + logger.error({ err }, 'Error in ai live connect session'); + } + + // if session was closed before turn is finished -> ignore + }); + } + }); + + rt.on('response.done', message => { + sessionManagement.get(rt)!.responseQueue.push(message); + + for (const outputObject of message.response.output ?? []) { + if (outputObject.type === 'function_call') { + logger.info({ functionCall: outputObject }, 'function call'); + rt.send({ + type: 'conversation.item.create', + item: { + type: 'function_call_output', + call_id: outputObject.call_id, + output: 'Dossier Nummer existiert in Datenbank.', + }, + }); + rt.send({ + type: 'response.create', + response: { + modalities: ['audio', 'text'], + }, + }); + } + } + }); + + rt.on('error', (e: OpenAIRealtimeError) => { + logger.error({ err: e }, 'Error in ai live connect session'); + sessionManagement.delete(rt); + }); + + rt.socket.onclose = (e: CloseEvent) => { + logger.debug({ reason: e.reason }, 'Close in ai live connect session'); + void closeAudioStream(); + sessionManagement.delete(rt); + }; + + // rt.socket.onmessage = (e: MessageEvent) => { + // logger.info({ message: e.data }, 'Message from ai live connect session'); + // }; + + if (!sessionManagement.has(rt)) { + sessionManagement.set(rt, { + isWorking: false, + responseQueue: [], + config, + sendAudioChunkToCall, + closeAudioStream, + flushAudioStream, + }); + } + + await connectedPromise; + + rt.send({ + type: 'session.update', + session: { + // can only have audio with text not audio only + modalities: ['audio', 'text'], + instructions: config.systemInstruction, + input_audio_format: 'pcm16', + output_audio_format: 'pcm16', + tools: config.tools, + tool_choice: 'auto', + }, + }); + + if (config.firstMessage) { + rt.send({ + type: 'conversation.item.create', + item: { + type: 'message', + role: 'user', + content: [ + { + type: 'input_text', + text: config.firstMessage, + }, + ], + }, + }); + rt.send({ + type: 'response.create', + response: { + modalities: ['audio', 'text'], + }, + }); + } + + void handleTurn(rt).catch(err => { + if (sessionManagement.has(rt)) { + logger.error({ err }, 'Error in ai live connect session'); + } + + // if session was closed before turn is finished -> ignore + }); + + return rt; +} + +export const openaiLLMProvider: ILLMProvider = { + init: async ({ server, config, flushAudioStream, closeAudioStream, sendAudioChunkToCall }) => { + const session = await startLiveSession(config, sendAudioChunkToCall, closeAudioStream, flushAudioStream); + return { + sendAudioChunk: (audioChunk: Buffer) => { + session.send({ + type: 'input_audio_buffer.append', + audio: audioChunk.toString('base64'), + }); + }, + flush: () => {}, + close: () => { + session.close(); + }, + }; + }, +}; diff --git a/lab/voice-assistant/package.json b/lab/voice-assistant/package.json index 02b0293cc32496127b667e747dc8280f29be67a6..bf80283447e8273fae64ccb55a362af418101e84 100644 --- a/lab/voice-assistant/package.json +++ b/lab/voice-assistant/package.json @@ -13,6 +13,7 @@ "@google/genai": "^1.4.0", "alawmulaw": "^6.0.0", "express": "^4.18.2", + "openai": "^4.100.0", "ws": "^8.18.2" }, "devDependencies": { diff --git a/lab/voice-assistant/pcm-to-mulaw.ts b/lab/voice-assistant/pcm-to-mulaw.ts index 01308d29dd8fe6f65d0b47cdd56ead7735c91f5d..0dfbee46445d09b2675e1930de99c8f68e10d65f 100644 --- a/lab/voice-assistant/pcm-to-mulaw.ts +++ b/lab/voice-assistant/pcm-to-mulaw.ts @@ -13,7 +13,7 @@ function downsamplePCM24kTo8k(input: Int16Array): Int16Array { return output; } -export function convertPCMToMulaw(buffer: Buffer): string { +export function convertPCMToMulaw(buffer: Buffer): Buffer { // Convert buffer to 24kHz PCM samples const pcmSamples24k = new Int16Array(buffer.buffer, buffer.byteOffset, buffer.length / 2); // Downsample to 8kHz @@ -21,6 +21,5 @@ export function convertPCMToMulaw(buffer: Buffer): string { // Encode to mulaw const mulawStuff = alawmulaw.mulaw.encode(pcmSamples8k); const mulawBuffer = Buffer.from(mulawStuff); - const mulawBase64 = mulawBuffer.toString('base64'); - return mulawBase64; + return mulawBuffer; } diff --git a/lab/voice-assistant/twilio.ts b/lab/voice-assistant/twilio.ts deleted file mode 100644 index 3060faca18a73e27586d82b3b78cb37c3e1b18ae..0000000000000000000000000000000000000000 --- a/lab/voice-assistant/twilio.ts +++ /dev/null @@ -1,139 +0,0 @@ -import { logger } from '@adornis/base/logging.js'; -import type { LiveConnectConfig, Session } from '@google/genai'; -import express from 'express'; -import { writeFile } from 'fs'; -import { WebSocketServer } from 'ws'; -import { startLiveSession } from './google-ai.js'; -import { convertMulawToPcm } from './mulaw-to-pcm.js'; -import { convertPCMToMulaw } from './pcm-to-mulaw.js'; -import { convertToWav } from './pcm-to-wav.js'; - -// --- Twilio WebSocket audio streaming --- -const twilioWebsockets = new WeakMap<import('ws').WebSocket, string>(); - -function saveBinaryFile(fileName: string, content: Buffer) { - writeFile(fileName, content, 'utf8', err => { - if (err) { - console.error(`Error writing file ${fileName}:`, err); - return; - } - }); -} - -export async function sendAudioToTwilio(buffer: Buffer, ws: import('ws').WebSocket) { - const streamSid = twilioWebsockets.get(ws); - if (!streamSid) { - logger.warn('No active Twilio WebSocket connection to send audio.'); - return; - } - - try { - const mulawBase64 = convertPCMToMulaw(buffer); - - return ws.send( - JSON.stringify({ - event: 'media', - streamSid: streamSid, - media: { payload: mulawBase64 }, - }), - ); - } catch (err) { - logger.error({ err }, 'HÄ'); - return; - } -} - -export function closeTwilioConnection(ws: import('ws').WebSocket) { - ws.close(); -} - -export function flushTwilioStream(ws: import('ws').WebSocket) { - ws.send( - JSON.stringify({ - event: 'clear', - streamSid: twilioWebsockets.get(ws), - }), - ); -} - -export function twilioRouter() { - const router = express.Router(); - - router.use('/twilio-media-stream', (req, res) => { - console.log('twilio-media-stream'); - res.type('xml').send(` - <Response> - <Connect> - <Stream url="wss://zwickroellndi-racct-runtime.nexus.adornis.de/api/twilio/ws" /> - </Connect> - </Response> - `); - }); - - return router; -} - -export function createTwilioWebSocketServer( - actualServer: import('http').Server, - config: Omit<LiveConnectConfig, 'responseModalities'> & { firstMessage: string }, -) { - // --- WebSocket server for Twilio media stream audio saving --- - const wss = new WebSocketServer({ server: actualServer, path: '/api/twilio/ws' }); - - // const audioOutputDir = path.resolve('/home/coder/repositories/ZwickroellNdi-racct/data/', 'audio-output'); - // if (!fs.existsSync(audioOutputDir)) fs.mkdirSync(audioOutputDir, { recursive: true }); - // console.log('audioOutputDir', audioOutputDir); - - // const mediaStreamSaver = new TwilioMediaStreamSaveAudioFile({ - // saveLocation: audioOutputDir, - // saveFilename: 'my-twilio-media-stream-output', - // onSaved: () => console.log('File was saved!'), - // }); - - wss.on('connection', ws => { - twilioWebsockets.set(ws, ''); // Track the active Twilio websocket - ws.on('close', () => { - twilioWebsockets.delete(ws); - }); - - let geminiSession: Session; - let audioParts: string[] = []; - ws.on('message', async data => { - const msg = JSON.parse(data); - switch (msg.event) { - case 'connected': - console.log('A new call has connected'); - break; - case 'start': - console.log(`Starting media stream...`); - twilioWebsockets.set(ws, msg.start.streamSid); - // mediaStreamSaver.twilioStreamStart(); - geminiSession = await startLiveSession(ws, config); - - break; - case 'media': - const muLawBuffer = Buffer.from(msg.media.payload, 'base64'); - const pcmBuffer = convertMulawToPcm(muLawBuffer); - - // only for debugging - audioParts.push(pcmBuffer.toString('base64')); - const buffer = convertToWav(audioParts, 'audio/pcm;rate=24000'); - saveBinaryFile('/home/coder/repositories/ZwickroellNdi-racct/data/test.wav', buffer); - - // console.log('blob: ', pcmBlob.type); - geminiSession?.sendRealtimeInput({ - audio: { data: pcmBuffer.toString('base64'), mimeType: 'audio/pcm;rate=16000' }, - }); - // mediaStreamSaver.twilioStreamMedia(msg.media.payload); - break; - case 'stop': - console.log('Call has ended'); - // mediaStreamSaver.twilioStreamStop(); - geminiSession?.close(); - break; - default: - break; - } - }); - }); -} diff --git a/lab/voice-assistant/voice-agent.ts b/lab/voice-assistant/voice-agent.ts new file mode 100644 index 0000000000000000000000000000000000000000..f6f6b4ba60c1d6aff2e82d9f4a79d4d4948b617b --- /dev/null +++ b/lab/voice-assistant/voice-agent.ts @@ -0,0 +1,71 @@ +import type express from 'express'; +import type { Session } from 'openai/resources/beta/realtime/sessions.mjs'; + +export interface LLMProviderConfig { + tools?: Array<Session.Tool & { resolve: (...args: any[]) => Promise<string> | string }>; + systemInstruction: string; + firstMessage?: string; +} + +export interface ICallProvider { + init: ({ + app, + server, + llmProvider, + audioBufferTransformerProvider, + config, + }: { + app: express.Application; + server: express.Application; + llmProvider: ILLMProvider; + audioBufferTransformerProvider: IAudioBufferTransformerProvider; + config: LLMProviderConfig; + }) => Promise<void> | void; +} + +export interface ILLMSession { + /** needs to be pcm 16khz little endian buffer */ + sendAudioChunk: (audioChunk: Buffer) => Promise<void> | void; + /** closes the session */ + close: () => Promise<void> | void; +} + +export interface ILLMProvider { + init: ({ + server, + config, + audioBufferTransformerProvider, + flushAudioStream, + closeAudioStream, + sendAudioChunkToCall, + }: { + server: express.Application; + config: LLMProviderConfig; + audioBufferTransformerProvider: IAudioBufferTransformerProvider; + flushAudioStream: () => Promise<void> | void; + closeAudioStream: () => Promise<void> | void; + sendAudioChunkToCall: (audioChunk: Buffer) => Promise<void> | void; + }) => Promise<ILLMSession>; +} + +export interface IAudioBufferTransformerProvider { + callToLLM: (audioChunk: Buffer) => Promise<Buffer>; + llmToCall: (audioChunk: Buffer) => Buffer; +} + +export const VoiceAgent = ({ + callProvider, + llmProvider, + audioBufferTransformerProvider, + config, +}: { + callProvider: ICallProvider; + llmProvider: ILLMProvider; + audioBufferTransformerProvider: IAudioBufferTransformerProvider; + config: LLMProviderConfig; +}) => ({ + init: async (app: express.Application, server: express.Application) => { + // await llmProvider.init({ server: app, callProvider, config, audioBufferTransformerProvider }); + await callProvider.init({ app, server, llmProvider, audioBufferTransformerProvider, config }); + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index f7f7df98383faa60a58e66c55619c4f96839701d..b43d7ecc521d00fd86ac4baadef91c2bf2e95c3e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1011,6 +1011,9 @@ importers: express: specifier: ^4.18.2 version: 4.21.2 + openai: + specifier: ^4.100.0 + version: 4.100.0(encoding@0.1.13)(ws@8.18.2)(zod@3.24.4) ws: specifier: ^8.18.2 version: 8.18.2 @@ -15740,7 +15743,7 @@ snapshots: dependencies: bytes: 3.1.2 content-type: 1.0.5 - debug: 4.4.1 + debug: 4.4.1(supports-color@8.1.1) http-errors: 2.0.0 iconv-lite: 0.6.3 on-finished: 2.4.1 @@ -16439,10 +16442,6 @@ snapshots: dependencies: ms: 2.1.2 - debug@4.4.1: - dependencies: - ms: 2.1.3 - debug@4.4.1(supports-color@8.1.1): dependencies: ms: 2.1.3 @@ -17006,7 +17005,7 @@ snapshots: content-type: 1.0.5 cookie: 0.7.1 cookie-signature: 1.2.2 - debug: 4.4.1 + debug: 4.4.1(supports-color@8.1.1) encodeurl: 2.0.0 escape-html: 1.0.3 etag: 1.8.1 @@ -17175,7 +17174,7 @@ snapshots: finalhandler@2.1.0: dependencies: - debug: 4.4.1 + debug: 4.4.1(supports-color@8.1.1) encodeurl: 2.0.0 escape-html: 1.0.3 on-finished: 2.4.1 @@ -17794,7 +17793,7 @@ snapshots: https-proxy-agent@7.0.6: dependencies: agent-base: 7.1.3 - debug: 4.4.1 + debug: 4.4.1(supports-color@8.1.1) transitivePeerDependencies: - supports-color @@ -20452,7 +20451,7 @@ snapshots: router@2.2.0: dependencies: - debug: 4.4.1 + debug: 4.4.1(supports-color@8.1.1) depd: 2.0.0 is-promise: 4.0.0 parseurl: 1.3.3 @@ -20593,7 +20592,7 @@ snapshots: send@1.2.0: dependencies: - debug: 4.4.1 + debug: 4.4.1(supports-color@8.1.1) encodeurl: 2.0.0 escape-html: 1.0.3 etag: 1.8.1