夜のうちに記事生成のバッチを仕掛けて寝て、朝に結果を見ると半分しか進んでいない。私自身、Dolice Labs の複数サイトを個人開発で回す中で、この「無人運用の取りこぼし」に何度もつまずいてきました。ログには例外も出ていないのに、なぜか途中で止まっている。腰を据えて計測してみると、原因はふたつに集約されました。長いストリーミング応答の最中にCPUが張り付くこと、そして接続が静かに切れたまま気づかずに先へ進んでいたことです。
2026年6月のアップデートで Anthropic 側がストリーミング時のCPU使用をおよそ37%削減したと案内されています。これは朗報ですが、削減されたぶんを自分の運用でどう活かすか、そもそも自分の環境で何が起きているかは、計測しなければ分かりません。以下では、ストリーミング応答のCPU・経過時間・トークンスループットを実測し、中断を検知して安全に再開する監視ラッパーを、そのまま動く形で組み立てていきます。
まず「止まる」の正体を切り分ける
「バッチが止まる」と一口に言っても、現場で起きていることは大きく3種類に分かれます。切り分けないまま対策を足すと、効かない防御だけが積み上がっていきます。
ひとつ目は、ストリーミング中にイベントを逐次処理する側が重く、CPUがコア1つに張り付くケース。応答自体は届いているのに、受け取って整形する処理が追いつかず、全体のスループットが落ちます。ふたつ目は、ネットワーク経路でストリームが途切れ、最後のチャンクが来ないまま読み取りループが宙ぶらりんになるケース。例外が飛ばず、ただ静かに待ち続けます。3つ目が、レート制限やサーバ側の一時エラーで、リトライ設計がないと1件の失敗がバッチ全体を巻き込みます。
最初にやるべきは、これらを「数字で」見えるようにすることです。憶測でタイムアウトを伸ばしたり、スレッドを増やしたりする前に、1リクエストあたり何秒かかり、CPUをどれだけ使い、毎秒何トークン受け取れているかを記録します。
ストリーミングの資源消費を実測する
まず、ストリーミング1回ぶんのCPU時間・実時間・受信トークン数を測る計測器を用意します。psutil でプロセスのCPU時間を取り、time.perf_counter() で壁時計の経過を測ります。CPU時間を実時間で割れば、その区間で実質何コアぶん回っていたかが分かります。
import time
import psutil
from dataclasses import dataclass, field
from anthropic import Anthropic
client = Anthropic() # ANTHROPIC_API_KEY を環境変数から読む
proc = psutil.Process()
@dataclass
class StreamStats:
wall_seconds: float = 0.0 # 壁時計の経過秒数
cpu_seconds: float = 0.0 # このプロセスが消費したCPU秒数
output_tokens: int = 0 # 受信した出力トークン数
chunks: int = 0 # 受信したテキストデルタの個数
text: str = field(default="")
@property
def cpu_cores(self) -> float:
# 1.0 なら平均してコア1つを占有していた、という意味
return self.cpu_seconds / self.wall_seconds if self.wall_seconds else 0.0
@property
def tokens_per_sec(self) -> float:
return self.output_tokens / self.wall_seconds if self.wall_seconds else 0.0
def measured_stream(model: str, messages: list, max_tokens: int = 2048) -> StreamStats:
stats = StreamStats()
cpu_before = sum(proc.cpu_times()[:2]) # user + system のCPU秒
wall_before = time.perf_counter()
with client.messages.stream(
model=model,
max_tokens=max_tokens,
messages=messages,
) as stream:
for text in stream.text_stream:
stats.text += text
stats.chunks += 1
final = stream.get_final_message()
stats.output_tokens = final.usage.output_tokens
stats.wall_seconds = time.perf_counter() - wall_before
stats.cpu_seconds = sum(proc.cpu_times()[:2]) - cpu_before
return statsこのコードのポイントは、cpu_times() の user 時間と system 時間を合算してから差分を取っているところです。time.process_time() でも近い値は取れますが、子スレッドやI/O待ちの扱いが環境で揺れるため、プロセス単位で素直に取れる psutil を私は好んで使っています。
実際にこの計測器を回すと、たとえば次のような出力が得られます。cpu_cores が 0.2 前後なら受信処理は軽く、ネットワーク待ちが支配的だと判断できます。逆に 0.9 を超えていれば、テキストデルタごとの整形処理が重すぎる兆候です。
stats = measured_stream(
"claude-sonnet-4-6",
[{"role": "user", "content": "Summarize the benefits of streaming in 200 words."}],
)
print(f"wall={stats.wall_seconds:.1f}s "
f"cpu={stats.cpu_seconds:.1f}s "
f"cores={stats.cpu_cores:.2f} "
f"tok/s={stats.tokens_per_sec:.1f}")
# 例: wall=6.3s cpu=1.4s cores=0.22 tok/s=33.5中断を検知して安全に再開する
計測ができたら、次は「静かな中断」への防御です。ストリーミングの怖さは、最後の message_stop イベントが来ないまま接続が途切れたとき、例外ではなく沈黙として現れることです。これを検知するには、デルタ間の無音時間に上限を設けるのが実用的でした。
下のラッパーは、チャンクが一定秒数届かなければ中断とみなして打ち切り、指数バックオフで再試行します。完全な再開(途中から続きを生成させる)はモデル側の保証がないため、私は「冪等な単位で投げ直す」方針を採っています。記事1本、レコード1件のように、やり直しても二重にならない粒度に仕事を割っておくのが要点です。
import time
import httpx
from anthropic import Anthropic, APIStatusError
client = Anthropic()
def resilient_stream(
model: str,
messages: list,
max_tokens: int = 2048,
idle_timeout: float = 30.0, # この秒数チャンクが来なければ中断扱い
max_retries: int = 4,
) -> str:
attempt = 0
while True:
attempt += 1
last_recv = time.monotonic()
buf = []
try:
with client.messages.stream(
model=model, max_tokens=max_tokens, messages=messages,
) as stream:
for text in stream.text_stream:
now = time.monotonic()
if now - last_recv > idle_timeout:
raise TimeoutError(
f"idle {now - last_recv:.0f}s > {idle_timeout}s"
)
last_recv = now
buf.append(text)
stream.get_final_message() # 完了確定(ここまで来れば成功)
return "".join(buf)
except (TimeoutError, httpx.HTTPError) as e:
if attempt > max_retries:
raise
backoff = min(2 ** attempt, 30)
print(f"[retry {attempt}] {type(e).__name__}: {e} -> {backoff}s")
time.sleep(backoff)
except APIStatusError as e:
# 429(レート制限)と 5xx だけ再試行。4xx の論理エラーは即座に投げる
if e.status_code not in (429, 500, 502, 503, 529) or attempt > max_retries:
raise
backoff = min(2 ** attempt, 30)
print(f"[retry {attempt}] HTTP {e.status_code} -> {backoff}s")
time.sleep(backoff)ここで意図的に分けているのが、再試行してよいエラーとそうでないエラーです。リクエスト内容そのものが誤っている 400 系を再試行しても無駄に課金されるだけなので、429 とサーバ側 5xx、そして Anthropic の過負荷を示す 529 に限って待ってから投げ直します。idle_timeout を 30 秒にしているのは経験則で、通常のストリームならデルタは数百ミリ秒〜数秒間隔で届くため、30 秒の沈黙はほぼ確実に異常だと判断できます。
バッチ全体を冪等な単位で回す
最後に、計測器と再開ラッパーを束ねて、夜間バッチとして安全に流せる形にします。鍵になるのは進捗の永続化です。1件終えるごとに完了マークを残し、再起動したら未完のぶんだけ拾い直します。私は単純に JSON Lines のチェックポイントファイルを使っています。データベースを増やさずに済み、途中で覗いて状況を確認できるからです。
import json
import os
from pathlib import Path
CKPT = Path("batch_progress.jsonl")
def load_done() -> set:
if not CKPT.exists():
return set()
done = set()
for line in CKPT.read_text().splitlines():
if line.strip():
done.add(json.loads(line)["id"])
return done
def run_batch(jobs: list[dict], model: str = "claude-sonnet-4-6"):
done = load_done()
pending = [j for j in jobs if j["id"] not in done]
print(f"total={len(jobs)} done={len(done)} pending={len(pending)}")
with CKPT.open("a") as ck:
for job in pending:
stats = measured_stream(model, job["messages"])
# 中断が起きうるならここを resilient_stream に差し替える
ck.write(json.dumps({
"id": job["id"],
"tokens": stats.output_tokens,
"wall": round(stats.wall_seconds, 2),
"cores": round(stats.cpu_cores, 2),
}, ensure_ascii=False) + "\n")
ck.flush()
os.fsync(ck.fileno()) # 電源断でも直前の完了を失わない
print(f"done {job['id']} ({stats.tokens_per_sec:.0f} tok/s)")flush() のあとに os.fsync() まで呼んでいるのは、無人運用では VM の停止や電源断が普通に起きるからです。ここを省くと、OSのバッファに溜まった「完了済み」の記録ごと飛んで、再起動後に同じ仕事をもう一度やり直す羽目になります。実際にこれで二重生成を起こしたことがあり、以来 fsync は無人バッチの最低限の作法だと考えています。
各処理の cores をチェックポイントに残しておくと、後から「どのジョブで受信処理が重かったか」を一覧で振り返れます。値が恒常的に高いジョブだけ、テキストデルタの整形を軽くする、あるいは出力長を分割する、といった的を絞った改善ができます。あてずっぽうに全体へ手を入れるより、ずっと費用対効果が高い進め方です。
計測してから手を入れる
ストリーミングの安定運用で遠回りに見えて一番の近道は、防御を足す前に数字を取ることでした。CPUがどこで張り付くのか、ストリームがいつ沈黙するのか、1件あたり何秒かかるのか。これらが見えてはじめて、idle_timeout の値も、リトライ対象のステータスコードも、根拠を持って決められます。
次の一歩としては、今夜のバッチに measured_stream を1日だけ仕込んで、cores と tok/s をチェックポイントに残してみてください。翌朝その数字を眺めるだけで、自分の環境のボトルネックが受信処理側にあるのか、ネットワーク側にあるのかが、はっきり分かれて見えるはずです。同じように無人運用と向き合っている方の、最初の計測の足がかりになれば嬉しいです。