Echtzeit-Streaming von KI-Agenten mit WebSockets und FastAPI
Inhaltsverzeichnis
Warum WebSocket-Streaming?
Das HTTP-Request-Response-Modell funktioniert gut bei schnellen API-Aufrufen, aber Antworten von KI-Agenten können 5-30 Sekunden dauern. Ohne Streaming starren die Nutzer auf einen Lade-Spinner. Mit WebSocket-Streaming sehen sie Tokens in Echtzeit erscheinen — das gleiche Erlebnis wie bei ChatGPT oder Claude.
So implementiert man es richtig mit FastAPI.
TL;DR
- WebSockets schlagen HTTP fuer AI-Agenten - Nutzer sehen Tokens in Echtzeit, statt 5-30 Sekunden auf einen Spinner zu starren.
- Connection Manager verwaltet mehrere Clients - verfolgt aktive Verbindungen, broadcastet Events und raeumt Disconnects automatisch auf.
- Strukturierte Events fuers Frontend - separate Event-Typen
text_delta,tool_call,tool_resultundcompleteermoeglichen reichhaltiges UI-Rendering. - Authentifizierung funktioniert ueber Query-Parameter - WebSocket unterstuetzt keine Header, also werden JWT-Tokens als Query-Parameter uebergeben und beim Verbindungsaufbau validiert.
- Produktionsmuster sind wichtig - Heartbeat-Pings, Reconnection-Logik, Nachrichten-Persistenz und Rate Limiting verhindern die Probleme, die WebSocket-Apps im grossen Massstab zerstoeren.
Die Architektur
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider ↑ ↑ │ token-by-token │ SSE/streaming │ JSON messages │ responseDer Browser öffnet eine dauerhafte WebSocket-Verbindung. Wenn der Nutzer eine Nachricht sendet, streamt FastAPI die LLM-Antwort Token für Token zurück. Kein Polling, keine langlebigen HTTP-Requests.
Grundlegende Implementierung
1. FastAPI WebSocket-Endpunkt
from fastapi import WebSocket, WebSocketDisconnectfrom pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")async def chat(websocket: WebSocket): await websocket.accept()
try: while True: data = await websocket.receive_json() message = data["message"]
async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({ "type": "chunk", "content": chunk, })
await websocket.send_json({"type": "done"})
except WebSocketDisconnect: pass2. Frontend-Hook
import { useCallback, useRef, useState } from "react";
export function useChat() { const ws = useRef<WebSocket | null>(null); const [messages, setMessages] = useState<Message[]>([]); const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => { ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => { const data = JSON.parse(event.data);
if (data.type === "chunk") { setMessages((prev) => { const last = prev[prev.length - 1]; if (last?.role === "assistant") { return [ ...prev.slice(0, -1), { ...last, content: last.content + data.content }, ]; } return [...prev, { role: "assistant", content: data.content }]; }); }
if (data.type === "done") { setIsStreaming(false); } }; }, []);
const send = useCallback((message: string) => { setIsStreaming(true); setMessages((prev) => [...prev, { role: "user", content: message }]); ws.current?.send(JSON.stringify({ message })); }, []);
return { messages, send, connect, isStreaming };}Produktionsmuster
Die Basisversion funktioniert, aber die Produktion erfordert mehr. Hier sind die Muster, die wir in jedem Deployment verwenden.
Authentifizierung
Akzeptiere niemals anonyme WebSocket-Verbindungen. Validiere das Token während des Handshakes:
from fastapi import WebSocket, Depends, status
async def get_ws_user( websocket: WebSocket, token: str = Query(...),) -> User: try: return await verify_token(token) except InvalidToken: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) raise
@app.websocket("/ws/chat")async def chat( websocket: WebSocket, user: User = Depends(get_ws_user),): await websocket.accept() # Benutzer ist jetzt authentifiziertKonversationsverlauf
Speichere Nachrichten in der Datenbank und übergib den Verlauf an den Agenten:
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)): await websocket.accept()
# Load conversation history history: list[ModelMessage] = await db.get_messages(user.id)
try: while True: data = await websocket.receive_json()
async with agent.run_stream( data["message"], message_history=history, deps=Deps(user=user, db=db), ) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages history = response.all_messages() await db.save_messages(user.id, history)
except WebSocketDisconnect: passFehlerbehandlung
LLM-Anbieter fallen aus. Rate-Limits werden erreicht. Verbindungen brechen ab. Behandle es elegant:
from pydantic_ai.exceptions import ModelRetryError
try: async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk}) await websocket.send_json({"type": "done"})
except ModelRetryError as e: await websocket.send_json({ "type": "error", "message": "The AI model is temporarily unavailable. Please try again.", })
except Exception as e: logger.error(f"Stream error: {e}") await websocket.send_json({ "type": "error", "message": "An unexpected error occurred.", })Verbindungsverwaltung
Verfolge aktive Verbindungen für einen sauberen Shutdown und Verbindungslimits:
class ConnectionManager: def __init__(self, max_connections: int = 100): self.active: dict[str, WebSocket] = {} self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket): if len(self.active) >= self.max_connections: await websocket.close(code=1013) # Try Again Later return False
# Close existing connection for same user if user_id in self.active: await self.active[user_id].close()
self.active[user_id] = websocket return True
def disconnect(self, user_id: str): self.active.pop(user_id, None)Tipps zur Performance
- Verwende
delta=True— sende nur neue Tokens, nicht den gesamten kumulierten Text - Fasse kleine Chunks zusammen — wenn Tokens schneller ankommen als der WebSocket senden kann, puffere sie
- Setze Timeouts — trenne inaktive Verbindungen nach 5-10 Minuten
- Nutze Connection-Pooling — verwende LLM-Anbieterverbindungen über Requests hinweg wieder
- Überwache mit Logfire — verfolge jede WebSocket-Nachricht zum Debugging
Spare dir das Boilerplate
Alle diese Muster sind im Full-Stack AI Agent Template vorkonfiguriert. Wähle “WebSocket streaming” im Konfigurator und erhalte authentifiziertes, persistentes Streaming mit Fehlerbehandlung sofort einsatzbereit.
Verwandte Artikel
Von create-react-app zu create-ai-app: Der neue Standard für KI-Anwendungen
2016 standardisierte create-react-app, wie wir Frontends bauen. 2026 brauchen KI-Anwendungen denselben Moment — und er i...
AGENTS.md: So machen Sie Ihre Codebasis KI-Agenten-freundlich (Copilot, Cursor, Codex, Claude Code)
Jedes KI-Coding-Tool liest Ihr Repository anders. So gibt AGENTS.md — der aufkommende Tool-agnostische Standard — ihnen...
Von 0 zum produktionsreifen KI-Agenten in 30 Minuten — Full-Stack-Template mit 5 KI-Frameworks
Schritt-fuer-Schritt-Anleitung: Web-Konfigurator, Preset waehlen, KI-Framework auswaehlen, 75+ Optionen konfigurieren, d...