Real-Time AI Agent Streaming with WebSockets and FastAPI
Table of Contents
Why WebSocket Streaming?
HTTP request-response is fine for quick API calls, but AI agent responses can take 5-30 seconds. Without streaming, users stare at a loading spinner. With WebSocket streaming, they see tokens appear in real-time — the same experience as ChatGPT or Claude.
Here’s how to implement it properly with FastAPI.
TL;DR
- WebSockets beat HTTP for AI agents - users see tokens in real-time instead of staring at a spinner for 5-30 seconds.
- Connection manager handles multiple clients - track active connections, broadcast events, and clean up disconnects automatically.
- Structured events for the frontend - separate
text_delta,tool_call,tool_result, andcompleteevent types enable rich UI rendering. - Authentication works via query params - WebSocket doesn’t support headers, so pass JWT tokens as query parameters and validate on connect.
- Production patterns matter - heartbeat pings, reconnection logic, message history persistence, and rate limiting prevent the issues that kill WebSocket apps at scale.
The Architecture
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider ↑ ↑ │ token-by-token │ SSE/streaming │ JSON messages │ responseThe browser opens a persistent WebSocket connection. When the user sends a message, FastAPI streams the LLM response back token by token. No polling, no long-running HTTP requests.
Basic Implementation
1. FastAPI WebSocket Endpoint
from fastapi import WebSocket, WebSocketDisconnectfrom pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")async def chat(websocket: WebSocket): await websocket.accept()
try: while True: data = await websocket.receive_json() message = data["message"]
async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({ "type": "chunk", "content": chunk, })
await websocket.send_json({"type": "done"})
except WebSocketDisconnect: pass2. Frontend Hook
import { useCallback, useRef, useState } from "react";
export function useChat() { const ws = useRef<WebSocket | null>(null); const [messages, setMessages] = useState<Message[]>([]); const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => { ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => { const data = JSON.parse(event.data);
if (data.type === "chunk") { setMessages((prev) => { const last = prev[prev.length - 1]; if (last?.role === "assistant") { return [ ...prev.slice(0, -1), { ...last, content: last.content + data.content }, ]; } return [...prev, { role: "assistant", content: data.content }]; }); }
if (data.type === "done") { setIsStreaming(false); } }; }, []);
const send = useCallback((message: string) => { setIsStreaming(true); setMessages((prev) => [...prev, { role: "user", content: message }]); ws.current?.send(JSON.stringify({ message })); }, []);
return { messages, send, connect, isStreaming };}Production Patterns
The basic version works, but production needs more. Here are the patterns we use in every deployment.
Authentication
Never accept anonymous WebSocket connections. Validate the token during the handshake:
from fastapi import WebSocket, Depends, status
async def get_ws_user( websocket: WebSocket, token: str = Query(...),) -> User: try: return await verify_token(token) except InvalidToken: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) raise
@app.websocket("/ws/chat")async def chat( websocket: WebSocket, user: User = Depends(get_ws_user),): await websocket.accept() # user is now authenticatedConversation History
Store messages in the database and pass history to the agent:
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)): await websocket.accept()
# Load conversation history history: list[ModelMessage] = await db.get_messages(user.id)
try: while True: data = await websocket.receive_json()
async with agent.run_stream( data["message"], message_history=history, deps=Deps(user=user, db=db), ) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages history = response.all_messages() await db.save_messages(user.id, history)
except WebSocketDisconnect: passError Handling
LLM providers fail. Rate limits hit. Connections drop. Handle it gracefully:
from pydantic_ai.exceptions import ModelRetryError
try: async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk}) await websocket.send_json({"type": "done"})
except ModelRetryError as e: await websocket.send_json({ "type": "error", "message": "The AI model is temporarily unavailable. Please try again.", })
except Exception as e: logger.error(f"Stream error: {e}") await websocket.send_json({ "type": "error", "message": "An unexpected error occurred.", })Connection Management
Track active connections for graceful shutdown and connection limits:
class ConnectionManager: def __init__(self, max_connections: int = 100): self.active: dict[str, WebSocket] = {} self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket): if len(self.active) >= self.max_connections: await websocket.close(code=1013) # Try Again Later return False
# Close existing connection for same user if user_id in self.active: await self.active[user_id].close()
self.active[user_id] = websocket return True
def disconnect(self, user_id: str): self.active.pop(user_id, None)Performance Tips
- Use
delta=True— send only new tokens, not the full accumulated text - Batch small chunks — if tokens arrive faster than the WebSocket can send, buffer them
- Set timeouts — disconnect idle connections after 5-10 minutes
- Use connection pooling — reuse LLM provider connections across requests
- Monitor with Logfire — trace every WebSocket message for debugging
Skip the Boilerplate
All of these patterns come pre-configured in the Full-Stack AI Agent Template. Select “WebSocket streaming” in the configurator and get authenticated, persistent, error-handled streaming out of the box.
Related Articles
From create-react-app to create-ai-app: The New Default for AI Applications
In 2016, create-react-app standardized how we build frontends. In 2026, AI applications need the same moment — and it's...
AGENTS.md: Making Your Codebase AI-Agent Friendly (Copilot, Cursor, Codex, Claude Code)
Every AI coding tool reads your repo differently. Here's how AGENTS.md — the emerging tool-agnostic standard — gives the...
From 0 to Production AI Agent in 30 Minutes — Full-Stack Template with 5 AI Frameworks
Step-by-step walkthrough: web configurator, pick a preset, choose your AI framework, configure 75+ options, docker-compo...