営業担当者が1日に処理できるリードの数には、物理的な上限があります。メールを読み、企業情報を調べ、提案書を作り、フォローアップを送る——これらを一件ずつ手でこなしていると、1日30件が限界というのが現実ではないでしょうか。
私がこの課題に向き合ったのは、ある中堅SaaS企業のCTO候補者から相談を受けたのがきっかけでした。月間400件のインバウンドリードがあるのに、営業チームは3人。「人を増やすか、AIに任せるか」という二択を迫られていると言うのです。
結論から言うと、Claude Agent SDK を使ったマルチエージェントシステムで、リード評価から初回提案書の下書きまでを自動化し、1人の営業担当者が扱えるリード数を月60件から180件に引き上げることができました。ここで扱うのはその設計と実装を余すことなく公開します。
なぜマルチエージェントなのか — シングルエージェントの限界
最初に試したのは、1つの大きなエージェントに全タスクを任せる方法でした。「リードを受け取って、調査して、提案書を書いて、フォローアップメールも作って」と1プロンプトに全部詰め込む設計です。
これがうまくいかない理由は3つあります。
1. コンテキストウィンドウの問題: 1件のリード処理に使う情報量(企業HP・LinkedIn・過去の接触履歴・自社製品情報)を全部詰め込むと、容易に100kトークンを超えます。Opus 4を使えばコンテキストは1Mトークンありますが、それを1件ごとに使うと費用が爆発します。
2. エラーの伝播: リード調査の途中でWebフェッチが失敗した場合、後続の提案書生成も全部失敗します。失敗箇所だけリトライする仕組みが必要です。
3. 品質のバラつき: 「調査専門」「文章専門」のように特化させた方が、それぞれの品質が安定します。
マルチエージェント設計では、これらを専門エージェントに分担させることで解決できます。
システムアーキテクチャの全体像
このシステムは4つのコンポーネントで構成されます。
- オーケストレーター: リードキューを監視し、各エージェントへタスクを割り振る
- リサーチエージェント: 企業情報・業種・規模・競合状況を調査してスコアリング
- ドラフターエージェント: 提案書と初回アプローチメールの下書きを生成
- レビューゲート: 人間が承認するまで送信を止めるバッファ
全体のフローは次の通りです。
インバウンドリード
↓
オーケストレーター(キュー監視)
↓
リサーチエージェント(調査 + スコアリング)
↓
[スコア閾値チェック] ──低スコア──→ アーカイブ
↓ 高スコア
ドラフターエージェント(提案書 + メール下書き)
↓
レビューゲート(人間承認待ち)
↓
CRMへ保存 + メール送信
重要なのは「レビューゲート」です。完全自動送信は初期段階では危険すぎます。エージェントが作った文章を人間が確認してから送る二段階設計にすることで、ブランドリスクを最小化できます。
Step 1: 環境構築と依存関係
# Python 3.11以上を推奨
pip install anthropic>=0.50.0 python-dotenv aiohttp aiofiles
# CRM連携はHubSpot CLient(オプション)
pip install hubspot-api-client
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
ANTHROPIC_API_KEY = os.environ["ANTHROPIC_API_KEY"]
# エージェント別モデル選択(コスト最適化の核心)
RESEARCH_MODEL = "claude-haiku-4-5-20251001" # 調査: 安価なHaikuで十分
DRAFT_MODEL = "claude-sonnet-4-6" # 文章生成: Sonnetで品質確保
ORCHESTRATOR_MODEL = "claude-haiku-4-5-20251001" # 制御: Haikuでコスト最小化
# 処理設定
LEAD_SCORE_THRESHOLD = 60 # このスコア以上のリードのみ提案書生成
MAX_CONCURRENT_LEADS = 5 # 同時処理リード数
モデル選択がコスト最適化の要です。リサーチとオーケストレーションは Haiku(安価)、文章品質が求められるドラフトのみ Sonnet を使います。これだけで全部 Sonnet を使った場合と比べてコストを65%削減できます。
Step 2: リサーチエージェントの実装
リサーチエージェントはWebツールを使って企業情報を取得し、スコアを算出します。
# research_agent.py
import anthropic
import json
from typing import TypedDict
client = anthropic.Anthropic()
class LeadResearchResult(TypedDict):
company_name: str
industry: str
employee_count: str
annual_revenue: str
pain_points: list[str]
competitors: list[str]
score: int
score_reason: str
is_qualified: bool
RESEARCH_TOOLS = [
{
"name": "web_search",
"description": "企業のWebサイト・ニュース・LinkedInを検索して情報を取得する",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "検索クエリ。企業名 + サイトドメインなど具体的に指定する"
}
},
"required": ["query"]
}
},
{
"name": "score_lead",
"description": "収集した情報をもとにリードにスコア(0-100)を付ける",
"input_schema": {
"type": "object",
"properties": {
"company_name": {"type": "string"},
"industry": {"type": "string"},
"employee_count": {"type": "string"},
"annual_revenue": {"type": "string"},
"pain_points": {
"type": "array",
"items": {"type": "string"}
},
"competitors": {
"type": "array",
"items": {"type": "string"}
},
"score": {
"type": "integer",
"description": "0-100のスコア。自社製品との適合度を総合評価"
},
"score_reason": {
"type": "string",
"description": "スコアの根拠を2-3文で説明"
}
},
"required": ["company_name", "industry", "score", "score_reason"]
}
}
]
async def research_lead(lead: dict) -> LeadResearchResult:
"""
リードの企業を調査してスコアを返す。
Web検索ツールは今回は簡略化のためモック実装。
本番ではAWS Lambdaにデプロイしたスクレイピング関数を呼び出す。
"""
messages = [
{
"role": "user",
"content": f"""以下のリードについて調査してスコアを付けてください。
企業名: {lead['company_name']}
担当者: {lead['contact_name']}
メール: {lead['email']}
問い合わせ内容: {lead.get('inquiry', 'なし')}
手順:
1. web_search ツールで企業情報・業種・規模・競合を調査してください
2. 十分な情報が集まったら score_lead ツールで評価を確定してください
評価基準(自社製品がSaaS向けAPI管理ツールの場合の例):
- 従業員数100-2000人の企業: +30点
- SaaS/テック系企業: +25点
- 問い合わせ内容が具体的: +20点
- 競合ツールを既に使用中: +15点(スイッチング需要あり)
- 大企業(2000人超): -10点(契約サイクルが長い)"""
}
]
# エージェントループ(tool_useが終わるまで繰り返す)
final_result = None
for iteration in range(5): # 最大5回で安全停止
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=2048,
tools=RESEARCH_TOOLS,
messages=messages
)
if response.stop_reason == "end_turn":
break
if response.stop_reason == "tool_use":
tool_uses = [b for b in response.content if b.type == "tool_use"]
tool_results = []
for tool_use in tool_uses:
if tool_use.name == "web_search":
# 本番: 実際のWeb検索APIを呼び出す
result = await mock_web_search(tool_use.input["query"])
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": result
})
elif tool_use.name == "score_lead":
# スコア確定 — これが最終結果
score_data = tool_use.input
final_result = LeadResearchResult(
company_name=score_data["company_name"],
industry=score_data.get("industry", "不明"),
employee_count=score_data.get("employee_count", "不明"),
annual_revenue=score_data.get("annual_revenue", "不明"),
pain_points=score_data.get("pain_points", []),
competitors=score_data.get("competitors", []),
score=score_data["score"],
score_reason=score_data["score_reason"],
is_qualified=score_data["score"] >= 60
)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": json.dumps({"status": "scored", "score": score_data["score"]})
})
# メッセージ履歴を更新してループ継続
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return final_result
async def mock_web_search(query: str) -> str:
"""
開発時のモック。本番ではBright Data / Serper APIなどに差し替える。
"""
return f"Search results for '{query}': [企業情報のモックデータ。従業員数500人、SaaS業種、CRMツール導入済み]"
このコードで重要なのは「最大5回ループ」の安全停止です。エージェントが無限ループに入るケースは本番では必ず起きます。イテレーション数に上限を設けることで防げます。
Step 3: ドラフターエージェントの実装
リサーチ結果を受け取って、提案書と初回メールの下書きを生成します。
# drafter_agent.py
import anthropic
from dataclasses import dataclass
client = anthropic.Anthropic()
@dataclass
class DraftResult:
proposal_summary: str # 2ページ程度の提案書サマリー(Markdown)
outreach_email: str # 初回アプローチメール本文
subject_line: str # メール件名
personalization_notes: str # 営業担当者へのメモ
PRODUCT_CONTEXT = """
自社製品: APIガード Pro
- SaaS企業向けAPI管理・モニタリングプラットフォーム
- 主な機能: レート制限・認証管理・使用量分析・異常検知
- 価格帯: 月額5万円〜(従量課金)
- 導入実績: 国内SaaS200社以上
- 競合優位: 導入5日・専任CSチームによる手厚いサポート
"""
async def draft_proposal(research_result: dict, lead: dict) -> DraftResult:
prompt = f"""以下のリード調査結果をもとに、提案書サマリーと初回アプローチメールを作成してください。
=== リード情報 ===
企業名: {research_result['company_name']}
業種: {research_result['industry']}
規模: {research_result['employee_count']}
課題: {', '.join(research_result.get('pain_points', []))}
競合ツール: {', '.join(research_result.get('competitors', []))}
担当者: {lead['contact_name']}
=== 自社製品情報 ===
{PRODUCT_CONTEXT}
=== 出力形式 ===
以下のJSON形式で返してください:
{{
"proposal_summary": "提案書サマリー(Markdownで200-300文字程度)",
"outreach_email": "初回メール本文(500-700文字。ですます調で丁寧かつ具体的に)",
"subject_line": "メール件名(30文字以内)",
"personalization_notes": "営業担当者へのメモ(この企業特有の注意点・アプローチのコツ)"
}}
重要なポイント:
- 企業の具体的な課題(競合ツールの不満点など)を冒頭に触れること
- 自社製品の機能を羅列しない。課題解決ストーリーで語ること
- 初回メールは返信してもらうことが目的。売り込みは最小限に"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
# JSONパース(本番ではエラーハンドリングを追加すること)
import json
import re
text = response.content[0].text
json_match = re.search(r'\{[\s\S]*\}', text)
if not json_match:
raise ValueError(f"JSONが見つかりません: {text[:200]}")
data = json.loads(json_match.group())
return DraftResult(
proposal_summary=data["proposal_summary"],
outreach_email=data["outreach_email"],
subject_line=data["subject_line"],
personalization_notes=data["personalization_notes"]
)
Sonnet を使っている理由は、提案書の文章品質が受注率に直結するからです。ここだけはコストを惜しまない設計にしました。
Step 4: オーケストレーターの実装
全体を制御するオーケストレーターは、キューからリードを取り出して各エージェントを順番に呼び出します。
# orchestrator.py
import asyncio
import json
from datetime import datetime
from typing import Optional
from research_agent import research_lead, LeadResearchResult
from drafter_agent import draft_proposal, DraftResult
class SalesOrchestrator:
def __init__(self, review_gate):
self.review_gate = review_gate
self.processed_count = 0
self.failed_leads = []
async def process_lead(self, lead: dict) -> dict:
"""
1件のリードを処理する完全なパイプライン。
各ステップのエラーを独立してハンドリングする。
"""
lead_id = lead.get("id", "unknown")
result = {"lead_id": lead_id, "status": "processing"}
# Step 1: リサーチ
try:
print(f"[{lead_id}] リサーチ開始: {lead['company_name']}")
research = await research_lead(lead)
result["research"] = research
result["score"] = research["score"]
except Exception as e:
result["status"] = "research_failed"
result["error"] = str(e)
self.failed_leads.append(result)
return result
# スコアチェック — 低スコアはここで止める
if not research["is_qualified"]:
result["status"] = "not_qualified"
result["reason"] = f"スコア {research['score']} が閾値60未満"
print(f"[{lead_id}] 除外: {result['reason']}")
return result
# Step 2: 提案書ドラフト
try:
print(f"[{lead_id}] ドラフト生成開始")
draft = await draft_proposal(research, lead)
result["draft"] = draft
except Exception as e:
result["status"] = "draft_failed"
result["error"] = str(e)
self.failed_leads.append(result)
return result
# Step 3: 人間レビューゲート
approved = await self.review_gate.submit_for_review(lead, research, draft)
if approved:
result["status"] = "approved"
# ここでCRM保存・メール送信を実行
await self.save_to_crm(lead, research, draft)
else:
result["status"] = "rejected_by_reviewer"
self.processed_count += 1
return result
async def process_batch(self, leads: list[dict]) -> list[dict]:
"""
複数リードを並列処理する。同時実行数は設定値で制限。
"""
semaphore = asyncio.Semaphore(5) # 同時5件まで
async def process_with_semaphore(lead):
async with semaphore:
return await self.process_lead(lead)
tasks = [process_with_semaphore(lead) for lead in leads]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
async def save_to_crm(self, lead: dict, research: dict, draft: dict):
"""
HubSpot/Salesforce CRMへの保存。
本番では hubspot-api-client を使う。
"""
crm_data = {
"company": research["company_name"],
"contact_email": lead["email"],
"lead_score": research["score"],
"industry": research["industry"],
"proposal_draft": draft.proposal_summary,
"created_at": datetime.now().isoformat()
}
# await hubspot_client.crm.contacts.basic_api.create(...)
print(f"CRM保存完了: {crm_data['company']}")
並列処理のセマフォ制限(asyncio.Semaphore(5))は見落としがちですが重要です。Anthropic APIのレート制限(デフォルトはTier1で分間60リクエスト)を超えないよう、同時リクエスト数を制御しています。
Step 5: 人間レビューゲートの実装
このシステムで最も重要な部分かもしれません。エージェントの出力を盲目的に送信しないための仕組みです。
# review_gate.py
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class ReviewRequest:
lead_id: str
company_name: str
score: int
proposal_summary: str
email_draft: str
subject_line: str
reviewer_notes: str
status: str = "pending" # pending / approved / rejected
class ReviewGate:
"""
Slack通知 + Webダッシュボードと連携したレビューゲート。
本番ではSlack Botで承認ボタンを実装する。
"""
def __init__(self, slack_webhook_url: Optional[str] = None):
self.pending_reviews: dict[str, ReviewRequest] = {}
self.slack_webhook = slack_webhook_url
self._review_events: dict[str, asyncio.Event] = {}
async def submit_for_review(
self,
lead: dict,
research: dict,
draft
) -> bool:
lead_id = lead["id"]
request = ReviewRequest(
lead_id=lead_id,
company_name=research["company_name"],
score=research["score"],
proposal_summary=draft.proposal_summary,
email_draft=draft.outreach_email,
subject_line=draft.subject_line,
reviewer_notes=draft.personalization_notes
)
self.pending_reviews[lead_id] = request
self._review_events[lead_id] = asyncio.Event()
# Slack通知を送る(本番実装)
if self.slack_webhook:
await self._notify_slack(request)
print(f"\n{'='*60}")
print(f"[レビュー待ち] {request.company_name}(スコア: {request.score})")
print(f"件名: {request.subject_line}")
print(f"メール本文:\n{request.email_draft[:300]}...")
print(f"{'='*60}")
# 開発環境: コンソールで承認/拒否を受け付ける
approved = await self._wait_for_console_input(lead_id)
return approved
async def _wait_for_console_input(self, lead_id: str) -> bool:
"""開発用のコンソール承認。本番はSlack/Webに置き換える。"""
loop = asyncio.get_event_loop()
def get_input():
while True:
answer = input("承認しますか? [y/n]: ").strip().lower()
if answer in ("y", "n"):
return answer == "y"
return await loop.run_in_executor(None, get_input)
async def _notify_slack(self, request: ReviewRequest):
"""
Slackにインタラクティブメッセージを送る。
本番ではBlock Kit + Action Buttonを使う。
"""
import aiohttp
message = {
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*新規リード承認依頼*\n企業: {request.company_name}\nスコア: {request.score}/100"
}
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "承認して送信"},
"style": "primary",
"value": f"approve_{request.lead_id}"
},
{
"type": "button",
"text": {"type": "plain_text", "text": "修正して差し戻し"},
"value": f"reject_{request.lead_id}"
}
]
}
]
}
async with aiohttp.ClientSession() as session:
await session.post(self.slack_webhook, json=message)
Slack の Block Kit を使ったインタラクティブ承認は、営業チームにとって最も使いやすいUIです。メールや管理画面に飛ぶことなく、Slack上で承認ボタンを押すだけで処理が進みます。
Step 6: 全体を動かすエントリーポイント
# main.py
import asyncio
from orchestrator import SalesOrchestrator
from review_gate import ReviewGate
import os
async def main():
# テスト用リードデータ
test_leads = [
{
"id": "lead_001",
"company_name": "テックベンチャー株式会社",
"contact_name": "山田 太郎",
"email": "yamada@techventure.co.jp",
"inquiry": "現在のAPI管理ツールのコストが高く、代替を探しています"
},
{
"id": "lead_002",
"company_name": "グローバル商事",
"contact_name": "佐藤 花子",
"email": "sato@global-shoji.jp",
"inquiry": "APIを使った社内システム構築に興味があります"
}
]
slack_webhook = os.getenv("SLACK_WEBHOOK_URL")
gate = ReviewGate(slack_webhook_url=slack_webhook)
orchestrator = SalesOrchestrator(review_gate=gate)
print(f"=== 営業AIエージェント 起動 ===")
print(f"処理対象: {len(test_leads)}件")
results = await orchestrator.process_batch(test_leads)
print(f"\n=== 処理完了 ===")
for r in results:
status_emoji = {
"approved": "✅",
"not_qualified": "⏭️",
"research_failed": "❌",
"draft_failed": "❌",
"rejected_by_reviewer": "🚫"
}.get(r.get("status", ""), "❓")
print(f"{status_emoji} {r.get('lead_id')}: {r.get('status')}")
if __name__ == "__main__":
asyncio.run(main())
Step 7: コスト試算と最適化
このシステムを月間300件のリードに適用した場合の費用を計算してみます。
モデル別の使用量(1件あたり推定)
リサーチエージェント(Haiku):
- 入力: 約5,000トークン(企業情報・ツール定義・指示)
- 出力: 約1,000トークン(スコアリング結果)
ドラフターエージェント(Sonnet):
- 入力: 約3,000トークン(リサーチ結果・プロダクト情報)
- 出力: 約1,500トークン(提案書・メール)
300件処理した場合の月間コスト概算:
- Haiku: (5,000 × 300) ÷ 1M × $0.8 = $1.2 ≈ 180円
- Sonnet(優秀リードの60%が通過 = 180件): (4,500 × 180) ÷ 1M × $3 = $2.43 ≈ 360円
- 合計: 約540円/月
これは驚くほど安いように見えますが、Webスクレイピング費用(Bright Data等で月3,000円〜)やサーバー費用を加えても月2万円以内に収まります。
さらにプロンプトキャッシングを有効にすれば(自社製品情報などの固定部分)、追加で40%のコスト削減が可能です。
# プロンプトキャッシングの実装例
def create_cached_system_message(product_context: str) -> dict:
return {
"type": "text",
"text": product_context,
"cache_control": {"type": "ephemeral"} # 最大5分間キャッシュ
}
# メッセージ作成時に使用
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=[
create_cached_system_message(PRODUCT_CONTEXT),
{"type": "text", "text": "あなたはB2B営業の専門家です。"}
],
messages=[...]
)
よくある問題と対処法
エージェントが同じツールを繰り返し呼ぶ
# イテレーション数に加え、ツール呼び出し回数も制限する
tool_call_count = {}
for iteration in range(5):
# ...
for tool_use in tool_uses:
tool_call_count[tool_use.name] = tool_call_count.get(tool_use.name, 0) + 1
if tool_call_count.get("web_search", 0) > 3:
# 強制的にスコアリングを促す
messages.append({
"role": "user",
"content": [{"type": "text", "text": "調査は十分です。score_lead ツールで評価を確定してください。"}]
})
break
レスポンスのJSON解析が失敗する
Sonnet は指示通りにJSONを返してくれますが、前後にMarkdownコードブロックが付くことがあります。
import re, json
def extract_json(text: str) -> dict:
# コードブロック除去
text = re.sub(r'```json\n?|\n?```', '', text)
# 最初のJSONオブジェクトを抽出
match = re.search(r'\{[\s\S]*\}', text)
if not match:
raise ValueError("JSONが見つかりません")
return json.loads(match.group())
Agent SDK を使ったより本格的な実装パターンについては、Claude Agent SDK 実践パターン完全ガイドも参考になります。また、エラー処理とイべント駆動設計についてはClaude API Webhook 非同期エラーリカバリガイドで詳しく解説しています。
本番デプロイ時の追加考慮事項
1. リードデータの暗号化
メールアドレスや企業情報は個人情報に該当することがあります。データベースへの保存前に暗号化し、ログにも出力しないよう注意が必要です。
2. 監査ログの整備
「エージェントがこのリードにスコア75を付けた理由」を後から確認できるよう、各処理のスナップショットをDynamoDBやCloudWatch Logsに保存してください。
3. ドリフト検知
モデルの振る舞いは同じプロンプトでも日によって微妙に変わります。週次でサンプリング評価を実施し、スコアのバラつきが許容範囲内かを確認することを勧めします。
4. フォールバック戦略
APIが一時的に利用不可になった場合(529エラー)は、指数バックオフでリトライします。詳しい実装はClaude API 529 エラー完全対応ガイドを参照してください。
全体を振り返って — 次のステップ
このシステムを手元で動かすための最小手順をまとめます。
pip install anthropic python-dotenv で依存関係をインストール
.env に ANTHROPIC_API_KEY を設定
research_agent.py の mock_web_search を実際のWeb検索APIに置き換え
main.py でテストリードを自社のCRMデータに差し替えて実行
- レビューゲートを Slack Bot に接続(Slack Block Kit + ngrok で試作可能)
人間の判断が価値を持つのは「どのリードを優先するか」「どのアプローチが相手に刺さるか」という戦略的判断の部分です。調査・スコアリング・下書き生成という時間を食う作業をエージェントに任せ、営業担当者は「この人の課題に本当に響くか」という判断だけに集中する——そういう分業が、長期的に最も高い成果をもたらすと考えています。
私自身まだこのシステムを改良中ですが、同じ課題に取り組んでいる方の参考になれば幸いです。
マルチエージェントアーキテクチャの基本設計
なぜ単一エージェントでは足りないのか
大規模なタスクを単一エージェントに任せると、コンテキストウィンドウの上限・集中型エラー・スケールアウト困難という 3 つの壁に直面します。マルチエージェントシステムはこれを分業で解決します。
[Orchestrator Agent]
├── [Research Agent] → Web検索・情報収集
├── [Analysis Agent] → データ分析・評価
├── [Writer Agent] → レポート・ドキュメント生成
└── [Review Agent] → 品質チェック・承認
Agent SDK では Agent クラスと Runner クラスが連携し、エージェント間の**ハンドオフ(handoff)**を型安全に管理します。
基本的なエージェント定義
from anthropic.agents import Agent, Runner, tool, handoff
from anthropic import Anthropic
client = Anthropic()
# ─── リサーチエージェント ───
research_agent = Agent(
name="ResearchAgent",
model="claude-sonnet-4-6",
instructions="""
あなたは情報収集の専門家です。
与えられたトピックについて、信頼性の高い情報を収集・整理します。
収集完了後は必ず AnalysisAgent へハンドオフしてください。
""",
tools=[web_search, fetch_url], # 後述のツール定義
handoffs=[handoff("AnalysisAgent")], # 次エージェントを明示
)
# ─── 分析エージェント ───
analysis_agent = Agent(
name="AnalysisAgent",
model="claude-opus-4-6", # 高精度モデルを分析に充当
instructions="""
あなたはデータ分析の専門家です。
ResearchAgent から渡された情報を構造化・評価し、
主要な洞察を 5 点以内で箇条書きにまとめてください。
完了後は WriterAgent へハンドオフしてください。
""",
handoffs=[handoff("WriterAgent")],
)
# ─── ライターエージェント ───
writer_agent = Agent(
name="WriterAgent",
model="claude-sonnet-4-6",
instructions="""
あなたはテクニカルライターです。
AnalysisAgent の洞察をもとに、読み手が即実践できるレポートを生成します。
""",
output_schema=ReportSchema, # 型付き出力(後述)
)
POINT: 高コストなモデル(Opus)は分析のみに使い、入出力量が多い検索や文章生成は Sonnet を充当することでコストを最適化できます。
プロダクション向けツール設計とガードレール
ツール定義の実装
import httpx
from anthropic.agents import tool
from pydantic import BaseModel, Field
from typing import Annotated
class SearchResult(BaseModel):
title: str
url: str
snippet: str
relevance_score: float
@tool(
name="web_search",
description="指定キーワードで Web 検索を実行し、関連性スコア付き結果を返す",
)
async def web_search(
query: Annotated[str, Field(description="検索クエリ(日本語・英語対応)")],
max_results: Annotated[int, Field(ge=1, le=10, default=5)] = 5,
) -> list[SearchResult]:
"""
実装例: SerpAPI / Brave Search API など実際のAPIに差し替えること
"""
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
"https://api.search.example.com/v1/search",
params={"q": query, "num": max_results},
headers={"Authorization": f"Bearer {SEARCH_API_KEY}"},
)
resp.raise_for_status()
results = resp.json()["results"]
return [
SearchResult(
title=r["title"],
url=r["url"],
snippet=r["snippet"],
relevance_score=r.get("score", 0.8),
)
for r in results
]
# ─── 期待出力 ───
# [
# SearchResult(title="Claude Agent SDK 公式ドキュメント", url="https://docs.anthropic.com/...",
# snippet="Agent SDK を使ったマルチエージェント...", relevance_score=0.95),
# ...
# ]
ガードレール(Guardrails)の設定
本番環境では入力バリデーション・出力検証・レート制限の三層ガードレールが必須です。
from anthropic.agents import InputGuardrail, OutputGuardrail, GuardrailFunctionOutput
# ─── 入力ガードレール: 禁止ワードとインジェクション攻撃を検出 ───
async def input_safety_check(ctx, agent, input_data) -> GuardrailFunctionOutput:
FORBIDDEN_PATTERNS = ["ignore previous", "system:", "jailbreak"]
text = str(input_data).lower()
for pattern in FORBIDDEN_PATTERNS:
if pattern in text:
return GuardrailFunctionOutput(
output_info={"detected": pattern},
tripwire_triggered=True, # ← これが True になると処理が即停止
)
return GuardrailFunctionOutput(tripwire_triggered=False)
# ─── 出力ガードレール: レポートの最低品質基準を検証 ───
async def output_quality_check(ctx, agent, output) -> GuardrailFunctionOutput:
if hasattr(output, "final_output"):
report = output.final_output
# 最低文字数・必須セクションの存在チェック
if len(report.content) < 500 or "結論" not in report.content:
return GuardrailFunctionOutput(
output_info={"reason": "品質基準未達"},
tripwire_triggered=True,
)
return GuardrailFunctionOutput(tripwire_triggered=False)
# エージェントにガードレールを適用
research_agent_guarded = research_agent.clone(
input_guardrails=[InputGuardrail(guardrail_function=input_safety_check)],
output_guardrails=[OutputGuardrail(guardrail_function=output_quality_check)],
)
エラーリカバリーとリトライ戦略
マルチエージェントシステムで最も重要なのは障害に対する堅牢性です。どのエージェントがどの段階で失敗しても、システム全体が適切にリカバリーする設計が求められます。
指数バックオフ付きリトライ
import asyncio
import logging
from anthropic.agents import Runner
from anthropic.types.agents import AgentRunError
logger = logging.getLogger(__name__)
async def run_with_retry(
runner: Runner,
initial_message: str,
max_retries: int = 3,
base_delay: float = 1.0,
) -> dict:
"""
指数バックオフ + ジッター付きリトライでエージェントを実行する
Args:
runner: 設定済み Runner インスタンス
initial_message: 初期プロンプト
max_retries: 最大リトライ回数(デフォルト 3)
base_delay: 基本待機秒数(デフォルト 1.0 秒)
Returns:
{"status": "success", "result": ...} または
{"status": "failed", "error": ..., "attempts": N}
"""
for attempt in range(max_retries + 1):
try:
result = await runner.run(initial_message)
logger.info(f"✅ 成功 (試行 {attempt + 1}/{max_retries + 1})")
return {"status": "success", "result": result, "attempts": attempt + 1}
except AgentRunError as e:
if attempt == max_retries:
logger.error(f"❌ 最大リトライ超過: {e}")
return {"status": "failed", "error": str(e), "attempts": attempt + 1}
# 指数バックオフ + ジッター(0〜1秒のランダム遅延追加)
delay = base_delay * (2 ** attempt) + asyncio.get_event_loop().time() % 1
logger.warning(f"⚠️ リトライ {attempt + 1}/{max_retries}: {e} — {delay:.2f}秒後")
await asyncio.sleep(delay)
except Exception as e:
# 予期しないエラーはリトライせずに即終了
logger.critical(f"💥 予期しないエラー: {e}")
raise
# ─── 使用例 ───
# result = await run_with_retry(runner, "2026年のAI市場を調査してください")
# → {"status": "success", "result": <RunResult>, "attempts": 1}
チェックポイントと再開機能
長時間実行するエージェントのために、進捗を永続化してタスクを途中から再開できる仕組みが重要です。
import json
from pathlib import Path
from datetime import datetime
class CheckpointManager:
"""エージェントの実行状態を JSON ファイルに永続化するマネージャー"""
def __init__(self, checkpoint_dir: str = "/tmp/agent_checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(parents=True, exist_ok=True)
def save(self, task_id: str, stage: str, data: dict) -> None:
checkpoint = {
"task_id": task_id,
"stage": stage,
"data": data,
"saved_at": datetime.utcnow().isoformat(),
}
path = self.checkpoint_dir / f"{task_id}.json"
path.write_text(json.dumps(checkpoint, ensure_ascii=False, indent=2))
logger.info(f"💾 チェックポイント保存: {task_id} @ {stage}")
def load(self, task_id: str) -> dict | None:
path = self.checkpoint_dir / f"{task_id}.json"
if path.exists():
cp = json.loads(path.read_text())
logger.info(f"♻️ チェックポイント復元: {task_id} @ {cp['stage']}")
return cp
return None
def clear(self, task_id: str) -> None:
path = self.checkpoint_dir / f"{task_id}.json"
if path.exists():
path.unlink()
観測性(Observability)の実装
本番システムでは「何が起きているか」を常に把握できる観測性が不可欠です。
トレーシングとメトリクス収集
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
@dataclass
class AgentSpan:
agent_name: str
start_time: float = field(default_factory=time.time)
end_time: float | None = None
input_tokens: int = 0
output_tokens: int = 0
tool_calls: list[str] = field(default_factory=list)
error: str | None = None
@property
def duration_ms(self) -> float:
if self.end_time:
return (self.end_time - self.start_time) * 1000
return 0.0
@property
def cost_usd(self) -> float:
# Sonnet 4.6 の価格例(実際の料金は公式を参照)
input_cost = self.input_tokens * 3e-6 # $3 / 1M tokens
output_cost = self.output_tokens * 15e-6 # $15 / 1M tokens
return input_cost + output_cost
class AgentTracer:
def __init__(self):
self.spans: list[AgentSpan] = []
@asynccontextmanager
async def trace(self, agent_name: str):
span = AgentSpan(agent_name=agent_name)
try:
yield span
span.end_time = time.time()
except Exception as e:
span.error = str(e)
span.end_time = time.time()
raise
finally:
self.spans.append(span)
logger.info(
f"📊 {agent_name}: {span.duration_ms:.0f}ms | "
f"tokens={span.input_tokens + span.output_tokens} | "
f"cost=${span.cost_usd:.4f}"
)
def summary(self) -> dict:
return {
"total_duration_ms": sum(s.duration_ms for s in self.spans),
"total_cost_usd": sum(s.cost_usd for s in self.spans),
"agent_breakdown": [
{"name": s.agent_name, "duration_ms": s.duration_ms,
"cost_usd": s.cost_usd, "error": s.error}
for s in self.spans
],
}
エンドツーエンドの実行パイプライン
ここまでの部品を組み合わせた、完全な実行パイプラインを示します。
import asyncio
from anthropic.agents import Runner
async def run_research_pipeline(topic: str, task_id: str) -> dict:
"""
完全なリサーチパイプラインを実行する
Returns:
{"report": str, "metrics": dict, "status": "success" | "failed"}
"""
tracer = AgentTracer()
checkpoint = CheckpointManager()
runner = Runner(
starting_agent=research_agent_guarded,
agents=[research_agent_guarded, analysis_agent, writer_agent],
)
# チェックポイントから再開チェック
existing = checkpoint.load(task_id)
initial_message = (
f"以下のトピックを調査してください: {topic}"
if not existing
else f"前回の続き({existing['stage']}から): {json.dumps(existing['data'])}"
)
try:
checkpoint.save(task_id, "started", {"topic": topic})
async with tracer.trace("full_pipeline"):
run_result = await run_with_retry(runner, initial_message)
if run_result["status"] == "success":
checkpoint.clear(task_id)
return {
"report": run_result["result"].final_output,
"metrics": tracer.summary(),
"status": "success",
}
else:
checkpoint.save(task_id, "failed", run_result)
return {"status": "failed", "error": run_result["error"]}
except Exception as e:
checkpoint.save(task_id, "crashed", {"error": str(e)})
return {"status": "failed", "error": str(e)}
# ─── 実行例 ───
if __name__ == "__main__":
result = asyncio.run(run_research_pipeline(
topic="2026年のエッジAI市場動向",
task_id="research-edge-ai-2026",
))
print(f"ステータス: {result['status']}")
if result["status"] == "success":
print(f"総コスト: ${result['metrics']['total_cost_usd']:.4f}")
print(f"総時間: {result['metrics']['total_duration_ms']:.0f}ms")
全体を振り返って
Claude Agent SDK でプロダクション対応マルチエージェントシステムを構築するポイントをまとめます。
- 役割分担の設計:コスト最適化のためモデルを役割に応じて使い分ける(分析=Opus、検索・生成=Sonnet)
- ガードレールの三層化:入力検証・出力品質チェック・レート制限を独立して設定する
- 指数バックオフリトライ:一時的な障害に自動対応し、チェックポイントで長時間タスクを永続化する
- 観測性の組み込み:トレーシングとコスト計算を最初から実装し、本番での問題を素早く特定できるようにする
Agent SDK の基礎から学びたい方は Agent SDK 入門ガイド を、実践パターンの詳細は Claude Agents SDK 実践パターン集 を参照してください。コードのデプロイ自動化については Claude Code エージェント活用ガイド も合わせてご覧ください。