CLAUDE LABEN
CORPS — Anthropicが$150Mの全国フェローシップ「Claude Corps」を発表(6/11)。早期キャリア人材1,000人をAI実務で育成し米国の非営利団体へ配置。第1期は10月開始SUBAGENTS — Claude Codeでサブエージェントの入れ子が可能に(最大5階層)。多段の委譲ワークフローをそのまま設計できるようになりましたWORKFLOWS — Dynamic workflowsがリサーチプレビューで登場。CLI・Desktop・VS Code拡張から、コードベース全域のバグ探索や大規模移行を組めます(Max/Team/Enterprise)BILLING — 6/15の課金変更まで残り2日。Agent SDK・headless・GitHub Actionsが月次クレジット($20/$100/$200)へ移行。同日にSonnet 4とOpus 4がAPIから引退しますFABLE5 — Fable 5の無料同梱期間は6/22まで継続中。Pro/Max/Team/Enterpriseで追加費用なしに試せますCODE80 — 「Claudeが自身のコードの80%超を書く」とIPO関連報道。2025年2月の10%未満から1年余りで急伸CORPS — Anthropicが$150Mの全国フェローシップ「Claude Corps」を発表(6/11)。早期キャリア人材1,000人をAI実務で育成し米国の非営利団体へ配置。第1期は10月開始SUBAGENTS — Claude Codeでサブエージェントの入れ子が可能に(最大5階層)。多段の委譲ワークフローをそのまま設計できるようになりましたWORKFLOWS — Dynamic workflowsがリサーチプレビューで登場。CLI・Desktop・VS Code拡張から、コードベース全域のバグ探索や大規模移行を組めます(Max/Team/Enterprise)BILLING — 6/15の課金変更まで残り2日。Agent SDK・headless・GitHub Actionsが月次クレジット($20/$100/$200)へ移行。同日にSonnet 4とOpus 4がAPIから引退しますFABLE5 — Fable 5の無料同梱期間は6/22まで継続中。Pro/Max/Team/Enterpriseで追加費用なしに試せますCODE80 — 「Claudeが自身のコードの80%超を書く」とIPO関連報道。2025年2月の10%未満から1年余りで急伸
記事一覧/API & SDK
API & SDK/2026-06-13上級

Claude API Python 上級クックブック:本番運用で効く実践パターン集

Claude API × Python の本番実装パターンを20個厳選。リトライ、並列処理、コスト最適化、テストから監視まで、現場で役立つコードレシピ集。

Claude API67Python15本番環境5パターン最適化5実践2

プレミアム記事

ローカルで完璧に動いていたスクリプトを本番に載せた初日、深夜に 429 が連鎖して処理が止まったことがあります。原因はレート制限そのものではなく、リトライを入れていなかった自分の設計でした。Claude API を Python で運用に乗せると、こうした「ローカルでは見えなかった壁」が必ず一度はやってきます。

このクックブックは、そうした壁にぶつかるたびに書き溜めてきたパターンを20個に整理したものです。読み物というより、必要になったときに該当箇所をコピーして手元のコードへ差し込むための実用集として作っています。

ひとつだけ先にお伝えしておきたいのは、20個すべてを最初から入れる必要はない、ということです。本番の堅牢さは「全部入り」ではなく、自分のアプリが実際に踏みやすい地雷から順に潰していくことで積み上がります。個人開発で複数のサービスを運用してきた身として正直に書くと、私自身も最初からすべてを入れていたわけではなく、障害に遭うたびに一つずつ足してきました。どのパターンがどの地雷に対応するのかが見えるよう、4つのまとまりに分けて並べています。

パターン1〜4:堅牢なAPI呼び出し基盤

パターン1:指数バックオフ付きリトライ

レート制限(429)とサーバーエラー(529)を区別しながら自動リトライします。

import anthropic
import time
import random
from typing import TypeVar, Callable, Any
 
T = TypeVar('T')
 
def with_retry(
    func: Callable[..., T],
    max_attempts: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    jitter: bool = True
) -> T:
    """指数バックオフ + ジッターでAPIを呼び出す"""
    for attempt in range(max_attempts):
        try:
            return func()
        except anthropic.RateLimitError as e:
            if attempt == max_attempts - 1:
                raise
            # Retry-After ヘッダーがあれば優先
            retry_after = getattr(e, 'retry_after', None)
            delay = retry_after if retry_after else min(
                base_delay * (2 ** attempt),
                max_delay
            )
            if jitter:
                delay *= (0.5 + random.random() * 0.5)
            print(f"レート制限。{delay:.1f}秒後にリトライ ({attempt+1}/{max_attempts})")
            time.sleep(delay)
        except anthropic.APIStatusError as e:
            if e.status_code < 500 or attempt == max_attempts - 1:
                raise
            delay = min(base_delay * (2 ** attempt), max_delay)
            time.sleep(delay)
 
# 使用例
client = anthropic.Anthropic()
response = with_retry(
    lambda: client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": "こんにちは"}]
    )
)

パターン2:サーキットブレーカー

連続失敗時にAPIへの呼び出し自体を遮断し、回復を待ちます。

import time
from enum import Enum
from threading import Lock
 
class CircuitState(Enum):
    CLOSED = "closed"       # 正常
    OPEN = "open"           # 遮断中
    HALF_OPEN = "half_open" # 回復テスト中
 
class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0.0
        self._lock = Lock()
 
    def call(self, func: Callable, *args, **kwargs):
        with self._lock:
            if self.state == CircuitState.OPEN:
                elapsed = time.time() - self.last_failure_time
                if elapsed >= self.recovery_timeout:
                    self.state = CircuitState.HALF_OPEN
                    self.success_count = 0
                else:
                    raise Exception(
                        f"サーキットブレーカー OPEN ({self.recovery_timeout - elapsed:.0f}秒後に再試行可能)"
                    )
        try:
            result = func(*args, **kwargs)
            with self._lock:
                if self.state == CircuitState.HALF_OPEN:
                    self.success_count += 1
                    if self.success_count >= self.success_threshold:
                        self.state = CircuitState.CLOSED
                        self.failure_count = 0
                else:
                    self.failure_count = 0
            return result
        except Exception:
            with self._lock:
                self.failure_count += 1
                self.last_failure_time = time.time()
                if self.failure_count >= self.failure_threshold:
                    self.state = CircuitState.OPEN
            raise
 
# グローバルに1つのブレーカーを持つ
claude_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30.0)

パターン3:タイムアウト付きストリーミング

ストリーミングはネットワーク問題で無限に待機することがあります。タイムアウトを設けましょう。

import asyncio
import anthropic
 
async def stream_with_timeout(
    prompt: str,
    timeout_seconds: float = 30.0
) -> str:
    """タイムアウト付きストリーミング。指定秒以内に完了しなければ中断。"""
    client = anthropic.AsyncAnthropic()
    collected_text = []
 
    async def _stream():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for text in stream.text_stream:
                collected_text.append(text)
        return "".join(collected_text)
 
    try:
        return await asyncio.wait_for(_stream(), timeout=timeout_seconds)
    except asyncio.TimeoutError:
        partial = "".join(collected_text)
        raise TimeoutError(
            f"ストリーミングが{timeout_seconds}秒でタイムアウト。"
            f"取得済み: {len(partial)}文字"
        )

パターン4:複合ガード(リトライ + ブレーカー + タイムアウト)

上記3つを組み合わせた実運用向けのラッパーです。

class RobustClaudeClient:
    def __init__(self):
        self.client = anthropic.Anthropic()
        self.breaker = CircuitBreaker()
    
    def complete(self, prompt: str, **kwargs) -> str:
        def _call():
            response = self.client.messages.create(
                model=kwargs.get("model", "claude-sonnet-4-6"),
                max_tokens=kwargs.get("max_tokens", 1024),
                messages=[{"role": "user", "content": prompt}],
                timeout=kwargs.get("timeout", 30.0)
            )
            return response.content[0].text
        
        return self.breaker.call(
            lambda: with_retry(_call, max_attempts=3)
        )

ここまでの4つは、どれか一つではなく重ねて効く土台です。特にサーキットブレーカーは軽視されがちですが、Anthropic 側が一時的に不調なときにリトライだけで押し続けると、回復を遅らせるうえに自分のレート枠を浪費します。落ちているものはいったん諦めて数十秒後に静かに再開する、という割り切りが、結果的に全体の可用性を底上げしてくれます。

パターン5〜8:非同期・並列処理

パターン5:セマフォ付き並列リクエスト

レート制限を守りながら複数リクエストを並列実行します。

import asyncio
import anthropic
from typing import List
 
async def parallel_completions(
    prompts: List[str],
    max_concurrent: int = 5,
    model: str = "claude-haiku-4-5-20251001"
) -> List[str]:
    """
    複数プロンプトを並列処理。
    max_concurrent でAPIへの同時接続数を制御。
    """
    client = anthropic.AsyncAnthropic()
    semaphore = asyncio.Semaphore(max_concurrent)
 
    async def _process(prompt: str) -> str:
        async with semaphore:
            response = await client.messages.create(
                model=model,
                max_tokens=1024,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.content[0].text
 
    tasks = [_process(p) for p in prompts]
    return await asyncio.gather(*tasks, return_exceptions=True)
 
# 使用例
async def main():
    items = ["AIとは", "機械学習とは", "深層学習とは", "LLMとは", "RAGとは"]
    results = await parallel_completions(
        [f"{item}を50文字で説明して" for item in items],
        max_concurrent=3
    )
    for item, result in zip(items, results):
        if isinstance(result, Exception):
            print(f"✗ {item}: {result}")
        else:
            print(f"✓ {item}: {result[:50]}...")

パターン6:バッチ処理キュー

大量のリクエストをバッチ化してレートリミットを効率よく使い切ります。

import asyncio
from collections import deque
from dataclasses import dataclass
from typing import Optional
import anthropic
 
@dataclass
class BatchItem:
    prompt: str
    future: asyncio.Future
    model: str = "claude-haiku-4-5-20251001"
    max_tokens: int = 512
 
class BatchQueue:
    """一定時間内に溜まったリクエストを並列実行するキュー"""
    
    def __init__(
        self,
        flush_interval: float = 0.1,
        max_batch_size: int = 10,
        max_concurrent: int = 5
    ):
        self.queue: deque[BatchItem] = deque()
        self.flush_interval = flush_interval
        self.max_batch_size = max_batch_size
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.client = anthropic.AsyncAnthropic()
        self._task: Optional[asyncio.Task] = None
 
    async def start(self):
        self._task = asyncio.create_task(self._flush_loop())
 
    async def stop(self):
        if self._task:
            self._task.cancel()
 
    async def submit(self, prompt: str, **kwargs) -> str:
        future = asyncio.get_event_loop().create_future()
        self.queue.append(BatchItem(prompt=prompt, future=future, **kwargs))
        return await future
 
    async def _flush_loop(self):
        while True:
            await asyncio.sleep(self.flush_interval)
            batch = []
            while self.queue and len(batch) < self.max_batch_size:
                batch.append(self.queue.popleft())
            if batch:
                await asyncio.gather(*[self._process(item) for item in batch])
 
    async def _process(self, item: BatchItem):
        async with self.semaphore:
            try:
                resp = await self.client.messages.create(
                    model=item.model,
                    max_tokens=item.max_tokens,
                    messages=[{"role": "user", "content": item.prompt}]
                )
                item.future.set_result(resp.content[0].text)
            except Exception as e:
                item.future.set_exception(e)

パターン7:ストリーミング進捗表示

長い生成をリアルタイムで表示しつつ完了を待つパターンです。

import anthropic
import sys
 
def stream_with_progress(prompt: str, show_stats: bool = True):
    """ストリーミングしながらトークン数と速度を表示"""
    client = anthropic.Anthropic()
    token_count = 0
    start_time = time.time()
    full_text = []
 
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            full_text.append(text)
            token_count += 1  # 概算
 
        message = stream.get_final_message()
    
    elapsed = time.time() - start_time
    if show_stats:
        usage = message.usage
        tps = usage.output_tokens / elapsed
        print(f"\n\n[入力: {usage.input_tokens} / 出力: {usage.output_tokens} / "
              f"速度: {tps:.1f} tokens/s / 所要: {elapsed:.1f}s]")
    
    return "".join(full_text)

パターン8:会話履歴の自動圧縮

長い会話でコンテキストウィンドウを使い過ぎないように、古いメッセージを自動要約します。

from typing import List
import anthropic
 
class CompressibleConversation:
    """コンテキストが長くなったら自動的に過去の会話を要約する"""
    
    def __init__(
        self,
        model: str = "claude-sonnet-4-6",
        max_tokens_before_compression: int = 80_000,
        summary_model: str = "claude-haiku-4-5-20251001"
    ):
        self.client = anthropic.Anthropic()
        self.model = model
        self.max_tokens = max_tokens_before_compression
        self.summary_model = summary_model
        self.messages: List[dict] = []
        self.compressed_summary: str = ""
 
    def _estimate_tokens(self) -> int:
        return sum(len(m["content"]) // 4 for m in self.messages)
 
    def _compress(self):
        """古いメッセージを要約して圧縮"""
        if len(self.messages) < 4:
            return
        # 直近4件を残して圧縮
        to_compress = self.messages[:-4]
        recent = self.messages[-4:]
        
        conv_text = "\n".join(
            f"{m['role']}: {m['content']}" for m in to_compress
        )
        resp = self.client.messages.create(
            model=self.summary_model,
            max_tokens=1024,
            messages=[{
                "role": "user",
                "content": f"以下の会話を200文字以内で要約してください:\n{conv_text}"
            }]
        )
        self.compressed_summary = resp.content[0].text
        self.messages = recent
 
    def chat(self, user_message: str) -> str:
        if self._estimate_tokens() > self.max_tokens:
            self._compress()
        
        system_prefix = (
            f"[過去の会話の要約]: {self.compressed_summary}\n\n"
            if self.compressed_summary else ""
        )
        self.messages.append({"role": "user", "content": user_message})
        
        response = self.client.messages.create(
            model=self.model,
            max_tokens=2048,
            system=system_prefix + "あなたは親切なアシスタントです。",
            messages=self.messages
        )
        assistant_message = response.content[0].text
        self.messages.append({"role": "assistant", "content": assistant_message})
        return assistant_message

並列化とコスト最適化は隣り合わせのテーマです。スループットを上げようと同時接続数を増やすほどレート制限に当たりやすくなり、リトライが増えてかえって遅くなることがあります。セマフォの値は「速くしたいから大きく」ではなく、実測しながら 429 が出ない上限に寄せていくのが、遠回りに見えていちばん速い、というのが現場での実感です。

ここまでお読みいただきありがとうございます。

この記事の続きを読む

この先には、実装コードやベンチマーク結果など、実務でお役に立てる内容をご用意しています。このサイトは広告を掲載しておらず、サーバーや開発にかかる費用はメンバーの皆様のご支援で成り立っています。もしお役に立てていましたら、ご支援いただけますと大変ありがたいです。

この記事で得られること
リトライ・サーキットブレーカー・タイムアウトを組み合わせた堅牢なAPI呼び出しパターン
非同期並列処理でスループットを最大化しながらレート制限を守る実装方法
コスト削減・テスト戦略・本番監視まで20の実践パターンをコード付きで解説
Stripe による安全な決済 · いつでもキャンセル可能

この記事を購入する

この先の内容をすべてお読みいただけます。一度のご購入で、いつでも何度でもアクセスできます。このサイトは広告を掲載しておらず、皆さまのご支援がサーバー費用などの運営を支えています。

または
メンバーシップなら全記事が読み放題 →
シェア

お読みいただきありがとうございます

Claude Lab は広告なしで運営しており、サーバー費用などの運営コストはメンバーシップのご支援で賄っています。実装コード・ベンチマーク・本番設計パターンなど、実務でお役立ていただける記事を毎日更新しています。もし読んでよかったと感じていただけましたら、ぜひご覧ください。

  • コピー&ペーストで使える実装コード付き
  • 毎日新しい上級ガイドを追加
  • ¥580/月 または ¥1,480 の永久アクセス
メンバーシップを見る →

関連記事

API & SDK2026-06-01
Crashlyticsのクラッシュを根本原因ごとに束ねる — Claude APIで作るトリアージの設計メモ
Crashlyticsの「Issue」は同じ原因を別物として散らしてしまうことがあります。累計5,000万DLのアプリ運用で溜まったクラッシュを、Claude APIで根本原因ごとに束ね直して優先順位をつける設計を、動くコードと実測値とともに共有します。
API & SDK2026-05-12
App Storeのレビュー1,000件をClaude APIで分析したら、自分のアプリの本当の問題が見えてきた
個人開発10年超・累計5,000万DLの経験から語る。App StoreレビューをClaude APIでバッチ分析し、次の改善優先順位を自動生成する実装パターンと、試して気づいた落とし穴を共有します。
API & SDK2026-05-06
Claude API で実装する自律リサーチエージェント: Web検索・要約・知識管理の完全システム設計
Claude API と Web 検索ツールを組み合わせた自律リサーチエージェントの設計・実装を解説。予算制御・品質保証・知識ベース格納まで、個人開発者が本番で使える完全なシステム構成を紹介します。
📚RECOMMENDED BOOKS
大規模言語モデル入門
山田育矢
LLM開発
生成AIプロンプトエンジニアリング入門
我妻幸長
プロンプト
Claude CodeによるAI駆動開発入門
平川知秀
AI駆動開発
※ アフィリエイトリンクを含みます
もっと見る →