Python is the reference language for AI automation — not because it’s best at everything, but because the ecosystem (Anthropic SDK, parsing libraries, integrations) is the most mature.
Here’s what I actually use in production, without the unnecessary framework layers.
The Minimalist Stack That Works
For 90% of AI automation use cases, you don’t need LangChain or LlamaIndex:
import anthropic
from pydantic import BaseModel
from typing import Optional
client = anthropic.Anthropic()
class ExtractionResult(BaseModel):
company_name: str
registration_number: Optional[str]
contact_email: Optional[str]
confidence: float
def extract_company_info(text: str) -> ExtractionResult:
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""Extract company information from this text.
Respond ONLY with valid JSON, no markdown.
Text: {text}
Expected format:
{{"company_name": "...", "registration_number": "...", "contact_email": "...", "confidence": 0.0-1.0}}"""
}]
)
return ExtractionResult.model_validate_json(response.content[0].text)
Pydantic validates the output. If the LLM returns malformed JSON, it raises an exception you can catch and retry.
Batch Processing Pipeline
Pattern I use for high-volume processing:
import asyncio
from anthropic import AsyncAnthropic
async_client = AsyncAnthropic()
async def process_document(doc: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
try:
response = await async_client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=256,
messages=[{"role": "user", "content": f"Classify this document: {doc[:2000]}"}]
)
return {"status": "ok", "result": response.content[0].text}
except Exception as e:
return {"status": "error", "error": str(e)}
async def batch_process(documents: list[str], concurrency: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
tasks = [process_document(doc, semaphore) for doc in documents]
return await asyncio.gather(*tasks)
The Semaphore controls concurrency to avoid blowing through rate limits. On Haiku, I run up to 20 concurrent requests without issues.
Structured Extraction with Retry
For critical cases where output format must be perfect:
def extract_with_retry(text: str, max_retries: int = 3) -> ExtractionResult:
last_error = None
for attempt in range(max_retries):
try:
return extract_company_info(text)
except (ValueError, ValidationError) as e:
last_error = e
if attempt < max_retries - 1:
text = f"{text}\n\n[PREVIOUS ERROR: {e} — fix the JSON format]"
raise RuntimeError(f"Extraction failed after {max_retries} attempts: {last_error}")
Feeding the error back to the model on retry resolves ~80% of failure cases.
What I Avoid
LangChain for simple pipelines. The abstraction overhead slows debugging and hides what the model actually receives. For complex workflows with memory and multiple agents it can be worth it — but for extraction or classification, no.
Synchronous calls in a for loop. Processing 1,000 documents sequentially on Haiku takes ~3 hours. With async and semaphore: ~10 minutes.
One model for everything. Classification → Haiku. Critical analysis → Sonnet. Cost difference: ×10.
Minimal Production Monitoring
import time
import logging
def timed_extract(text: str) -> tuple[ExtractionResult, float]:
start = time.monotonic()
result = extract_with_retry(text)
elapsed = time.monotonic() - start
logging.info(f"extraction ok | {elapsed:.2f}s | confidence={result.confidence:.2f}")
return result, elapsed
One structured log per call. Enough to detect performance or quality drift.
Stéphanie Caumont
AI Product Owner · Learn more