FastAPI is the best choice for exposing a LLM as an API: native async, Pydantic validation, automatic docs. Here’s the structure I use in production.
Base Structure
api/
├── main.py # FastAPI app, routes
├── llm.py # Anthropic/OpenAI client
├── models.py # Pydantic schemas
└── middleware.py # rate limiting, auth
Robust LLM Client
# llm.py
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential
client = anthropic.AsyncAnthropic()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
async def complete(prompt: str, max_tokens: int = 1024) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
tenacity handles retries automatically on transient errors (rate limit 429, timeout 5xx).
Standard Endpoint
# models.py
from pydantic import BaseModel, Field
class CompletionRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=10000)
max_tokens: int = Field(default=512, ge=1, le=4096)
class CompletionResponse(BaseModel):
result: str
model: str
tokens_used: int
# main.py
from fastapi import FastAPI, HTTPException
from .models import CompletionRequest, CompletionResponse
from .llm import complete
app = FastAPI(title="AI API")
@app.post("/complete", response_model=CompletionResponse)
async def run_completion(req: CompletionRequest):
try:
result = await complete(req.prompt, req.max_tokens)
return CompletionResponse(
result=result,
model="claude-sonnet-4-6",
tokens_used=len(result.split()), # approximation
)
except anthropic.RateLimitError:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
except anthropic.APIError as e:
raise HTTPException(status_code=502, detail=f"LLM error: {e}")
Streaming
For long responses, streaming is essential — the client sees tokens as they arrive instead of waiting 10 seconds.
from fastapi.responses import StreamingResponse
@app.post("/complete/stream")
async def stream_completion(req: CompletionRequest):
async def generator():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=req.max_tokens,
messages=[{"role": "user", "content": req.prompt}],
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generator(), media_type="text/event-stream")
Rate Limiting
Without rate limiting, a single client can exhaust your entire API quota.
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/complete")
@limiter.limit("20/minute")
async def run_completion(request: Request, req: CompletionRequest):
...
20 requests/minute per IP — adjust to your billing model.
Deployment
FROM python:3.13-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
On Vercel or Fly.io, a single worker is enough (async = native concurrency). On a VM, scale --workers to CPU count.
What I Always Add in Production
- Auth: Bearer token or API key via
Depends() - Structured logging: prompt hash + latency + tokens per request
- Explicit timeout:
asyncio.wait_for(complete(...), timeout=30.0) - Health check:
GET /healthreturning{"status": "ok", "llm": "reachable"}
SC
Stéphanie Caumont
AI Product Owner · Learn more