Skip to content
Volver al blog
Open Source

Streaming en tiempo real de agentes IA con WebSockets y FastAPI

Vstorm · · 5 min de lectura
Disponible en: Deutsch · English · Polski
Tabla de contenidos

¿Por qué streaming con WebSocket?

El modelo HTTP de petición-respuesta funciona bien para llamadas rápidas a la API, pero las respuestas de agentes IA pueden tardar entre 5 y 30 segundos. Sin streaming, los usuarios se quedan mirando un spinner de carga. Con streaming por WebSocket, ven los tokens aparecer en tiempo real — la misma experiencia que en ChatGPT o Claude.

Así es como implementarlo correctamente con FastAPI.

TL;DR

  • WebSockets superan a HTTP para agentes AI - los usuarios ven tokens en tiempo real en vez de mirar un spinner durante 5-30 segundos.
  • El connection manager maneja multiples clientes - rastrea conexiones activas, transmite eventos y limpia desconexiones automaticamente.
  • Eventos estructurados para el frontend - tipos de eventos separados text_delta, tool_call, tool_result y complete permiten renderizado UI enriquecido.
  • La autenticacion funciona via query params - WebSocket no soporta headers, asi que los tokens JWT se pasan como parametros de consulta y se validan al conectar.
  • Los patrones de produccion importan - heartbeat pings, logica de reconexion, persistencia del historial de mensajes y rate limiting previenen los problemas que matan apps WebSocket a escala.

La arquitectura

architecture
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider
↑ ↑
│ token-by-token │ SSE/streaming
│ JSON messages │ response

El navegador abre una conexión WebSocket persistente. Cuando el usuario envía un mensaje, FastAPI transmite la respuesta del LLM token por token. Sin polling, sin peticiones HTTP de larga duración.

Implementación básica

1. Endpoint WebSocket en FastAPI

backend/app/api/ws.py
from fastapi import WebSocket, WebSocketDisconnect
from pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message = data["message"]
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({
"type": "chunk",
"content": chunk,
})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass

2. Hook del frontend

frontend/src/hooks/useWebSocket.ts
import { useCallback, useRef, useState } from "react";
export function useChat() {
const ws = useRef<WebSocket | null>(null);
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => {
ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "chunk") {
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last?.role === "assistant") {
return [
...prev.slice(0, -1),
{ ...last, content: last.content + data.content },
];
}
return [...prev, { role: "assistant", content: data.content }];
});
}
if (data.type === "done") {
setIsStreaming(false);
}
};
}, []);
const send = useCallback((message: string) => {
setIsStreaming(true);
setMessages((prev) => [...prev, { role: "user", content: message }]);
ws.current?.send(JSON.stringify({ message }));
}, []);
return { messages, send, connect, isStreaming };
}

Patrones de producción

La versión básica funciona, pero producción necesita más. Estos son los patrones que usamos en cada despliegue.

Autenticación

Nunca aceptes conexiones WebSocket anónimas. Valida el token durante el handshake:

backend/app/api/ws.py
from fastapi import WebSocket, Depends, status
async def get_ws_user(
websocket: WebSocket,
token: str = Query(...),
) -> User:
try:
return await verify_token(token)
except InvalidToken:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
raise
@app.websocket("/ws/chat")
async def chat(
websocket: WebSocket,
user: User = Depends(get_ws_user),
):
await websocket.accept()
# el usuario está autenticado

Historial de conversación

Almacena los mensajes en la base de datos y pasa el historial al agente:

backend/app/api/ws.py
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)):
await websocket.accept()
# Load conversation history
history: list[ModelMessage] = await db.get_messages(user.id)
try:
while True:
data = await websocket.receive_json()
async with agent.run_stream(
data["message"],
message_history=history,
deps=Deps(user=user, db=db),
) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages
history = response.all_messages()
await db.save_messages(user.id, history)
except WebSocketDisconnect:
pass

Manejo de errores

Los proveedores de LLM fallan. Se alcanzan los límites de tasa. Las conexiones se caen. Manéjalo con elegancia:

backend/app/api/ws.py
from pydantic_ai.exceptions import ModelRetryError
try:
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
except ModelRetryError as e:
await websocket.send_json({
"type": "error",
"message": "The AI model is temporarily unavailable. Please try again.",
})
except Exception as e:
logger.error(f"Stream error: {e}")
await websocket.send_json({
"type": "error",
"message": "An unexpected error occurred.",
})

Gestión de conexiones

Rastrea las conexiones activas para un apagado limpio y límites de conexión:

backend/app/core/connections.py
class ConnectionManager:
def __init__(self, max_connections: int = 100):
self.active: dict[str, WebSocket] = {}
self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket):
if len(self.active) >= self.max_connections:
await websocket.close(code=1013) # Try Again Later
return False
# Close existing connection for same user
if user_id in self.active:
await self.active[user_id].close()
self.active[user_id] = websocket
return True
def disconnect(self, user_id: str):
self.active.pop(user_id, None)

Consejos de rendimiento

  1. Usa delta=True — envía solo los tokens nuevos, no el texto acumulado completo
  2. Agrupa fragmentos pequeños — si los tokens llegan más rápido de lo que el WebSocket puede enviar, almacénalos en un buffer
  3. Establece timeouts — desconecta las conexiones inactivas después de 5-10 minutos
  4. Usa connection pooling — reutiliza las conexiones al proveedor de LLM entre peticiones
  5. Monitoriza con Logfire — rastrea cada mensaje WebSocket para depuración

Ahorra el boilerplate

Todos estos patrones vienen preconfigurados en el Full-Stack AI Agent Template. Selecciona “WebSocket streaming” en el configurador y obtén streaming autenticado, persistente y con manejo de errores listo para usar.

Compartir artículo

Artículos relacionados

¿Listo para desplegar tu app de IA?

Elige tus frameworks, genera un proyecto listo para producción y despliega. 75+ opciones, un comando, cero deuda de configuración.

¿Necesitas ayuda construyendo agentes de IA?