← Volver al blog Dev IA

Automatización IA con Python: Lo Que Realmente Funciona en Producción

2 jul. 20266 min

Python es el lenguaje de referencia para la automatización IA — no porque sea el mejor para todo, sino porque el ecosistema (SDK de Anthropic, librerías de parsing, integraciones) es el más maduro.

Aquí lo que uso realmente en producción, sin capas de frameworks innecesarios.

El Stack Minimalista que Funciona

Para el 90% de los casos de automatización IA, no necesitas LangChain ni LlamaIndex:

import anthropic
from pydantic import BaseModel
from typing import Optional

client = anthropic.Anthropic()

class ExtractionResult(BaseModel):
    company_name: str
    registration_number: Optional[str]
    contact_email: Optional[str]
    confidence: float

def extract_company_info(text: str) -> ExtractionResult:
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": f"""Extrae información de empresa de este texto.
Responde ÚNICAMENTE con JSON válido, sin markdown.

Texto: {text}

Formato esperado:
{{"company_name": "...", "registration_number": "...", "contact_email": "...", "confidence": 0.0-1.0}}"""
        }]
    )
    return ExtractionResult.model_validate_json(response.content[0].text)

Pydantic valida la salida. Si el LLM devuelve JSON malformado, lanza una excepción que puedes capturar y reintentar.

Pipeline de Procesamiento por Lotes

Patrón que uso para procesar grandes volúmenes:

import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

async def process_document(doc: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        try:
            response = await async_client.messages.create(
                model="claude-haiku-4-5-20251001",
                max_tokens=256,
                messages=[{"role": "user", "content": f"Clasifica este documento: {doc[:2000]}"}]
            )
            return {"status": "ok", "result": response.content[0].text}
        except Exception as e:
            return {"status": "error", "error": str(e)}

async def batch_process(documents: list[str], concurrency: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(concurrency)
    tasks = [process_document(doc, semaphore) for doc in documents]
    return await asyncio.gather(*tasks)

El Semaphore controla la concurrencia para no exceder los rate limits. En Haiku, llego hasta 20 peticiones simultáneas sin problemas.

Lo Que Evito

LangChain para pipelines simples. El overhead de abstracción ralentiza el debugging y oculta lo que el modelo realmente recibe. Para workflows simples de extracción o clasificación: no.

Llamadas síncronas en bucle for. Procesar 1.000 documentos secuencialmente en Haiku = ~3 horas. Con async y semáforo = ~10 minutos.

Un solo modelo para todo. Clasificación → Haiku. Análisis crítico → Sonnet. Diferencia de coste: ×10.

Monitorización Mínima en Producción

import time, logging

def timed_extract(text: str) -> tuple[ExtractionResult, float]:
    start = time.monotonic()
    result = extract_with_retry(text)
    elapsed = time.monotonic() - start
    logging.info(f"extraction ok | {elapsed:.2f}s | confidence={result.confidence:.2f}")
    return result, elapsed

Un log estructurado por llamada. Suficiente para detectar derivas de rendimiento o calidad.

SC

Stéphanie Caumont

Product Owner de IA · Saber más