Skip to content
Zurück zum Blog
Open Source

Echtzeit-Streaming von KI-Agenten mit WebSockets und FastAPI

Vstorm · · 4 Min. Lesezeit
Verfügbar in: English · Español · Polski
Inhaltsverzeichnis

Warum WebSocket-Streaming?

Das HTTP-Request-Response-Modell funktioniert gut bei schnellen API-Aufrufen, aber Antworten von KI-Agenten können 5-30 Sekunden dauern. Ohne Streaming starren die Nutzer auf einen Lade-Spinner. Mit WebSocket-Streaming sehen sie Tokens in Echtzeit erscheinen — das gleiche Erlebnis wie bei ChatGPT oder Claude.

So implementiert man es richtig mit FastAPI.

TL;DR

  • WebSockets schlagen HTTP fuer AI-Agenten - Nutzer sehen Tokens in Echtzeit, statt 5-30 Sekunden auf einen Spinner zu starren.
  • Connection Manager verwaltet mehrere Clients - verfolgt aktive Verbindungen, broadcastet Events und raeumt Disconnects automatisch auf.
  • Strukturierte Events fuers Frontend - separate Event-Typen text_delta, tool_call, tool_result und complete ermoeglichen reichhaltiges UI-Rendering.
  • Authentifizierung funktioniert ueber Query-Parameter - WebSocket unterstuetzt keine Header, also werden JWT-Tokens als Query-Parameter uebergeben und beim Verbindungsaufbau validiert.
  • Produktionsmuster sind wichtig - Heartbeat-Pings, Reconnection-Logik, Nachrichten-Persistenz und Rate Limiting verhindern die Probleme, die WebSocket-Apps im grossen Massstab zerstoeren.

Die Architektur

architecture
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider
↑ ↑
│ token-by-token │ SSE/streaming
│ JSON messages │ response

Der Browser öffnet eine dauerhafte WebSocket-Verbindung. Wenn der Nutzer eine Nachricht sendet, streamt FastAPI die LLM-Antwort Token für Token zurück. Kein Polling, keine langlebigen HTTP-Requests.

Grundlegende Implementierung

1. FastAPI WebSocket-Endpunkt

backend/app/api/ws.py
from fastapi import WebSocket, WebSocketDisconnect
from pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message = data["message"]
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({
"type": "chunk",
"content": chunk,
})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass

2. Frontend-Hook

frontend/src/hooks/useWebSocket.ts
import { useCallback, useRef, useState } from "react";
export function useChat() {
const ws = useRef<WebSocket | null>(null);
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => {
ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "chunk") {
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last?.role === "assistant") {
return [
...prev.slice(0, -1),
{ ...last, content: last.content + data.content },
];
}
return [...prev, { role: "assistant", content: data.content }];
});
}
if (data.type === "done") {
setIsStreaming(false);
}
};
}, []);
const send = useCallback((message: string) => {
setIsStreaming(true);
setMessages((prev) => [...prev, { role: "user", content: message }]);
ws.current?.send(JSON.stringify({ message }));
}, []);
return { messages, send, connect, isStreaming };
}

Produktionsmuster

Die Basisversion funktioniert, aber die Produktion erfordert mehr. Hier sind die Muster, die wir in jedem Deployment verwenden.

Authentifizierung

Akzeptiere niemals anonyme WebSocket-Verbindungen. Validiere das Token während des Handshakes:

backend/app/api/ws.py
from fastapi import WebSocket, Depends, status
async def get_ws_user(
websocket: WebSocket,
token: str = Query(...),
) -> User:
try:
return await verify_token(token)
except InvalidToken:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
raise
@app.websocket("/ws/chat")
async def chat(
websocket: WebSocket,
user: User = Depends(get_ws_user),
):
await websocket.accept()
# Benutzer ist jetzt authentifiziert

Konversationsverlauf

Speichere Nachrichten in der Datenbank und übergib den Verlauf an den Agenten:

backend/app/api/ws.py
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)):
await websocket.accept()
# Load conversation history
history: list[ModelMessage] = await db.get_messages(user.id)
try:
while True:
data = await websocket.receive_json()
async with agent.run_stream(
data["message"],
message_history=history,
deps=Deps(user=user, db=db),
) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages
history = response.all_messages()
await db.save_messages(user.id, history)
except WebSocketDisconnect:
pass

Fehlerbehandlung

LLM-Anbieter fallen aus. Rate-Limits werden erreicht. Verbindungen brechen ab. Behandle es elegant:

backend/app/api/ws.py
from pydantic_ai.exceptions import ModelRetryError
try:
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
except ModelRetryError as e:
await websocket.send_json({
"type": "error",
"message": "The AI model is temporarily unavailable. Please try again.",
})
except Exception as e:
logger.error(f"Stream error: {e}")
await websocket.send_json({
"type": "error",
"message": "An unexpected error occurred.",
})

Verbindungsverwaltung

Verfolge aktive Verbindungen für einen sauberen Shutdown und Verbindungslimits:

backend/app/core/connections.py
class ConnectionManager:
def __init__(self, max_connections: int = 100):
self.active: dict[str, WebSocket] = {}
self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket):
if len(self.active) >= self.max_connections:
await websocket.close(code=1013) # Try Again Later
return False
# Close existing connection for same user
if user_id in self.active:
await self.active[user_id].close()
self.active[user_id] = websocket
return True
def disconnect(self, user_id: str):
self.active.pop(user_id, None)

Tipps zur Performance

  1. Verwende delta=True — sende nur neue Tokens, nicht den gesamten kumulierten Text
  2. Fasse kleine Chunks zusammen — wenn Tokens schneller ankommen als der WebSocket senden kann, puffere sie
  3. Setze Timeouts — trenne inaktive Verbindungen nach 5-10 Minuten
  4. Nutze Connection-Pooling — verwende LLM-Anbieterverbindungen über Requests hinweg wieder
  5. Überwache mit Logfire — verfolge jede WebSocket-Nachricht zum Debugging

Spare dir das Boilerplate

Alle diese Muster sind im Full-Stack AI Agent Template vorkonfiguriert. Wähle “WebSocket streaming” im Konfigurator und erhalte authentifiziertes, persistentes Streaming mit Fehlerbehandlung sofort einsatzbereit.

Artikel teilen

Verwandte Artikel

Bereit, deine KI-App zu shippen?

Wähle deine Frameworks, generiere ein produktionsreifes Projekt und deploye. 75+ Optionen, ein Befehl, null Config-Schulden.

Brauchen Sie Hilfe beim Aufbau von KI-Agenten?