Automatisation IA avec Python : ce qui marche vraiment en production
Python est le langage de référence pour l’automatisation IA. Pas parce que c’est le meilleur pour tout — mais parce que l’écosystème (SDK Anthropic, bibliothèques de parsing, intégrations) y est le plus mature.
Voici ce que j’utilise réellement en production, sans la couche de frameworks inutiles.
La stack minimaliste qui marche
Pour 90% des cas d’automatisation IA, tu n’as pas besoin de LangChain ni de LlamaIndex :
import anthropic
from pydantic import BaseModel
from typing import Optional
client = anthropic.Anthropic()
class ExtractionResult(BaseModel):
company_name: str
siret: Optional[str]
contact_email: Optional[str]
confidence: float
def extract_company_info(text: str) -> ExtractionResult:
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""Extrait les informations d'entreprise depuis ce texte.
Réponds UNIQUEMENT en JSON valide, sans markdown.
Texte: {text}
Format attendu:
{{"company_name": "...", "siret": "...", "contact_email": "...", "confidence": 0.0-1.0}}"""
}]
)
return ExtractionResult.model_validate_json(response.content[0].text)
Pydantic fait la validation de la sortie. Si le LLM retourne un JSON malformé, ça lève une exception que tu peux catch et retry.
Pipeline de traitement par lots
Pattern que j’utilise pour traiter des volumes importants :
import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic()
async def process_document(doc: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
try:
response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=256,
messages=[{"role": "user", "content": f"Classifie ce document: {doc[:2000]}"}]
)
return {"status": "ok", "result": response.content[0].text}
except Exception as e:
return {"status": "error", "error": str(e)}
async def batch_process(documents: list[str], concurrency: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
tasks = [process_document(doc, semaphore) for doc in documents]
return await asyncio.gather(*tasks)
Le Semaphore contrôle la concurrence pour ne pas exploser les rate limits. Sur Haiku, je monte jusqu’à 20 requêtes simultanées sans problème.
Extraction structurée avec retry
Pour les cas critiques où le format de sortie doit être parfait :
def extract_with_retry(text: str, max_retries: int = 3) -> ExtractionResult:
last_error = None
for attempt in range(max_retries):
try:
return extract_company_info(text)
except (ValueError, ValidationError) as e:
last_error = e
# On enrichit le prompt avec l'erreur pour guider le retry
if attempt < max_retries - 1:
text = f"{text}\n\n[ERREUR PRÉCÉDENTE: {e} — corrige le format JSON]"
raise RuntimeError(f"Extraction failed after {max_retries} attempts: {last_error}")
Fournir l’erreur au modèle dans le retry résout ~80% des cas d’échec.
Ce que j’évite
LangChain pour des pipelines simples. L’overhead d’abstraction ralentit le debugging et cache ce que fait vraiment le modèle. Pour des workflows complexes avec mémoire et agents multiples, ça peut valoir le coup — mais pour une extraction ou une classification, non.
Appels synchrones en boucle for. Traiter 1 000 documents en séquentiel sur Haiku = ~3h. En async avec semaphore = ~10 min.
Un seul modèle pour tout. Classification → Haiku. Analyse critique → Sonnet. Différence de coût : ×10.
Monitoring de prod minimal
import time
import logging
def timed_extract(text: str) -> tuple[ExtractionResult, float]:
start = time.monotonic()
result = extract_with_retry(text)
elapsed = time.monotonic() - start
logging.info(f"extraction ok | {elapsed:.2f}s | confidence={result.confidence:.2f}")
return result, elapsed
Un log structuré par appel. C’est suffisant pour détecter les dérives de performance ou de qualité.
Stéphanie Caumont
Product Owner IA · En savoir plus