Skip to content
Wróć do bloga
Open Source

Strumieniowanie agentów AI w czasie rzeczywistym z WebSockets i FastAPI

Vstorm · · 5 min czytania
Spis treści

Dlaczego strumieniowanie przez WebSocket?

Standardowy model żądanie-odpowiedź HTTP sprawdza się przy szybkich wywołaniach API, ale odpowiedzi agentów AI mogą trwać 5-30 sekund. Bez strumieniowania użytkownicy wpatrują się w kręcący się spinner. Dzięki strumieniowaniu przez WebSocket widzą tokeny pojawiające się w czasie rzeczywistym — dokładnie tak, jak w ChatGPT czy Claude.

Oto jak zaimplementować to poprawnie z FastAPI.

TL;DR

  • WebSockety bija HTTP dla agentow AI - uzytkownicy widza tokeny w czasie rzeczywistym zamiast wpatrywac sie w spinner przez 5-30 sekund.
  • Connection manager obsluguje wielu klientow - sledzi aktywne polaczenia, broadcastuje eventy i automatycznie sprząta po rozlaczeniach.
  • Ustrukturyzowane eventy dla frontendu - oddzielne typy eventow text_delta, tool_call, tool_result i complete umozliwiaja bogate renderowanie UI.
  • Uwierzytelnianie dziala przez query params - WebSocket nie wspiera headerow, wiec tokeny JWT przekazuje sie jako parametry zapytania i waliduje przy polaczeniu.
  • Wzorce produkcyjne maja znaczenie - heartbeat pingi, logika reconnect, persystencja historii wiadomosci i rate limiting zapobiegaja problemom, ktore zabijaja aplikacje WebSocket w skali.

Architektura

architecture
Browser ←→ WebSocket ←→ FastAPI ←→ LLM Provider
↑ ↑
│ token-by-token │ SSE/streaming
│ JSON messages │ response

Przeglądarka otwiera trwałe połączenie WebSocket. Kiedy użytkownik wysyła wiadomość, FastAPI przesyła odpowiedź z LLM token po tokenie. Bez odpytywania, bez długotrwałych żądań HTTP.

Podstawowa implementacja

1. Endpoint WebSocket w FastAPI

backend/app/api/ws.py
from fastapi import WebSocket, WebSocketDisconnect
from pydantic_ai import Agent
agent = Agent("openai:gpt-4o")
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
message = data["message"]
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({
"type": "chunk",
"content": chunk,
})
await websocket.send_json({"type": "done"})
except WebSocketDisconnect:
pass

2. Hook frontendowy

frontend/src/hooks/useWebSocket.ts
import { useCallback, useRef, useState } from "react";
export function useChat() {
const ws = useRef<WebSocket | null>(null);
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const connect = useCallback(() => {
ws.current = new WebSocket("ws://localhost:8000/ws/chat");
ws.current.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === "chunk") {
setMessages((prev) => {
const last = prev[prev.length - 1];
if (last?.role === "assistant") {
return [
...prev.slice(0, -1),
{ ...last, content: last.content + data.content },
];
}
return [...prev, { role: "assistant", content: data.content }];
});
}
if (data.type === "done") {
setIsStreaming(false);
}
};
}, []);
const send = useCallback((message: string) => {
setIsStreaming(true);
setMessages((prev) => [...prev, { role: "user", content: message }]);
ws.current?.send(JSON.stringify({ message }));
}, []);
return { messages, send, connect, isStreaming };
}

Wzorce produkcyjne

Wersja podstawowa działa, ale produkcja wymaga więcej. Oto wzorce, które stosujemy w każdym wdrożeniu.

Uwierzytelnianie

Nigdy nie akceptuj anonimowych połączeń WebSocket. Zweryfikuj token podczas nawiązywania połączenia:

backend/app/api/ws.py
from fastapi import WebSocket, Depends, status
async def get_ws_user(
websocket: WebSocket,
token: str = Query(...),
) -> User:
try:
return await verify_token(token)
except InvalidToken:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
raise
@app.websocket("/ws/chat")
async def chat(
websocket: WebSocket,
user: User = Depends(get_ws_user),
):
await websocket.accept()
# użytkownik jest teraz uwierzytelniony

Historia konwersacji

Przechowuj wiadomości w bazie danych i przekazuj historię do agenta:

backend/app/api/ws.py
from pydantic_ai.messages import ModelMessage
@app.websocket("/ws/chat")
async def chat(websocket: WebSocket, user: User = Depends(get_ws_user)):
await websocket.accept()
# Load conversation history
history: list[ModelMessage] = await db.get_messages(user.id)
try:
while True:
data = await websocket.receive_json()
async with agent.run_stream(
data["message"],
message_history=history,
deps=Deps(user=user, db=db),
) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
# Update history with new messages
history = response.all_messages()
await db.save_messages(user.id, history)
except WebSocketDisconnect:
pass

Obsługa błędów

Dostawcy LLM ulegają awariom. Limity zapytań zostają przekroczone. Połączenia się zrywają. Obsłuż to elegancko:

backend/app/api/ws.py
from pydantic_ai.exceptions import ModelRetryError
try:
async with agent.run_stream(message) as response:
async for chunk in response.stream_text(delta=True):
await websocket.send_json({"type": "chunk", "content": chunk})
await websocket.send_json({"type": "done"})
except ModelRetryError as e:
await websocket.send_json({
"type": "error",
"message": "The AI model is temporarily unavailable. Please try again.",
})
except Exception as e:
logger.error(f"Stream error: {e}")
await websocket.send_json({
"type": "error",
"message": "An unexpected error occurred.",
})

Zarządzanie połączeniami

Śledź aktywne połączenia w celu płynnego zamykania serwera i kontroli limitów:

backend/app/core/connections.py
class ConnectionManager:
def __init__(self, max_connections: int = 100):
self.active: dict[str, WebSocket] = {}
self.max_connections = max_connections
async def connect(self, user_id: str, websocket: WebSocket):
if len(self.active) >= self.max_connections:
await websocket.close(code=1013) # Try Again Later
return False
# Close existing connection for same user
if user_id in self.active:
await self.active[user_id].close()
self.active[user_id] = websocket
return True
def disconnect(self, user_id: str):
self.active.pop(user_id, None)

Wskazówki dotyczące wydajności

  1. Używaj delta=True — wysyłaj tylko nowe tokeny, a nie cały skumulowany tekst
  2. Grupuj małe fragmenty — jeśli tokeny przychodzą szybciej niż WebSocket jest w stanie wysłać, buforuj je
  3. Ustawiaj limity czasu — rozłączaj bezczynne połączenia po 5-10 minutach
  4. Używaj puli połączeń — wykorzystuj ponownie połączenia z dostawcą LLM między żądaniami
  5. Monitoruj za pomocą Logfire — śledź każdą wiadomość WebSocket w celach debugowania

Pomiń boilerplate

Wszystkie te wzorce są wstępnie skonfigurowane w szablonie Full-Stack AI Agent Template. Wybierz “WebSocket streaming” w konfiguratorze i otrzymaj uwierzytelnione, trwałe strumieniowanie z obsługą błędów od razu po wyjęciu z pudełka.

Udostępnij artykuł

Powiązane artykuły

Gotowy, żeby wdrożyć swoją aplikację AI?

Wybierz frameworki, wygeneruj projekt gotowy do produkcji i wdróż. 75+ opcji, jedna komenda, zero długu konfiguracyjnego.

Potrzebujesz pomocy przy budowie agentów AI?