FastAPI est le meilleur choix pour exposer un LLM en API : async natif, validation Pydantic, documentation auto. Voici la structure que j’utilise en production.
Structure de base
api/
├── main.py # app FastAPI, routes
├── llm.py # client Anthropic/OpenAI
├── models.py # schémas Pydantic
└── middleware.py # rate limiting, auth
Client LLM robuste
# llm.py
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential
client = anthropic.AsyncAnthropic()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
async def complete(prompt: str, max_tokens: int = 1024) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
tenacity gère les retries automatiquement sur les erreurs transitoires (rate limit 429, timeout 5xx).
Endpoint standard
# models.py
from pydantic import BaseModel, Field
class CompletionRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=10000)
max_tokens: int = Field(default=512, ge=1, le=4096)
class CompletionResponse(BaseModel):
result: str
model: str
tokens_used: int
# main.py
from fastapi import FastAPI, HTTPException
from .models import CompletionRequest, CompletionResponse
from .llm import complete
app = FastAPI(title="AI API")
@app.post("/complete", response_model=CompletionResponse)
async def run_completion(req: CompletionRequest):
try:
result = await complete(req.prompt, req.max_tokens)
return CompletionResponse(
result=result,
model="claude-sonnet-4-6",
tokens_used=len(result.split()), # approximation
)
except anthropic.RateLimitError:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
except anthropic.APIError as e:
raise HTTPException(status_code=502, detail=f"LLM error: {e}")
Streaming
Pour les réponses longues, le streaming est indispensable — le client voit les tokens arriver au lieu d’attendre 10 secondes.
from fastapi.responses import StreamingResponse
@app.post("/complete/stream")
async def stream_completion(req: CompletionRequest):
async def generator():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=req.max_tokens,
messages=[{"role": "user", "content": req.prompt}],
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generator(), media_type="text/event-stream")
Rate Limiting
Sans rate limiting, un seul client peut épuiser votre quota API.
# middleware.py
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
# main.py
from slowapi import _rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/complete")
@limiter.limit("20/minute")
async def run_completion(request: Request, req: CompletionRequest):
...
20 requêtes/minute par IP — ajustez selon votre modèle de facturation.
Déploiement
# Dockerfile minimal
FROM python:3.13-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
Sur Vercel ou Fly.io, un seul worker suffit (async = concurrence native). Sur une VM, adaptez --workers au nombre de CPU.
Ce que j’ajoute toujours en prod
- Authentification : Bearer token ou API key via
Depends() - Logging structuré : prompt hash + latence + tokens par requête
- Timeout explicite :
asyncio.wait_for(complete(...), timeout=30.0) - Health check :
GET /healthretournant{"status": "ok", "llm": "reachable"}
SC
Stéphanie Caumont
Product Owner IA · En savoir plus