FastAPI es la mejor elección para exponer un LLM como API: async nativo, validación Pydantic, documentación automática. Esta es la estructura que uso en producción.
Estructura Base
api/
├── main.py # app FastAPI, rutas
├── llm.py # cliente Anthropic/OpenAI
├── models.py # esquemas Pydantic
└── middleware.py # rate limiting, auth
Cliente LLM Robusto
# llm.py
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential
client = anthropic.AsyncAnthropic()
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
reraise=True,
)
async def complete(prompt: str, max_tokens: int = 1024) -> str:
response = await client.messages.create(
model="claude-sonnet-4-6",
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text
tenacity gestiona los reintentos automáticamente en errores transitorios (rate limit 429, timeout 5xx).
Endpoint Estándar
# models.py
from pydantic import BaseModel, Field
class CompletionRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=10000)
max_tokens: int = Field(default=512, ge=1, le=4096)
class CompletionResponse(BaseModel):
result: str
model: str
tokens_used: int
# main.py
from fastapi import FastAPI, HTTPException
from .models import CompletionRequest, CompletionResponse
from .llm import complete
app = FastAPI(title="AI API")
@app.post("/complete", response_model=CompletionResponse)
async def run_completion(req: CompletionRequest):
try:
result = await complete(req.prompt, req.max_tokens)
return CompletionResponse(
result=result,
model="claude-sonnet-4-6",
tokens_used=len(result.split()),
)
except anthropic.RateLimitError:
raise HTTPException(status_code=429, detail="Rate limit exceeded")
except anthropic.APIError as e:
raise HTTPException(status_code=502, detail=f"LLM error: {e}")
Streaming
Para respuestas largas, el streaming es indispensable — el cliente ve los tokens llegar en lugar de esperar 10 segundos.
from fastapi.responses import StreamingResponse
@app.post("/complete/stream")
async def stream_completion(req: CompletionRequest):
async def generator():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=req.max_tokens,
messages=[{"role": "user", "content": req.prompt}],
) as stream:
async for text in stream.text_stream:
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generator(), media_type="text/event-stream")
Rate Limiting
Sin rate limiting, un solo cliente puede agotar toda tu cuota de API.
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/complete")
@limiter.limit("20/minute")
async def run_completion(request: Request, req: CompletionRequest):
...
20 peticiones/minuto por IP — ajusta según tu modelo de facturación.
Despliegue
FROM python:3.13-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]
En Vercel o Fly.io, un solo worker es suficiente (async = concurrencia nativa). En una VM, escala --workers al número de CPUs.
Lo Que Añado Siempre en Producción
- Autenticación: Bearer token o API key via
Depends() - Logging estructurado: hash del prompt + latencia + tokens por petición
- Timeout explícito:
asyncio.wait_for(complete(...), timeout=30.0) - Health check:
GET /healthretornando{"status": "ok", "llm": "reachable"}
SC
Stéphanie Caumont
Product Owner de IA · Saber más