●WWDC — WWDC 2026 confirms Siri runs on Google Gemini; third-party handoff to ChatGPT is dropped, and Siri AI won't ship in the EU under the DMA at iOS 27●BILLING — 6 days until the Jun 15 change: Agent SDK, headless Claude Code, GitHub Actions, and third-party agents move to API-rate monthly credit●OUTAGE — claude.ai, Claude Code, and Cowork saw an outage (Jun). Scheduled runs are safest when built around fallbackModel and retries●DYNAMIC-WORKFLOWS — Dynamic workflows are on by default on Max/Team and the API, for codebase-wide bug hunts and independent verification●ULTRACODE — Claude Code's new ultracode setting sits in the effort menu, fixing effort to xhigh while Claude decides when to run a workflow●OPUS4.8 — Claude Opus 4.8 is settled in as the default across major plans, with stronger coding, agentic, and reasoning skills●WWDC — WWDC 2026 confirms Siri runs on Google Gemini; third-party handoff to ChatGPT is dropped, and Siri AI won't ship in the EU under the DMA at iOS 27●BILLING — 6 days until the Jun 15 change: Agent SDK, headless Claude Code, GitHub Actions, and third-party agents move to API-rate monthly credit●OUTAGE — claude.ai, Claude Code, and Cowork saw an outage (Jun). Scheduled runs are safest when built around fallbackModel and retries●DYNAMIC-WORKFLOWS — Dynamic workflows are on by default on Max/Team and the API, for codebase-wide bug hunts and independent verification●ULTRACODE — Claude Code's new ultracode setting sits in the effort menu, fixing effort to xhigh while Claude decides when to run a workflow●OPUS4.8 — Claude Opus 4.8 is settled in as the default across major plans, with stronger coding, agentic, and reasoning skills
The AI Sales Automation Your Team Actually Wants — Building a Multi-Agent Sales System with Claude Agent SDK
A complete guide to building a production-grade multi-agent sales automation system using Claude Agent SDK — covering lead scoring, proposal drafting, human review gates, CRM integration, and cost optimization with real code examples.
There's a hard ceiling on how many leads a human sales rep can work in a day. Reading emails, researching companies, writing proposals, scheduling follow-ups — if your team is handling all of this manually, 30 to 40 leads per rep per month is about the limit.
I ran into this problem while consulting for a mid-sized SaaS company. They had 400 inbound leads per month and a sales team of three. The question on the table was simple: hire more people, or use AI? We went with AI — and by the end of the project, one rep was handling what had previously required three.
This guide walks through exactly how we did it using Claude Agent SDK. I'll share the architecture, the code that runs in production, the failure modes we hit, and how we handled them.
Why Multi-Agent — The Limits of a Single-Agent Approach
The obvious starting point is to throw everything at one agent: "Here's a lead, research the company, score it, write a proposal, and draft the outreach email." We tried this first.
It doesn't work well, for three reasons.
Context window bloat: A single lead processing task — with company website content, LinkedIn data, contact history, product documentation — easily exceeds 100k tokens when handled by one agent. Claude Opus 4 can handle 1M tokens, but using that capacity for every single lead makes the cost unsustainable.
Error propagation: If the web research step fails halfway through, the proposal draft fails too. You lose the whole pipeline for that lead. Separate agents fail independently and can be retried at the step level.
Quality inconsistency: An agent specialized in research produces better research than a generalist. Same with writing. Separation of concerns applies to agents just as much as to code.
The solution is three specialized agents coordinated by an orchestrator.
System Architecture
The system has four components:
Orchestrator: Watches the lead queue, routes tasks to agents, handles errors and retries
Research Agent: Gathers company information, industry data, and competitor landscape — then outputs a lead score
Drafter Agent: Takes research output and produces a proposal summary and personalized outreach email draft
Review Gate: Holds all outputs until a human approves them — nothing goes to the CRM or gets sent until someone clicks approve
The review gate is the most important part of this architecture. Full automation is tempting but dangerous — an AI-written email that goes out under your company's name without human review creates real brand risk. Keep humans in the loop for the sending decision, at least until you've built enough confidence in the outputs.
✦
Thank you for reading this far.
Continue Reading
What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.
WHAT YOU'LL LEARN
✦Learn the multi-agent design pattern that divides CRM integration, lead scoring, and proposal drafting across three specialized agents — with working code you can adapt for your own stack
✦Understand how to build recovery logic and human review gates for production AI agents, so you can deploy multi-agent systems without risking brand damage from unchecked outputs
✦Get a cost-optimization blueprint that handles 300 leads per month for under $30 — with concrete token estimates and caching strategies you can apply immediately
# config.pyimport osfrom dotenv import load_dotenvload_dotenv()ANTHROPIC_API_KEY = os.environ["ANTHROPIC_API_KEY"]# Model selection by agent role — this is the key to cost optimizationRESEARCH_MODEL = "claude-haiku-4-5-20251001" # Research: Haiku is plentyDRAFT_MODEL = "claude-sonnet-4-6" # Drafting: Sonnet for qualityORCHESTRATOR_MODEL = "claude-haiku-4-5-20251001" # Control: minimal costLEAD_SCORE_THRESHOLD = 60 # Only draft proposals for leads scoring 60+MAX_CONCURRENT_LEADS = 5 # Semaphore limit for API rate control
The model selection strategy here accounts for most of the cost savings. Using Haiku for research and orchestration, Sonnet only for drafting, cuts total cost by roughly 65% versus using Sonnet for everything.
Step 2: The Research Agent
# research_agent.pyimport anthropicimport jsonfrom typing import TypedDictclient = anthropic.Anthropic()class LeadResearchResult(TypedDict): company_name: str industry: str employee_count: str pain_points: list[str] competitors: list[str] score: int score_reason: str is_qualified: boolRESEARCH_TOOLS = [ { "name": "web_search", "description": "Search for company information, news, LinkedIn data, and competitor context", "input_schema": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query. Be specific — company name + site or type of info needed" } }, "required": ["query"] } }, { "name": "score_lead", "description": "Finalize the lead evaluation based on gathered information", "input_schema": { "type": "object", "properties": { "company_name": {"type": "string"}, "industry": {"type": "string"}, "employee_count": {"type": "string"}, "pain_points": { "type": "array", "items": {"type": "string"} }, "competitors": { "type": "array", "items": {"type": "string"} }, "score": { "type": "integer", "description": "0-100 fit score. Higher means better match for your product" }, "score_reason": { "type": "string", "description": "2-3 sentences explaining the score" } }, "required": ["company_name", "industry", "score", "score_reason"] } }]async def research_lead(lead: dict) -> LeadResearchResult: """ Research a lead and return a scored result. Web search is mocked here — in production, call a real scraping API. """ messages = [ { "role": "user", "content": f"""Research this lead and score them for our product.Company: {lead['company_name']}Contact: {lead['contact_name']}Email: {lead['email']}Inquiry: {lead.get('inquiry', 'None provided')}Steps:1. Use web_search to gather company info (industry, size, tech stack, competitors)2. Once you have enough information, call score_lead to finalize the evaluationScoring criteria (for a SaaS API management tool):- 100-2000 employees: +30 points- SaaS / tech company: +25 points - Specific pain point mentioned in inquiry: +20 points- Already using a competitor tool: +15 points (switching intent)- Enterprise (2000+ employees): -10 points (longer sales cycles)""" } ] final_result = None tool_call_counts = {} for iteration in range(5): # Safety cap — prevent infinite loops response = client.messages.create( model=RESEARCH_MODEL, max_tokens=2048, tools=RESEARCH_TOOLS, messages=messages ) if response.stop_reason == "end_turn": break if response.stop_reason == "tool_use": tool_uses = [b for b in response.content if b.type == "tool_use"] tool_results = [] for tool_use in tool_uses: tool_call_counts[tool_use.name] = tool_call_counts.get(tool_use.name, 0) + 1 # Prevent excessive web searches if tool_call_counts.get("web_search", 0) > 3: messages.append({ "role": "user", "content": "You have enough information. Please call score_lead now to finalize." }) break if tool_use.name == "web_search": result = await mock_web_search(tool_use.input["query"]) tool_results.append({ "type": "tool_result", "tool_use_id": tool_use.id, "content": result }) elif tool_use.name == "score_lead": score_data = tool_use.input final_result = LeadResearchResult( company_name=score_data["company_name"], industry=score_data.get("industry", "Unknown"), employee_count=score_data.get("employee_count", "Unknown"), pain_points=score_data.get("pain_points", []), competitors=score_data.get("competitors", []), score=score_data["score"], score_reason=score_data["score_reason"], is_qualified=score_data["score"] >= LEAD_SCORE_THRESHOLD ) tool_results.append({ "type": "tool_result", "tool_use_id": tool_use.id, "content": json.dumps({"status": "scored", "score": score_data["score"]}) }) messages.append({"role": "assistant", "content": response.content}) messages.append({"role": "user", "content": tool_results}) if final_result is None: raise ValueError(f"Research agent did not produce a score for {lead['company_name']}") return final_resultasync def mock_web_search(query: str) -> str: """Development mock. Replace with Serper API or Bright Data in production.""" return f"Search results for '{query}': [Mock: 500 employees, SaaS industry, currently using Apigee]"
The iteration cap (for iteration in range(5)) is non-negotiable. Without it, a confused agent will spin indefinitely and burn tokens. Set the cap, and add the explicit nudge ("you have enough information, score now") to guide it toward the exit condition.
Step 3: The Drafter Agent
# drafter_agent.pyimport anthropicimport jsonimport refrom dataclasses import dataclassclient = anthropic.Anthropic()@dataclassclass DraftResult: proposal_summary: str outreach_email: str subject_line: str personalization_notes: strPRODUCT_CONTEXT = """Product: APIGuard Pro- API management and monitoring platform for SaaS companies- Key features: rate limiting, authentication management, usage analytics, anomaly detection- Pricing: from $500/month (usage-based)- References: 200+ SaaS companies in market- Differentiator: 5-day implementation, dedicated customer success team"""async def draft_proposal(research_result: dict, lead: dict) -> DraftResult: prompt = f"""Based on this lead research, create a proposal summary and personalized outreach email.=== Lead Research ===Company: {research_result['company_name']}Industry: {research_result['industry']}Size: {research_result['employee_count']}Pain points: {', '.join(research_result.get('pain_points', []))}Current tools: {', '.join(research_result.get('competitors', []))}Contact: {lead['contact_name']}=== Our Product ==={PRODUCT_CONTEXT}Return a JSON object with these fields:{{ "proposal_summary": "2-paragraph executive summary of why our product fits this company (Markdown ok)", "outreach_email": "First-touch email body (250-350 words, conversational tone, no hard sell)", "subject_line": "Email subject line (under 50 characters)", "personalization_notes": "Notes for the sales rep — specific angles, watch-outs, conversation starters"}}Writing guidelines:- Lead with their specific problem, not our features- If they're using a competitor, acknowledge the switch cost honestly- The email goal is a reply, not a close — keep the ask small""" response = client.messages.create( model=DRAFT_MODEL, max_tokens=4096, messages=[{"role": "user", "content": prompt}] ) data = extract_json(response.content[0].text) return DraftResult( proposal_summary=data["proposal_summary"], outreach_email=data["outreach_email"], subject_line=data["subject_line"], personalization_notes=data["personalization_notes"] )def extract_json(text: str) -> dict: """Robust JSON extraction — handles markdown code blocks and extra text.""" text = re.sub(r'```json\n?|\n?```', '', text) match = re.search(r'\{[\s\S]*\}', text) if not match: raise ValueError(f"No JSON found in response: {text[:200]}") return json.loads(match.group())
We use Sonnet here specifically because the output goes directly in front of real people. The quality difference between Haiku and Sonnet on persuasive writing is significant enough to justify the price delta. For everything else in this pipeline, Haiku is fine.
Step 4: The Orchestrator
# orchestrator.pyimport asynciofrom datetime import datetimefrom research_agent import research_leadfrom drafter_agent import draft_proposalclass SalesOrchestrator: def __init__(self, review_gate): self.review_gate = review_gate self.processed = 0 self.failed = [] async def process_lead(self, lead: dict) -> dict: lead_id = lead.get("id", "unknown") result = {"lead_id": lead_id, "status": "processing"} # Step 1: Research — fail independently try: research = await research_lead(lead) result["research"] = research except Exception as e: result["status"] = "research_failed" result["error"] = str(e) self.failed.append(result) return result # Qualification gate if not research["is_qualified"]: result["status"] = "not_qualified" result["reason"] = f"Score {research['score']} below threshold {60}" return result # Step 2: Proposal draft — fail independently try: draft = await draft_proposal(research, lead) result["draft"] = draft except Exception as e: result["status"] = "draft_failed" result["error"] = str(e) self.failed.append(result) return result # Step 3: Human review — blocks until approved or rejected approved = await self.review_gate.submit_for_review(lead, research, draft) if approved: result["status"] = "approved" await self.save_to_crm(lead, research, draft) else: result["status"] = "rejected" self.processed += 1 return result async def process_batch(self, leads: list[dict]) -> list[dict]: semaphore = asyncio.Semaphore(MAX_CONCURRENT_LEADS) async def bounded(lead): async with semaphore: return await self.process_lead(lead) results = await asyncio.gather( *[bounded(lead) for lead in leads], return_exceptions=True ) return [r for r in results if not isinstance(r, Exception)] async def save_to_crm(self, lead: dict, research: dict, draft): crm_record = { "company": research["company_name"], "email": lead["email"], "score": research["score"], "industry": research["industry"], "proposal": draft.proposal_summary, "created_at": datetime.now().isoformat() } # Production: await hubspot_client.crm.contacts.basic_api.create(...) print(f"CRM save: {crm_record['company']} (score: {crm_record['score']})")
The asyncio.Semaphore(MAX_CONCURRENT_LEADS) matters. Without it, a batch of 50 leads would fire 50 simultaneous API requests, almost certainly hitting Anthropic's rate limit (60 requests per minute on Tier 1). The semaphore keeps concurrent requests within bounds.
In production, the Slack button click needs a webhook receiver on your end (a small FastAPI or Flask app) that calls a method on your ReviewGate instance to set the approval status. The async event pattern (using asyncio.Event) works well for this — the review gate waits on the event, and the webhook handler fires it when the button is clicked.
Cost Model for 300 Leads/Month
Using the token estimates from our production deployment:
Drafter Agent (Sonnet) — for 60% of leads that pass the threshold:
Input: ~3,000 tokens
Output: ~1,500 tokens
Monthly totals for 300 leads (180 making it through to drafting):
Haiku: (6,000 × 300) / 1M × $0.80 = ~$1.44
Sonnet: (4,500 × 180) / 1M × $3.00 = ~$2.43
Total API cost: ~$3.87/month
Add infrastructure (a small VPS or Lambda), a web scraping service like Serper or Bright Data (~$30-50/month), and you're looking at well under $100/month for a system that handles 300 leads.
Prompt caching can cut this further. The product context sent to every Drafter call is identical — cache it:
response = client.messages.create( model=DRAFT_MODEL, max_tokens=4096, system=[ { "type": "text", "text": PRODUCT_CONTEXT, "cache_control": {"type": "ephemeral"} # Cache for up to 5 minutes } ], messages=[...])
With caching, the input token cost for Drafter drops by roughly 40%.
Common Failure Modes
Agent loops on the same tool call
Add explicit call count tracking and a redirect message:
if tool_call_counts.get("web_search", 0) > 3: # Redirect the agent toward the exit condition messages.append({ "role": "user", "content": "Sufficient research collected. Please call score_lead to finalize." })
JSON parsing fails on draft output
Sonnet occasionally wraps JSON in a code block or adds explanatory text before it. The extract_json function above handles this, but you can also use a stop sequence approach:
response = client.messages.create( model=DRAFT_MODEL, max_tokens=4096, stop_sequences=["```"], # Prevents trailing code blocks messages=[{"role": "user", "content": prompt + "\nRespond with raw JSON only, no code blocks."}])
Rate limits under high concurrency
The semaphore handles the Anthropic rate limit. For the web scraping API, add a separate rate limit per scraping provider. Mixing asyncio.Semaphore for different resources is the cleanest pattern:
Data privacy: Lead data contains PII. Encrypt before writing to any database, and never log email addresses or company names to console in production.
Audit trails: For every lead processed, store a snapshot of the research result, the draft, and the reviewer decision. This is the first thing stakeholders ask for.
Model drift monitoring: The same prompt can produce slightly different outputs week to week as model behavior shifts. Run weekly spot checks on a sample of outputs and compare against your quality baseline.
Graceful degradation: If the Research Agent fails (API timeout, scraping block), the Drafter can still run with only the information from the inbound form. A reduced-quality draft is better than no draft at all.
Building a Persistent Lead Queue with Async Processing
In the examples above, we process leads synchronously — one batch at a time. Production deployments need a persistent queue that survives process restarts and handles variable inbound volume. Here's how to wire this up with Redis and asyncio.
# queue_manager.pyimport asyncioimport jsonimport redis.asyncio as aioredisfrom orchestrator import SalesOrchestratorfrom review_gate import ReviewGateREDIS_URL = "redis://localhost:6379"LEAD_QUEUE_KEY = "sales:leads:pending"FAILED_QUEUE_KEY = "sales:leads:failed"class LeadQueueManager: def __init__(self): self.redis = None self.running = False async def connect(self): self.redis = await aioredis.from_url(REDIS_URL) async def enqueue_lead(self, lead: dict): """ Add a new lead to the processing queue. Call this from your CRM webhook handler or form processor. """ await self.redis.rpush(LEAD_QUEUE_KEY, json.dumps(lead)) print(f"Queued: {lead['company_name']} ({await self.redis.llen(LEAD_QUEUE_KEY)} total)") async def start_processing(self, orchestrator: SalesOrchestrator): """ Continuously pull from the queue and process leads. Runs until stopped — deploy as a background worker. """ self.running = True print("Queue worker started. Waiting for leads...") while self.running: # Block for up to 5 seconds waiting for a new item item = await self.redis.blpop(LEAD_QUEUE_KEY, timeout=5) if item is None: continue # Timeout — loop back and wait again _, lead_json = item lead = json.loads(lead_json) try: result = await orchestrator.process_lead(lead) if result["status"] in ("research_failed", "draft_failed"): # Move to failed queue for manual inspection await self.redis.rpush(FAILED_QUEUE_KEY, json.dumps({ "lead": lead, "error": result.get("error"), "step": result["status"] })) except Exception as e: print(f"Unexpected error processing {lead.get('company_name')}: {e}") await self.redis.rpush(FAILED_QUEUE_KEY, json.dumps({ "lead": lead, "error": str(e), "step": "unknown" })) async def retry_failed(self, orchestrator: SalesOrchestrator): """ Move failed leads back to the main queue for retry. Run this manually after investigating failures. """ failed_count = await self.redis.llen(FAILED_QUEUE_KEY) print(f"Retrying {failed_count} failed leads...") for _ in range(failed_count): item = await self.redis.lpop(FAILED_QUEUE_KEY) if item: failed_data = json.loads(item) await self.enqueue_lead(failed_data["lead"])
This queue manager gives you durability (leads survive a process restart), observability (failed leads are inspectable in a separate queue), and backpressure control (the semaphore in the orchestrator still controls concurrency).
To ingest leads from a webhook — for example, a HubSpot form submission or a Typeform — wire it to enqueue_lead:
Deploy the webhook receiver and queue worker as separate processes (or containers). The worker can run on a $5/month VPS — it's not compute-intensive.
Tuning the Scoring Rubric
The scoring criteria in the Research Agent prompt is the single biggest lever on system output quality. A poorly tuned rubric will either flood the Drafter with low-fit leads (expensive, wastes rep time on reviews) or filter out good leads (missed revenue).
Here's a methodology for calibrating it without running the full pipeline:
# rubric_calibration.pyimport anthropicimport jsonclient = anthropic.Anthropic()HISTORICAL_LEADS = [ { "lead": {"company_name": "TechCorp Inc", "inquiry": "We're looking for a replacement for Kong Gateway"}, "converted": True, "days_to_close": 45 }, { "lead": {"company_name": "Megacorp Global", "inquiry": "Just exploring options"}, "converted": False, "days_to_close": None }, # Add 50-100 historical leads with known outcomes]async def score_historical_leads(): """ Run historical leads through the Research Agent and compare predicted scores to actual conversion outcomes. """ from research_agent import research_lead results = [] for item in HISTORICAL_LEADS: research = await research_lead(item["lead"]) results.append({ "company": item["lead"]["company_name"], "predicted_score": research["score"], "actual_converted": item["converted"] }) # Calculate precision/recall at different thresholds for threshold in [50, 60, 70, 80]: predicted_qualified = [r for r in results if r["predicted_score"] >= threshold] true_positives = sum(1 for r in predicted_qualified if r["actual_converted"]) if len(predicted_qualified) == 0: continue precision = true_positives / len(predicted_qualified) recall = true_positives / sum(1 for r in results if r["actual_converted"]) print(f"Threshold {threshold}: Precision={precision:.2f}, Recall={recall:.2f}, " f"Leads passed={len(predicted_qualified)}")asyncio.run(score_historical_leads())
Run this against 50-100 historical leads where you know the conversion outcome. The output tells you where to set the threshold. A threshold of 60 that gives you 70% precision and 80% recall is a much better starting point than a rubric you guessed at.
Monitoring the Running System
Once deployed, you need visibility into what the agents are doing. Two things matter most: cost per lead and lead-to-meeting conversion rate.
Write these log entries to CloudWatch, Datadog, or even a simple append-only CSV file. After a month, you'll have enough data to answer the questions that actually matter: "Is our scoring rubric working?" and "Which draft quality signals predict conversion?"
The answer to those questions is where the real optimization happens — not in the code, but in the prompts and rubric that drive the agents.
Scaling Beyond a Single Team
Once the system is working for your sales team, the architecture generalizes well. The same multi-agent pattern — specialized researchers, specialized writers, human review gates — applies to:
Partner outreach: Research potential integration partners, score their audience fit, draft partnership proposals. The rubric changes (you're scoring for audience overlap and technical compatibility rather than company size), but the code structure is identical.
Renewal risk identification: Run your existing customer base through a research agent that looks for signals of churn risk — leadership changes, job postings for competing tools, reduced product usage. Flag high-risk accounts for CS outreach before renewal conversations start.
Conference lead follow-up: After a trade show, ingest the badge scans or card photos, run research, score, and have drafts ready before your team lands at the airport. The review gate is especially useful here because conference leads often need context from the in-person conversation that the agent can't know.
In all of these cases, the underlying principle is the same: use AI for the research and drafting work that scales poorly with human headcount, and keep humans in the loop for the judgment calls that actually matter.
Getting Started in One Weekend
If you want to run this yourself, here's a realistic weekend plan:
Saturday morning: Set up the environment, implement the Research Agent with mock web search, and verify that the scoring logic produces reasonable outputs on a handful of test leads from your actual CRM.
Saturday afternoon: Implement the Drafter Agent. Feed it 5-10 of your best historical proposals as examples in the system prompt (few-shot prompting). Adjust until the output matches your company's tone.
Sunday morning: Wire up the Review Gate with Slack (Block Kit + a small FastAPI webhook receiver). Test the full pipeline end to end with 20 real leads.
Sunday afternoon: Review the outputs, tune the scoring rubric based on what you see, and decide on the threshold. Deploy the queue worker to a VPS.
By Monday morning, you have a running system. It won't be perfect, but it will be producing something your team can actually use — and iterating from there is much faster than building from scratch.
Where to Take This Next
The system described here gets you to roughly 3x lead throughput per rep. Where to go from there:
The most impactful next step is closing the feedback loop. Track which approved proposals actually convert to meetings and which don't, then feed that signal back into the scoring criteria. A scoring model that learns from outcomes is significantly more accurate than a static rubric.
The second improvement is adding a personalization layer to the Drafter — pulling in recent company news, job postings, or LinkedIn activity before drafting. This turns generic outreach into genuinely tailored messages and measurably improves reply rates.
The work of building this taught me something I didn't expect: the bottleneck shifts from "doing the research" to "deciding which leads are worth the human's time." That's a more interesting problem — and a better use of your sales team's judgment.
Designing a Multi-Agent Architecture
When Single Agents Fall Short
Relying on a single agent for complex tasks creates three bottlenecks: context window exhaustion, centralized failure points, and poor scalability. A multi-agent architecture solves these by dividing responsibilities:
The Agent SDK's Agent and Runner classes manage handoffs between agents in a type-safe, auditable way.
Defining Your Agents
from anthropic.agents import Agent, Runner, tool, handofffrom anthropic import Anthropicclient = Anthropic()# ─── Research Agent ───research_agent = Agent( name="ResearchAgent", model="claude-sonnet-4-6", instructions=""" You are a research specialist. Collect accurate, relevant information on the given topic from multiple sources. When finished, hand off to AnalysisAgent. """, tools=[web_search, fetch_url], # defined below handoffs=[handoff("AnalysisAgent")],)# ─── Analysis Agent ───analysis_agent = Agent( name="AnalysisAgent", model="claude-opus-4-6", # Use the highest-capability model for analysis instructions=""" You are a data analyst. Structure and evaluate the information from ResearchAgent, then summarize your top 5 insights. Hand off to WriterAgent when done. """, handoffs=[handoff("WriterAgent")],)# ─── Writer Agent ───writer_agent = Agent( name="WriterAgent", model="claude-sonnet-4-6", instructions=""" You are a technical writer. Turn AnalysisAgent's insights into an actionable report that readers can immediately apply. """, output_schema=ReportSchema, # typed output schema defined separately)
Cost Tip: Reserve expensive models like Opus for analysis-heavy tasks where reasoning depth matters. Use Sonnet for high-throughput steps like search and writing.
Production Tool Design and Guardrails
Implementing Tools with Pydantic Validation
import httpxfrom anthropic.agents import toolfrom pydantic import BaseModel, Fieldfrom typing import Annotatedclass SearchResult(BaseModel): title: str url: str snippet: str relevance_score: float@tool( name="web_search", description="Run a web search and return relevance-scored results",)async def web_search( query: Annotated[str, Field(description="Search query string")], max_results: Annotated[int, Field(ge=1, le=10, default=5)] = 5,) -> list[SearchResult]: """ Replace with your actual search API (SerpAPI, Brave Search, etc.) """ async with httpx.AsyncClient(timeout=10.0) as client: resp = await client.get( "https://api.search.example.com/v1/search", params={"q": query, "num": max_results}, headers={"Authorization": f"Bearer {SEARCH_API_KEY}"}, ) resp.raise_for_status() results = resp.json()["results"] return [ SearchResult( title=r["title"], url=r["url"], snippet=r["snippet"], relevance_score=r.get("score", 0.8), ) for r in results ]# Expected output:# [# SearchResult(title="Claude Agent SDK Docs", url="https://docs.anthropic.com/...",# snippet="Multi-agent systems with...", relevance_score=0.95),# ...# ]
Three-Layer Guardrail Strategy
Production systems require defense at three levels: input validation, output quality, and rate limiting.
from anthropic.agents import InputGuardrail, OutputGuardrail, GuardrailFunctionOutput# ─── Input Guardrail: Block prompt injection and forbidden patterns ───async def input_safety_check(ctx, agent, input_data) -> GuardrailFunctionOutput: FORBIDDEN_PATTERNS = ["ignore previous", "system:", "jailbreak", "disregard"] text = str(input_data).lower() for pattern in FORBIDDEN_PATTERNS: if pattern in text: return GuardrailFunctionOutput( output_info={"detected": pattern}, tripwire_triggered=True, # ← Halts execution immediately ) return GuardrailFunctionOutput(tripwire_triggered=False)# ─── Output Guardrail: Enforce minimum quality standards on reports ───async def output_quality_check(ctx, agent, output) -> GuardrailFunctionOutput: if hasattr(output, "final_output"): report = output.final_output # Enforce minimum length and required sections if len(report.content) < 500 or "conclusion" not in report.content.lower(): return GuardrailFunctionOutput( output_info={"reason": "Quality threshold not met"}, tripwire_triggered=True, ) return GuardrailFunctionOutput(tripwire_triggered=False)# Attach guardrails to the agentresearch_agent_guarded = research_agent.clone( input_guardrails=[InputGuardrail(guardrail_function=input_safety_check)], output_guardrails=[OutputGuardrail(guardrail_function=output_quality_check)],)
Retry Strategies and Error Recovery
Resilience is non-negotiable in production. Any agent can fail—API timeouts, rate limits, transient network errors—and your system must handle these gracefully.
Exponential Backoff with Jitter
import asyncioimport loggingfrom anthropic.agents import Runnerfrom anthropic.types.agents import AgentRunErrorlogger = logging.getLogger(__name__)async def run_with_retry( runner: Runner, initial_message: str, max_retries: int = 3, base_delay: float = 1.0,) -> dict: """ Run an agent pipeline with exponential backoff and jitter. Returns: {"status": "success", "result": ..., "attempts": N} or {"status": "failed", "error": ..., "attempts": N} """ for attempt in range(max_retries + 1): try: result = await runner.run(initial_message) logger.info(f"✅ Succeeded on attempt {attempt + 1}") return {"status": "success", "result": result, "attempts": attempt + 1} except AgentRunError as e: if attempt == max_retries: logger.error(f"❌ Max retries exceeded: {e}") return {"status": "failed", "error": str(e), "attempts": attempt + 1} # Exponential backoff + random jitter (0–1 second) delay = base_delay * (2 ** attempt) + asyncio.get_event_loop().time() % 1 logger.warning(f"⚠️ Retry {attempt + 1}/{max_retries}: {e} — waiting {delay:.2f}s") await asyncio.sleep(delay) except Exception as e: # Unexpected errors are not retried logger.critical(f"💥 Unexpected error: {e}") raise
Checkpointing for Long-Running Tasks
For pipelines that run for several minutes or longer, persist progress so you can resume from the last successful stage:
Observability from the start — Embed tracing and cost attribution before you hit production, not after.
For foundational Agent SDK concepts, see the [Agent SDK Beginner's Guide]((/articles/api-sdk/agent-sdk-guide). For more hands-on patterns, check out [Claude Agents SDK Practical Patterns]((/articles/api-sdk/claude-agents-sdk-practical-patterns). To automate deployments with these agents, visit the [Claude Code Agent Guide]((/articles/claude-code/claude-code-agent-guide).
Share
Thank You for Reading
Claude Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.