個人開発で Claude API をチャット機能に組み込んだとき、最初のうちは何の問題もありませんでした。崩れ始めたのは、同時に使う人がほんの十数人を超えたあたりからです。応答が途中で止まる、同じ文が二度出る、ツール呼び出しがなぜか半分の引数で走る — どれも SDK のサンプルには出てこない挙動で、ローカルでは一切再現しませんでした。私はしばらく「自分のコードのどこかにバグがある」と思い込んで探し続け、最終的に行き着いたのは、もっと身も蓋もない結論でした。ストリーミングは、本番で部分的に失敗するのが普通なのです。
この記事は、その前提を受け入れたうえで実装をどう組み直したか、という運用メモです。完璧に落ちない実装を目指すのではなく、落ちたときに利用者が気づかないうちに立て直す。その視点で、状態管理・重複排除・ツール引数の安全弁・バックオフの分離・監視という五つの層を、実際に手元のサービスで削った跡とともに書いていきます。
一括応答との違いは「半分成功する」こと
通常のリクエスト・レスポンスなら、失敗は分かりやすい形で来ます。タイムアウトすれば何も返らず、エラーが出れば全体がエラーです。再送すればよく、状態は「成功」か「失敗」の二択で済みます。
ストリーミングが厄介なのは、その中間が日常的に発生する点です。サーバーとクライアントが数十秒つながり続ける前提なので、次のどれもが「正常動作の範囲で起こりうる」こととして降ってきます。中間プロキシが無通信を嫌ってコネクションを切る。デプロイのたびにロードバランサが既存接続を落とす。ブラウザのタブが裏に回ってバッファの読み出しが遅れる。HTTP/2 の再試行でサーバーが同じイベントをもう一度流す。
これらはバグではなく環境です。だから「起きないようにする」方向で詰めても、運用環境の組み合わせは無限にあって追いつきません。発想を「壊れない実装」から「壊れた地点を覚えていて、そこから続ける実装」に切り替える。これだけで、コードの形が根本から変わります。具体的には、ストリームのどのブロックのどこまで受け取ったかを常に手元に持っておく、という設計になります。
どのイベントの途中で切れたかを覚えておく
Claude API のストリーミングは Server-Sent Events で届き、主なイベントは message_start、content_block_start、content_block_delta、content_block_stop、message_delta、message_stop、それに keep-alive の ping と error です。回復を考えるうえで決定的に重要なのは、content_block_delta がブロック単位のインデックスを持っていることです。途中で切れても、いまどのブロックの何文字目まで来ているかさえ握っていれば、続きを組み立て直せます。
そこで、生のストリームをそのまま回さず、現在の部分応答を保持する状態オブジェクトでくるみます。下のコードは、ブロックごとのテキストとツール引数の断片を貯めながら、再接続のヒントになる情報を一緒に持たせる最小構成です。
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class StreamState :
"""ストリーミング中の部分応答を保持する。再接続時はここを見て続きを再構築する。"""
message_id: Optional[ str ] = None
model: Optional[ str ] = None
blocks: list[ dict ] = field( default_factory = list )
current_index: int = - 1
stop_reason: Optional[ str ] = None
def apply_delta (self, index: int , delta: dict ) -> None :
# 受信したブロック数まで器を広げてから差分を足す
while len ( self .blocks) <= index:
self .blocks.append({ "type" : "text" , "text" : "" , "partial_json" : "" })
block = self .blocks[index]
kind = delta.get( "type" )
if kind == "text_delta" :
block[ "text" ] += delta.get( "text" , "" )
elif kind == "input_json_delta" :
block[ "partial_json" ] += delta.get( "partial_json" , "" )
def assistant_text (self) -> str :
return "" .join(b[ "text" ] for b in self .blocks if b.get( "type" ) == "text" )
このオブジェクトを一つ挟むだけで、後続の回復処理がすべて「いまの状態を見て判断する」という素直な形に揃います。状態が散らばっていないことが、ストリーミングを安定させる土台になります。
切れたら、続きではなく「ここまで」を渡して書き継がせる
接続が切れたとき、最初に考えたのは「同じリクエストを再開できないか」でした。しかし Claude API はイベント単位の再送機構を持っていないので、同じ呼び出しに再接続しても、どこから再開されるかは保証されません。重複が来るかもしれないし、頭から来るかもしれない。これでは確定的に組み立て直せません。
確実なのは、ここまでに受け取った部分応答を assistant メッセージとして会話履歴に積み、その続きを生成させる方法です。リクエスト回数は増えますが、毎回の結果が決まっているので冪等に扱えます。下は、接続系のエラーだけを捕まえて自動再接続する薄いラッパーです。レート制限を意図的にここで握らないのが要点で、理由は後述します。
import asyncio, logging
import anthropic
log = logging.getLogger( __name__ )
class ResilientStream :
def __init__ (self, client: anthropic.AsyncAnthropic, max_retries: int = 3 ):
self .client = client
self .max_retries = max_retries
self .state = StreamState()
async def run (self, ** kwargs):
attempt = 0
while True :
try :
async with self .client.messages.stream( ** kwargs) as stream:
async for event in stream:
self ._track(event)
yield event
return # message_stop まで到達
except (anthropic.APIConnectionError, anthropic.APITimeoutError) as e:
attempt += 1
if attempt > self .max_retries:
log.error( "stream gave up after %d attempts: %s " , attempt, e)
raise
await asyncio.sleep( min ( 2 ** attempt, 20 ))
kwargs = self ._resume_kwargs(kwargs) # 部分応答を履歴に積む
def _track (self, event) -> None :
t = event.type
if t == "message_start" :
self .state.message_id = event.message.id
self .state.model = event.message.model
elif t == "content_block_start" :
self .state.current_index = event.index
elif t == "content_block_delta" :
self .state.apply_delta(event.index, event.delta.model_dump())
elif t == "message_delta" and event.delta.stop_reason:
self .state.stop_reason = event.delta.stop_reason
def _resume_kwargs (self, original: dict ) -> dict :
partial = self .state.assistant_text()
if not partial:
return original
resumed = dict (original)
resumed[ "messages" ] = list (original.get( "messages" , [])) + [
{ "role" : "assistant" , "content" : partial}
]
return resumed
呼び出し側から見れば、途中で何度切れていても「完全な応答が流れてきた」ように見えます。回復のロジックがすべてこの内側に隠れているので、画面描画やビジネスロジックは何も知らなくて済みます。この境界の引き方が、後から効いてきます。
同じ delta が二度来たときに、文章を二重にしない
重複は派手に失敗します。テキストが二度連結されると、利用者は読んだ瞬間に気づきます。HTTP/2 の再試行や再接続の実装次第で、同じ content_block_delta がもう一度流れてくることは普通に起こるので、重複判定は省けません。
ここで肝心なのは「何をもって同一とみなすか」を明示することです。タイムスタンプは再送で変わりうるので鍵に使えません。私が落ち着いたのは、メッセージ ID とブロックのインデックス、そしてそのブロック内での累計受信位置の三つを合わせる方法でした。位置まで含めると、同じブロックの別の delta を取り違える事故も防げます。
from hashlib import sha256
class DedupWindow :
"""直近 window 件のイベントを覚えて、二度目以降を捨てる。"""
def __init__ (self, window: int = 512 ):
self .seen: set[ str ] = set ()
self .order: list[ str ] = []
self .window = window
def is_duplicate (self, message_id: str , index: int , position: int ) -> bool :
key = sha256( f " { message_id } : { index } : { position } " .encode()).hexdigest()
if key in self .seen:
return True
self .seen.add(key)
self .order.append(key)
if len ( self .order) > self .window:
self .seen.discard( self .order.pop( 0 ))
return False
メモリを無限に伸ばさないよう、覚えておく件数に上限を設けています。一つの応答に含まれる delta はせいぜい数百なので、512 件あれば一周分を十分にカバーできます。この上限がないと、長時間動かしっぱなしのサーバーで集合がじわじわ膨らみます。
半端なツール引数で、本番データを触らせない
私が一番肝を冷やしたのは、テキストの乱れではなく、ツール呼び出しでした。ツールの引数は input_json_delta で JSON 文字列の断片として届きます。途中で切れると、その断片は壊れた JSON のまま残ります。それを気づかずパースしようとして例外で済めばまだ幸運で、最悪なのは、たまたま途中までで JSON として成立してしまい、半分の引数でツールが走るケースです。
実際に一度、途中切断で欠けたパラメータのまま書き込み系のツールが動き、本番のレコードに意図しない更新をかけかけたことがありました。間一髪で気づいて止めましたが、あれ以来、ツール実行の条件そのものを「引数が完全にパースできること」に置き換えています。
import json
from typing import Optional
def parse_tool_input (block: dict ) -> Optional[ dict ]:
"""tool_use ブロックの partial_json を安全に読む。途中なら None。"""
raw = block.get( "partial_json" , "" ).strip()
if not raw:
return None
try :
return json.loads(raw)
except json.JSONDecodeError as e:
log.warning( "incomplete tool input, skipping: %s (head= %r )" , e, raw[: 120 ])
return None
# 実行境界では必ずこの関数を通す
args = parse_tool_input(block)
if args is None :
# 引数が揃っていない間はツールを起動しない
return
run_tool(block[ "name" ], args)
たった数行ですが、これをアプリケーションの境界に一つ置くだけで、ストリーム中断が副作用に化ける経路をほぼ塞げます。安全弁は、派手さはなくても効きます。
接続エラーとレート制限を、別々のリズムで休ませる
先ほどラッパーでレート制限をあえて握らなかったのには理由があります。接続エラーの再試行とレート制限の再試行を同じ仕組みで回すと、互いに干渉して、まるで嵐のように再試行が重なることがあるからです。429 が返っているのに短い間隔で接続を張り直せば、また 429 を呼びます。
そこで、エラーの種別で休み方を変えます。レート制限はサーバーが返す待機時間を尊重し、足りない分をジッタで補う。接続系は短めの指数バックオフにする。ジッタを入れるのは、複数のクライアントが同時に復帰しようとして再びピークを作る現象を散らすためで、ほんの数秒ずらすだけでも、同時復帰の山が目に見えてなだらかになります。
import random
async def backoff_for (attempt: int , error: Exception ) -> None :
if isinstance (error, anthropic.RateLimitError):
base = getattr (error, "retry_after" , None ) or 30
delay = base + random.uniform( 0 , 5 ) # サーバー指示 + 散らし
elif isinstance (error, (anthropic.APIConnectionError, anthropic.APITimeoutError)):
delay = min ( 2 ** attempt, 20 ) + random.uniform( 0 , 1 )
else :
delay = min ( 2 ** attempt, 60 )
log.info( "backoff %.1f s (attempt= %d , %s )" , delay, attempt, type (error). __name__ )
await asyncio.sleep(delay)
種別を分けてからは、負荷が高い時間帯でも再試行が連鎖して自分の首を絞めることがなくなりました。再試行は、間隔の設計まで含めて初めて回復策になります。
200 のまま届く error イベントを見落とさない
もう一つ、自前で HTTP を扱う場合に踏みやすい段差があります。Claude API の error イベントは、HTTP ステータスが 200 のまま SSE のボディに埋め込まれて届きます。公式 SDK は例外に変換してくれますが、低レイヤを自分で書いていると、event.type == "error" を自分でチェックしない限り、エラーが正常応答に紛れて素通りします。
if event.type == "error" :
err = event.error
if err.type == "overloaded_error" :
raise anthropic.APIStatusError(err.message) # 容量不足は再試行の価値あり
raise RuntimeError ( f "fatal stream error: { err.type } : { err.message } " )
ここで overloaded_error だけは回復見込みがあるので再試行に回し、それ以外は失敗として扱います。この区別を入れておかないと、プロンプト側の不備で永遠に再試行し続けるループが、静かに出来上がります。
壊れている兆候を、利用者より先に拾う
実装を堅くしても、劣化に気づけなければ意味がありません。結局いちばん早く気づくのが利用者だった、という状態が一番まずい。私自身が個人開発のサービスを本番運用しながら、エラーの兆候を早めに対処するために見ているのは、次の四つです。
指標 定義 警告ライン
完了率 message_stop まで到達したストリームの割合99.5% を下回ったら
平均再接続回数 1 ストリームあたりの再接続回数 0.1 を超えたら
初トークン遅延 P95 最初のトークンが出るまでの時間の P95 6 秒を超えたら
重複率 重複判定がヒットした delta の割合 1% を上回ったら
完了率は健全性の総合点、再接続回数は経路の不安定さ、初トークン遅延は体感速度、重複率は配信経路の異常をそれぞれ映します。どれか一つでも線を越えたら、利用者から「最近遅くない?」と言われる前に手を打てます。監視は、回復の最後の層です。
三つの順番で入れていく
全部を一度に組み込もうとすると腰が重くなります。私は、まず一か所だけ既存のストリーミングを状態付きのラッパーに置き換え、次に重複判定を足し、最後にツール引数の安全弁を実行境界へ差し込む、という順で入れました。監視はそのあとで構いません。
この順番には理由があって、状態付きラッパーが入っていれば重複判定も安全弁も「いまの状態を見る」だけで書けるからです。土台から積むと、後の二つが驚くほど短いコードで済みます。落ちないことを目指すより、落ちた地点を覚えて静かに続ける。そう構え直すだけで、同じトラフィックでも体感の安定感は確かに一段変わります。利用者が不具合に気づかなくなった時間こそ、この設計が返してくれる成果だと考えています。