The first night I shipped a script that ran flawlessly on my laptop, a cascade of 429s stalled the whole pipeline at 2 AM. The culprit wasn't the rate limit itself—it was my own design, which had no retry logic. Run the Claude API in production with Python and you'll meet at least one of these "invisible-on-localhost" walls.
This cookbook is the set of patterns I've written down each time I hit one of those walls, organized into twenty recipes. It's less something to read front-to-back and more something to copy from when you need it.
One thing worth saying up front: you don't need all twenty from day one. Production resilience isn't a "kitchen sink"—it's built by defusing the specific landmines your app actually steps on, in order. As an indie developer running several services, I didn't start with all of them either; I added each one after an incident forced my hand. To make that mapping clear, I've grouped the patterns into four clusters.
Patterns 1–4: Resilient API Foundation
Pattern 1: Exponential Backoff with Jitter
Distinguishes between rate limits (429) and server errors (5xx), backing off appropriately for each.
import anthropic
import time
import random
from typing import TypeVar, Callable
T = TypeVar('T')
def with_retry(
func: Callable[..., T],
max_attempts: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
) -> T:
"""Exponential backoff + jitter for API calls."""
for attempt in range(max_attempts):
try:
return func()
except anthropic.RateLimitError as e:
if attempt == max_attempts - 1:
raise
retry_after = getattr(e, 'retry_after', None)
delay = retry_after if retry_after else min(
base_delay * (2 ** attempt), max_delay
)
if jitter:
delay *= (0.5 + random.random() * 0.5)
print(f"Rate limited. Retrying in {delay:.1f}s ({attempt+1}/{max_attempts})")
time.sleep(delay)
except anthropic.APIStatusError as e:
if e.status_code < 500 or attempt == max_attempts - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
client = anthropic.Anthropic()
response = with_retry(
lambda: client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
)
)Pattern 2: Circuit Breaker
When failures cascade, stop hitting the API entirely and wait for recovery.
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0.0
self._lock = Lock()
def call(self, func, *args, **kwargs):
with self._lock:
if self.state == CircuitState.OPEN:
elapsed = time.time() - self.last_failure_time
if elapsed >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception(f"Circuit OPEN. Retry in {self.recovery_timeout - elapsed:.0f}s")
try:
result = func(*args, **kwargs)
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
return result
except Exception:
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
claude_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)Pattern 3: Streaming with Timeout
Streaming can hang indefinitely on network issues. Always set a deadline.
import asyncio
import anthropic
async def stream_with_timeout(prompt: str, timeout_seconds: float = 30.0) -> str:
client = anthropic.AsyncAnthropic()
collected_text = []
async def _stream():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
collected_text.append(text)
return "".join(collected_text)
try:
return await asyncio.wait_for(_stream(), timeout=timeout_seconds)
except asyncio.TimeoutError:
partial = "".join(collected_text)
raise TimeoutError(
f"Stream timed out after {timeout_seconds}s. "
f"Got {len(partial)} chars before cutoff."
)Pattern 4: Composite Guard (Retry + Breaker + Timeout)
All three combined into a single production-ready wrapper.
class RobustClaudeClient:
def __init__(self):
self.client = anthropic.Anthropic()
self.breaker = CircuitBreaker()
def complete(self, prompt: str, **kwargs) -> str:
def _call():
response = self.client.messages.create(
model=kwargs.get("model", "claude-sonnet-4-6"),
max_tokens=kwargs.get("max_tokens", 1024),
messages=[{"role": "user", "content": prompt}],
timeout=kwargs.get("timeout", 30.0)
)
return response.content[0].text
return self.breaker.call(lambda: with_retry(_call, max_attempts=3))These first four aren't alternatives—they stack. The circuit breaker is the one people skip, but hammering a temporarily degraded endpoint with retries only slows its recovery and burns your own rate budget. Giving up on what's down and quietly resuming a minute later does more for overall availability than any single retry tweak.
Patterns 5–8: Async and Parallel Processing
Pattern 5: Parallel Requests with Semaphore
Run multiple requests concurrently without blowing past rate limits.
import asyncio
import anthropic
from typing import List
async def parallel_completions(
prompts: List[str],
max_concurrent: int = 5,
model: str = "claude-haiku-4-5-20251001"
) -> List[str]:
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)
async def _process(prompt: str) -> str:
async with semaphore:
response = await client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
return await asyncio.gather(
*[_process(p) for p in prompts],
return_exceptions=True
)Pattern 6: Batching Queue
Buffer requests and flush them in parallel bursts, maximizing throughput.
import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Optional
import anthropic
@dataclass
class BatchItem:
prompt: str
future: asyncio.Future
model: str = "claude-haiku-4-5-20251001"
max_tokens: int = 512
class BatchQueue:
def __init__(self, flush_interval=0.1, max_batch_size=10, max_concurrent=5):
self.queue: deque[BatchItem] = deque()
self.flush_interval = flush_interval
self.max_batch_size = max_batch_size
self.semaphore = asyncio.Semaphore(max_concurrent)
self.client = anthropic.AsyncAnthropic()
async def submit(self, prompt: str, **kwargs) -> str:
future = asyncio.get_event_loop().create_future()
self.queue.append(BatchItem(prompt=prompt, future=future, **kwargs))
return await future
async def _flush_loop(self):
while True:
await asyncio.sleep(self.flush_interval)
batch = []
while self.queue and len(batch) < self.max_batch_size:
batch.append(self.queue.popleft())
if batch:
await asyncio.gather(*[self._process(item) for item in batch])
async def _process(self, item: BatchItem):
async with self.semaphore:
try:
resp = await self.client.messages.create(
model=item.model,
max_tokens=item.max_tokens,
messages=[{"role": "user", "content": item.prompt}]
)
item.future.set_result(resp.content[0].text)
except Exception as e:
item.future.set_exception(e)Pattern 7: Streaming with Stats
Stream output while tracking tokens and speed in real time.
import anthropic, time
def stream_with_progress(prompt: str, show_stats: bool = True):
client = anthropic.Anthropic()
full_text, start_time = [], time.time()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
full_text.append(text)
message = stream.get_final_message()
if show_stats:
usage = message.usage
tps = usage.output_tokens / (time.time() - start_time)
print(f"\n\n[in: {usage.input_tokens} / out: {usage.output_tokens} / {tps:.1f} tok/s]")
return "".join(full_text)Pattern 8: Auto-Compressing Conversation
Summarize old turns automatically when the context gets long.
from typing import List
import anthropic
class CompressibleConversation:
def __init__(self, model="claude-sonnet-4-6", max_tokens_before_compression=80_000):
self.client = anthropic.Anthropic()
self.model = model
self.max_tokens = max_tokens_before_compression
self.messages: List[dict] = []
self.compressed_summary = ""
def _estimate_tokens(self) -> int:
return sum(len(m["content"]) // 4 for m in self.messages)
def _compress(self):
if len(self.messages) < 4:
return
to_compress, recent = self.messages[:-4], self.messages[-4:]
conv_text = "\n".join(f"{m['role']}: {m['content']}" for m in to_compress)
resp = self.client.messages.create(
model="claude-haiku-4-5-20251001", max_tokens=512,
messages=[{"role": "user", "content": f"Summarize in 100 words:\n{conv_text}"}]
)
self.compressed_summary = resp.content[0].text
self.messages = recent
def chat(self, user_message: str) -> str:
if self._estimate_tokens() > self.max_tokens:
self._compress()
system = f"[Summary of prior conversation]: {self.compressed_summary}\n\n" if self.compressed_summary else ""
self.messages.append({"role": "user", "content": user_message})
response = self.client.messages.create(
model=self.model, max_tokens=2048,
system=system + "You are a helpful assistant.",
messages=self.messages
)
reply = response.content[0].text
self.messages.append({"role": "assistant", "content": reply})
return replyParallelism and cost live next door to each other. Crank up concurrency to chase throughput and you hit rate limits sooner, trigger more retries, and end up slower. Tune the semaphore not by "bigger is faster" but by measuring and settling on the highest value that doesn't produce 429s—the long way round is usually the fast one here.