CLAUDE LABEN
CODE — Claude Codeにリモート管理者向けTrusted Devicesが追加。リモートセッション前に端末を検証できますCODE — ストリーミング時のCPU使用が約37%削減され、長時間の自動運用がより安定しますCODE — フルスクリーンのマウスクリック操作と音声入力の修正、Linuxの音声検出改善が入りましたAUTH — 静的APIキーを、短命でスコープ付きのWIF資格情報へ置き換えられるようになりましたTEAM — SlackでClaudeを直接タグ付けし、作業しながらタスクを委譲できるようになりましたWORKFLOW — dynamic workflowsがリサーチプレビューで登場。込み入った作業を自分で手順に分解しますCODE — Claude Codeにリモート管理者向けTrusted Devicesが追加。リモートセッション前に端末を検証できますCODE — ストリーミング時のCPU使用が約37%削減され、長時間の自動運用がより安定しますCODE — フルスクリーンのマウスクリック操作と音声入力の修正、Linuxの音声検出改善が入りましたAUTH — 静的APIキーを、短命でスコープ付きのWIF資格情報へ置き換えられるようになりましたTEAM — SlackでClaudeを直接タグ付けし、作業しながらタスクを委譲できるようになりましたWORKFLOW — dynamic workflowsがリサーチプレビューで登場。込み入った作業を自分で手順に分解します
記事一覧/API & SDK
API & SDK/2026-06-28上級

ストリーミング応答のCPUと取りこぼしを実測して長時間バッチを安定させる

夜間に走らせた長時間バッチが、朝には半分しか終わっていない。原因はCPU張り付きとストリームの中断でした。streamのCPU・スループットを実測し、中断から再開する監視ラッパーの実装を紹介します。

Claude API90ストリーミング9バッチ処理3監視2個人開発92

夜のうちに記事生成のバッチを仕掛けて寝て、朝に結果を見ると半分しか進んでいない。私自身、Dolice Labs の複数サイトを個人開発で回す中で、この「無人運用の取りこぼし」に何度もつまずいてきました。ログには例外も出ていないのに、なぜか途中で止まっている。腰を据えて計測してみると、原因はふたつに集約されました。長いストリーミング応答の最中にCPUが張り付くこと、そして接続が静かに切れたまま気づかずに先へ進んでいたことです。

2026年6月のアップデートで Anthropic 側がストリーミング時のCPU使用をおよそ37%削減したと案内されています。これは朗報ですが、削減されたぶんを自分の運用でどう活かすか、そもそも自分の環境で何が起きているかは、計測しなければ分かりません。以下では、ストリーミング応答のCPU・経過時間・トークンスループットを実測し、中断を検知して安全に再開する監視ラッパーを、そのまま動く形で組み立てていきます。

まず「止まる」の正体を切り分ける

「バッチが止まる」と一口に言っても、現場で起きていることは大きく3種類に分かれます。切り分けないまま対策を足すと、効かない防御だけが積み上がっていきます。

ひとつ目は、ストリーミング中にイベントを逐次処理する側が重く、CPUがコア1つに張り付くケース。応答自体は届いているのに、受け取って整形する処理が追いつかず、全体のスループットが落ちます。ふたつ目は、ネットワーク経路でストリームが途切れ、最後のチャンクが来ないまま読み取りループが宙ぶらりんになるケース。例外が飛ばず、ただ静かに待ち続けます。3つ目が、レート制限やサーバ側の一時エラーで、リトライ設計がないと1件の失敗がバッチ全体を巻き込みます。

最初にやるべきは、これらを「数字で」見えるようにすることです。憶測でタイムアウトを伸ばしたり、スレッドを増やしたりする前に、1リクエストあたり何秒かかり、CPUをどれだけ使い、毎秒何トークン受け取れているかを記録します。

ストリーミングの資源消費を実測する

まず、ストリーミング1回ぶんのCPU時間・実時間・受信トークン数を測る計測器を用意します。psutil でプロセスのCPU時間を取り、time.perf_counter() で壁時計の経過を測ります。CPU時間を実時間で割れば、その区間で実質何コアぶん回っていたかが分かります。

import time
import psutil
from dataclasses import dataclass, field
from anthropic import Anthropic
 
client = Anthropic()  # ANTHROPIC_API_KEY を環境変数から読む
proc = psutil.Process()
 
 
@dataclass
class StreamStats:
    wall_seconds: float = 0.0       # 壁時計の経過秒数
    cpu_seconds: float = 0.0        # このプロセスが消費したCPU秒数
    output_tokens: int = 0          # 受信した出力トークン数
    chunks: int = 0                 # 受信したテキストデルタの個数
    text: str = field(default="")
 
    @property
    def cpu_cores(self) -> float:
        # 1.0 なら平均してコア1つを占有していた、という意味
        return self.cpu_seconds / self.wall_seconds if self.wall_seconds else 0.0
 
    @property
    def tokens_per_sec(self) -> float:
        return self.output_tokens / self.wall_seconds if self.wall_seconds else 0.0
 
 
def measured_stream(model: str, messages: list, max_tokens: int = 2048) -> StreamStats:
    stats = StreamStats()
    cpu_before = sum(proc.cpu_times()[:2])   # user + system のCPU秒
    wall_before = time.perf_counter()
 
    with client.messages.stream(
        model=model,
        max_tokens=max_tokens,
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            stats.text += text
            stats.chunks += 1
        final = stream.get_final_message()
        stats.output_tokens = final.usage.output_tokens
 
    stats.wall_seconds = time.perf_counter() - wall_before
    stats.cpu_seconds = sum(proc.cpu_times()[:2]) - cpu_before
    return stats

このコードのポイントは、cpu_times() の user 時間と system 時間を合算してから差分を取っているところです。time.process_time() でも近い値は取れますが、子スレッドやI/O待ちの扱いが環境で揺れるため、プロセス単位で素直に取れる psutil を私は好んで使っています。

実際にこの計測器を回すと、たとえば次のような出力が得られます。cpu_cores が 0.2 前後なら受信処理は軽く、ネットワーク待ちが支配的だと判断できます。逆に 0.9 を超えていれば、テキストデルタごとの整形処理が重すぎる兆候です。

stats = measured_stream(
    "claude-sonnet-4-6",
    [{"role": "user", "content": "Summarize the benefits of streaming in 200 words."}],
)
print(f"wall={stats.wall_seconds:.1f}s "
      f"cpu={stats.cpu_seconds:.1f}s "
      f"cores={stats.cpu_cores:.2f} "
      f"tok/s={stats.tokens_per_sec:.1f}")
# 例: wall=6.3s cpu=1.4s cores=0.22 tok/s=33.5

中断を検知して安全に再開する

計測ができたら、次は「静かな中断」への防御です。ストリーミングの怖さは、最後の message_stop イベントが来ないまま接続が途切れたとき、例外ではなく沈黙として現れることです。これを検知するには、デルタ間の無音時間に上限を設けるのが実用的でした。

下のラッパーは、チャンクが一定秒数届かなければ中断とみなして打ち切り、指数バックオフで再試行します。完全な再開(途中から続きを生成させる)はモデル側の保証がないため、私は「冪等な単位で投げ直す」方針を採っています。記事1本、レコード1件のように、やり直しても二重にならない粒度に仕事を割っておくのが要点です。

import time
import httpx
from anthropic import Anthropic, APIStatusError
 
client = Anthropic()
 
 
def resilient_stream(
    model: str,
    messages: list,
    max_tokens: int = 2048,
    idle_timeout: float = 30.0,   # この秒数チャンクが来なければ中断扱い
    max_retries: int = 4,
) -> str:
    attempt = 0
    while True:
        attempt += 1
        last_recv = time.monotonic()
        buf = []
        try:
            with client.messages.stream(
                model=model, max_tokens=max_tokens, messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    now = time.monotonic()
                    if now - last_recv > idle_timeout:
                        raise TimeoutError(
                            f"idle {now - last_recv:.0f}s > {idle_timeout}s"
                        )
                    last_recv = now
                    buf.append(text)
                stream.get_final_message()   # 完了確定(ここまで来れば成功)
            return "".join(buf)
 
        except (TimeoutError, httpx.HTTPError) as e:
            if attempt > max_retries:
                raise
            backoff = min(2 ** attempt, 30)
            print(f"[retry {attempt}] {type(e).__name__}: {e} -> {backoff}s")
            time.sleep(backoff)
 
        except APIStatusError as e:
            # 429(レート制限)と 5xx だけ再試行。4xx の論理エラーは即座に投げる
            if e.status_code not in (429, 500, 502, 503, 529) or attempt > max_retries:
                raise
            backoff = min(2 ** attempt, 30)
            print(f"[retry {attempt}] HTTP {e.status_code} -> {backoff}s")
            time.sleep(backoff)

ここで意図的に分けているのが、再試行してよいエラーとそうでないエラーです。リクエスト内容そのものが誤っている 400 系を再試行しても無駄に課金されるだけなので、429 とサーバ側 5xx、そして Anthropic の過負荷を示す 529 に限って待ってから投げ直します。idle_timeout を 30 秒にしているのは経験則で、通常のストリームならデルタは数百ミリ秒〜数秒間隔で届くため、30 秒の沈黙はほぼ確実に異常だと判断できます。

バッチ全体を冪等な単位で回す

最後に、計測器と再開ラッパーを束ねて、夜間バッチとして安全に流せる形にします。鍵になるのは進捗の永続化です。1件終えるごとに完了マークを残し、再起動したら未完のぶんだけ拾い直します。私は単純に JSON Lines のチェックポイントファイルを使っています。データベースを増やさずに済み、途中で覗いて状況を確認できるからです。

import json
import os
from pathlib import Path
 
CKPT = Path("batch_progress.jsonl")
 
 
def load_done() -> set:
    if not CKPT.exists():
        return set()
    done = set()
    for line in CKPT.read_text().splitlines():
        if line.strip():
            done.add(json.loads(line)["id"])
    return done
 
 
def run_batch(jobs: list[dict], model: str = "claude-sonnet-4-6"):
    done = load_done()
    pending = [j for j in jobs if j["id"] not in done]
    print(f"total={len(jobs)} done={len(done)} pending={len(pending)}")
 
    with CKPT.open("a") as ck:
        for job in pending:
            stats = measured_stream(model, job["messages"])
            # 中断が起きうるならここを resilient_stream に差し替える
            ck.write(json.dumps({
                "id": job["id"],
                "tokens": stats.output_tokens,
                "wall": round(stats.wall_seconds, 2),
                "cores": round(stats.cpu_cores, 2),
            }, ensure_ascii=False) + "\n")
            ck.flush()
            os.fsync(ck.fileno())   # 電源断でも直前の完了を失わない
            print(f"done {job['id']} ({stats.tokens_per_sec:.0f} tok/s)")

flush() のあとに os.fsync() まで呼んでいるのは、無人運用では VM の停止や電源断が普通に起きるからです。ここを省くと、OSのバッファに溜まった「完了済み」の記録ごと飛んで、再起動後に同じ仕事をもう一度やり直す羽目になります。実際にこれで二重生成を起こしたことがあり、以来 fsync は無人バッチの最低限の作法だと考えています。

各処理の cores をチェックポイントに残しておくと、後から「どのジョブで受信処理が重かったか」を一覧で振り返れます。値が恒常的に高いジョブだけ、テキストデルタの整形を軽くする、あるいは出力長を分割する、といった的を絞った改善ができます。あてずっぽうに全体へ手を入れるより、ずっと費用対効果が高い進め方です。

計測してから手を入れる

ストリーミングの安定運用で遠回りに見えて一番の近道は、防御を足す前に数字を取ることでした。CPUがどこで張り付くのか、ストリームがいつ沈黙するのか、1件あたり何秒かかるのか。これらが見えてはじめて、idle_timeout の値も、リトライ対象のステータスコードも、根拠を持って決められます。

次の一歩としては、今夜のバッチに measured_stream を1日だけ仕込んで、corestok/s をチェックポイントに残してみてください。翌朝その数字を眺めるだけで、自分の環境のボトルネックが受信処理側にあるのか、ネットワーク側にあるのかが、はっきり分かれて見えるはずです。同じように無人運用と向き合っている方の、最初の計測の足がかりになれば嬉しいです。

シェア

お読みいただきありがとうございます

Claude Lab は広告なしで運営しており、サーバー費用などの運営コストはメンバーシップのご支援で賄っています。実装コード・ベンチマーク・本番設計パターンなど、実務でお役立ていただける記事を毎日更新しています。もし読んでよかったと感じていただけましたら、ぜひご覧ください。

  • コピー&ペーストで使える実装コード付き
  • 毎日新しい上級ガイドを追加
  • ¥580/月 または ¥1,480 の永久アクセス
メンバーシップを見る →

もしこの記事がお役に立ちましたら、チップ(¥150)で応援いただけると大変励みになります。広告なしでの運営を続けるため、皆さまのご支援が大きな力になっています。

関連記事

API & SDK2026-06-01
Crashlyticsのクラッシュを根本原因ごとに束ねる — Claude APIで作るトリアージの設計メモ
Crashlyticsの「Issue」は同じ原因を別物として散らしてしまうことがあります。累計5,000万DLのアプリ運用で溜まったクラッシュを、Claude APIで根本原因ごとに束ね直して優先順位をつける設計を、動くコードと実測値とともに共有します。
API & SDK2026-05-14
Claude API でアプリ内 AI チャットを実装して踏んだ6つの罠 — 本番投入までの設計記録
Claude APIをアプリ内チャットに組み込んで本番で直面した6つの設計ミスを、個人開発での実運用経験から解説。コンテキスト設計、ストリーミング、セッション管理、コスト監視の実装パターンを動くコード付きで紹介します。
API & SDK2026-05-12
Haiku 4.5・ストリーミング・プロンプトキャッシングを組み合わせて個人開発アプリのAPIコストを抑えた記録
Claude Haiku 4.5、ストリーミング、プロンプトキャッシングの3つを組み合わせることで、個人開発アプリのAI機能のコストと応答速度を同時に改善した実装パターンを記録します。
📚RECOMMENDED BOOKS
大規模言語モデル入門
山田育矢
LLM開発
生成AIプロンプトエンジニアリング入門
我妻幸長
プロンプト
Claude CodeによるAI駆動開発入門
平川知秀
AI駆動開発
※ アフィリエイトリンクを含みます
もっと見る →