CLAUDE LABJP
CODE — Claude Code adds Trusted Devices, verifying a machine before remote admin sessions beginCODE — CPU use drops about 37% during streaming, keeping long always-on automation steadierCODE — Fullscreen mouse-click controls, voice dictation fixes, and better Linux voice detection landAUTH — Static API keys can now be replaced with short-lived, scoped WIF credentialsTEAM — You can tag Claude directly in Slack and delegate tasks while you focus elsewhereWORKFLOW — Dynamic workflows arrive in research preview, breaking complex work into steps on their ownCODE — Claude Code adds Trusted Devices, verifying a machine before remote admin sessions beginCODE — CPU use drops about 37% during streaming, keeping long always-on automation steadierCODE — Fullscreen mouse-click controls, voice dictation fixes, and better Linux voice detection landAUTH — Static API keys can now be replaced with short-lived, scoped WIF credentialsTEAM — You can tag Claude directly in Slack and delegate tasks while you focus elsewhereWORKFLOW — Dynamic workflows arrive in research preview, breaking complex work into steps on their own
Articles/API & SDK
API & SDK/2026-06-28Advanced

Measure Streaming CPU and Dropped Chunks to Stabilize Long Batch Jobs

You start an overnight batch, and by morning only half of it finished. The culprits were CPU pinned during streaming and a quiet connection drop. Here is a monitor wrapper that measures stream CPU and throughput, and resumes from interruptions.

Claude API90streaming20batch processing3monitoring9indie developer16

You kick off an article-generation batch before bed, and in the morning only half of it has run. No exceptions in the logs, yet it stalled somewhere in the middle. As an indie developer running the several Dolice Labs sites on my own, I hit this "silent drop in unattended runs" more times than I'd like to admit. When I finally sat down to measure it, the cause collapsed into two things: the CPU getting pinned during long streaming responses, and connections quietly dying without anyone noticing, while the loop marched on regardless.

An update in June 2026 noted that Anthropic reduced CPU usage during streaming by roughly 37%. That's welcome news, but how you put that headroom to work in your own pipeline — and what is actually happening on your machine in the first place — is something you only learn by measuring. In this article we build a monitor wrapper, in runnable form, that measures the CPU, wall time, and token throughput of streaming responses, detects interruptions, and resumes safely.

Separate the three flavors of "it stalled"

"The batch stalled" sounds like one problem, but on the ground it splits into three. Pile on fixes without separating them and you just accumulate defenses that don't help.

The first is CPU pinned to a single core because the code consuming streamed events is heavy. The response is arriving fine, but the formatting work can't keep up, so overall throughput sags. The second is a stream that breaks mid-flight on the network path — the final chunk never arrives and the read loop hangs without raising anything. The third is rate limits or transient server errors, where, without a retry design, a single failure drags the whole batch down with it.

The first move is to make these visible as numbers. Before you stretch timeouts or add threads on a hunch, record how many seconds each request takes, how much CPU it burns, and how many tokens per second you actually receive.

Measure what a stream actually costs

Start with an instrument that measures CPU time, wall time, and received tokens for a single stream. We read process CPU time with psutil and wall time with time.perf_counter(). Dividing CPU time by wall time tells you roughly how many cores you were effectively burning over that span.

import time
import psutil
from dataclasses import dataclass, field
from anthropic import Anthropic
 
client = Anthropic()  # reads ANTHROPIC_API_KEY from the environment
proc = psutil.Process()
 
 
@dataclass
class StreamStats:
    wall_seconds: float = 0.0       # elapsed wall-clock seconds
    cpu_seconds: float = 0.0        # CPU seconds this process consumed
    output_tokens: int = 0          # output tokens received
    chunks: int = 0                 # number of text deltas received
    text: str = field(default="")
 
    @property
    def cpu_cores(self) -> float:
        # 1.0 means you occupied one core on average over the span
        return self.cpu_seconds / self.wall_seconds if self.wall_seconds else 0.0
 
    @property
    def tokens_per_sec(self) -> float:
        return self.output_tokens / self.wall_seconds if self.wall_seconds else 0.0
 
 
def measured_stream(model: str, messages: list, max_tokens: int = 2048) -> StreamStats:
    stats = StreamStats()
    cpu_before = sum(proc.cpu_times()[:2])   # user + system CPU seconds
    wall_before = time.perf_counter()
 
    with client.messages.stream(
        model=model,
        max_tokens=max_tokens,
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            stats.text += text
            stats.chunks += 1
        final = stream.get_final_message()
        stats.output_tokens = final.usage.output_tokens
 
    stats.wall_seconds = time.perf_counter() - wall_before
    stats.cpu_seconds = sum(proc.cpu_times()[:2]) - cpu_before
    return stats

The key detail here is summing user and system CPU time from cpu_times() and then taking the difference. You can get a close value from time.process_time(), but the way it handles child threads and I/O waits drifts across environments, so I prefer the straightforward per-process reading that psutil gives.

Run the instrument and you get output like the following. If cpu_cores sits around 0.2, the receive side is light and network wait dominates. If it climbs past 0.9, that's a sign the per-delta formatting work is too heavy.

stats = measured_stream(
    "claude-sonnet-4-6",
    [{"role": "user", "content": "Summarize the benefits of streaming in 200 words."}],
)
print(f"wall={stats.wall_seconds:.1f}s "
      f"cpu={stats.cpu_seconds:.1f}s "
      f"cores={stats.cpu_cores:.2f} "
      f"tok/s={stats.tokens_per_sec:.1f}")
# e.g. wall=6.3s cpu=1.4s cores=0.22 tok/s=33.5

Detect interruptions and resume safely

Once you can measure, the next defense is against the "silent drop." The danger of streaming is that when a connection dies before the final message_stop event, it shows up not as an exception but as silence. The practical way I found to detect it is to cap the idle time between deltas.

The wrapper below treats a gap longer than a threshold as an interruption, aborts, and retries with exponential backoff. Because the model offers no guarantee of true mid-stream resumption, I take the "re-submit at an idempotent unit" approach: split work into grains — one article, one record — that don't double up if you redo them.

import time
import httpx
from anthropic import Anthropic, APIStatusError
 
client = Anthropic()
 
 
def resilient_stream(
    model: str,
    messages: list,
    max_tokens: int = 2048,
    idle_timeout: float = 30.0,   # no chunk within this many seconds = interrupted
    max_retries: int = 4,
) -> str:
    attempt = 0
    while True:
        attempt += 1
        last_recv = time.monotonic()
        buf = []
        try:
            with client.messages.stream(
                model=model, max_tokens=max_tokens, messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    now = time.monotonic()
                    if now - last_recv > idle_timeout:
                        raise TimeoutError(
                            f"idle {now - last_recv:.0f}s > {idle_timeout}s"
                        )
                    last_recv = now
                    buf.append(text)
                stream.get_final_message()   # confirms completion (success if we reach here)
            return "".join(buf)
 
        except (TimeoutError, httpx.HTTPError) as e:
            if attempt > max_retries:
                raise
            backoff = min(2 ** attempt, 30)
            print(f"[retry {attempt}] {type(e).__name__}: {e} -> {backoff}s")
            time.sleep(backoff)
 
        except APIStatusError as e:
            # retry only 429 (rate limit) and 5xx. raise logical 4xx immediately
            if e.status_code not in (429, 500, 502, 503, 529) or attempt > max_retries:
                raise
            backoff = min(2 ** attempt, 30)
            print(f"[retry {attempt}] HTTP {e.status_code} -> {backoff}s")
            time.sleep(backoff)

The deliberate split here is between errors worth retrying and errors that aren't. Retrying a 400-class error, where the request itself is malformed, only burns money, so I wait and re-submit only for 429, server-side 5xx, and the 529 that signals Anthropic overload. The 30-second idle_timeout is a rule of thumb: in a normal stream, deltas arrive every few hundred milliseconds to a few seconds, so 30 seconds of silence is almost certainly an anomaly.

Run the whole batch in idempotent units

Finally, tie the instrument and the resume wrapper into something you can run safely as an overnight batch. The linchpin is persisting progress. Mark each item done as you finish it, and on restart pick up only the unfinished work. I simply use a JSON Lines checkpoint file — it avoids adding a database and you can peek at it mid-run to see where things stand.

import json
import os
from pathlib import Path
 
CKPT = Path("batch_progress.jsonl")
 
 
def load_done() -> set:
    if not CKPT.exists():
        return set()
    done = set()
    for line in CKPT.read_text().splitlines():
        if line.strip():
            done.add(json.loads(line)["id"])
    return done
 
 
def run_batch(jobs: list[dict], model: str = "claude-sonnet-4-6"):
    done = load_done()
    pending = [j for j in jobs if j["id"] not in done]
    print(f"total={len(jobs)} done={len(done)} pending={len(pending)}")
 
    with CKPT.open("a") as ck:
        for job in pending:
            stats = measured_stream(model, job["messages"])
            # swap in resilient_stream here if interruptions are likely
            ck.write(json.dumps({
                "id": job["id"],
                "tokens": stats.output_tokens,
                "wall": round(stats.wall_seconds, 2),
                "cores": round(stats.cpu_cores, 2),
            }, ensure_ascii=False) + "\n")
            ck.flush()
            os.fsync(ck.fileno())   # don't lose the last completion on power loss
            print(f"done {job['id']} ({stats.tokens_per_sec:.0f} tok/s)")

I call os.fsync() after flush() because in unattended operation a VM stop or power cut is routine. Skip it and you can lose the "done" records sitting in the OS buffer, then redo the same work after a restart. I caused a double-generation this way once, and ever since I've treated fsync as the minimum etiquette for an unattended batch.

Keeping each item's cores in the checkpoint lets you review afterward which jobs had heavy receive-side processing. Only for jobs where the value is consistently high do you need to lighten the delta formatting or split the output length — a far better cost-to-benefit ratio than reaching into everything blindly.

Measure first, then touch the code

The shortest path to stable streaming, counterintuitively, was taking numbers before adding defenses. Where the CPU pins, when the stream goes silent, how many seconds each item costs — only once these are visible can you set idle_timeout and choose which status codes to retry with actual grounding.

For your next step, drop measured_stream into tonight's batch for a single day and keep cores and tok/s in the checkpoint. Glancing at those numbers the next morning will cleanly separate whether your bottleneck lives on the receive side or the network side. I hope it gives you a foothold for that first measurement, for anyone wrestling with unattended runs the same way.

Share

Thank You for Reading

Claude Lab is ad-free, supported entirely by members like you. We publish practical guides daily with implementation code, benchmarks, and production-ready patterns. If you've found it useful, we'd love to have you on board.

  • Copy-paste ready implementation code
  • New advanced guides published daily
  • $5/mo or $10 for lifetime access
View Membership →

If you found this article helpful, a small tip ($1.50) would mean a lot to us. Your support helps keep this site ad-free and covers server and hosting costs.

Related Articles

API & SDK2026-06-25
Reach a Remote MCP Server in a Single API Request: Implementing the Messages API MCP Connector
How to call a remote MCP server's tools using only the Messages API's mcp_servers and mcp_toolset—no local MCP client. Covers allowlist/denylist design, response handling, and the pitfalls to avoid before unattended production use.
API & SDK2026-06-23
When Thinking Is Always On, Prefill Quietly Stops Working — Fixing Streaming and Token Budgets for Fable 5
Fable 5 thinks by default. Prefill no longer applies, the first streamed block isn't text, and max_tokens has to leave room for reasoning. Here is how I fixed those three broken assumptions in my own automated publishing pipeline.
API & SDK2026-06-22
Claude API Streaming Breaks the "Everything Arrives" Assumption — Field Notes on Recovering from Partial Failure
Once concurrency climbs, Claude API streams disconnect mid-response, replay events, and emit half-finished tool arguments. Treating partial failure as the norm rather than an anomaly, here is how I rebuilt the implementation and monitoring to recover quietly.
📚RECOMMENDED BOOKS
Build a Large Language Model (From Scratch)
Sebastian Raschka
LLM Dev
Prompt Engineering for LLMs
Berryman & Ziegler
Prompting
AI Engineering
Chip Huyen
AI Eng
* Contains affiliate links
See all →