CLAUDE LABJP
CORPS — Anthropic unveils Claude Corps (Jun 11), a $150M national fellowship placing 1,000 early-career workers inside US nonprofits; the first cohort starts in OctoberSUBAGENTS — Claude Code sub-agents can now spawn their own sub-agents, up to 5 levels deep — multi-stage delegation workflows out of the boxWORKFLOWS — Dynamic workflows arrive in research preview across CLI, Desktop, and VS Code for codebase-wide bug hunts and large migrations (Max/Team/Enterprise)BILLING — 2 days to the Jun 15 change: Agent SDK, headless runs, and GitHub Actions move to monthly credits ($20/$100/$200); Sonnet 4 and Opus 4 retire from the API the same dayFABLE5 — Fable 5 remains included free on Pro, Max, Team, and Enterprise through Jun 22CODE80 — IPO coverage reports Claude now writes over 80% of its own code, up from under 10% in February 2025CORPS — Anthropic unveils Claude Corps (Jun 11), a $150M national fellowship placing 1,000 early-career workers inside US nonprofits; the first cohort starts in OctoberSUBAGENTS — Claude Code sub-agents can now spawn their own sub-agents, up to 5 levels deep — multi-stage delegation workflows out of the boxWORKFLOWS — Dynamic workflows arrive in research preview across CLI, Desktop, and VS Code for codebase-wide bug hunts and large migrations (Max/Team/Enterprise)BILLING — 2 days to the Jun 15 change: Agent SDK, headless runs, and GitHub Actions move to monthly credits ($20/$100/$200); Sonnet 4 and Opus 4 retire from the API the same dayFABLE5 — Fable 5 remains included free on Pro, Max, Team, and Enterprise through Jun 22CODE80 — IPO coverage reports Claude now writes over 80% of its own code, up from under 10% in February 2025
Articles/API & SDK
API & SDK/2026-06-13Advanced

Claude API Python Advanced Cookbook: 20 Production Patterns You'll Actually Use

20 battle-tested Python patterns for the Claude API—retry logic, parallel processing, cost optimization, testing, and monitoring. Copy-paste ready code recipes.

Claude API67Python15production87patternsoptimization4practical

Premium Article

The first night I shipped a script that ran flawlessly on my laptop, a cascade of 429s stalled the whole pipeline at 2 AM. The culprit wasn't the rate limit itself—it was my own design, which had no retry logic. Run the Claude API in production with Python and you'll meet at least one of these "invisible-on-localhost" walls.

This cookbook is the set of patterns I've written down each time I hit one of those walls, organized into twenty recipes. It's less something to read front-to-back and more something to copy from when you need it.

One thing worth saying up front: you don't need all twenty from day one. Production resilience isn't a "kitchen sink"—it's built by defusing the specific landmines your app actually steps on, in order. As an indie developer running several services, I didn't start with all of them either; I added each one after an incident forced my hand. To make that mapping clear, I've grouped the patterns into four clusters.

Patterns 1–4: Resilient API Foundation

Pattern 1: Exponential Backoff with Jitter

Distinguishes between rate limits (429) and server errors (5xx), backing off appropriately for each.

import anthropic
import time
import random
from typing import TypeVar, Callable
 
T = TypeVar('T')
 
def with_retry(
    func: Callable[..., T],
    max_attempts: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True
) -> T:
    """Exponential backoff + jitter for API calls."""
    for attempt in range(max_attempts):
        try:
            return func()
        except anthropic.RateLimitError as e:
            if attempt == max_attempts - 1:
                raise
            retry_after = getattr(e, 'retry_after', None)
            delay = retry_after if retry_after else min(
                base_delay * (2 ** attempt), max_delay
            )
            if jitter:
                delay *= (0.5 + random.random() * 0.5)
            print(f"Rate limited. Retrying in {delay:.1f}s ({attempt+1}/{max_attempts})")
            time.sleep(delay)
        except anthropic.APIStatusError as e:
            if e.status_code < 500 or attempt == max_attempts - 1:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            time.sleep(delay)
 
client = anthropic.Anthropic()
response = with_retry(
    lambda: client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}]
    )
)

Pattern 2: Circuit Breaker

When failures cascade, stop hitting the API entirely and wait for recovery.

import time
from enum import Enum
from threading import Lock
 
class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"
 
class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0.0
        self._lock = Lock()
 
    def call(self, func, *args, **kwargs):
        with self._lock:
            if self.state == CircuitState.OPEN:
                elapsed = time.time() - self.last_failure_time
                if elapsed >= self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.success_count = 0
                else:
                    raise Exception(f"Circuit OPEN. Retry in {self.recovery_timeout - elapsed:.0f}s")
        try:
            result = func(*args, **kwargs)
            with self._lock:
                if self.state == CircuitState.HALF_OPEN:
                    self.success_count += 1
                    if self.success_count >= self.success_threshold:
                        self.state = CircuitState.CLOSED
                        self.failure_count = 0
                else:
                    self.failure_count = 0
            return result
        except Exception:
            with self._lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.failure_count >= self.failure_threshold:
                    self.state = CircuitState.OPEN
            raise
 
claude_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)

Pattern 3: Streaming with Timeout

Streaming can hang indefinitely on network issues. Always set a deadline.

import asyncio
import anthropic
 
async def stream_with_timeout(prompt: str, timeout_seconds: float = 30.0) -> str:
    client = anthropic.AsyncAnthropic()
    collected_text = []
 
    async def _stream():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for text in stream.text_stream:
                collected_text.append(text)
        return "".join(collected_text)
 
    try:
        return await asyncio.wait_for(_stream(), timeout=timeout_seconds)
    except asyncio.TimeoutError:
        partial = "".join(collected_text)
        raise TimeoutError(
            f"Stream timed out after {timeout_seconds}s. "
            f"Got {len(partial)} chars before cutoff."
        )

Pattern 4: Composite Guard (Retry + Breaker + Timeout)

All three combined into a single production-ready wrapper.

class RobustClaudeClient:
    def __init__(self):
        self.client = anthropic.Anthropic()
        self.breaker = CircuitBreaker()
    
    def complete(self, prompt: str, **kwargs) -> str:
        def _call():
            response = self.client.messages.create(
                model=kwargs.get("model", "claude-sonnet-4-6"),
                max_tokens=kwargs.get("max_tokens", 1024),
                messages=[{"role": "user", "content": prompt}],
                timeout=kwargs.get("timeout", 30.0)
            )
            return response.content[0].text
        
        return self.breaker.call(lambda: with_retry(_call, max_attempts=3))

These first four aren't alternatives—they stack. The circuit breaker is the one people skip, but hammering a temporarily degraded endpoint with retries only slows its recovery and burns your own rate budget. Giving up on what's down and quietly resuming a minute later does more for overall availability than any single retry tweak.

Patterns 5–8: Async and Parallel Processing

Pattern 5: Parallel Requests with Semaphore

Run multiple requests concurrently without blowing past rate limits.

import asyncio
import anthropic
from typing import List
 
async def parallel_completions(
    prompts: List[str],
    max_concurrent: int = 5,
    model: str = "claude-haiku-4-5-20251001"
) -> List[str]:
    client = anthropic.AsyncAnthropic()
    semaphore = asyncio.Semaphore(max_concurrent)
 
    async def _process(prompt: str) -> str:
        async with semaphore:
            response = await client.messages.create(
                model=model,
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.content[0].text
 
    return await asyncio.gather(
        *[_process(p) for p in prompts],
        return_exceptions=True
    )

Pattern 6: Batching Queue

Buffer requests and flush them in parallel bursts, maximizing throughput.

import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Optional
import anthropic
 
@dataclass
class BatchItem:
    prompt: str
    future: asyncio.Future
    model: str = "claude-haiku-4-5-20251001"
    max_tokens: int = 512
 
class BatchQueue:
    def __init__(self, flush_interval=0.1, max_batch_size=10, max_concurrent=5):
        self.queue: deque[BatchItem] = deque()
        self.flush_interval = flush_interval
        self.max_batch_size = max_batch_size
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.client = anthropic.AsyncAnthropic()
 
    async def submit(self, prompt: str, **kwargs) -> str:
        future = asyncio.get_event_loop().create_future()
        self.queue.append(BatchItem(prompt=prompt, future=future, **kwargs))
        return await future
 
    async def _flush_loop(self):
        while True:
            await asyncio.sleep(self.flush_interval)
            batch = []
            while self.queue and len(batch) < self.max_batch_size:
                batch.append(self.queue.popleft())
            if batch:
                await asyncio.gather(*[self._process(item) for item in batch])
 
    async def _process(self, item: BatchItem):
        async with self.semaphore:
            try:
                resp = await self.client.messages.create(
                    model=item.model,
                    max_tokens=item.max_tokens,
                    messages=[{"role": "user", "content": item.prompt}]
                )
                item.future.set_result(resp.content[0].text)
            except Exception as e:
                item.future.set_exception(e)

Pattern 7: Streaming with Stats

Stream output while tracking tokens and speed in real time.

import anthropic, time
 
def stream_with_progress(prompt: str, show_stats: bool = True):
    client = anthropic.Anthropic()
    full_text, start_time = [], time.time()
 
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            full_text.append(text)
        message = stream.get_final_message()
    
    if show_stats:
        usage = message.usage
        tps = usage.output_tokens / (time.time() - start_time)
        print(f"\n\n[in: {usage.input_tokens} / out: {usage.output_tokens} / {tps:.1f} tok/s]")
    
    return "".join(full_text)

Pattern 8: Auto-Compressing Conversation

Summarize old turns automatically when the context gets long.

from typing import List
import anthropic
 
class CompressibleConversation:
    def __init__(self, model="claude-sonnet-4-6", max_tokens_before_compression=80_000):
        self.client = anthropic.Anthropic()
        self.model = model
        self.max_tokens = max_tokens_before_compression
        self.messages: List[dict] = []
        self.compressed_summary = ""
 
    def _estimate_tokens(self) -> int:
        return sum(len(m["content"]) // 4 for m in self.messages)
 
    def _compress(self):
        if len(self.messages) < 4:
            return
        to_compress, recent = self.messages[:-4], self.messages[-4:]
        conv_text = "\n".join(f"{m['role']}: {m['content']}" for m in to_compress)
        resp = self.client.messages.create(
            model="claude-haiku-4-5-20251001", max_tokens=512,
            messages=[{"role": "user", "content": f"Summarize in 100 words:\n{conv_text}"}]
        )
        self.compressed_summary = resp.content[0].text
        self.messages = recent
 
    def chat(self, user_message: str) -> str:
        if self._estimate_tokens() > self.max_tokens:
            self._compress()
        system = f"[Summary of prior conversation]: {self.compressed_summary}\n\n" if self.compressed_summary else ""
        self.messages.append({"role": "user", "content": user_message})
        response = self.client.messages.create(
            model=self.model, max_tokens=2048,
            system=system + "You are a helpful assistant.",
            messages=self.messages
        )
        reply = response.content[0].text
        self.messages.append({"role": "assistant", "content": reply})
        return reply

Parallelism and cost live next door to each other. Crank up concurrency to chase throughput and you hit rate limits sooner, trigger more retries, and end up slower. Tune the semaphore not by "bigger is faster" but by measuring and settling on the highest value that doesn't produce 429s—the long way round is usually the fast one here.

Thank you for reading this far.

Continue Reading

What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.

WHAT YOU'LL LEARN
Resilient API client combining retry with exponential backoff, circuit breaker, and timeout
Async parallel processing that maximizes throughput while respecting rate limits
Cost reduction, testing strategies, and production monitoring—20 patterns with runnable code
Secure payment via Stripe · Cancel anytime

Unlock This Article

Get full access to the rest of this article. Buy once, read anytime. This site is ad-free — your support goes directly toward keeping it running.

or
Unlock all articles with Membership →
Share

Thank You for Reading

Claude Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.

  • Copy-paste ready implementation code
  • New advanced guides published daily
  • $5/mo or $10 for lifetime access
View Membership →

Related Articles

API & SDK2026-06-01
Grouping Crashes by Root Cause: A Triage Design Built on the Claude API
Crashlytics 'Issues' often scatter the same root cause across separate entries. After years of running apps with 50M+ cumulative downloads, here is how I use the Claude API to regroup crashes by actual root cause and rank them, with working code and real numbers.
API & SDK2026-05-30
Continuing past max_tokens in the Claude API without duplicated text or broken code fences
Detect stop_reason: max_tokens, continue the generation with an assistant prefill, and stitch the parts back together without duplicated seams or broken code fences. A production-tested continuation pattern in TypeScript.
API & SDK2026-05-27
Tail Latency in Scheduled Claude API Workloads: A Three-Layer Guardrail Against Retry Storms
After running six sites in parallel through scheduled Claude API tasks for several months, 14 days of logs revealed three distinct p95/p99 patterns and a retry storm I had been creating from my own client. This is the guardrail design I landed on — jitter, budget, circuit breaker — with the before/after numbers.
📚RECOMMENDED BOOKS
Build a Large Language Model (From Scratch)
Sebastian Raschka
LLM Dev
Prompt Engineering for LLMs
Berryman & Ziegler
Prompting
AI Engineering
Chip Huyen
AI Eng
* Contains affiliate links
See all →