●FABLE5 — Claude Fable 5 launches (Jun 9): the first generally available Mythos-class model, beyond Opus, with 1M-token context, 128k output, and always-on adaptive thinking●FREE-WINDOW — Fable 5 is included free on Pro, Max, Team, and Enterprise through Jun 22; usage credits required from Jun 23. API pricing is $10/$50 per MTok●SAFEGUARDS — Fable 5 falls back to Opus 4.8 on high-risk topics (under 5% of sessions); the unrestricted Mythos 5 is limited to vetted organizations●IPO — Anthropic confidentially files for an IPO (Jun 1), with a reported $65B raise, $965B valuation, and $47B annualized revenue●BILLING — 3 days to the Jun 15 change: Agent SDK, headless Claude Code, GitHub Actions, and third-party agents move to API-rate monthly credits●PLATFORM — Claude Developer Platform adds Managed Agents scheduled deployments, vault env credentials, and session thread webhook events●FABLE5 — Claude Fable 5 launches (Jun 9): the first generally available Mythos-class model, beyond Opus, with 1M-token context, 128k output, and always-on adaptive thinking●FREE-WINDOW — Fable 5 is included free on Pro, Max, Team, and Enterprise through Jun 22; usage credits required from Jun 23. API pricing is $10/$50 per MTok●SAFEGUARDS — Fable 5 falls back to Opus 4.8 on high-risk topics (under 5% of sessions); the unrestricted Mythos 5 is limited to vetted organizations●IPO — Anthropic confidentially files for an IPO (Jun 1), with a reported $65B raise, $965B valuation, and $47B annualized revenue●BILLING — 3 days to the Jun 15 change: Agent SDK, headless Claude Code, GitHub Actions, and third-party agents move to API-rate monthly credits●PLATFORM — Claude Developer Platform adds Managed Agents scheduled deployments, vault env credentials, and session thread webhook events
Cutting Claude API Costs in Half with Messages Batches API — Design Patterns from an Indie Developer
How to reduce Claude API costs by up to 50% using the Messages Batches API. Includes async design patterns, real cost calculations, and production-ready error handling from an indie developer who runs four AI blogs on autopilot.
A few months after I started running four AI blogs on full autopilot, I opened one month's API invoice and found a number that made me pause. It was nearly double what I expected.
The cause was easy to diagnose. I had been using the synchronous Messages API for background batch jobs — the same API designed for low-latency user-facing responses. No user was waiting for those results in real time. Switching to Anthropic's Messages Batches API cut those costs roughly in half, and combined with model selection, the savings compounded quickly.
For an indie developer, cost sensitivity is second nature. When API spending drops by half, that budget goes back into product. This article covers the design patterns, cost calculations, and real-world gotchas I've collected while running Batches API in production.
What the Messages Batches API Solves
The standard Claude Messages API is synchronous by design. A request goes in, a response comes out. For user-facing interactions — chat, live translation, real-time code completion — that behavior is exactly right.
But consider these workloads:
Sentiment analysis on 1,000 app reviews overnight
Generating SEO metadata for 200 articles in one pass
Summarizing 100 news items each morning before business hours
Analyzing user behavior logs to produce a morning report
None of these require a response in milliseconds. What they need is to finish within a reasonable window — say, a few hours — at the lowest cost per token possible.
The Messages Batches API is purpose-built for this use case. Key specs:
Cost: Up to 50% discount compared to standard Messages API pricing
Latency window: Up to 24 hours (often completes in minutes to a few hours)
Batch size: Up to 10,000 requests per batch
Limitations: No streaming, no synchronous response
The official docs state the 50% figure but don't explain the reason. My working theory: without real-time requirements, Anthropic can schedule processing during off-peak compute windows, passing the efficiency savings to the caller.
When to Use Batch vs Real-Time
Here's the decision framework I use in practice.
Use Batches API when all of these are true
No user is waiting for the result in real time
Completing within 24 hours is sufficient
You have 10 or more requests (the setup overhead is worth it)
You can retry failures without hard downstream dependencies
Use the standard Messages API when any of these apply
A user is actively waiting for a response
You need streaming output to the UI
It's a one-off or ad hoc request
Request N's output feeds directly into request N+1 (synchronous chain)
In practice, the boundary case is anything that runs in a backend pipeline but isn't latency-sensitive. My rule of thumb: if it doesn't touch a screen until a human opens a dashboard later, it's a batch job.
✦
Thank you for reading this far.
Continue Reading
What follows includes implementation code, benchmarks, and practical content we hope you'll find useful. This site runs without ads — server and development costs are supported entirely by members like you. If it's been helpful, we'd be truly grateful for your support.
WHAT YOU'LL LEARN
✦A production decision flow for routing workloads between the synchronous API and batches, plus a 5-point pre-adoption checklist
✦Complete working Python code covering batch creation, polling, result retrieval, and partial-failure retries
✦Measured completion-time distributions from 31 production batches and real monthly cost data you will not find in the official docs
Secure payment via Stripe · Cancel anytime
✦
Unlock This Article
Get full access to the rest of this article. Buy once, read anytime. This site is ad-free — your support goes directly toward keeping it running.
Implementation: Building Batch Jobs with the Python SDK
Let's look at actual code. I'll start with a simple multi-text analysis example using the Anthropic Python SDK.
Step 1: Create a batch
import anthropicimport timeclient = anthropic.Anthropic(api_key="YOUR_API_KEY")def create_batch(items: list[dict]) -> str: """ Create a batch job for multiple text items. Args: items: [{"id": "item_1", "text": "Text to analyze"}, ...] Returns: batch_id used for status checks and result retrieval """ requests = [ anthropic.types.message_create_params.Request( custom_id=item["id"], params=anthropic.types.message_create_params.MessageCreateParamsNonStreaming( model="claude-haiku-4-5-20251001", # Haiku is ideal for batch workloads max_tokens=512, messages=[ { "role": "user", "content": ( "Classify the sentiment of the following text as " "positive, negative, or neutral. " "Provide the label and one sentence of reasoning.\n\n" + item["text"] ) } ] ) ) for item in items ] batch = client.messages.batches.create(requests=requests) print(f"✅ Batch created") print(f" Batch ID: {batch.id}") print(f" Requests queued: {batch.request_counts.processing}") print(f" Status: {batch.processing_status}") return batch.id# Exampleitems = [ {"id": "review_001", "text": "The UI is intuitive but startup is a bit slow."}, {"id": "review_002", "text": "Too many ads. The paid version is overpriced."}, {"id": "review_003", "text": "Simple and clean. Does what I need."},]batch_id = create_batch(items)# → Batch ID: msgbatch_01Abc123...
One thing the official docs don't emphasize: using claude-haiku-4-5-20251001 over Sonnet can produce a 5–10x additional cost difference for straightforward tasks. Sentiment classification, summarization, and metadata generation don't need Sonnet's reasoning depth. Batch discount + model selection is the two-lever combination that drives real savings at indie scale.
Step 2: Poll for completion
def wait_for_batch( batch_id: str, poll_interval: int = 30, max_wait_seconds: int = 7200 # 2 hours max) -> str: """ Wait for batch completion and return the final status. Returns: "ended" | "errored" | "canceled" | "expired" | "timeout" """ start_time = time.time() while True: elapsed = int(time.time() - start_time) if elapsed > max_wait_seconds: print(f"⚠️ Timed out after {max_wait_seconds}s") return "timeout" batch = client.messages.batches.retrieve(batch_id) status = batch.processing_status counts = batch.request_counts total = ( counts.processing + counts.succeeded + counts.errored + counts.canceled + counts.expired ) completed = counts.succeeded + counts.errored + counts.canceled + counts.expired print(f"⏳ [{elapsed}s] {status}: {completed}/{total} complete") if status == "ended": print( f"✅ Batch complete " f"(succeeded: {counts.succeeded}, " f"errored: {counts.errored}, " f"expired: {counts.expired})" ) return status if status in ("errored", "canceled"): print(f"❌ Batch failed: {status}") return status time.sleep(poll_interval)
An important practical note: polling intervals under a few seconds can trigger rate limit errors. I use 30–60 second intervals in production. For rate limit details, see the API Rate Limits & Best Practices. A 10,000-request batch rarely completes in under 10 minutes — build your polling around realistic expectations.
Step 3: Retrieve and process results
def retrieve_batch_results(batch_id: str) -> dict[str, dict]: """ Retrieve batch results, keyed by custom_id. Returns: {"review_001": {"type": "succeeded", "content": "..."}, ...} """ results = {} for result in client.messages.batches.results(batch_id): custom_id = result.custom_id if result.result.type == "succeeded": message = result.result.message content_text = message.content[0].text if message.content else "" results[custom_id] = { "type": "succeeded", "content": content_text, "input_tokens": message.usage.input_tokens, "output_tokens": message.usage.output_tokens, } elif result.result.type == "errored": error = result.result.error results[custom_id] = { "type": "errored", "error_type": error.type, "error_message": str(error), } elif result.result.type == "expired": results[custom_id] = {"type": "expired"} succeeded = sum(1 for r in results.values() if r["type"] == "succeeded") failed = len(results) - succeeded print(f"📊 Retrieved: {succeeded} succeeded, {failed} failed/expired") return results# Full flowbatch_id = create_batch(items)final_status = wait_for_batch(batch_id)if final_status == "ended": results = retrieve_batch_results(batch_id) for item_id, result in results.items(): if result["type"] == "succeeded": print(f"{item_id}: {result['content'][:100]}...")
These three steps are the complete Batches API flow. It reads simply, but production use surfaces more edge cases than the happy path suggests — which the next sections address.
Cost Calculation: What You Actually Save
Let me put concrete numbers on it. Using claude-haiku-4-5-20251001 pricing as of May 2026 (verify current rates at Anthropic's pricing page):
Scenario: 10,000 requests/month
Model: claude-haiku-4-5-20251001
Input: 500 tokens / request
Output: 200 tokens / request
Standard Messages API:
Input: $0.80/MTok × 500 tok × 10,000 = $4.00/month
Output: $4.00/MTok × 200 tok × 10,000 = $8.00/month
Total: $12.00/month
Messages Batches API (up to 50% discount):
Input: $0.40/MTok × 500 tok × 10,000 = $2.00/month
Output: $2.00/MTok × 200 tok × 10,000 = $4.00/month
Total: $6.00/month
Monthly savings: $6.00. At 100,000 requests, that's $60/month. At 1 million, $600/month. The relative savings stay constant regardless of scale.
The bigger multiplier comes from model selection. If the same 10,000-request workload uses Sonnet instead of Haiku, costs are roughly 5–10x higher before the batch discount. Combining Haiku + Batches API against a Sonnet real-time baseline can reduce costs by a factor of 10 or more.
I applied this combination to automated SEO metadata generation across four sites, and it meaningfully reduced the monthly API spend that would otherwise accumulate at that scale.
Production Design Patterns
Here's what I've learned from running Batches API in a live system.
Pattern 1: Persist results immediately
Batch results are deleted after 24 hours. Always write them to durable storage as soon as retrieval completes.
import sqlite3from datetime import datetimedef save_results_to_db( batch_id: str, results: dict[str, dict], db_path: str = "batch_results.db") -> None: """Persist batch results to SQLite.""" conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS batch_results ( batch_id TEXT, custom_id TEXT, type TEXT, content TEXT, input_tokens INTEGER, output_tokens INTEGER, error_message TEXT, created_at TEXT, PRIMARY KEY (batch_id, custom_id) ) """) now = datetime.now().isoformat() for custom_id, result in results.items(): cursor.execute(""" INSERT OR REPLACE INTO batch_results (batch_id, custom_id, type, content, input_tokens, output_tokens, error_message, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( batch_id, custom_id, result.get("type"), result.get("content"), result.get("input_tokens"), result.get("output_tokens"), result.get("error_message"), now )) conn.commit() conn.close() print(f"💾 Saved {len(results)} results to DB")
My pattern: record the batch ID to the database before the job starts, then update a completion flag after retrieval. If the process crashes mid-run, the batch ID is preserved and the job can resume without re-queuing.
Pattern 2: Retry only failed requests
Some requests within a batch will fail while others succeed. Re-running the entire batch wastes money. Instead, extract failed custom IDs and submit a new smaller batch.
def retry_failed_requests( original_items: list[dict], results: dict[str, dict], max_retries: int = 2) -> dict[str, dict]: """Retry only failed requests from a batch.""" failed_ids = { cid for cid, r in results.items() if r["type"] != "succeeded" } if not failed_ids: print("✅ All requests succeeded, no retry needed") return results print(f"⚠️ {len(failed_ids)} failed, retrying") failed_items = [i for i in original_items if i["id"] in failed_ids] for attempt in range(max_retries): print(f"🔄 Retry {attempt + 1}/{max_retries}") retry_batch_id = create_batch(failed_items) retry_status = wait_for_batch(retry_batch_id) if retry_status == "ended": retry_results = retrieve_batch_results(retry_batch_id) for cid, result in retry_results.items(): if result["type"] == "succeeded": results[cid] = result failed_ids.discard(cid) if not failed_ids: print("✅ All requests resolved after retry") break if failed_ids: print(f"❌ {len(failed_ids)} requests failed permanently") return results
Pattern 3: Right-size your batches
The 10,000-request limit is a ceiling, not a target. Here's the sizing I use:
Development / testing: 10–100 requests (fast feedback loops)
Production, high scale: 1,000–5,000 requests (cost-optimized, still recoverable)
Full 10,000-request batches are unwieldy unless you have a specific reason. Processing time becomes unpredictable, and partial failure retries get expensive.
Pattern 4: Job state management
When running multiple concurrent batches, a simple state class keeps things organized:
import jsonfrom dataclasses import dataclass, field, asdictfrom pathlib import Path@dataclassclass BatchJob: """Tracks the lifecycle of a batch job.""" job_name: str batch_id: str | None = None status: str = "pending" # pending | running | ended | failed total_items: int = 0 succeeded: int = 0 failed: int = 0 retry_count: int = 0 created_at: str = field( default_factory=lambda: datetime.now().isoformat() ) completed_at: str | None = None def save(self, state_dir: str = "/tmp/batch_states") -> None: Path(state_dir).mkdir(exist_ok=True) with open(f"{state_dir}/{self.job_name}.json", "w") as f: json.dump(asdict(self), f, indent=2) @classmethod def load(cls, job_name: str, state_dir: str = "/tmp/batch_states") -> "BatchJob": with open(f"{state_dir}/{job_name}.json") as f: return cls(**json.load(f))
Common Pitfalls
These are the specific mistakes I made — or caught early — during initial production use.
Pitfall 1: Losing the batch ID
Creating a batch without immediately persisting its ID is the most costly mistake. If the calling process crashes or restarts, there's no clean way to reconnect to an in-flight batch. Write the ID to disk or a database before doing anything else after create().
Pitfall 2: Treating "ended" as "all succeeded"
processing_status: "ended" means the batch finished processing — not that every request succeeded. Always check request_counts.errored and request_counts.expired explicitly:
batch = client.messages.batches.retrieve(batch_id)if batch.processing_status == "ended": counts = batch.request_counts if counts.errored > 0 or counts.expired > 0: print(f"⚠️ Partial failure: errored={counts.errored}, expired={counts.expired}") # Route to retry logic
Pitfall 3: Ignoring the "expired" result type
Individual requests within a batch can expire if processing exceeds 24 hours. This is rare under normal conditions but happens during high-load periods. The expired result type appears in retrieve_batch_results() — treat it the same as errored and route to retry. For deeper error handling patterns, see the error handling guide.
Pitfall 4: Chaining requests within a single batch
Batches assume request independence. If request B needs output from request A, you can't do that within one batch. Split into separate sequential batches: run batch A, retrieve results, feed into batch B.
Pitfall 5: Sub-second polling intervals
Calling retrieve() every second for a 10,000-item batch will hit rate limit errors before the batch finishes. Use 30–60 second polling intervals. For long-running jobs, consider a cron-based checker that reads persisted batch IDs rather than a blocking while-loop.
Real-World Use Cases from My Workflow
Two patterns I run regularly in production.
Use case 1: Bulk SEO metadata generation
When I add a new category across multiple sites, I often want to audit and refresh existing article descriptions in bulk. Running that through the standard API synchronously would be slow and expensive. With Batches API, I queue it overnight and review results in the morning.
import globdef queue_seo_description_refresh(articles_dir: str) -> str: """Queue SEO description regeneration for up to 200 MDX files.""" mdx_files = glob.glob(f"{articles_dir}/**/*.mdx", recursive=True) items = [] for filepath in mdx_files[:200]: with open(filepath, encoding="utf-8") as f: content = f.read() body = content.split("---", 2)[-1][:1000] slug = filepath.split("/")[-1].replace(".mdx", "") items.append({ "id": slug, "text": ( "Read this technical article excerpt and write a compelling " "SEO meta description in under 160 characters. " "Write for a developer audience. " "Do not start with 'In this article' or 'This guide'.\n\n" + body ) }) print(f"📝 Queuing {len(items)} files") return create_batch(items)
Use case 2: Daily app review analysis
On busy days, the apps I run receive anywhere from dozens to a few hundred reviews, and the volume adds up fast. Running sentiment analysis and feature extraction on each day's reviews gives me a daily signal on user satisfaction trends — something I couldn't afford to run in real time at that volume.
def queue_daily_review_analysis(reviews: list[dict]) -> str: """ Batch-analyze app store reviews. reviews: [{"id": "rev_001", "text": "Review text", "rating": 4}, ...] """ items = [] for review in reviews: prompt = ( "Analyze this app review and return only JSON:\n" '{"sentiment":"positive|negative|neutral",' '"main_issue":"primary complaint or null",' '"feature_request":"requested feature or null",' '"urgency":"high|medium|low"}\n\n' f"Rating: {review['rating']} stars\n" f"Review: {review['text']}" ) items.append({"id": review["id"], "text": prompt}) return create_batch(items)
The combination of Haiku + Batches API is particularly effective for structured extraction tasks like this — the model is fast, inexpensive, and accurate enough for classification workloads.
Operational Notes the Docs Don't Mention
These are the things I only learned by running batches in production, with my own measurements attached.
Completion times: design for the worst case, not the median
The official guarantee is just "within 24 hours," but real batches finish far sooner. Across 31 production runs of ~200-request Haiku batches in April and May 2026, my numbers were:
Median: ~9 minutes
75th percentile: ~18 minutes
Fastest: 3 minutes / slowest: 52 minutes
I couldn't establish a reliable correlation with submission time of day from 31 data points. The design lesson is simple: schedule downstream steps around the slowest observed case (about an hour), not the median. If you hard-code a 15-minute delay because batches "usually take 10 minutes," the occasional slow batch will break your pipeline.
Cron + state files beat long polling
I started with a blocking wait_for_batch() loop, but eventually settled on a different shape: save the job state at creation time, exit immediately, and let a cron job check pending batches every 15 minutes. It survives restarts and crashes far better than a process parked for two hours.
def check_pending_batches(state_dir: str = "/var/lib/batch_states") -> None: """Called by cron every 15 minutes to check unfinished batches.""" for path in Path(state_dir).glob("*.json"): job = BatchJob.load(path.stem, state_dir) if job.status != "running": continue batch = client.messages.batches.retrieve(job.batch_id) if batch.processing_status == "ended": results = retrieve_batch_results(job.batch_id) save_batch_results_to_db(job.batch_id, results) job.status = "ended" job.completed_at = datetime.now().isoformat() job.save(state_dir)
This is consistent with the principles in API Rate Limits & Best Practices: instead of solving timing problems with waiting, persist state and check periodically. At indie scale, it is by far the most maintainable shape.
Guard against the 24-hour result deletion with a retrieval-succeeded flag
Batch results disappear after 24 hours. To protect against the retrieval step itself failing, I only set a success flag after results are safely written to the database — and the cron job keeps retrying retrieval until that flag is set. Since switching to this design, I haven't lost a single batch to the "I thought I retrieved it, but the process died midway" failure mode.
The June 15, 2026 billing change makes batches more valuable
With the June 15 billing change moving Agent SDK and headless executions to API-rate monthly credits, I re-sorted my automation workloads into "user-facing" and "overnight batch" buckets. The more your recurring bulk work shifts from subscription allowances to API billing, the more that 50% batch discount matters. It's a good moment to audit your own workload list. For retry policy on the failures you will inevitably hit at volume, see Claude API Error Handling and Retry Strategies.
Pre-adoption checklist
Before switching a workload over, confirm these five points and you'll avoid most rework:
Is the workload truly decoupled from user-facing responses?
Are requests independent of each other? (If chaining is needed, can you split stages into separate batches?)
Have you decided where batch IDs are persisted (file / DB)?
Do you have a partial-failure retry policy?
Do you have a result store and a retrieval-succeeded flag?
Decision Framework: Should I Use Batches API?
A practical decision tree:
Q1: Is a user waiting for the result in real time?
→ Yes → Use standard Messages API
→ No → Q2
Q2: Is the request count 10 or more?
→ No → Standard API is fine, overhead isn't worth it
→ Yes → Q3
Q3: Is completing within 24 hours sufficient?
→ Yes → Use Batches API (up to 50% savings)
→ No (need result within seconds) → Standard Messages API
Q4 (if using Batches): Does the task require complex reasoning?
→ Yes → claude-sonnet-4-6 (batch discount applies)
→ No (classification, summarization, extraction) → claude-haiku-4-5-20251001 (further savings)
Cost intuition compounds over time. A few dollars saved each month becomes a meaningful product budget over years. Whether you default every workload to the synchronous API or route background jobs to batches is one of those early design choices that quietly shapes everything after — defaulting to synchronous everywhere works, but it costs more than the job requires.
Start with one background job in your current codebase — overnight report generation, a nightly data enrichment step, anything that runs without a user waiting. Swap the standard API call for a batch job using the code above. The 24-hour window turns out to be a feature rather than a constraint: you stop thinking about latency for workloads that never needed it.
Share
Thank You for Reading
Claude Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.