← Back to blog AI Dev

AI Automation with Python: What Actually Works in Production

Jul 2, 20266 min

Python is the reference language for AI automation — not because it’s best at everything, but because the ecosystem (Anthropic SDK, parsing libraries, integrations) is the most mature.

Here’s what I actually use in production, without the unnecessary framework layers.

The Minimalist Stack That Works

For 90% of AI automation use cases, you don’t need LangChain or LlamaIndex:

import anthropic
from pydantic import BaseModel
from typing import Optional

client = anthropic.Anthropic()

class ExtractionResult(BaseModel):
    company_name: str
    registration_number: Optional[str]
    contact_email: Optional[str]
    confidence: float

def extract_company_info(text: str) -> ExtractionResult:
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": f"""Extract company information from this text.
Respond ONLY with valid JSON, no markdown.

Text: {text}

Expected format:
{{"company_name": "...", "registration_number": "...", "contact_email": "...", "confidence": 0.0-1.0}}"""
        }]
    )
    return ExtractionResult.model_validate_json(response.content[0].text)

Pydantic validates the output. If the LLM returns malformed JSON, it raises an exception you can catch and retry.

Batch Processing Pipeline

Pattern I use for high-volume processing:

import asyncio
from anthropic import AsyncAnthropic

async_client = AsyncAnthropic()

async def process_document(doc: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        try:
            response = await async_client.messages.create(
                model="claude-haiku-4-5-20251001",
                max_tokens=256,
                messages=[{"role": "user", "content": f"Classify this document: {doc[:2000]}"}]
            )
            return {"status": "ok", "result": response.content[0].text}
        except Exception as e:
            return {"status": "error", "error": str(e)}

async def batch_process(documents: list[str], concurrency: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(concurrency)
    tasks = [process_document(doc, semaphore) for doc in documents]
    return await asyncio.gather(*tasks)

The Semaphore controls concurrency to avoid blowing through rate limits. On Haiku, I run up to 20 concurrent requests without issues.

Structured Extraction with Retry

For critical cases where output format must be perfect:

def extract_with_retry(text: str, max_retries: int = 3) -> ExtractionResult:
    last_error = None
    for attempt in range(max_retries):
        try:
            return extract_company_info(text)
        except (ValueError, ValidationError) as e:
            last_error = e
            if attempt < max_retries - 1:
                text = f"{text}\n\n[PREVIOUS ERROR: {e} — fix the JSON format]"
    raise RuntimeError(f"Extraction failed after {max_retries} attempts: {last_error}")

Feeding the error back to the model on retry resolves ~80% of failure cases.

What I Avoid

LangChain for simple pipelines. The abstraction overhead slows debugging and hides what the model actually receives. For complex workflows with memory and multiple agents it can be worth it — but for extraction or classification, no.

Synchronous calls in a for loop. Processing 1,000 documents sequentially on Haiku takes ~3 hours. With async and semaphore: ~10 minutes.

One model for everything. Classification → Haiku. Critical analysis → Sonnet. Cost difference: ×10.

Minimal Production Monitoring

import time
import logging

def timed_extract(text: str) -> tuple[ExtractionResult, float]:
    start = time.monotonic()
    result = extract_with_retry(text)
    elapsed = time.monotonic() - start
    logging.info(f"extraction ok | {elapsed:.2f}s | confidence={result.confidence:.2f}")
    return result, elapsed

One structured log per call. Enough to detect performance or quality drift.

SC

Stéphanie Caumont

AI Product Owner · Learn more