ストリーミングのトラブルで厄介なのは、例外が飛んでくれる切断ではなく、何のエラーも出さないまま、ただデルタが届かなくなる停止 です。try/except には何もかからない。ログにもスタックトレースは残らない。プロセスは生きていて、ソケットも開いている。それでも本文は途中で終わっている——この「沈黙停止」を、私自身しばらく見逃していました。
個人開発で複数のサイトの記事生成を無人で回していたある時期、生成物のうち数本がいつも末尾の数段落だけ欠けて保存されていました。エラー通知は一度も鳴っていません。原因にたどり着くまで時間がかかったのは、stream.text_stream を最後まで for で回しているのに途中で抜けていた、という挙動が、コードの見た目からはまったく異常に見えなかったからです。本稿は、その沈黙停止を検知して途中から続けるための、実運用で落ち着いた組み立てを残しておくものです。一般的なタイムアウト延長やリトライの話は前提として、その先で必要になった部分に絞ります。
なぜ ReadTimeout は沈黙停止で発火しないのか
多くの解説は「read タイムアウトを延ばせ」で止まります。ところが沈黙停止では、そのタイムアウトがしばしば発火しません。理由は、SDK の read タイムアウトが測っているのがソケットからのバイト到着間隔 だからです。
Server-Sent Events の経路には、リバースプロキシやロードバランサーがコメント行(: ping のような無害な行)を一定間隔で流していることがあります。あるいは Anthropic 側の ping イベントが届き続けることもあります。すると、本文のデルタは一つも来ていないのに、ソケットには定期的に何かが届いている という状態が生まれます。socket は「生きている」ので read タイムアウトはリセットされ続け、永遠に発火しません。接続は健康、けれど中身は死んでいる。これが沈黙停止の正体です。
つまり監視すべき粒度が違います。守るべきは「バイトが来ているか」ではなく「意味のあるコンテンツ(text_delta)が進んでいるか 」です。ここを測るのが、以下のウォッチドッグです。
トークン間隔ウォッチドッグ
考え方は単純で、最後に text_delta を受け取った時刻を記録し、その間隔がしきい値を超えたら自分から打ち切る、というものです。SDK 任せのソケットタイムアウトとは別の、コンテンツ層の番人を一つ足すイメージです。
import time
import threading
import anthropic
class StreamStalled ( Exception ):
"""意味のあるデルタが一定時間届かなかった(沈黙停止)"""
def stream_with_watchdog (
client: anthropic.Anthropic,
messages: list ,
model: str = "claude-sonnet-4-6" ,
max_tokens: int = 8192 ,
stall_seconds: float = 25.0 , # text_delta の途切れ許容時間
):
"""
text_delta の到着間隔を監視し、stall_seconds を超えたら StreamStalled を投げる。
受信済みテキストは buf に貯め、停止時でも呼び出し側が引き継げるようにする。
"""
buf: list[ str ] = []
last_delta = { "t" : time.monotonic()}
stop = threading.Event()
def watchdog ():
while not stop.wait( 1.0 ):
gap = time.monotonic() - last_delta[ "t" ]
if gap > stall_seconds:
stop.set()
# ストリームの基盤接続を閉じて for ループを抜けさせる
raise_close()
# close 用のフックは stream オブジェクト確定後に差し込む
closer = { "fn" : lambda : None }
def raise_close ():
closer[ "fn" ]()
wd = threading.Thread( target = watchdog, daemon = True )
with client.messages.stream(
model = model, max_tokens = max_tokens, messages = messages,
) as stream:
closer[ "fn" ] = stream.close # ウォッチドッグから接続を閉じられるように
wd.start()
try :
for text in stream.text_stream:
last_delta[ "t" ] = time.monotonic()
buf.append(text)
yield text
finally :
stop.set()
received = "" .join(buf)
if last_delta_gap_exceeded(last_delta, stall_seconds) and stream_incomplete(stream):
raise StreamStalled(received)
実装上のキモは二つあります。一つは、ウォッチドッグから stream.close() を呼べるようにしておくこと。for ループはブロッキングなので、外側から接続を閉じない限り、沈黙したまま待ち続けてしまいます。もう一つは、停止時点までの received を必ず手元に残す ことです。これがなければ「途中から続ける」が成立しません。
stall_seconds は経路によって調整します。私の環境では、初トークンまで(思考が長いモデルでは特に)20 秒前後かかることがあるため、初トークン専用にもう少し長い猶予(後述の first_token_seconds)を別に持たせ、いったん本文が流れ始めてからは 25 秒を上限にしています。本文が動き出した後に 25 秒も無言なら、それは経路側の停止とみなして打ち切ったほうが、待ち続けるより回復が速いという判断です。
受信済みテキストを引き継いで「途中から」続ける
沈黙停止を検知できたら、次は再開です。ここで素朴にリクエストをそのまま投げ直すと、また最初から全文を生成し直す ことになります。長い記事や長いレポートでは、これがトークンと時間の二重の無駄になります。
代わりに使うのが、受信済みのテキストを assistant ロールの最後のメッセージとして渡す プレフィル(prefill)です。Claude は直前の assistant の発話を引き継いで続きを書くため、止まった地点のおおよそ続きから再開できます。
def resume_stream (client, base_messages, received_text, ** kw):
"""
received_text(停止時点までの本文)を assistant プレフィルとして渡し、続きを生成する。
"""
messages = base_messages + [
{ "role" : "assistant" , "content" : received_text},
]
# continued は received_text の「続き」だけが返る
for text in stream_with_watchdog(client, messages, ** kw):
yield text
def robust_generate (client, base_messages, max_resumes = 3 , ** kw):
full = ""
messages = base_messages
for attempt in range (max_resumes + 1 ):
try :
chunks = (stream_with_watchdog if attempt == 0 else resume_stream)
args = (client, base_messages, full, * ([] )) if attempt else (client, base_messages,)
gen = resume_stream(client, base_messages, full, ** kw) if attempt else stream_with_watchdog(client, base_messages, ** kw)
for text in gen:
full += text
return full # 正常完了
except StreamStalled as e:
full += e.args[ 0 ] if e.args and isinstance (e.args[ 0 ], str ) else ""
# 次のループでプレフィル再開
continue
return full
注意点が一つあります。プレフィル再開は「シームレスな結合」を保証しません。停止地点が単語や記号の途中だった場合、再開分の先頭で表現が微妙に重なったり、逆に文がつながらなかったりします。実運用では、結合部の重なりを刈り取る 後処理を入れておくと安定します。
def stitch (prev: str , cont: str , max_overlap: int = 80 ) -> str :
"""prev の末尾と cont の先頭の重複を検出して除去し、滑らかに結合する"""
window = prev[ - max_overlap:]
for size in range ( min ( len (window), len (cont)), 0 , - 1 ):
if window.endswith(cont[:size]):
return prev + cont[size:]
return prev + cont
完璧な接合ではありませんが、「末尾の数段落が欠けたまま保存される」よりはるかにましな結果になります。私の場合、欠落そのものがなくなったことのほうが、わずかな結合の粗さよりずっと価値がありました。プレフィル再開後の本文は、保存前に一度だけ通読チェック(後述の検証ステップ)に通すようにしています。
タイムアウトを「予算」として4層に分けて設計する
沈黙停止に一度向き合うと、タイムアウトは単一の数字ではなく、目的の違う複数の層であることが見えてきます。私は次の4層に分けて管理しています。
層 監視対象 目安 超過時の意味
接続 TCP/TLS 確立まで 10 秒 到達性・DNS・プロキシの問題
初トークン 最初の text_delta まで 30〜45 秒 モデルの思考が長い/キューイング
トークン間 delta と delta の間隔 20〜25 秒 沈黙停止(経路側の停止が濃厚)
全体 1リクエストの総時間 用途依存(例 8 分) 暴走・想定外の長文
接続層と全体層は SDK や asyncio.wait_for で素直に守れます。初トークン層とトークン間層は、前述のウォッチドッグの last_delta["t"] を、初回更新前か更新後かで分岐させれば一つの仕組みで両方守れます。
def deadline_for (received_any: bool , first_token_s: float , inter_token_s: float ) -> float :
"""まだ1つも delta が来ていなければ初トークン予算、来ていればトークン間予算を返す"""
return first_token_s if not received_any else inter_token_s
ここで大事なのは、しきい値を勘で置かず、実測の分布から決める ことです。各層の経過時間をメトリクスに出し、p50 / p95 / p99 を一週間ほど眺めてから上限を引きます。私の自動運用では、初トークンの p95 が約 18 秒だったので初トークン予算を 45 秒(p95 の 2 倍以上)に、トークン間の p99 が 6 秒程度だったので上限を 25 秒に置きました。正常系の p99 を巻き込まない位置 に線を引くのが、誤検知で無駄に再開しないコツです。
import logging, time
logger = logging.getLogger( "claude.stream" )
def timed_stream (client, messages, ** kw):
t0 = time.monotonic()
first_t = None
gaps = []
last = t0
for text in stream_with_watchdog(client, messages, ** kw):
now = time.monotonic()
if first_t is None :
first_t = now - t0
else :
gaps.append(now - last)
last = now
yield text
total = time.monotonic() - t0
p99_gap = sorted (gaps)[ int ( len (gaps) * 0.99 )] if gaps else 0.0
logger.info( "first_token= %.2f s total= %.2f s max_gap= %.2f s p99_gap= %.2f s" ,
first_t or - 1 , total, max (gaps) if gaps else 0.0 , p99_gap)
この一行のログが貯まると、しきい値の妥当性も、経路の劣化も、後から数字で振り返れます。「なんとなく止まる」が「初トークンの p95 が先週より 6 秒延びている」に変わると、対処の精度がまるで違ってきます。
経路側で沈黙を減らす(プロキシのバッファリング)
アプリ側のウォッチドッグは「止まったら気づいて続ける」仕組みですが、そもそも沈黙を生みにくくする経路設定も併せて効きます。最も多いのはプロキシのバッファリングです。Nginx を挟んでいるなら、応答バッファリングを切らないとデルタがまとめて遅延し、トークン間隔が見かけ上ばらついてウォッチドッグの誤検知を誘います。
location /api/chat {
proxy_pass http://backend;
proxy_http_version 1.1 ;
proxy_set_header Connection "" ;
proxy_buffering off ; # これが無いとデルタがまとまって遅延する
proxy_cache off ;
chunked_transfer_encoding on ;
proxy_connect_timeout 10s ;
proxy_read_timeout 600s ; # ソケット層の保険(沈黙停止はアプリ側で別に守る)
proxy_send_timeout 600s ;
}
アプリ自身が SSE を中継するなら、レスポンスヘッダに X-Accel-Buffering: no を付けて Nginx のバッファリングを明示的に無効化しておきます。経路でバッファをためないことと、アプリ側でコンテンツ進捗を測ることは、別々の対策であり、両方あって初めて沈黙停止に強くなります。
保存前の最終チェックを一つだけ挟む
沈黙停止対策で再開を入れると、今度は「再開はしたが、本当に最後まで書けたのか」を確かめたくなります。私は保存の直前に、stop_reason を見る一行だけのゲートを置いています。
def assert_complete (final_message) -> None :
sr = final_message.stop_reason
if sr == "max_tokens" :
raise RuntimeError ( "max_tokens 到達。続きが残っている可能性" )
if sr not in ( "end_turn" , "stop_sequence" ):
raise RuntimeError ( f "未完了の終了理由: { sr } " )
プレフィル再開を挟んだ場合は最後の final_message の stop_reason を見ます。end_turn であれば、少なくともモデルは「書き終えた」と判断しています。沈黙停止のときに私が一番困ったのは、エラーが無いせいで「未完了だと気づけない」ことでした。この一行は、その盲点をふさぐためのものです。
振り返りとして
沈黙停止への対処は、要素だけ見れば派手ではありません。コンテンツ進捗を測るウォッチドッグ、受信済みテキストのプレフィル再開、4層のタイムアウト予算、経路のバッファ無効化、そして保存前の stop_reason チェック。どれも単体では小さな工夫です。それでも、これらが揃うと「無言で末尾が欠ける」という、いちばん気づきにくい事故がほぼ起きなくなりました。
長く回す運用ほど、機能の華やかさより「途中で静かに崩れないこと」がありがたく感じます。エラーが出ない不具合は、出る不具合より厄介です。だからこそ、まず沈黙を観測可能にすること から始めるのが、私にとっては遠回りに見えていちばんの近道でした。同じように末尾が欠ける症状に心当たりがある方の、手がかりになれば幸いです。