Strumieniowanie agentów AI w czasie rzeczywistym z WebSockets i FastAPI
Spis treści
Dlaczego strumieniowanie przez WebSocket?
Standardowy model żądanie-odpowiedź HTTP sprawdza się przy szybkich wywołaniach API, ale odpowiedzi agentów AI mogą trwać 5-30 sekund. Bez strumieniowania użytkownicy wpatrują się w kręcący się spinner. Dzięki strumieniowaniu przez WebSocket widzą tokeny pojawiające się w czasie rzeczywistym — dokładnie tak, jak w ChatGPT czy Claude.
Oto jak zaimplementować to poprawnie z FastAPI.
TL;DR
- WebSockety bija HTTP dla agentow AI - uzytkownicy widza tokeny w czasie rzeczywistym zamiast wpatrywac sie w spinner przez 5-30 sekund.
- Connection manager obsluguje wielu klientow - sledzi aktywne polaczenia, broadcastuje eventy i automatycznie sprząta po rozlaczeniach.
- Ustrukturyzowane eventy dla frontendu - oddzielne typy eventow
text_delta,tool_call,tool_resulticompleteumozliwiaja bogate renderowanie UI. - Uwierzytelnianie dziala przez query params - WebSocket nie wspiera headerow, wiec tokeny JWT przekazuje sie jako parametry zapytania i waliduje przy polaczeniu.
- Wzorce produkcyjne maja znaczenie - heartbeat pingi, logika reconnect, persystencja historii wiadomosci i rate limiting zapobiegaja problemom, ktore zabijaja aplikacje WebSocket w skali.
Architektura
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider ↑ ↑ │ token-by-token │ SSE/streaming │ JSON messages │ responsePrzeglądarka otwiera trwałe połączenie WebSocket. Kiedy użytkownik wysyła wiadomość, FastAPI przesyła odpowiedź z LLM token po tokenie. Bez odpytywania, bez długotrwałych żądań HTTP.
Podstawowa implementacja
1. Endpoint WebSocket w FastAPI
from fastapi import WebSocket, WebSocketDisconnectfrom pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")async def chat(websocket: WebSocket): await websocket.accept()
try: while True: data = await websocket.receive_json() message = data["message"]
async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({ "type": "chunk", "content": chunk, })
await websocket.send_json({"type": "done"})
except WebSocketDisconnect: pass2. Hook frontendowy
import { useCallback, useRef, useState } from "react";
export function useChat() { const ws = useRef<WebSocket | null>(null); const [messages, setMessages] = useState<Message[]>([]); const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => { ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => { const data = JSON.parse(event.data);
if (data.type === "chunk") { setMessages((prev) => { const last = prev[prev.length - 1]; if (last?.role === "assistant") { return [ ...prev.slice(0, -1), { ...last, content: last.content + data.content }, ]; } return [...prev, { role: "assistant", content: data.content }]; }); }
if (data.type === "done") { setIsStreaming(false); } }; }, []);
const send = useCallback((message: string) => { setIsStreaming(true); setMessages((prev) => [...prev, { role: "user", content: message }]); ws.current?.send(JSON.stringify({ message })); }, []);
return { messages, send, connect, isStreaming };}Wzorce produkcyjne
Wersja podstawowa działa, ale produkcja wymaga więcej. Oto wzorce, które stosujemy w każdym wdrożeniu.
Uwierzytelnianie
Nigdy nie akceptuj anonimowych połączeń WebSocket. Zweryfikuj token podczas nawiązywania połączenia:
from fastapi import WebSocket, Depends, status
async def get_ws_user( websocket: WebSocket, token: str = Query(...),) -> User: try: return await verify_token(token) except InvalidToken: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) raise
@app.websocket("/ws/chat")async def chat( websocket: WebSocket, user: User = Depends(get_ws_user),): await websocket.accept() # użytkownik jest teraz uwierzytelnionyHistoria konwersacji
Przechowuj wiadomości w bazie danych i przekazuj historię do agenta:
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)): await websocket.accept()
# Load conversation history history: list[ModelMessage] = await db.get_messages(user.id)
try: while True: data = await websocket.receive_json()
async with agent.run_stream( data["message"], message_history=history, deps=Deps(user=user, db=db), ) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages history = response.all_messages() await db.save_messages(user.id, history)
except WebSocketDisconnect: passObsługa błędów
Dostawcy LLM ulegają awariom. Limity zapytań zostają przekroczone. Połączenia się zrywają. Obsłuż to elegancko:
from pydantic_ai.exceptions import ModelRetryError
try: async with agent.run_stream(message) as response: async for chunk in response.stream_text(delta=True): await websocket.send_json({"type": "chunk", "content": chunk}) await websocket.send_json({"type": "done"})
except ModelRetryError as e: await websocket.send_json({ "type": "error", "message": "The AI model is temporarily unavailable. Please try again.", })
except Exception as e: logger.error(f"Stream error: {e}") await websocket.send_json({ "type": "error", "message": "An unexpected error occurred.", })Zarządzanie połączeniami
Śledź aktywne połączenia w celu płynnego zamykania serwera i kontroli limitów:
class ConnectionManager: def __init__(self, max_connections: int = 100): self.active: dict[str, WebSocket] = {} self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket): if len(self.active) >= self.max_connections: await websocket.close(code=1013) # Try Again Later return False
# Close existing connection for same user if user_id in self.active: await self.active[user_id].close()
self.active[user_id] = websocket return True
def disconnect(self, user_id: str): self.active.pop(user_id, None)Wskazówki dotyczące wydajności
- Używaj
delta=True— wysyłaj tylko nowe tokeny, a nie cały skumulowany tekst - Grupuj małe fragmenty — jeśli tokeny przychodzą szybciej niż WebSocket jest w stanie wysłać, buforuj je
- Ustawiaj limity czasu — rozłączaj bezczynne połączenia po 5-10 minutach
- Używaj puli połączeń — wykorzystuj ponownie połączenia z dostawcą LLM między żądaniami
- Monitoruj za pomocą Logfire — śledź każdą wiadomość WebSocket w celach debugowania
Pomiń boilerplate
Wszystkie te wzorce są wstępnie skonfigurowane w szablonie Full-Stack AI Agent Template. Wybierz “WebSocket streaming” w konfiguratorze i otrzymaj uwierzytelnione, trwałe strumieniowanie z obsługą błędów od razu po wyjęciu z pudełka.
Powiązane artykuły
Od create-react-app do create-ai-app: Nowy standard dla aplikacji AI
W 2016 roku create-react-app ustandaryzował budowanie frontendów. W 2026 roku aplikacje AI potrzebują tego samego moment...
AGENTS.md: Jak przygotować repozytorium dla agentów AI (Copilot, Cursor, Codex, Claude Code)
Każde narzędzie AI do kodowania czyta Twoje repozytorium inaczej. Sprawdź, jak AGENTS.md — wschodzący standard — daje im...
Od zera do produkcyjnego agenta AI w 30 minut — szablon full-stack z 5 frameworkami AI
Krok po kroku: konfigurator webowy, wybierz preset, wybierz framework AI, skonfiguruj 75+ opcji, docker-compose up — dzi...