Skip to content
Back to blog
Open Source

Real-Time AI Agent Streaming with WebSockets and FastAPI

Vstorm · · 5 min read
Available in: Deutsch · Español · Polski
Table of Contents

Why WebSocket Streaming?

HTTP request-response is fine for quick API calls, but AI agent responses can take 5-30 seconds. Without streaming, users stare at a loading spinner. With WebSocket streaming, they see tokens appear in real-time — the same experience as ChatGPT or Claude.

Here’s how to implement it properly with FastAPI.

TL;DR

  • WebSockets beat HTTP for AI agents - users see tokens in real-time instead of staring at a spinner for 5-30 seconds.
  • Connection manager handles multiple clients - track active connections, broadcast events, and clean up disconnects automatically.
  • Structured events for the frontend - separate text_delta, tool_call, tool_result, and complete event types enable rich UI rendering.
  • Authentication works via query params - WebSocket doesn’t support headers, so pass JWT tokens as query parameters and validate on connect.
  • Production patterns matter - heartbeat pings, reconnection logic, message history persistence, and rate limiting prevent the issues that kill WebSocket apps at scale.

The Architecture

architecture
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider
↑ ↑
│ token-by-token │ SSE/streaming
│ JSON messages │ response

The browser opens a persistent WebSocket connection. When the user sends a message, FastAPI streams the LLM response back token by token. No polling, no long-running HTTP requests.

Basic Implementation

1. FastAPI WebSocket Endpoint

backend/app/api/ws.py
from fastapi import WebSocket, WebSocketDisconnect
from pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message = data["message"]
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({
"type": "chunk",
"content": chunk,
})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass

2. Frontend Hook

frontend/src/hooks/useWebSocket.ts
import { useCallback, useRef, useState } from "react";
export function useChat() {
const ws = useRef<WebSocket | null>(null);
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => {
ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "chunk") {
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last?.role === "assistant") {
return [
...prev.slice(0, -1),
{ ...last, content: last.content + data.content },
];
}
return [...prev, { role: "assistant", content: data.content }];
});
}
if (data.type === "done") {
setIsStreaming(false);
}
};
}, []);
const send = useCallback((message: string) => {
setIsStreaming(true);
setMessages((prev) => [...prev, { role: "user", content: message }]);
ws.current?.send(JSON.stringify({ message }));
}, []);
return { messages, send, connect, isStreaming };
}

Production Patterns

The basic version works, but production needs more. Here are the patterns we use in every deployment.

Authentication

Never accept anonymous WebSocket connections. Validate the token during the handshake:

backend/app/api/ws.py
from fastapi import WebSocket, Depends, status
async def get_ws_user(
websocket: WebSocket,
token: str = Query(...),
) -> User:
try:
return await verify_token(token)
except InvalidToken:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
raise
@app.websocket("/ws/chat")
async def chat(
websocket: WebSocket,
user: User = Depends(get_ws_user),
):
await websocket.accept()
# user is now authenticated

Conversation History

Store messages in the database and pass history to the agent:

backend/app/api/ws.py
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)):
await websocket.accept()
# Load conversation history
history: list[ModelMessage] = await db.get_messages(user.id)
try:
while True:
data = await websocket.receive_json()
async with agent.run_stream(
data["message"],
message_history=history,
deps=Deps(user=user, db=db),
) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages
history = response.all_messages()
await db.save_messages(user.id, history)
except WebSocketDisconnect:
pass

Error Handling

LLM providers fail. Rate limits hit. Connections drop. Handle it gracefully:

backend/app/api/ws.py
from pydantic_ai.exceptions import ModelRetryError
try:
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
except ModelRetryError as e:
await websocket.send_json({
"type": "error",
"message": "The AI model is temporarily unavailable. Please try again.",
})
except Exception as e:
logger.error(f"Stream error: {e}")
await websocket.send_json({
"type": "error",
"message": "An unexpected error occurred.",
})

Connection Management

Track active connections for graceful shutdown and connection limits:

backend/app/core/connections.py
class ConnectionManager:
def __init__(self, max_connections: int = 100):
self.active: dict[str, WebSocket] = {}
self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket):
if len(self.active) >= self.max_connections:
await websocket.close(code=1013) # Try Again Later
return False
# Close existing connection for same user
if user_id in self.active:
await self.active[user_id].close()
self.active[user_id] = websocket
return True
def disconnect(self, user_id: str):
self.active.pop(user_id, None)

Performance Tips

  1. Use delta=True — send only new tokens, not the full accumulated text
  2. Batch small chunks — if tokens arrive faster than the WebSocket can send, buffer them
  3. Set timeouts — disconnect idle connections after 5-10 minutes
  4. Use connection pooling — reuse LLM provider connections across requests
  5. Monitor with Logfire — trace every WebSocket message for debugging

Skip the Boilerplate

All of these patterns come pre-configured in the Full-Stack AI Agent Template. Select “WebSocket streaming” in the configurator and get authenticated, persistent, error-handled streaming out of the box.

Share this article

Related Articles

Ready to ship your AI app?

Pick your frameworks, generate a production-ready project, and deploy. 75+ options, one command, zero config debt.

Need help building production AI agents?