ローカルで完璧に動いていたスクリプトを本番に載せた初日、深夜に 429 が連鎖して処理が止まったことがあります。原因はレート制限そのものではなく、リトライを入れていなかった自分の設計でした。Claude API を Python で運用に乗せると、こうした「ローカルでは見えなかった壁」が必ず一度はやってきます。
このクックブックは、そうした壁にぶつかるたびに書き溜めてきたパターンを20個に整理したものです。読み物というより、必要になったときに該当箇所をコピーして手元のコードへ差し込むための実用集として作っています。
ひとつだけ先にお伝えしておきたいのは、20個すべてを最初から入れる必要はない、ということです。本番の堅牢さは「全部入り」ではなく、自分のアプリが実際に踏みやすい地雷から順に潰していくことで積み上がります。個人開発で複数のサービスを運用してきた身として正直に書くと、私自身も最初からすべてを入れていたわけではなく、障害に遭うたびに一つずつ足してきました。どのパターンがどの地雷に対応するのかが見えるよう、4つのまとまりに分けて並べています。
パターン1〜4:堅牢なAPI呼び出し基盤
パターン1:指数バックオフ付きリトライ
レート制限(429)とサーバーエラー(529)を区別しながら自動リトライします。
import anthropic
import time
import random
from typing import TypeVar, Callable, Any
T = TypeVar('T')
def with_retry(
func: Callable[..., T],
max_attempts: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
) -> T:
"""指数バックオフ + ジッターでAPIを呼び出す"""
for attempt in range(max_attempts):
try:
return func()
except anthropic.RateLimitError as e:
if attempt == max_attempts - 1:
raise
# Retry-After ヘッダーがあれば優先
retry_after = getattr(e, 'retry_after', None)
delay = retry_after if retry_after else min(
base_delay * (2 ** attempt),
max_delay
)
if jitter:
delay *= (0.5 + random.random() * 0.5)
print(f"レート制限。{delay:.1f}秒後にリトライ ({attempt+1}/{max_attempts})")
time.sleep(delay)
except anthropic.APIStatusError as e:
if e.status_code < 500 or attempt == max_attempts - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
time.sleep(delay)
# 使用例
client = anthropic.Anthropic()
response = with_retry(
lambda: client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "こんにちは"}]
)
)パターン2:サーキットブレーカー
連続失敗時にAPIへの呼び出し自体を遮断し、回復を待ちます。
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # 正常
OPEN = "open" # 遮断中
HALF_OPEN = "half_open" # 回復テスト中
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0.0
self._lock = Lock()
def call(self, func: Callable, *args, **kwargs):
with self._lock:
if self.state == CircuitState.OPEN:
elapsed = time.time() - self.last_failure_time
if elapsed >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise Exception(
f"サーキットブレーカー OPEN ({self.recovery_timeout - elapsed:.0f}秒後に再試行可能)"
)
try:
result = func(*args, **kwargs)
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
return result
except Exception:
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# グローバルに1つのブレーカーを持つ
claude_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)パターン3:タイムアウト付きストリーミング
ストリーミングはネットワーク問題で無限に待機することがあります。タイムアウトを設けましょう。
import asyncio
import anthropic
async def stream_with_timeout(
prompt: str,
timeout_seconds: float = 30.0
) -> str:
"""タイムアウト付きストリーミング。指定秒以内に完了しなければ中断。"""
client = anthropic.AsyncAnthropic()
collected_text = []
async def _stream():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
collected_text.append(text)
return "".join(collected_text)
try:
return await asyncio.wait_for(_stream(), timeout=timeout_seconds)
except asyncio.TimeoutError:
partial = "".join(collected_text)
raise TimeoutError(
f"ストリーミングが{timeout_seconds}秒でタイムアウト。"
f"取得済み: {len(partial)}文字"
)パターン4:複合ガード(リトライ + ブレーカー + タイムアウト)
上記3つを組み合わせた実運用向けのラッパーです。
class RobustClaudeClient:
def __init__(self):
self.client = anthropic.Anthropic()
self.breaker = CircuitBreaker()
def complete(self, prompt: str, **kwargs) -> str:
def _call():
response = self.client.messages.create(
model=kwargs.get("model", "claude-sonnet-4-6"),
max_tokens=kwargs.get("max_tokens", 1024),
messages=[{"role": "user", "content": prompt}],
timeout=kwargs.get("timeout", 30.0)
)
return response.content[0].text
return self.breaker.call(
lambda: with_retry(_call, max_attempts=3)
)ここまでの4つは、どれか一つではなく重ねて効く土台です。特にサーキットブレーカーは軽視されがちですが、Anthropic 側が一時的に不調なときにリトライだけで押し続けると、回復を遅らせるうえに自分のレート枠を浪費します。落ちているものはいったん諦めて数十秒後に静かに再開する、という割り切りが、結果的に全体の可用性を底上げしてくれます。
パターン5〜8:非同期・並列処理
パターン5:セマフォ付き並列リクエスト
レート制限を守りながら複数リクエストを並列実行します。
import asyncio
import anthropic
from typing import List
async def parallel_completions(
prompts: List[str],
max_concurrent: int = 5,
model: str = "claude-haiku-4-5-20251001"
) -> List[str]:
"""
複数プロンプトを並列処理。
max_concurrent でAPIへの同時接続数を制御。
"""
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)
async def _process(prompt: str) -> str:
async with semaphore:
response = await client.messages.create(
model=model,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
tasks = [_process(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
# 使用例
async def main():
items = ["AIとは", "機械学習とは", "深層学習とは", "LLMとは", "RAGとは"]
results = await parallel_completions(
[f"{item}を50文字で説明して" for item in items],
max_concurrent=3
)
for item, result in zip(items, results):
if isinstance(result, Exception):
print(f"✗ {item}: {result}")
else:
print(f"✓ {item}: {result[:50]}...")パターン6:バッチ処理キュー
大量のリクエストをバッチ化してレートリミットを効率よく使い切ります。
import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Optional
import anthropic
@dataclass
class BatchItem:
prompt: str
future: asyncio.Future
model: str = "claude-haiku-4-5-20251001"
max_tokens: int = 512
class BatchQueue:
"""一定時間内に溜まったリクエストを並列実行するキュー"""
def __init__(
self,
flush_interval: float = 0.1,
max_batch_size: int = 10,
max_concurrent: int = 5
):
self.queue: deque[BatchItem] = deque()
self.flush_interval = flush_interval
self.max_batch_size = max_batch_size
self.semaphore = asyncio.Semaphore(max_concurrent)
self.client = anthropic.AsyncAnthropic()
self._task: Optional[asyncio.Task] = None
async def start(self):
self._task = asyncio.create_task(self._flush_loop())
async def stop(self):
if self._task:
self._task.cancel()
async def submit(self, prompt: str, **kwargs) -> str:
future = asyncio.get_event_loop().create_future()
self.queue.append(BatchItem(prompt=prompt, future=future, **kwargs))
return await future
async def _flush_loop(self):
while True:
await asyncio.sleep(self.flush_interval)
batch = []
while self.queue and len(batch) < self.max_batch_size:
batch.append(self.queue.popleft())
if batch:
await asyncio.gather(*[self._process(item) for item in batch])
async def _process(self, item: BatchItem):
async with self.semaphore:
try:
resp = await self.client.messages.create(
model=item.model,
max_tokens=item.max_tokens,
messages=[{"role": "user", "content": item.prompt}]
)
item.future.set_result(resp.content[0].text)
except Exception as e:
item.future.set_exception(e)パターン7:ストリーミング進捗表示
長い生成をリアルタイムで表示しつつ完了を待つパターンです。
import anthropic
import sys
def stream_with_progress(prompt: str, show_stats: bool = True):
"""ストリーミングしながらトークン数と速度を表示"""
client = anthropic.Anthropic()
token_count = 0
start_time = time.time()
full_text = []
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
full_text.append(text)
token_count += 1 # 概算
message = stream.get_final_message()
elapsed = time.time() - start_time
if show_stats:
usage = message.usage
tps = usage.output_tokens / elapsed
print(f"\n\n[入力: {usage.input_tokens} / 出力: {usage.output_tokens} / "
f"速度: {tps:.1f} tokens/s / 所要: {elapsed:.1f}s]")
return "".join(full_text)パターン8:会話履歴の自動圧縮
長い会話でコンテキストウィンドウを使い過ぎないように、古いメッセージを自動要約します。
from typing import List
import anthropic
class CompressibleConversation:
"""コンテキストが長くなったら自動的に過去の会話を要約する"""
def __init__(
self,
model: str = "claude-sonnet-4-6",
max_tokens_before_compression: int = 80_000,
summary_model: str = "claude-haiku-4-5-20251001"
):
self.client = anthropic.Anthropic()
self.model = model
self.max_tokens = max_tokens_before_compression
self.summary_model = summary_model
self.messages: List[dict] = []
self.compressed_summary: str = ""
def _estimate_tokens(self) -> int:
return sum(len(m["content"]) // 4 for m in self.messages)
def _compress(self):
"""古いメッセージを要約して圧縮"""
if len(self.messages) < 4:
return
# 直近4件を残して圧縮
to_compress = self.messages[:-4]
recent = self.messages[-4:]
conv_text = "\n".join(
f"{m['role']}: {m['content']}" for m in to_compress
)
resp = self.client.messages.create(
model=self.summary_model,
max_tokens=1024,
messages=[{
"role": "user",
"content": f"以下の会話を200文字以内で要約してください:\n{conv_text}"
}]
)
self.compressed_summary = resp.content[0].text
self.messages = recent
def chat(self, user_message: str) -> str:
if self._estimate_tokens() > self.max_tokens:
self._compress()
system_prefix = (
f"[過去の会話の要約]: {self.compressed_summary}\n\n"
if self.compressed_summary else ""
)
self.messages.append({"role": "user", "content": user_message})
response = self.client.messages.create(
model=self.model,
max_tokens=2048,
system=system_prefix + "あなたは親切なアシスタントです。",
messages=self.messages
)
assistant_message = response.content[0].text
self.messages.append({"role": "assistant", "content": assistant_message})
return assistant_message並列化とコスト最適化は隣り合わせのテーマです。スループットを上げようと同時接続数を増やすほどレート制限に当たりやすくなり、リトライが増えてかえって遅くなることがあります。セマフォの値は「速くしたいから大きく」ではなく、実測しながら 429 が出ない上限に寄せていくのが、遠回りに見えていちばん速い、というのが現場での実感です。