Skip to main content
Back to Blog

Voice AI Agents: Building Real-Time Conversational Systems

A comprehensive guide to building voice AI agents—real-time speech APIs, WebRTC integration, turn-taking, interruption handling, telephony integration with Twilio, and production patterns for voice-first AI systems.

17 min read
Share:

The Voice-First Future

Voice interfaces are becoming the primary way users interact with AI. OpenAI's Realtime API, ElevenLabs conversational AI, and similar technologies have made natural, low-latency voice conversations with AI not just possible but practical.

This guide covers everything you need to build production voice AI agents: real-time speech APIs, audio streaming, turn-taking, interruption handling, telephony integration, and the unique challenges of voice-first design.

Prerequisites:

  • Familiarity with building agentic AI systems
  • Basic understanding of WebSockets and streaming
  • Python or JavaScript experience

What you'll learn:

  • Real-time speech-to-speech architecture
  • OpenAI Realtime API integration
  • WebRTC for browser-based voice
  • Turn-taking and interruption handling
  • Twilio telephony integration
  • Production latency optimization

Voice AI Architecture

Voice AI systems have unique requirements compared to text-based agents:

Code
┌─────────────────────────────────────────────────────────────────┐
│                     Voice AI Agent                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────┐    ┌───────────┐    ┌─────────┐    ┌──────────┐  │
│  │  Audio  │───▶│   Voice   │───▶│   LLM   │───▶│  Speech  │  │
│  │  Input  │    │  Activity │    │  Agent  │    │ Synthesis│  │
│  │(Microphone)  │ Detection │    │         │    │  (TTS)   │  │
│  └─────────┘    └───────────┘    └─────────┘    └──────────┘  │
│       │              │                │              │         │
│       │              ▼                ▼              │         │
│       │        ┌───────────┐    ┌─────────┐         │         │
│       │        │   Turn    │    │  Tool   │         │         │
│       │        │  Taking   │    │  Use    │         │         │
│       │        │  Manager  │    │         │         │         │
│       │        └───────────┘    └─────────┘         │         │
│       │                                             │         │
│       └─────────────────────────────────────────────┘         │
│                    Interruption Loop                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Key differences from text agents:

AspectText AgentVoice Agent
InputComplete messagesContinuous audio stream
LatencySeconds acceptable<500ms critical
Turn-takingExplicit (send button)Implicit (silence detection)
InterruptionN/AMust handle mid-utterance
ContextFull message historyLimited by working memory
ErrorsCan re-readGone once spoken

OpenAI Realtime API

The Realtime API provides speech-to-speech with function calling, enabling true voice agents.

Basic Connection

Python
import asyncio
import websockets
import json
import base64
import pyaudio

class RealtimeVoiceClient:
    """Client for OpenAI Realtime API."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.ws = None
        self.audio_buffer = []

        # Audio settings
        self.sample_rate = 24000
        self.channels = 1
        self.chunk_size = 1024

    async def connect(self):
        """Connect to Realtime API."""
        url = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "OpenAI-Beta": "realtime=v1"
        }

        self.ws = await websockets.connect(url, extra_headers=headers)

        # Configure session
        await self.ws.send(json.dumps({
            "type": "session.update",
            "session": {
                "modalities": ["text", "audio"],
                "instructions": "You are a helpful voice assistant. Be concise and conversational.",
                "voice": "alloy",
                "input_audio_format": "pcm16",
                "output_audio_format": "pcm16",
                "input_audio_transcription": {
                    "model": "whisper-1"
                },
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                    "prefix_padding_ms": 300,
                    "silence_duration_ms": 500
                }
            }
        }))

        print("Connected to Realtime API")

    async def send_audio(self, audio_data: bytes):
        """Send audio chunk to API."""
        if self.ws:
            await self.ws.send(json.dumps({
                "type": "input_audio_buffer.append",
                "audio": base64.b64encode(audio_data).decode()
            }))

    async def receive_loop(self, on_audio: callable, on_transcript: callable):
        """Receive and process responses."""
        async for message in self.ws:
            event = json.loads(message)
            event_type = event.get("type")

            if event_type == "response.audio.delta":
                # Audio response chunk
                audio_data = base64.b64decode(event["delta"])
                await on_audio(audio_data)

            elif event_type == "response.audio_transcript.delta":
                # Transcript of AI response
                await on_transcript(event["delta"])

            elif event_type == "conversation.item.input_audio_transcription.completed":
                # User's speech transcribed
                print(f"User said: {event['transcript']}")

            elif event_type == "response.done":
                print("Response complete")

            elif event_type == "error":
                print(f"Error: {event['error']}")

    async def close(self):
        """Close connection."""
        if self.ws:
            await self.ws.close()

Key session configuration options:

  • modalities: Set to ["text", "audio"] for full voice capabilities. You can use ["text"] alone if you want text-only responses.

  • voice: Choose from available voices (alloy, echo, fable, onyx, nova, shimmer). Each has distinct characteristics for different use cases.

  • input_audio_format/output_audio_format: PCM16 is the most common for low-latency streaming. The API also supports G.711 mu-law for telephony integration.

  • turn_detection: Server-side VAD (Voice Activity Detection) handles speech/silence detection automatically. Key parameters:

    • threshold: Sensitivity (0.0-1.0). Lower = more sensitive to quiet speech
    • prefix_padding_ms: Audio to include before detected speech start
    • silence_duration_ms: How long to wait before considering turn complete

Adding Function Calling

Python
class RealtimeVoiceAgent(RealtimeVoiceClient):
    """Voice agent with tool use."""

    def __init__(self, api_key: str, tools: list[dict]):
        super().__init__(api_key)
        self.tools = tools
        self.tool_handlers = {}

    async def connect(self):
        """Connect with tools configured."""
        await super().connect()

        # Add tools to session
        await self.ws.send(json.dumps({
            "type": "session.update",
            "session": {
                "tools": self.tools,
                "tool_choice": "auto"
            }
        }))

    def register_tool(self, name: str, handler: callable):
        """Register a tool handler."""
        self.tool_handlers[name] = handler

    async def receive_loop(self, on_audio: callable, on_transcript: callable):
        """Receive and process responses including tool calls."""
        async for message in self.ws:
            event = json.loads(message)
            event_type = event.get("type")

            if event_type == "response.audio.delta":
                audio_data = base64.b64decode(event["delta"])
                await on_audio(audio_data)

            elif event_type == "response.function_call_arguments.done":
                # Tool call requested
                await self._handle_tool_call(event)

            elif event_type == "response.audio_transcript.delta":
                await on_transcript(event["delta"])

    async def _handle_tool_call(self, event: dict):
        """Handle a tool call from the model."""
        call_id = event["call_id"]
        name = event["name"]
        arguments = json.loads(event["arguments"])

        print(f"Tool call: {name}({arguments})")

        if name in self.tool_handlers:
            try:
                result = await self.tool_handlers[name](**arguments)
                output = json.dumps(result)
            except Exception as e:
                output = json.dumps({"error": str(e)})
        else:
            output = json.dumps({"error": f"Unknown tool: {name}"})

        # Send tool result
        await self.ws.send(json.dumps({
            "type": "conversation.item.create",
            "item": {
                "type": "function_call_output",
                "call_id": call_id,
                "output": output
            }
        }))

        # Request continuation
        await self.ws.send(json.dumps({
            "type": "response.create"
        }))

# Define tools
weather_tool = {
    "type": "function",
    "name": "get_weather",
    "description": "Get current weather for a location",
    "parameters": {
        "type": "object",
        "properties": {
            "location": {
                "type": "string",
                "description": "City name"
            }
        },
        "required": ["location"]
    }
}

# Usage
agent = RealtimeVoiceAgent(api_key, tools=[weather_tool])

async def get_weather(location: str):
    # Actual implementation would call weather API
    return {"temperature": 72, "condition": "sunny", "location": location}

agent.register_tool("get_weather", get_weather)

Tool handling flow:

  1. Tool call detection: When the model decides to call a tool, you receive a response.function_call_arguments.done event with the function name and arguments.

  2. Execute locally: The tool handler runs on your server (not in the API). This gives you full control over what tools can do.

  3. Return result: Send the result back via conversation.item.create with type function_call_output. Include the call_id to match the request.

  4. Continue generation: After providing the tool result, send response.create to trigger the model to continue speaking with the tool's information.

Why not wait for user confirmation? Unlike text chat where users can review before sending, voice conversations should flow naturally. The model decides when to use tools based on the conversation context.

Complete Voice Application

Python
import asyncio
import pyaudio
import numpy as np
from queue import Queue
from threading import Thread

class VoiceApp:
    """Complete voice application."""

    def __init__(self, api_key: str):
        self.agent = RealtimeVoiceAgent(api_key, tools=[weather_tool])
        self.audio = pyaudio.PyAudio()
        self.input_queue = Queue()
        self.output_queue = Queue()
        self.running = False

    async def start(self):
        """Start the voice application."""
        await self.agent.connect()
        self.running = True

        # Register tools
        self.agent.register_tool("get_weather", self._get_weather)

        # Start audio threads
        input_thread = Thread(target=self._capture_audio)
        output_thread = Thread(target=self._play_audio)
        input_thread.start()
        output_thread.start()

        # Start send/receive tasks
        await asyncio.gather(
            self._send_audio_loop(),
            self.agent.receive_loop(
                on_audio=self._queue_output_audio,
                on_transcript=self._on_transcript
            )
        )

    def _capture_audio(self):
        """Capture audio from microphone."""
        stream = self.audio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=24000,
            input=True,
            frames_per_buffer=1024
        )

        while self.running:
            data = stream.read(1024, exception_on_overflow=False)
            self.input_queue.put(data)

        stream.close()

    def _play_audio(self):
        """Play audio responses."""
        stream = self.audio.open(
            format=pyaudio.paInt16,
            channels=1,
            rate=24000,
            output=True,
            frames_per_buffer=1024
        )

        while self.running:
            if not self.output_queue.empty():
                data = self.output_queue.get()
                stream.write(data)

        stream.close()

    async def _send_audio_loop(self):
        """Send captured audio to API."""
        while self.running:
            if not self.input_queue.empty():
                data = self.input_queue.get()
                await self.agent.send_audio(data)
            await asyncio.sleep(0.01)

    async def _queue_output_audio(self, audio_data: bytes):
        """Queue audio for playback."""
        self.output_queue.put(audio_data)

    async def _on_transcript(self, text: str):
        """Handle transcript updates."""
        print(f"Assistant: {text}", end="", flush=True)

    async def _get_weather(self, location: str):
        """Weather tool implementation."""
        return {"temperature": 72, "condition": "sunny", "location": location}

    async def stop(self):
        """Stop the application."""
        self.running = False
        await self.agent.close()
        self.audio.terminate()

# Run
async def main():
    app = VoiceApp(api_key="your-api-key")
    try:
        await app.start()
    except KeyboardInterrupt:
        await app.stop()

asyncio.run(main())

Browser-Based Voice with WebRTC

WebRTC Voice Client

JavaScript
// voice-client.js
class VoiceClient {
  constructor(serverUrl) {
    this.serverUrl = serverUrl;
    this.ws = null;
    this.mediaStream = null;
    this.audioContext = null;
    this.processor = null;
    this.isListening = false;
  }

  async connect() {
    // Get microphone access
    this.mediaStream = await navigator.mediaDevices.getUserMedia({
      audio: {
        sampleRate: 24000,
        channelCount: 1,
        echoCancellation: true,
        noiseSuppression: true,
      }
    });

    // Set up audio processing
    this.audioContext = new AudioContext({ sampleRate: 24000 });
    const source = this.audioContext.createMediaStreamSource(this.mediaStream);

    // Create processor for sending audio
    await this.audioContext.audioWorklet.addModule('audio-processor.js');
    this.processor = new AudioWorkletNode(this.audioContext, 'audio-processor');

    this.processor.port.onmessage = (event) => {
      if (this.isListening && this.ws?.readyState === WebSocket.OPEN) {
        this.ws.send(event.data);
      }
    };

    source.connect(this.processor);

    // Connect to server
    this.ws = new WebSocket(this.serverUrl);
    this.ws.binaryType = 'arraybuffer';

    this.ws.onmessage = (event) => {
      this.handleMessage(event.data);
    };

    this.ws.onopen = () => {
      console.log('Connected to voice server');
      this.isListening = true;
    };
  }

  handleMessage(data) {
    if (data instanceof ArrayBuffer) {
      // Audio data - play it
      this.playAudio(data);
    } else {
      // JSON message
      const message = JSON.parse(data);
      this.handleEvent(message);
    }
  }

  async playAudio(audioData) {
    const audioBuffer = await this.audioContext.decodeAudioData(audioData);
    const source = this.audioContext.createBufferSource();
    source.buffer = audioBuffer;
    source.connect(this.audioContext.destination);
    source.start();
  }

  handleEvent(event) {
    switch (event.type) {
      case 'transcript':
        this.onTranscript?.(event.text, event.isFinal);
        break;
      case 'response_start':
        this.onResponseStart?.();
        break;
      case 'response_end':
        this.onResponseEnd?.();
        break;
      case 'tool_call':
        this.onToolCall?.(event.name, event.arguments);
        break;
    }
  }

  startListening() {
    this.isListening = true;
  }

  stopListening() {
    this.isListening = false;
  }

  disconnect() {
    this.isListening = false;
    this.processor?.disconnect();
    this.mediaStream?.getTracks().forEach(track => track.stop());
    this.ws?.close();
  }
}

// Audio processor worklet
// audio-processor.js
class AudioProcessor extends AudioWorkletProcessor {
  constructor() {
    super();
    this.buffer = [];
    this.bufferSize = 2400; // 100ms at 24kHz
  }

  process(inputs) {
    const input = inputs[0][0];
    if (input) {
      // Convert float32 to int16
      const int16 = new Int16Array(input.length);
      for (let i = 0; i < input.length; i++) {
        int16[i] = Math.max(-32768, Math.min(32767, input[i] * 32768));
      }

      this.buffer.push(...int16);

      if (this.buffer.length >= this.bufferSize) {
        const chunk = new Int16Array(this.buffer.splice(0, this.bufferSize));
        this.port.postMessage(chunk.buffer);
      }
    }
    return true;
  }
}

registerProcessor('audio-processor', AudioProcessor);

Voice UI Component

JavaScript
// VoiceUI.jsx
import React, { useState, useEffect, useRef } from 'react';

const VoiceUI = ({ serverUrl }) => {
  const [isConnected, setIsConnected] = useState(false);
  const [isListening, setIsListening] = useState(false);
  const [isSpeaking, setIsSpeaking] = useState(false);
  const [transcript, setTranscript] = useState('');
  const [response, setResponse] = useState('');

  const clientRef = useRef(null);

  useEffect(() => {
    const client = new VoiceClient(serverUrl);

    client.onTranscript = (text, isFinal) => {
      setTranscript(prev => isFinal ? text : prev + text);
    };

    client.onResponseStart = () => {
      setIsSpeaking(true);
      setResponse('');
    };

    client.onResponseEnd = () => {
      setIsSpeaking(false);
    };

    clientRef.current = client;

    return () => {
      client.disconnect();
    };
  }, [serverUrl]);

  const handleConnect = async () => {
    await clientRef.current.connect();
    setIsConnected(true);
    setIsListening(true);
  };

  const handleDisconnect = () => {
    clientRef.current.disconnect();
    setIsConnected(false);
    setIsListening(false);
  };

  const toggleListening = () => {
    if (isListening) {
      clientRef.current.stopListening();
    } else {
      clientRef.current.startListening();
    }
    setIsListening(!isListening);
  };

  return (
    <div className="voice-ui">
      <div className="status">
        <span className={`indicator ${isConnected ? 'connected' : ''}`} />
        {isConnected ? 'Connected' : 'Disconnected'}
      </div>

      <div className="visualizer">
        {isSpeaking && <div className="speaking-animation" />}
        {isListening && !isSpeaking && <div className="listening-animation" />}
      </div>

      <div className="transcript">
        <p><strong>You:</strong> {transcript}</p>
        <p><strong>Assistant:</strong> {response}</p>
      </div>

      <div className="controls">
        {!isConnected ? (
          <button onClick={handleConnect}>Connect</button>
        ) : (
          <>
            <button onClick={toggleListening}>
              {isListening ? 'Mute' : 'Unmute'}
            </button>
            <button onClick={handleDisconnect}>Disconnect</button>
          </>
        )}
      </div>
    </div>
  );
};

export default VoiceUI;

Turn-Taking and Interruption

Voice Activity Detection

Python
import numpy as np
from collections import deque

class VoiceActivityDetector:
    """Detect voice activity for turn-taking."""

    def __init__(
        self,
        sample_rate: int = 24000,
        frame_duration_ms: int = 30,
        threshold: float = 0.02,
        speech_pad_ms: int = 300,
        silence_duration_ms: int = 500
    ):
        self.sample_rate = sample_rate
        self.frame_size = int(sample_rate * frame_duration_ms / 1000)
        self.threshold = threshold
        self.speech_pad_frames = int(speech_pad_ms / frame_duration_ms)
        self.silence_frames = int(silence_duration_ms / frame_duration_ms)

        self.is_speaking = False
        self.silence_count = 0
        self.speech_count = 0
        self.ring_buffer = deque(maxlen=self.speech_pad_frames)

    def process_frame(self, audio_frame: np.ndarray) -> dict:
        """Process an audio frame and return VAD state."""
        # Calculate RMS energy
        rms = np.sqrt(np.mean(audio_frame.astype(np.float32) ** 2))
        is_speech = rms > self.threshold

        self.ring_buffer.append(is_speech)

        result = {
            "is_speech": is_speech,
            "rms": rms,
            "state_changed": False,
            "event": None
        }

        if self.is_speaking:
            if is_speech:
                self.silence_count = 0
            else:
                self.silence_count += 1

            if self.silence_count >= self.silence_frames:
                # End of speech
                self.is_speaking = False
                self.silence_count = 0
                result["state_changed"] = True
                result["event"] = "speech_end"
        else:
            if is_speech:
                self.speech_count += 1
                if self.speech_count >= 3:  # Require consecutive speech frames
                    self.is_speaking = True
                    self.speech_count = 0
                    result["state_changed"] = True
                    result["event"] = "speech_start"
            else:
                self.speech_count = 0

        result["is_speaking"] = self.is_speaking
        return result


class TurnManager:
    """Manage conversation turns between user and agent."""

    def __init__(self):
        self.vad = VoiceActivityDetector()
        self.current_turn = "agent"  # "user" or "agent"
        self.agent_speaking = False
        self.user_buffer = []
        self.pending_interruption = False

    def process_user_audio(self, audio: np.ndarray) -> dict:
        """Process user audio and manage turns."""
        vad_result = self.vad.process_frame(audio)

        result = {
            "action": None,
            "audio": None
        }

        if vad_result["event"] == "speech_start":
            if self.agent_speaking:
                # User interrupting agent
                self.pending_interruption = True
                result["action"] = "interrupt"
            else:
                # User starting to speak
                self.current_turn = "user"
                self.user_buffer = []
                result["action"] = "user_start"

        if self.current_turn == "user" and vad_result["is_speaking"]:
            self.user_buffer.append(audio)

        if vad_result["event"] == "speech_end" and self.current_turn == "user":
            # User finished speaking
            result["action"] = "user_end"
            result["audio"] = np.concatenate(self.user_buffer)
            self.user_buffer = []
            self.current_turn = "agent"

        return result

    def start_agent_response(self):
        """Mark agent as speaking."""
        self.agent_speaking = True
        self.current_turn = "agent"

    def end_agent_response(self):
        """Mark agent as done speaking."""
        self.agent_speaking = False

    def handle_interruption(self) -> bool:
        """Check and handle pending interruption."""
        if self.pending_interruption:
            self.pending_interruption = False
            self.agent_speaking = False
            self.current_turn = "user"
            return True
        return False

Voice Activity Detection (VAD) algorithm:

  1. Energy calculation: For each audio frame, compute RMS (root mean square) energy. This indicates how "loud" the audio is.

  2. Threshold comparison: If RMS exceeds the threshold, consider it speech; otherwise, silence.

  3. Hysteresis: Require consecutive speech frames (3 in this implementation) before triggering speech_start. This prevents false triggers from brief noises.

  4. End-of-speech detection: Count consecutive silence frames. Only trigger speech_end after sustained silence (500ms default). This handles natural pauses mid-sentence.

Turn-taking challenges:

  • Backchannel filtering: "Uh-huh" and "mm-hmm" shouldn't trigger turn changes
  • Interruption vs. agreement: User speaking while agent talks could be interruption or just acknowledgment
  • Cross-talk: Both parties speaking simultaneously needs special handling

The TurnManager class coordinates these states, tracking who "owns" the current turn and detecting when control should transfer.

Interruption Handling

Interruption is one of the most complex aspects of voice AI. When a user starts speaking while the agent is responding, you need to decide: Is this an interruption (stop talking), a backchannel acknowledgment ("uh-huh"), or accidental noise? The following implementation cancels the current response when a true interruption is detected:

Python
class InterruptibleVoiceAgent:
    """Voice agent that handles interruptions gracefully."""

    def __init__(self, realtime_client: RealtimeVoiceAgent):
        self.client = realtime_client
        self.turn_manager = TurnManager()
        self.response_buffer = []
        self.is_generating = False

    async def handle_audio(self, audio: np.ndarray):
        """Handle incoming user audio."""
        result = self.turn_manager.process_user_audio(audio)

        if result["action"] == "interrupt":
            await self._handle_interruption()

        elif result["action"] == "user_end":
            # Send complete utterance for processing
            await self._process_user_utterance(result["audio"])

    async def _handle_interruption(self):
        """Handle user interruption."""
        if self.is_generating:
            # Cancel current response
            await self.client.ws.send(json.dumps({
                "type": "response.cancel"
            }))

            self.is_generating = False
            self.response_buffer = []

            # Acknowledge interruption
            print("Response cancelled due to interruption")

    async def _process_user_utterance(self, audio: np.ndarray):
        """Process complete user utterance."""
        # Send audio to API
        await self.client.send_audio(audio.tobytes())

        # Commit the audio buffer
        await self.client.ws.send(json.dumps({
            "type": "input_audio_buffer.commit"
        }))

        # Request response
        await self.client.ws.send(json.dumps({
            "type": "response.create"
        }))

        self.is_generating = True
        self.turn_manager.start_agent_response()

    async def handle_response_chunk(self, audio_chunk: bytes):
        """Handle response audio chunk."""
        # Check for interruption before playing
        if self.turn_manager.handle_interruption():
            return

        self.response_buffer.append(audio_chunk)
        # Play audio...

    async def handle_response_end(self):
        """Handle end of response."""
        self.is_generating = False
        self.turn_manager.end_agent_response()
        self.response_buffer = []

Key patterns in this interruption handler:

  1. Immediate cancellation: When _handle_interruption is called, the agent sends a response.cancel message to the API. This stops generation server-side immediately—no point generating audio the user won't hear.

  2. Buffer clearing: The response_buffer is cleared on interruption. Any audio chunks already generated but not yet played are discarded. This prevents awkward situations where the agent continues speaking after the user interrupted.

  3. State coordination: The TurnManager coordinates between detecting user speech (handle_audio) and checking for interruptions during playback (handle_response_chunk). This two-point check handles both cases: user interrupting before playback starts and user interrupting mid-playback.

  4. Fire-and-forget cancellation: Note that response.cancel doesn't require waiting for confirmation. The agent immediately resets its state and is ready to process the user's new utterance.

Edge cases to consider:

  • Backchannel filtering: "Uh-huh" or "okay" mid-response usually shouldn't trigger interruption. A more sophisticated implementation would check audio duration and energy patterns.
  • Interruption during tool execution: If the agent called a tool and is waiting for results, interruption handling becomes complex—you may want to let the tool complete but suppress the response.
  • Partial response recovery: In some cases, you might want to remember where the agent was interrupted and resume later ("As I was saying...").

Twilio Telephony Integration

While WebRTC works for browser-based voice AI, many production use cases require telephone integration—customer support lines, appointment reminders, outbound campaigns. Twilio provides the bridge between traditional phone networks (PSTN) and your voice AI backend.

Twilio Voice Webhook Server

The architecture involves two connection types: (1) an HTTP webhook that Twilio calls when a phone call arrives, and (2) a WebSocket for streaming audio bidirectionally during the call. The webhook returns TwiML (Twilio Markup Language) instructions, including a directive to open a media stream to your server:

Python
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import Response
import asyncio

app = FastAPI()

# Store active calls
active_calls = {}

@app.post("/incoming-call")
async def handle_incoming_call(request: Request):
    """Handle incoming Twilio call."""
    form_data = await request.form()
    call_sid = form_data.get("CallSid")

    # Return TwiML to connect to WebSocket
    twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
    <Response>
        <Say>Welcome to the AI assistant. How can I help you today?</Say>
        <Connect>
            <Stream url="wss://{request.headers['host']}/media-stream/{call_sid}" />
        </Connect>
    </Response>"""

    return Response(content=twiml, media_type="application/xml")

@app.websocket("/media-stream/{call_sid}")
async def media_stream(websocket: WebSocket, call_sid: str):
    """Handle Twilio media stream."""
    await websocket.accept()

    # Create voice agent for this call
    agent = TwilioVoiceAgent(call_sid)
    active_calls[call_sid] = agent

    try:
        await agent.connect()

        async for message in websocket.iter_text():
            data = json.loads(message)
            event = data.get("event")

            if event == "media":
                # Incoming audio from caller
                audio_payload = data["media"]["payload"]
                audio_bytes = base64.b64decode(audio_payload)

                # Convert mulaw to PCM
                pcm_audio = audioop.ulaw2lin(audio_bytes, 2)

                # Process with agent
                await agent.process_audio(pcm_audio)

            elif event == "start":
                stream_sid = data["start"]["streamSid"]
                agent.stream_sid = stream_sid

            elif event == "stop":
                break

    finally:
        await agent.disconnect()
        del active_calls[call_sid]


class TwilioVoiceAgent:
    """Voice agent for Twilio calls."""

    def __init__(self, call_sid: str):
        self.call_sid = call_sid
        self.stream_sid = None
        self.realtime_client = None
        self.websocket = None

    async def connect(self):
        """Connect to OpenAI Realtime API."""
        self.realtime_client = RealtimeVoiceClient(api_key)
        await self.realtime_client.connect()

    async def process_audio(self, pcm_audio: bytes):
        """Process incoming audio from Twilio."""
        # Resample from 8kHz (Twilio) to 24kHz (OpenAI)
        resampled = self._resample(pcm_audio, 8000, 24000)
        await self.realtime_client.send_audio(resampled)

    async def send_audio_to_twilio(self, audio: bytes):
        """Send audio response back to Twilio."""
        # Resample from 24kHz to 8kHz
        resampled = self._resample(audio, 24000, 8000)

        # Convert PCM to mulaw
        mulaw_audio = audioop.lin2ulaw(resampled, 2)

        # Send via WebSocket
        message = {
            "event": "media",
            "streamSid": self.stream_sid,
            "media": {
                "payload": base64.b64encode(mulaw_audio).decode()
            }
        }
        await self.websocket.send_text(json.dumps(message))

    def _resample(self, audio: bytes, from_rate: int, to_rate: int) -> bytes:
        """Resample audio."""
        import audioop
        return audioop.ratecv(audio, 2, 1, from_rate, to_rate, None)[0]

    async def disconnect(self):
        """Disconnect from APIs."""
        if self.realtime_client:
            await self.realtime_client.close()

Understanding the Twilio integration flow:

  1. Incoming call webhook: When someone calls your Twilio number, Twilio makes an HTTP POST to /incoming-call. You respond with TwiML that says "play a greeting, then connect to this WebSocket for media streaming."

  2. Media stream WebSocket: Twilio opens a WebSocket connection to /media-stream/{call_sid}. This carries raw audio in both directions—user audio comes in as base64-encoded mu-law, and you send agent audio back in the same format.

  3. Audio format conversion: Twilio uses mu-law (G.711) at 8kHz—the standard telephone codec. The OpenAI Realtime API uses PCM16 at 24kHz. You must convert between formats:

    • audioop.ulaw2lin() converts mu-law to PCM
    • audioop.lin2ulaw() converts PCM back to mu-law
    • audioop.ratecv() handles resampling (8kHz ↔ 24kHz)
  4. Stream events: Twilio sends three key events:

    • start: Stream initialized, contains streamSid for sending audio back
    • media: Contains payload with base64-encoded audio chunks
    • stop: Call ended, clean up resources

Why resampling matters: Telephone networks operate at 8kHz for historical bandwidth reasons. Modern speech models work at 24kHz for better quality. The 3x resampling adds latency and can introduce artifacts. High-quality resampling libraries like soxr provide better results than audioop for production use.

Outbound Calling

Outbound calling flips the flow—your system initiates the call rather than waiting for incoming calls. This is essential for proactive use cases: appointment reminders, delivery notifications, survey calls, or sales outreach. The pattern is similar but starts with an API call to Twilio:

Python
from twilio.rest import Client

class OutboundCaller:
    """Make outbound calls with voice AI."""

    def __init__(self, account_sid: str, auth_token: str, from_number: str):
        self.client = Client(account_sid, auth_token)
        self.from_number = from_number

    def make_call(self, to_number: str, webhook_url: str) -> str:
        """Initiate an outbound call."""
        call = self.client.calls.create(
            to=to_number,
            from_=self.from_number,
            url=webhook_url,
            status_callback=f"{webhook_url}/status",
            status_callback_event=["initiated", "ringing", "answered", "completed"]
        )
        return call.sid

    def create_greeting_twiml(self, message: str, stream_url: str) -> str:
        """Create TwiML for outbound call."""
        return f"""<?xml version="1.0" encoding="UTF-8"?>
        <Response>
            <Say voice="Polly.Amy">{message}</Say>
            <Connect>
                <Stream url="{stream_url}" />
            </Connect>
        </Response>"""

# Usage
caller = OutboundCaller(
    account_sid="ACxxxxx",
    auth_token="xxxxx",
    from_number="+1234567890"
)

call_sid = caller.make_call(
    to_number="+0987654321",
    webhook_url="https://your-server.com/outbound-call"
)

Outbound calling flow:

  1. Initiate call: Use client.calls.create() to start the call. Twilio dials the recipient and, when they answer, fetches TwiML from your webhook_url.

  2. Status callbacks: The status_callback_event parameter tells Twilio to notify you as the call progresses—initiated, ringing, answered, completed. This is crucial for tracking call outcomes and triggering follow-up actions.

  3. Initial greeting: The TwiML typically includes a <Say> element to deliver an initial message (using Twilio's TTS), then <Connect><Stream> to hand off to your voice AI.

Compliance considerations for outbound calling:

  • TCPA compliance: In the US, automated calls to mobile phones require prior consent. Violations carry significant fines (500500-1500 per call).
  • Do-not-call lists: Maintain and honor opt-out requests within 30 days.
  • Caller ID: Display a valid, answerable number—spoofing caller ID for malicious purposes is illegal.
  • Time restrictions: Many jurisdictions restrict calling hours (e.g., 8am-9pm in the recipient's time zone).
  • Rate limiting: Twilio and carriers may flag or block accounts making too many calls too quickly.

Latency Optimization

Latency is the make-or-break factor for voice AI. Human conversation has natural pauses of 200-500ms between turns. Exceed 1 second of response time and the conversation feels sluggish; exceed 2 seconds and users start wondering if the system is broken.

End-to-End Latency Analysis

Understanding where latency comes from is the first step to optimizing it. The voice AI pipeline has multiple stages, each contributing delay:

Code
User speaks → Silence detected → Audio sent → STT → LLM → TTS → Audio plays
   |              |                |           |      |      |         |
   0ms          ~500ms          ~600ms      ~800ms ~1500ms ~1800ms   ~2000ms

Target: <1000ms total latency for responsive conversation

Breaking down the latency budget:

  • Silence detection (~500ms): VAD must wait for sustained silence to confirm end-of-turn. Too aggressive = cutting off users mid-sentence. Too conservative = slow responses. The 300-500ms range balances these.

  • Network round-trip (~100ms): Audio upload to server, WebSocket latency. Geographically distributed servers help.

  • Speech-to-text (~200ms): Whisper and similar models are fast but not instant. Streaming STT reduces this by processing audio as it arrives.

  • LLM inference (~500-1000ms): The biggest variable. First-token latency matters most—streaming lets TTS start before LLM finishes.

  • Text-to-speech (~200-300ms): Like STT, streaming TTS generates audio chunk-by-chunk rather than waiting for complete text.

The OpenAI Realtime API reduces latency dramatically by fusing STT→LLM→TTS into a single model. Instead of three separate round-trips, you get one—and the model can start speaking before it finishes "thinking."

Optimization Strategies

The following classes demonstrate key optimization techniques. Each targets a different bottleneck in the pipeline:

Python
class LatencyOptimizedAgent:
    """Voice agent optimized for low latency."""

    def __init__(self):
        # Use server-side VAD to minimize latency
        self.use_server_vad = True

        # Reduce silence detection threshold
        self.silence_duration_ms = 300  # Faster turn detection

        # Enable response prefetching
        self.prefetch_enabled = True

        # Use streaming TTS
        self.streaming_tts = True

    async def configure_session(self, ws):
        """Configure low-latency session."""
        await ws.send(json.dumps({
            "type": "session.update",
            "session": {
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.4,  # Lower threshold for faster detection
                    "prefix_padding_ms": 200,  # Less padding
                    "silence_duration_ms": 300  # Faster end-of-turn
                },
                "input_audio_transcription": {
                    "model": "whisper-1"
                },
                # Request shorter responses for speed
                "instructions": "Be very concise. Answer in 1-2 sentences."
            }
        }))

    def optimize_network(self):
        """Network-level optimizations."""
        return {
            # Use regional endpoints
            "endpoint": "wss://api.openai.com/v1/realtime",

            # Keep connection alive
            "keepalive": True,

            # Disable Nagle's algorithm for lower latency
            "tcp_nodelay": True,

            # Use binary WebSocket frames when possible
            "binary_frames": True
        }

class AudioStreamOptimizer:
    """Optimize audio streaming for low latency."""

    def __init__(self, sample_rate: int = 24000):
        self.sample_rate = sample_rate
        self.buffer_duration_ms = 20  # Small chunks
        self.buffer_size = int(sample_rate * self.buffer_duration_ms / 1000)

    def create_optimized_stream(self):
        """Create optimized audio stream."""
        return {
            "format": "pcm16",
            "sample_rate": self.sample_rate,
            "buffer_size": self.buffer_size,
            # Use low-latency audio APIs
            "latency": "low",
            # Disable audio processing that adds latency
            "echo_cancellation": True,  # Keep this for quality
            "noise_suppression": False,  # Disable for speed
            "auto_gain_control": False  # Disable for speed
        }

class ResponseStreamer:
    """Stream responses with minimal buffering."""

    def __init__(self, min_chunk_ms: int = 50):
        self.min_chunk_size = int(24000 * min_chunk_ms / 1000) * 2  # bytes

    async def stream_response(self, audio_generator, play_func):
        """Stream audio with minimal delay."""
        buffer = bytearray()

        async for chunk in audio_generator:
            buffer.extend(chunk)

            # Play as soon as we have minimum chunk
            while len(buffer) >= self.min_chunk_size:
                to_play = bytes(buffer[:self.min_chunk_size])
                del buffer[:self.min_chunk_size]
                await play_func(to_play)
                # Don't await completion - fire and forget for overlap

Explaining each optimization:

1. Session Configuration (LatencyOptimizedAgent):

  • silence_duration_ms: 300 is aggressive—short pauses won't trigger end-of-turn, but the agent responds faster when the user truly finishes.
  • threshold: 0.4 is lower than default, catching quieter speech but risking more false positives.
  • prefix_padding_ms: 200 keeps less audio before detected speech, reducing buffering delay.
  • Short instructions encourage concise responses, reducing TTS time.

2. Network Optimizations (optimize_network):

  • tcp_nodelay: True disables Nagle's algorithm, which normally batches small packets. For real-time audio, you want every packet sent immediately.
  • binary_frames avoids base64 encoding overhead (33% size increase) when the WebSocket library supports it.
  • Keep-alive prevents connection teardown/setup delays between turns.

3. Audio Streaming (AudioStreamOptimizer):

  • buffer_duration_ms: 20 means tiny chunks (480 samples at 24kHz). Smaller chunks = less latency but more CPU overhead.
  • Echo cancellation is kept enabled—without it, the agent would hear its own output and get confused.
  • Noise suppression and auto-gain are disabled for speed. These add latency and aren't critical if audio quality is decent.

4. Response Streaming (ResponseStreamer):

  • min_chunk_size of 50ms audio (~2400 bytes) balances latency vs. overhead. Too small = excessive function call overhead. Too large = noticeable delay before first audio.
  • Fire-and-forget playback (await play_func(to_play) without waiting) allows overlapping audio generation and playback.

Latency Monitoring

You can't improve what you don't measure. Instrumenting your voice pipeline with timing markers lets you identify bottlenecks, track regressions, and set SLOs. The following classes capture per-stage latency and compute statistics:

Python
import time
from dataclasses import dataclass
from collections import deque

@dataclass
class LatencyMeasurement:
    timestamp: float
    vad_latency_ms: float
    stt_latency_ms: float
    llm_latency_ms: float
    tts_latency_ms: float
    total_latency_ms: float

class LatencyMonitor:
    """Monitor and report voice latency metrics."""

    def __init__(self, window_size: int = 100):
        self.measurements = deque(maxlen=window_size)
        self.current_measurement = {}

    def start_turn(self):
        """Start measuring a new turn."""
        self.current_measurement = {
            "speech_start": time.time()
        }

    def mark_vad_complete(self):
        """Mark VAD detection complete."""
        self.current_measurement["vad_complete"] = time.time()

    def mark_stt_complete(self):
        """Mark speech-to-text complete."""
        self.current_measurement["stt_complete"] = time.time()

    def mark_llm_start(self):
        """Mark LLM processing start."""
        self.current_measurement["llm_start"] = time.time()

    def mark_llm_first_token(self):
        """Mark first token from LLM."""
        self.current_measurement["llm_first_token"] = time.time()

    def mark_tts_first_audio(self):
        """Mark first audio output."""
        self.current_measurement["tts_first_audio"] = time.time()

    def end_turn(self):
        """Complete the measurement."""
        m = self.current_measurement
        now = time.time()

        measurement = LatencyMeasurement(
            timestamp=now,
            vad_latency_ms=(m.get("vad_complete", now) - m.get("speech_start", now)) * 1000,
            stt_latency_ms=(m.get("stt_complete", now) - m.get("vad_complete", now)) * 1000,
            llm_latency_ms=(m.get("llm_first_token", now) - m.get("llm_start", now)) * 1000,
            tts_latency_ms=(m.get("tts_first_audio", now) - m.get("llm_first_token", now)) * 1000,
            total_latency_ms=(m.get("tts_first_audio", now) - m.get("speech_start", now)) * 1000
        )

        self.measurements.append(measurement)
        return measurement

    def get_stats(self) -> dict:
        """Get latency statistics."""
        if not self.measurements:
            return {}

        total_latencies = [m.total_latency_ms for m in self.measurements]

        return {
            "count": len(self.measurements),
            "avg_total_ms": sum(total_latencies) / len(total_latencies),
            "p50_total_ms": sorted(total_latencies)[len(total_latencies) // 2],
            "p90_total_ms": sorted(total_latencies)[int(len(total_latencies) * 0.9)],
            "p99_total_ms": sorted(total_latencies)[int(len(total_latencies) * 0.99)],
            "breakdown": {
                "avg_vad_ms": sum(m.vad_latency_ms for m in self.measurements) / len(self.measurements),
                "avg_stt_ms": sum(m.stt_latency_ms for m in self.measurements) / len(self.measurements),
                "avg_llm_ms": sum(m.llm_latency_ms for m in self.measurements) / len(self.measurements),
                "avg_tts_ms": sum(m.tts_latency_ms for m in self.measurements) / len(self.measurements)
            }
        }

How to use latency monitoring:

  1. Instrument your pipeline: Call mark_vad_complete(), mark_stt_complete(), etc. at each stage transition. The timestamps create a trace of where time went.

  2. Track percentiles: The get_stats() method returns p50, p90, and p99 latencies. Average latency hides outliers—p99 tells you what slow users actually experience.

  3. Set SLOs and alert: A reasonable target might be p90 < 1000ms, p99 < 2000ms. Alert when these breach, but expect some variance—LLM inference time varies with response length and load.

  4. Identify bottlenecks: The breakdown dict shows average time per stage. If avg_llm_ms dominates, consider a smaller model or request shorter responses. If avg_vad_ms is high, your silence threshold may be too conservative.

Production monitoring recommendations:

  • Export metrics to Prometheus/Datadog/CloudWatch for dashboards and alerting
  • Include call ID/session ID in traces for debugging specific conversations
  • Sample detailed traces (not every call) to reduce overhead
  • Track latency by user region—geographic distance to servers matters

Production Patterns

Voice AI in production faces unique challenges. WebSocket connections drop. APIs rate-limit you. Users walk through tunnels and lose connectivity. Unlike text chat where the user can wait and retry, voice requires seamless recovery—awkward silence is unacceptable.

Error Recovery

Connection failures are inevitable at scale. The following pattern implements exponential backoff retry with graceful degradation—the agent tries to reconnect transparently, and only escalates to the user when recovery fails:

Python
class ResilientVoiceAgent:
    """Voice agent with error recovery."""

    def __init__(self, config: dict):
        self.config = config
        self.connection_attempts = 0
        self.max_reconnects = 5
        self.backoff_base = 1.0

    async def connect_with_retry(self):
        """Connect with exponential backoff retry."""
        while self.connection_attempts < self.max_reconnects:
            try:
                await self._connect()
                self.connection_attempts = 0
                return True
            except Exception as e:
                self.connection_attempts += 1
                wait_time = self.backoff_base * (2 ** self.connection_attempts)
                print(f"Connection failed ({e}), retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)

        return False

    async def handle_error(self, error: Exception):
        """Handle various error types."""
        error_type = type(error).__name__

        if "WebSocket" in error_type or "Connection" in error_type:
            # Connection error - try to reconnect
            await self._reconnect()

        elif "Timeout" in error_type:
            # Timeout - retry the operation
            await self._retry_last_operation()

        elif "RateLimit" in error_type:
            # Rate limited - back off
            await asyncio.sleep(60)
            await self._reconnect()

        else:
            # Unknown error - log and continue
            print(f"Unknown error: {error}")

    async def _reconnect(self):
        """Reconnect to the API."""
        print("Reconnecting...")
        await self._disconnect()
        success = await self.connect_with_retry()
        if success:
            await self._notify_user("Connection restored. Please continue.")

    async def _notify_user(self, message: str):
        """Notify user of system state via TTS."""
        # Play a system message
        pass

Error recovery strategies:

  1. Exponential backoff: Each retry waits longer (1s, 2s, 4s, 8s...). This prevents thundering herd problems when a service recovers—if 1000 agents reconnect simultaneously, they'll overload the service again.

  2. Error classification: Different errors need different responses:

    • WebSocket/Connection errors: Reconnect immediately—these are often transient.
    • Timeout errors: Retry the operation—might be temporary server load.
    • Rate limit errors: Back off for 60+ seconds—retrying immediately makes it worse.
    • Authentication errors: Don't retry—something is fundamentally wrong.
  3. User notification: After successful reconnection, tell the user ("Connection restored. Please continue."). This acknowledges any awkward pause and resets expectations.

  4. Max retry limit: Give up after 5 attempts. Indefinite retries waste resources on dead connections. Better to fail explicitly and let the user retry later.

What to preserve across reconnections:

  • Conversation history (so context isn't lost)
  • User preferences and session state
  • Any pending tool calls or actions
  • The audio being processed when disconnection occurred

Conversation State Management

Voice conversations are stateful—you need to track what was said, what state the conversation is in, and any context the user has provided. Unlike text chat where history is visible, voice users can't scroll back, so the agent must maintain and utilize context intelligently:

Python
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum

class ConversationState(Enum):
    IDLE = "idle"
    LISTENING = "listening"
    PROCESSING = "processing"
    SPEAKING = "speaking"
    WAITING_FOR_TOOL = "waiting_for_tool"
    ERROR = "error"

@dataclass
class VoiceConversation:
    """Manage voice conversation state."""

    session_id: str
    state: ConversationState = ConversationState.IDLE
    turn_count: int = 0
    context: dict = field(default_factory=dict)
    history: list[dict] = field(default_factory=list)
    pending_tool_calls: list[dict] = field(default_factory=list)

    def add_user_turn(self, transcript: str):
        """Add user turn to history."""
        self.history.append({
            "role": "user",
            "content": transcript,
            "timestamp": time.time()
        })
        self.turn_count += 1

    def add_assistant_turn(self, transcript: str):
        """Add assistant turn to history."""
        self.history.append({
            "role": "assistant",
            "content": transcript,
            "timestamp": time.time()
        })

    def get_context_window(self, max_turns: int = 10) -> list[dict]:
        """Get recent conversation for context."""
        return self.history[-max_turns:]

    def set_context(self, key: str, value: any):
        """Set conversation context."""
        self.context[key] = value

    def get_context(self, key: str) -> Optional[any]:
        """Get conversation context."""
        return self.context.get(key)

class ConversationManager:
    """Manage multiple voice conversations."""

    def __init__(self):
        self.conversations: dict[str, VoiceConversation] = {}

    def create_conversation(self, session_id: str) -> VoiceConversation:
        """Create a new conversation."""
        conv = VoiceConversation(session_id=session_id)
        self.conversations[session_id] = conv
        return conv

    def get_conversation(self, session_id: str) -> Optional[VoiceConversation]:
        """Get existing conversation."""
        return self.conversations.get(session_id)

    def end_conversation(self, session_id: str):
        """End and archive a conversation."""
        if session_id in self.conversations:
            conv = self.conversations.pop(session_id)
            self._archive(conv)

    def _archive(self, conversation: VoiceConversation):
        """Archive conversation for analytics."""
        # Store conversation history, metrics, etc.
        pass

Key aspects of conversation state:

  1. State machine: ConversationState enum tracks the conversation phase. This matters for several reasons:

    • LISTENING: Accept and buffer audio
    • PROCESSING: Ignore new audio (or buffer for next turn)
    • SPEAKING: Watch for interruptions
    • WAITING_FOR_TOOL: Agent called a function, waiting for result
    • ERROR: Recovery mode
  2. Turn counting: Track how many exchanges have occurred. Long conversations may need summarization to fit context windows. Turn count also helps with analytics (how many turns to resolution?).

  3. Context dictionary: Stores extracted entities and user preferences across turns. If the user says "Check my order" in turn 1, you store the order ID. In turn 3, when they say "What's the status?", you know which order they mean.

  4. Conversation window: get_context_window(max_turns=10) returns recent history for the LLM. Voice conversations tend to be shorter than text, so 10 turns usually suffices. Older context can be summarized if needed.

  5. Multi-session management: ConversationManager handles multiple concurrent calls. In production, this would likely be backed by Redis for horizontal scaling across multiple server instances.

State persistence for telephony:

Phone calls can transfer between agents or reconnect after brief drops. Persist state to Redis/DynamoDB so any server can resume the conversation:

Python
# Store state for 24-hour session persistence
await redis.setex(f"voice:session:{session_id}", 86400, json.dumps(state))

Emotion and Sentiment Detection

Voice carries emotional information that text lacks—tone, pitch, speaking rate, and energy all convey how the user feels. Detecting emotion enables adaptive responses: a frustrated user needs empathy and quick resolution; an excited user can handle more detailed explanations. This is particularly valuable for customer support where early frustration detection can prevent escalations.

Real-Time Emotion Analysis

The following implementation extracts prosodic features (pitch, energy, tempo) from audio and maps them to emotional states. This is a heuristic approach—production systems often use dedicated ML models, but the prosodic features remain the foundation:

Python
import numpy as np
from dataclasses import dataclass
from typing import Optional
from enum import Enum

class Emotion(str, Enum):
    NEUTRAL = "neutral"
    HAPPY = "happy"
    SAD = "sad"
    ANGRY = "angry"
    FRUSTRATED = "frustrated"
    CONFUSED = "confused"
    EXCITED = "excited"

@dataclass
class EmotionAnalysis:
    primary_emotion: Emotion
    confidence: float
    valence: float  # -1 (negative) to 1 (positive)
    arousal: float  # 0 (calm) to 1 (excited)
    frustration_level: float  # 0 to 1

class VoiceEmotionAnalyzer:
    """Analyze emotions from voice characteristics."""

    def __init__(self):
        self.emotion_history = []
        self.baseline_pitch = None
        self.baseline_energy = None

    def analyze_audio_features(self, audio: np.ndarray, sample_rate: int = 24000) -> dict:
        """Extract emotion-relevant audio features."""
        import librosa

        # Extract features
        pitch, _ = librosa.piptrack(y=audio.astype(float), sr=sample_rate)
        pitch_mean = np.mean(pitch[pitch > 0]) if np.any(pitch > 0) else 0

        energy = np.mean(librosa.feature.rms(y=audio.astype(float)))
        tempo, _ = librosa.beat.beat_track(y=audio.astype(float), sr=sample_rate)

        # Speech rate (syllables per second approximation)
        zero_crossings = librosa.feature.zero_crossing_rate(y=audio.astype(float))
        speech_rate = np.mean(zero_crossings) * sample_rate

        # Spectral features
        spectral_centroid = np.mean(librosa.feature.spectral_centroid(y=audio.astype(float), sr=sample_rate))
        spectral_rolloff = np.mean(librosa.feature.spectral_rolloff(y=audio.astype(float), sr=sample_rate))

        return {
            "pitch_mean": pitch_mean,
            "pitch_variance": np.var(pitch[pitch > 0]) if np.any(pitch > 0) else 0,
            "energy": energy,
            "tempo": tempo,
            "speech_rate": speech_rate,
            "spectral_centroid": spectral_centroid,
            "spectral_rolloff": spectral_rolloff
        }

    def detect_emotion(self, audio: np.ndarray, transcript: str = "") -> EmotionAnalysis:
        """Detect emotion from audio and optional transcript."""
        features = self.analyze_audio_features(audio)

        # Set baseline on first utterance
        if self.baseline_pitch is None:
            self.baseline_pitch = features["pitch_mean"]
            self.baseline_energy = features["energy"]

        # Calculate relative changes
        pitch_delta = (features["pitch_mean"] - self.baseline_pitch) / max(self.baseline_pitch, 1)
        energy_delta = (features["energy"] - self.baseline_energy) / max(self.baseline_energy, 0.01)

        # Heuristic emotion detection
        valence = 0.0
        arousal = 0.0

        # High pitch + high energy = excited/angry
        if pitch_delta > 0.2 and energy_delta > 0.3:
            arousal = 0.8

        # Low pitch + low energy = sad
        if pitch_delta < -0.1 and energy_delta < -0.2:
            valence = -0.5

        # High speech rate = frustration or excitement
        if features["speech_rate"] > 150:
            arousal = max(arousal, 0.6)

        # Determine primary emotion
        if arousal > 0.6 and valence < -0.3:
            emotion = Emotion.ANGRY
        elif arousal > 0.6 and valence > 0.3:
            emotion = Emotion.EXCITED
        elif valence < -0.3:
            emotion = Emotion.SAD
        elif valence > 0.3:
            emotion = Emotion.HAPPY
        elif arousal > 0.5:
            emotion = Emotion.FRUSTRATED
        else:
            emotion = Emotion.NEUTRAL

        # Calculate frustration from patterns
        frustration = self._detect_frustration(features, transcript)

        analysis = EmotionAnalysis(
            primary_emotion=emotion,
            confidence=0.6,  # Audio-only has limited confidence
            valence=valence,
            arousal=arousal,
            frustration_level=frustration
        )

        self.emotion_history.append(analysis)
        return analysis

    def _detect_frustration(self, features: dict, transcript: str) -> float:
        """Detect user frustration level."""
        frustration_score = 0.0

        # High energy variance suggests frustration
        if features.get("energy", 0) > self.baseline_energy * 1.5:
            frustration_score += 0.3

        # Repeated phrases in recent history
        if len(self.emotion_history) >= 3:
            recent_negative = sum(
                1 for e in self.emotion_history[-3:]
                if e.valence < 0
            )
            frustration_score += recent_negative * 0.15

        # Text indicators
        frustration_words = ["again", "already", "still", "why", "doesn't work", "not working"]
        if transcript:
            for word in frustration_words:
                if word in transcript.lower():
                    frustration_score += 0.2

        return min(frustration_score, 1.0)

class EmotionAwareVoiceAgent:
    """Voice agent that adapts to user emotions."""

    def __init__(self, base_agent, emotion_analyzer: VoiceEmotionAnalyzer):
        self.agent = base_agent
        self.analyzer = emotion_analyzer
        self.escalation_threshold = 0.7

    async def process_with_emotion(self, audio: np.ndarray, transcript: str) -> dict:
        """Process user input with emotion awareness."""
        emotion = self.analyzer.detect_emotion(audio, transcript)

        # Adapt response strategy based on emotion
        response_modifiers = self._get_response_modifiers(emotion)

        # Check for escalation
        if emotion.frustration_level > self.escalation_threshold:
            return await self._handle_escalation(transcript, emotion)

        # Process with emotion context
        result = await self.agent.process(
            transcript,
            emotion_context=emotion,
            **response_modifiers
        )

        return {
            "response": result,
            "emotion_detected": emotion,
            "response_strategy": response_modifiers
        }

    def _get_response_modifiers(self, emotion: EmotionAnalysis) -> dict:
        """Get response modifiers based on emotion."""
        modifiers = {}

        if emotion.primary_emotion == Emotion.FRUSTRATED:
            modifiers["tone"] = "empathetic"
            modifiers["acknowledge_difficulty"] = True
            modifiers["offer_alternatives"] = True

        elif emotion.primary_emotion == Emotion.CONFUSED:
            modifiers["tone"] = "patient"
            modifiers["simplify_language"] = True
            modifiers["provide_examples"] = True

        elif emotion.primary_emotion == Emotion.ANGRY:
            modifiers["tone"] = "calm_professional"
            modifiers["acknowledge_feelings"] = True
            modifiers["prioritize_resolution"] = True

        elif emotion.primary_emotion in [Emotion.HAPPY, Emotion.EXCITED]:
            modifiers["tone"] = "enthusiastic"
            modifiers["match_energy"] = True

        return modifiers

    async def _handle_escalation(self, transcript: str, emotion: EmotionAnalysis) -> dict:
        """Handle escalation to human agent."""
        return {
            "action": "escalate",
            "reason": f"User frustration level: {emotion.frustration_level:.2f}",
            "transcript": transcript,
            "emotion": emotion,
            "message": "I understand this has been frustrating. Let me connect you with a specialist who can help."
        }

Understanding the emotion detection pipeline:

  1. Feature extraction (analyze_audio_features):

    • Pitch (F0): Higher pitch often indicates excitement or stress. Lower pitch suggests calmness or sadness.
    • Energy (RMS): Louder speech typically correlates with strong emotions (anger, excitement). Quiet speech suggests sadness or hesitation.
    • Speech rate: Fast speech indicates urgency, excitement, or anxiety. Slow speech suggests thoughtfulness or depression.
    • Spectral features: Centroid and rolloff describe the "brightness" of the voice—useful for distinguishing emotional intensity.
  2. Baseline calibration: The first utterance establishes the user's baseline pitch and energy. Subsequent analysis measures relative changes from baseline. This accounts for individual differences—some people naturally speak louder or higher than others.

  3. Valence-arousal model: Rather than directly classifying discrete emotions, the code first computes:

    • Valence: Positive (happy) to negative (sad/angry) emotional tone
    • Arousal: Low (calm) to high (excited/agitated) energy level

    Discrete emotions are then mapped from this 2D space.

  4. Frustration detection (_detect_frustration): This is business-critical for customer support. The method looks for:

    • Energy spikes above baseline
    • Consecutive negative-valence turns
    • Linguistic indicators ("again", "still", "doesn't work")
  5. Adaptive response (EmotionAwareVoiceAgent): Different emotions trigger different response strategies:

    • Frustrated: Empathetic tone, acknowledge difficulty, offer alternatives
    • Confused: Patient tone, simplified language, provide examples
    • Angry: Calm professional tone, prioritize quick resolution
    • Excited/Happy: Match their energy, be enthusiastic
  6. Escalation threshold: When frustration exceeds 0.7, the agent offers to transfer to a human. This catches situations where the AI isn't helping before the user becomes truly angry.

Limitations of audio-only emotion detection:

  • Cultural differences in emotional expression
  • Microphone quality affects feature extraction
  • Short utterances provide limited signal
  • Sarcasm and irony are hard to detect
  • The 0.6 confidence score reflects these limitations

For higher accuracy, combine audio features with text sentiment analysis and conversation context (repeated questions, corrections).

Multi-Speaker Handling

Many voice AI scenarios involve multiple speakers—conference calls, group customer support, or family members using a shared device. Speaker diarization identifies "who spoke when," enabling the agent to maintain separate contexts per person and address speakers individually.

Speaker Diarization

Speaker diarization creates a voice "fingerprint" for each speaker using embedding vectors. When new audio arrives, we compare its embedding to known speakers. If similar enough, it's the same person; if not, it's a new speaker. This simplified implementation uses MFCCs (Mel-Frequency Cepstral Coefficients)—production systems use neural speaker embeddings for better accuracy:

Python
from dataclasses import dataclass
from typing import Optional
import numpy as np

@dataclass
class SpeakerSegment:
    speaker_id: str
    start_time: float
    end_time: float
    audio: np.ndarray
    transcript: Optional[str] = None
    confidence: float = 0.0

class SpeakerDiarizer:
    """Identify and separate multiple speakers."""

    def __init__(self):
        self.speaker_embeddings = {}
        self.speaker_count = 0

    def process_audio_segment(
        self,
        audio: np.ndarray,
        timestamp: float,
        sample_rate: int = 24000
    ) -> SpeakerSegment:
        """Process audio segment and identify speaker."""
        # Extract speaker embedding
        embedding = self._extract_embedding(audio, sample_rate)

        # Match to existing speaker or create new
        speaker_id, confidence = self._match_speaker(embedding)

        if speaker_id is None:
            # New speaker
            speaker_id = f"speaker_{self.speaker_count}"
            self.speaker_count += 1
            self.speaker_embeddings[speaker_id] = [embedding]
            confidence = 1.0
        else:
            # Update embedding history
            self.speaker_embeddings[speaker_id].append(embedding)
            # Keep only recent embeddings
            self.speaker_embeddings[speaker_id] = self.speaker_embeddings[speaker_id][-10:]

        return SpeakerSegment(
            speaker_id=speaker_id,
            start_time=timestamp,
            end_time=timestamp + len(audio) / sample_rate,
            audio=audio,
            confidence=confidence
        )

    def _extract_embedding(self, audio: np.ndarray, sample_rate: int) -> np.ndarray:
        """Extract speaker embedding from audio."""
        # Simplified embedding extraction using MFCC
        import librosa

        mfcc = librosa.feature.mfcc(
            y=audio.astype(float),
            sr=sample_rate,
            n_mfcc=20
        )

        # Use mean of MFCCs as simple embedding
        embedding = np.mean(mfcc, axis=1)
        return embedding / np.linalg.norm(embedding)

    def _match_speaker(self, embedding: np.ndarray) -> tuple[Optional[str], float]:
        """Match embedding to existing speakers."""
        if not self.speaker_embeddings:
            return None, 0.0

        best_match = None
        best_similarity = 0.0

        for speaker_id, embeddings in self.speaker_embeddings.items():
            # Compare to mean of recent embeddings
            mean_embedding = np.mean(embeddings, axis=0)
            similarity = np.dot(embedding, mean_embedding)

            if similarity > best_similarity:
                best_similarity = similarity
                best_match = speaker_id

        # Threshold for matching
        if best_similarity > 0.7:
            return best_match, best_similarity

        return None, 0.0

class MultiSpeakerVoiceAgent:
    """Handle multi-speaker conversations."""

    def __init__(self, base_agent, diarizer: SpeakerDiarizer):
        self.agent = base_agent
        self.diarizer = diarizer
        self.speaker_contexts = {}  # Per-speaker conversation state

    async def process_multi_speaker(
        self,
        audio: np.ndarray,
        timestamp: float
    ) -> dict:
        """Process audio from potentially multiple speakers."""
        # Identify speaker
        segment = self.diarizer.process_audio_segment(audio, timestamp)

        # Get or create speaker context
        if segment.speaker_id not in self.speaker_contexts:
            self.speaker_contexts[segment.speaker_id] = {
                "history": [],
                "preferences": {},
                "turn_count": 0
            }

        context = self.speaker_contexts[segment.speaker_id]
        context["turn_count"] += 1

        # Process with speaker-specific context
        result = await self.agent.process(
            audio=segment.audio,
            speaker_context=context,
            speaker_id=segment.speaker_id
        )

        # Update history
        context["history"].append({
            "timestamp": timestamp,
            "transcript": result.get("transcript"),
            "response": result.get("response")
        })

        return {
            "speaker_id": segment.speaker_id,
            "speaker_confidence": segment.confidence,
            "result": result,
            "total_speakers": self.diarizer.speaker_count
        }

    def get_speaker_summary(self) -> dict:
        """Get summary of all speakers in conversation."""
        return {
            speaker_id: {
                "turn_count": ctx["turn_count"],
                "history_length": len(ctx["history"])
            }
            for speaker_id, ctx in self.speaker_contexts.items()
        }

How the speaker diarization works:

  1. Embedding extraction (_extract_embedding):

    • MFCCs capture the spectral characteristics of speech
    • Taking the mean across time creates a fixed-size "voice fingerprint"
    • L2 normalization makes similarity comparison consistent
  2. Speaker matching (_match_speaker):

    • Compares new embedding to stored embeddings using cosine similarity (dot product of normalized vectors)
    • 0.7 threshold balances false positives vs. false negatives
    • Returns None for embeddings below threshold → new speaker detected
  3. Embedding history: Stores up to 10 recent embeddings per speaker and averages them. Voice characteristics vary slightly across utterances—averaging reduces this variance.

  4. Per-speaker context (MultiSpeakerVoiceAgent):

    • Each speaker has their own conversation history
    • Enables personalized responses ("Sarah, last time you mentioned...")
    • Turn count per speaker helps detect who's dominating the conversation

Production considerations for multi-speaker:

  • Neural embeddings: Models like Resemblyzer, SpeechBrain, or Pyannote provide much better accuracy than MFCCs
  • Online vs. offline: This implementation is online (real-time). Offline diarization with full conversation audio is more accurate but adds latency.
  • Cross-talk: When speakers overlap, diarization becomes difficult. Some systems detect and flag overlapping segments.
  • Speaker enrollment: For known users, you can pre-register their voice embeddings for more reliable identification.
  • Privacy: Voice embeddings are biometric data—handle with appropriate consent and storage security.

Cost Optimization

Voice AI is expensive. The OpenAI Realtime API charges 0.06/minuteforaudioinputand0.06/minute for audio input and 0.24/minute for audio output—a 5-minute support call costs roughly $1.50. At scale, this adds up quickly. Understanding and optimizing costs is essential for sustainable deployment.

Voice API Cost Tracking

Before optimizing, you need visibility into where costs come from. The following classes track per-session and aggregate costs, enabling analysis and budgeting:

Python
from dataclasses import dataclass
from datetime import datetime, timedelta
from collections import defaultdict

@dataclass
class VoiceCostMetrics:
    audio_minutes: float
    api_calls: int
    input_tokens: int
    output_tokens: int
    estimated_cost_usd: float

class VoiceCostTracker:
    """Track and optimize voice API costs."""

    # Pricing (example rates)
    PRICING = {
        "openai_realtime": {
            "audio_input_per_minute": 0.06,
            "audio_output_per_minute": 0.24,
            "text_input_per_1k_tokens": 0.0025,
            "text_output_per_1k_tokens": 0.01
        },
        "elevenlabs": {
            "characters_per_1k": 0.30
        },
        "whisper": {
            "audio_per_minute": 0.006
        }
    }

    def __init__(self):
        self.sessions = {}
        self.daily_costs = defaultdict(float)

    def start_session(self, session_id: str):
        """Start tracking a voice session."""
        self.sessions[session_id] = {
            "start_time": datetime.now(),
            "audio_input_seconds": 0,
            "audio_output_seconds": 0,
            "input_tokens": 0,
            "output_tokens": 0,
            "api_calls": 0
        }

    def track_audio_input(self, session_id: str, duration_seconds: float):
        """Track audio input duration."""
        if session_id in self.sessions:
            self.sessions[session_id]["audio_input_seconds"] += duration_seconds
            self.sessions[session_id]["api_calls"] += 1

    def track_audio_output(self, session_id: str, duration_seconds: float):
        """Track audio output duration."""
        if session_id in self.sessions:
            self.sessions[session_id]["audio_output_seconds"] += duration_seconds

    def track_tokens(self, session_id: str, input_tokens: int, output_tokens: int):
        """Track token usage."""
        if session_id in self.sessions:
            self.sessions[session_id]["input_tokens"] += input_tokens
            self.sessions[session_id]["output_tokens"] += output_tokens

    def end_session(self, session_id: str) -> VoiceCostMetrics:
        """End session and calculate costs."""
        if session_id not in self.sessions:
            return VoiceCostMetrics(0, 0, 0, 0, 0)

        session = self.sessions.pop(session_id)

        # Calculate costs
        pricing = self.PRICING["openai_realtime"]

        audio_input_cost = (session["audio_input_seconds"] / 60) * pricing["audio_input_per_minute"]
        audio_output_cost = (session["audio_output_seconds"] / 60) * pricing["audio_output_per_minute"]
        text_input_cost = (session["input_tokens"] / 1000) * pricing["text_input_per_1k_tokens"]
        text_output_cost = (session["output_tokens"] / 1000) * pricing["text_output_per_1k_tokens"]

        total_cost = audio_input_cost + audio_output_cost + text_input_cost + text_output_cost

        # Track daily costs
        today = datetime.now().strftime("%Y-%m-%d")
        self.daily_costs[today] += total_cost

        total_audio_minutes = (session["audio_input_seconds"] + session["audio_output_seconds"]) / 60

        return VoiceCostMetrics(
            audio_minutes=total_audio_minutes,
            api_calls=session["api_calls"],
            input_tokens=session["input_tokens"],
            output_tokens=session["output_tokens"],
            estimated_cost_usd=total_cost
        )

    def get_daily_summary(self, date: str = None) -> dict:
        """Get daily cost summary."""
        date = date or datetime.now().strftime("%Y-%m-%d")
        return {
            "date": date,
            "total_cost_usd": self.daily_costs.get(date, 0)
        }

    def estimate_monthly_cost(self, daily_sessions: int, avg_session_minutes: float) -> dict:
        """Estimate monthly costs based on usage patterns."""
        pricing = self.PRICING["openai_realtime"]

        # Assume 50/50 input/output split
        input_minutes = avg_session_minutes * 0.5
        output_minutes = avg_session_minutes * 0.5

        per_session_cost = (
            input_minutes * pricing["audio_input_per_minute"] +
            output_minutes * pricing["audio_output_per_minute"]
        )

        daily_cost = per_session_cost * daily_sessions
        monthly_cost = daily_cost * 30

        return {
            "per_session_cost": per_session_cost,
            "daily_cost": daily_cost,
            "monthly_cost": monthly_cost,
            "assumptions": {
                "daily_sessions": daily_sessions,
                "avg_session_minutes": avg_session_minutes
            }
        }

class CostOptimizedVoiceAgent:
    """Voice agent with cost optimization strategies."""

    def __init__(self, agent, cost_tracker: VoiceCostTracker):
        self.agent = agent
        self.tracker = cost_tracker
        self.optimization_enabled = True

    async def process_with_optimization(
        self,
        session_id: str,
        audio: np.ndarray
    ) -> dict:
        """Process with cost optimization strategies."""
        optimizations_applied = []

        # Strategy 1: Skip silence
        if self._is_silence(audio):
            return {"skipped": True, "reason": "silence"}

        # Strategy 2: Batch short utterances
        if len(audio) < 24000 * 0.5:  # Less than 0.5 seconds
            buffered = await self._buffer_short_audio(session_id, audio)
            if buffered:
                optimizations_applied.append("buffered_short_audio")
                return {"buffered": True}

        # Strategy 3: Use text mode for simple responses
        if self._should_use_text_mode(session_id):
            result = await self._process_text_mode(audio)
            optimizations_applied.append("text_mode")
        else:
            result = await self.agent.process(audio)

        # Track costs
        self.tracker.track_audio_input(session_id, len(audio) / 24000)

        return {
            "result": result,
            "optimizations": optimizations_applied
        }

    def _is_silence(self, audio: np.ndarray, threshold: float = 0.01) -> bool:
        """Check if audio is silence."""
        rms = np.sqrt(np.mean(audio.astype(float) ** 2))
        return rms < threshold

    async def _buffer_short_audio(self, session_id: str, audio: np.ndarray) -> bool:
        """Buffer short audio for batching."""
        # Implementation would buffer and return True if buffered
        return False

    def _should_use_text_mode(self, session_id: str) -> bool:
        """Determine if text mode would be more cost-effective."""
        # Use text mode for FAQ-type questions
        return False

    async def _process_text_mode(self, audio: np.ndarray) -> dict:
        """Process using text mode (STT -> LLM -> TTS)."""
        # Cheaper for some use cases
        pass

Cost tracking explained:

  1. Per-session tracking: start_session and end_session bracket each call. Track audio seconds, tokens, and API calls separately—they have different pricing.

  2. Input vs. output audio: Output is 4x more expensive than input for the Realtime API. This matters for optimization—shorter agent responses reduce costs more than shorter user inputs.

  3. Token tracking: Even with audio, there's text involved (transcriptions, tool definitions). Track these for complete cost visibility.

  4. Daily aggregation: daily_costs accumulates across sessions, enabling daily budget enforcement and trend analysis.

  5. Cost estimation (estimate_monthly_cost): Projects costs based on usage patterns. Essential for business planning and deciding if voice AI is economically viable for your use case.

Optimization strategies (CostOptimizedVoiceAgent):

  • Skip silence: Don't send empty audio to the API. The _is_silence check filters out quiet periods.
  • Buffer short utterances: "Yes" and "okay" are expensive if sent individually. Batching with the next substantive utterance reduces API calls.
  • Text mode fallback: For simple FAQ queries, separate STT + text LLM + TTS is cheaper than the unified Realtime API. The trade-off is higher latency.

Budget Alerts and Limits

Cost tracking is defensive—it tells you what you spent. Budget management is preventive—it stops you from spending more than intended. The following implementation enforces both per-session and daily limits, with alerts before hard cutoffs:

Python
class BudgetManager:
    """Manage voice API budgets."""

    def __init__(
        self,
        daily_budget_usd: float = 100,
        session_budget_usd: float = 5,
        alert_threshold: float = 0.8
    ):
        self.daily_budget = daily_budget_usd
        self.session_budget = session_budget_usd
        self.alert_threshold = alert_threshold
        self.cost_tracker = VoiceCostTracker()
        self.alert_callbacks = []

    def register_alert(self, callback):
        """Register alert callback."""
        self.alert_callbacks.append(callback)

    def check_budget(self, session_id: str) -> dict:
        """Check if session/daily budget allows continuing."""
        daily_summary = self.cost_tracker.get_daily_summary()
        daily_used = daily_summary["total_cost_usd"]

        session = self.cost_tracker.sessions.get(session_id, {})
        session_used = self._calculate_session_cost(session)

        # Check limits
        daily_remaining = self.daily_budget - daily_used
        session_remaining = self.session_budget - session_used

        result = {
            "can_continue": True,
            "daily_used": daily_used,
            "daily_remaining": daily_remaining,
            "session_used": session_used,
            "session_remaining": session_remaining
        }

        # Check for alerts
        if daily_used > self.daily_budget * self.alert_threshold:
            self._send_alert("daily_threshold", daily_used)
            result["alerts"] = ["approaching_daily_limit"]

        if daily_remaining <= 0:
            result["can_continue"] = False
            result["reason"] = "daily_budget_exceeded"

        if session_remaining <= 0:
            result["can_continue"] = False
            result["reason"] = "session_budget_exceeded"

        return result

    def _calculate_session_cost(self, session: dict) -> float:
        """Calculate current session cost."""
        if not session:
            return 0

        pricing = VoiceCostTracker.PRICING["openai_realtime"]
        return (
            (session.get("audio_input_seconds", 0) / 60) * pricing["audio_input_per_minute"] +
            (session.get("audio_output_seconds", 0) / 60) * pricing["audio_output_per_minute"]
        )

    def _send_alert(self, alert_type: str, value: float):
        """Send budget alert."""
        for callback in self.alert_callbacks:
            callback(alert_type, value)

Budget management strategies:

  1. Two-tier limits: Both daily and per-session budgets. Daily limits protect against runaway costs overall. Per-session limits prevent individual calls from consuming disproportionate resources (useful for preventing abuse or infinite loops).

  2. Alert threshold (80%): The alert_threshold triggers warnings before hard limits. This gives operations teams time to investigate and react:

    Python
    budget_manager.register_alert(lambda t, v: pagerduty.alert(f"{t}: {v}"))
    
  3. Soft vs. hard enforcement: check_budget returns can_continue: False when limits are exceeded. The calling code decides how to handle it—immediately disconnect, or give the user a warning and one more turn.

  4. Callback system: register_alert allows plugging in various notification channels—Slack, PagerDuty, email. Multiple callbacks can be registered.

Production budget architecture:

Code
┌─────────────────────────────────────────────────────────────┐
│                    Budget Enforcement                       │
├──────────────┬──────────────┬──────────────┬───────────────┤
│ Per-Session  │   Per-User   │    Daily     │   Monthly     │
│ ($5 max)     │ ($50/day)    │ ($1000 org)  │ ($20k org)    │
└──────────────┴──────────────┴──────────────┴───────────────┘

Consider multiple levels: session → user → team → organization. This prevents both individual abuse and aggregate budget overruns.

Accessibility Considerations

Voice interfaces can be more accessible than visual interfaces—but only if designed thoughtfully. Users who are blind rely on voice output being well-structured. Users who are deaf or hard of hearing need transcripts. Users with cognitive disabilities benefit from simpler, confirming interactions.

Screen Reader Integration

For users who interact with screens using screen readers (VoiceOver, NVDA), the voice agent should output content in a structured, navigable way. The following patterns help:

Python
class AccessibleVoiceAgent:
    """Voice agent with accessibility features."""

    def __init__(self, base_agent):
        self.agent = base_agent
        self.accessibility_mode = False
        self.speech_rate = 1.0
        self.confirmation_mode = True

    async def process_accessible(self, audio: np.ndarray) -> dict:
        """Process with accessibility accommodations."""
        result = await self.agent.process(audio)

        if self.accessibility_mode:
            result = await self._enhance_for_accessibility(result)

        return result

    async def _enhance_for_accessibility(self, result: dict) -> dict:
        """Enhance response for accessibility."""
        response = result.get("response", "")

        # Add structure announcements
        enhanced = self._add_structure_announcements(response)

        # Slow down speech rate
        result["speech_rate"] = self.speech_rate

        # Add confirmation prompts
        if self.confirmation_mode:
            enhanced += " Is there anything you'd like me to clarify?"

        result["response"] = enhanced
        return result

    def _add_structure_announcements(self, text: str) -> str:
        """Add announcements for structure (lists, sections)."""
        # Announce list items
        lines = text.split("\n")
        enhanced_lines = []

        list_count = 0
        for line in lines:
            if line.strip().startswith(("-", "*", "•")):
                list_count += 1
                enhanced_lines.append(f"Item {list_count}: {line.strip()[1:].strip()}")
            else:
                if list_count > 0:
                    # End of list
                    list_count = 0
                enhanced_lines.append(line)

        return "\n".join(enhanced_lines)

    def set_speech_rate(self, rate: float):
        """Set speech rate (0.5 to 2.0)."""
        self.speech_rate = max(0.5, min(2.0, rate))

    def toggle_confirmation_mode(self):
        """Toggle confirmation prompts."""
        self.confirmation_mode = not self.confirmation_mode

Key accessibility adaptations:

  1. Structure announcements (_add_structure_announcements): Screen reader users can't see bullet points or headers. Convert visual structure to spoken structure:

    • "Item 1: ..., Item 2: ..." instead of bullet points
    • "Section: Account Settings" instead of relying on visual hierarchy
  2. Speech rate control: Users can adjust playback speed (0.5x to 2.0x). Faster for experienced users, slower for those who need more processing time or are multitasking.

  3. Confirmation prompts: confirmation_mode adds "Is there anything you'd like me to clarify?" after responses. This helps users who may miss information or need repetition without feeling rushed.

  4. Mode toggle: accessibility_mode can be enabled via a command ("enable accessibility mode") or detected automatically if the user mentions using assistive technology.

Additional accessibility considerations:

  • Transcripts: Provide text transcripts of all voice interactions for deaf/hard-of-hearing users and for later reference.
  • Keyboard commands: Allow users to control the voice agent without speaking (pause, repeat, skip).
  • Multiple modalities: Combine voice output with screen display for users who benefit from both.
  • Timeout extensions: Give users more time before assuming they're done speaking.
  • Clear pronunciation: Use phonetic markup for acronyms, unusual words, or numbers (spell out "API" vs. pronounce as a word).
  • Error recovery: Make it easy to correct misunderstandings without starting over.

Legal compliance: In many jurisdictions (US ADA, EU EAA), digital services must be accessible. Voice AI should be part of an overall accessibility strategy, not a replacement for it.

Conclusion

Building voice AI agents requires mastering several interconnected systems:

  1. Real-time audio streaming: Efficient capture, transmission, and playback
  2. Turn-taking: Natural conversation flow with proper timing
  3. Interruption handling: Responsive to user interjections
  4. Low latency: Sub-second response times for natural conversation
  5. Error recovery: Graceful handling of connection issues
  6. Telephony integration: Supporting phone-based interactions

Start with the OpenAI Realtime API for the fastest path to working voice agents. Add WebRTC for browser support and Twilio for telephony as needed. Focus relentlessly on latency—voice interfaces are unforgiving of delays.

Frequently Asked Questions

Enrico Piovano, PhD

Co-founder & CTO at Goji AI. Former Applied Scientist at Amazon (Alexa & AGI), focused on Agentic AI and LLMs. PhD in Electrical Engineering from Imperial College London. Gold Medalist at the National Mathematical Olympiad.

Related Articles