You kick off an article-generation batch before bed, and in the morning only half of it has run. No exceptions in the logs, yet it stalled somewhere in the middle. As an indie developer running the several Dolice Labs sites on my own, I hit this "silent drop in unattended runs" more times than I'd like to admit. When I finally sat down to measure it, the cause collapsed into two things: the CPU getting pinned during long streaming responses, and connections quietly dying without anyone noticing, while the loop marched on regardless.
An update in June 2026 noted that Anthropic reduced CPU usage during streaming by roughly 37%. That's welcome news, but how you put that headroom to work in your own pipeline — and what is actually happening on your machine in the first place — is something you only learn by measuring. In this article we build a monitor wrapper, in runnable form, that measures the CPU, wall time, and token throughput of streaming responses, detects interruptions, and resumes safely.
Separate the three flavors of "it stalled"
"The batch stalled" sounds like one problem, but on the ground it splits into three. Pile on fixes without separating them and you just accumulate defenses that don't help.
The first is CPU pinned to a single core because the code consuming streamed events is heavy. The response is arriving fine, but the formatting work can't keep up, so overall throughput sags. The second is a stream that breaks mid-flight on the network path — the final chunk never arrives and the read loop hangs without raising anything. The third is rate limits or transient server errors, where, without a retry design, a single failure drags the whole batch down with it.
The first move is to make these visible as numbers. Before you stretch timeouts or add threads on a hunch, record how many seconds each request takes, how much CPU it burns, and how many tokens per second you actually receive.
Measure what a stream actually costs
Start with an instrument that measures CPU time, wall time, and received tokens for a single stream. We read process CPU time with psutil and wall time with time.perf_counter(). Dividing CPU time by wall time tells you roughly how many cores you were effectively burning over that span.
import time
import psutil
from dataclasses import dataclass, field
from anthropic import Anthropic
client = Anthropic() # reads ANTHROPIC_API_KEY from the environment
proc = psutil.Process()
@dataclass
class StreamStats:
wall_seconds: float = 0.0 # elapsed wall-clock seconds
cpu_seconds: float = 0.0 # CPU seconds this process consumed
output_tokens: int = 0 # output tokens received
chunks: int = 0 # number of text deltas received
text: str = field(default="")
@property
def cpu_cores(self) -> float:
# 1.0 means you occupied one core on average over the span
return self.cpu_seconds / self.wall_seconds if self.wall_seconds else 0.0
@property
def tokens_per_sec(self) -> float:
return self.output_tokens / self.wall_seconds if self.wall_seconds else 0.0
def measured_stream(model: str, messages: list, max_tokens: int = 2048) -> StreamStats:
stats = StreamStats()
cpu_before = sum(proc.cpu_times()[:2]) # user + system CPU seconds
wall_before = time.perf_counter()
with client.messages.stream(
model=model,
max_tokens=max_tokens,
messages=messages,
) as stream:
for text in stream.text_stream:
stats.text += text
stats.chunks += 1
final = stream.get_final_message()
stats.output_tokens = final.usage.output_tokens
stats.wall_seconds = time.perf_counter() - wall_before
stats.cpu_seconds = sum(proc.cpu_times()[:2]) - cpu_before
return statsThe key detail here is summing user and system CPU time from cpu_times() and then taking the difference. You can get a close value from time.process_time(), but the way it handles child threads and I/O waits drifts across environments, so I prefer the straightforward per-process reading that psutil gives.
Run the instrument and you get output like the following. If cpu_cores sits around 0.2, the receive side is light and network wait dominates. If it climbs past 0.9, that's a sign the per-delta formatting work is too heavy.
stats = measured_stream(
"claude-sonnet-4-6",
[{"role": "user", "content": "Summarize the benefits of streaming in 200 words."}],
)
print(f"wall={stats.wall_seconds:.1f}s "
f"cpu={stats.cpu_seconds:.1f}s "
f"cores={stats.cpu_cores:.2f} "
f"tok/s={stats.tokens_per_sec:.1f}")
# e.g. wall=6.3s cpu=1.4s cores=0.22 tok/s=33.5Detect interruptions and resume safely
Once you can measure, the next defense is against the "silent drop." The danger of streaming is that when a connection dies before the final message_stop event, it shows up not as an exception but as silence. The practical way I found to detect it is to cap the idle time between deltas.
The wrapper below treats a gap longer than a threshold as an interruption, aborts, and retries with exponential backoff. Because the model offers no guarantee of true mid-stream resumption, I take the "re-submit at an idempotent unit" approach: split work into grains — one article, one record — that don't double up if you redo them.
import time
import httpx
from anthropic import Anthropic, APIStatusError
client = Anthropic()
def resilient_stream(
model: str,
messages: list,
max_tokens: int = 2048,
idle_timeout: float = 30.0, # no chunk within this many seconds = interrupted
max_retries: int = 4,
) -> str:
attempt = 0
while True:
attempt += 1
last_recv = time.monotonic()
buf = []
try:
with client.messages.stream(
model=model, max_tokens=max_tokens, messages=messages,
) as stream:
for text in stream.text_stream:
now = time.monotonic()
if now - last_recv > idle_timeout:
raise TimeoutError(
f"idle {now - last_recv:.0f}s > {idle_timeout}s"
)
last_recv = now
buf.append(text)
stream.get_final_message() # confirms completion (success if we reach here)
return "".join(buf)
except (TimeoutError, httpx.HTTPError) as e:
if attempt > max_retries:
raise
backoff = min(2 ** attempt, 30)
print(f"[retry {attempt}] {type(e).__name__}: {e} -> {backoff}s")
time.sleep(backoff)
except APIStatusError as e:
# retry only 429 (rate limit) and 5xx. raise logical 4xx immediately
if e.status_code not in (429, 500, 502, 503, 529) or attempt > max_retries:
raise
backoff = min(2 ** attempt, 30)
print(f"[retry {attempt}] HTTP {e.status_code} -> {backoff}s")
time.sleep(backoff)The deliberate split here is between errors worth retrying and errors that aren't. Retrying a 400-class error, where the request itself is malformed, only burns money, so I wait and re-submit only for 429, server-side 5xx, and the 529 that signals Anthropic overload. The 30-second idle_timeout is a rule of thumb: in a normal stream, deltas arrive every few hundred milliseconds to a few seconds, so 30 seconds of silence is almost certainly an anomaly.
Run the whole batch in idempotent units
Finally, tie the instrument and the resume wrapper into something you can run safely as an overnight batch. The linchpin is persisting progress. Mark each item done as you finish it, and on restart pick up only the unfinished work. I simply use a JSON Lines checkpoint file — it avoids adding a database and you can peek at it mid-run to see where things stand.
import json
import os
from pathlib import Path
CKPT = Path("batch_progress.jsonl")
def load_done() -> set:
if not CKPT.exists():
return set()
done = set()
for line in CKPT.read_text().splitlines():
if line.strip():
done.add(json.loads(line)["id"])
return done
def run_batch(jobs: list[dict], model: str = "claude-sonnet-4-6"):
done = load_done()
pending = [j for j in jobs if j["id"] not in done]
print(f"total={len(jobs)} done={len(done)} pending={len(pending)}")
with CKPT.open("a") as ck:
for job in pending:
stats = measured_stream(model, job["messages"])
# swap in resilient_stream here if interruptions are likely
ck.write(json.dumps({
"id": job["id"],
"tokens": stats.output_tokens,
"wall": round(stats.wall_seconds, 2),
"cores": round(stats.cpu_cores, 2),
}, ensure_ascii=False) + "\n")
ck.flush()
os.fsync(ck.fileno()) # don't lose the last completion on power loss
print(f"done {job['id']} ({stats.tokens_per_sec:.0f} tok/s)")I call os.fsync() after flush() because in unattended operation a VM stop or power cut is routine. Skip it and you can lose the "done" records sitting in the OS buffer, then redo the same work after a restart. I caused a double-generation this way once, and ever since I've treated fsync as the minimum etiquette for an unattended batch.
Keeping each item's cores in the checkpoint lets you review afterward which jobs had heavy receive-side processing. Only for jobs where the value is consistently high do you need to lighten the delta formatting or split the output length — a far better cost-to-benefit ratio than reaching into everything blindly.
Measure first, then touch the code
The shortest path to stable streaming, counterintuitively, was taking numbers before adding defenses. Where the CPU pins, when the stream goes silent, how many seconds each item costs — only once these are visible can you set idle_timeout and choose which status codes to retry with actual grounding.
For your next step, drop measured_stream into tonight's batch for a single day and keep cores and tok/s in the checkpoint. Glancing at those numbers the next morning will cleanly separate whether your bottleneck lives on the receive side or the network side. I hope it gives you a foothold for that first measurement, for anyone wrestling with unattended runs the same way.