無人で動かしているパイプラインで一番こわいのは、止まることではありません。止まらずに、こっそり質を落としたまま動き続けること です。
個人開発で複数のサイトの定期実行を回していて、深夜のバッチが主力モデルにアクセスできず、自動で軽量モデルへ落ちていたことがありました。リトライは成功し、ログには 200 OK が並び、処理は完走していました。気づいたのは翌朝で、その間に出力された分は明らかに密度が足りていませんでした。エラーで止まっていれば即座に分かったのに、フォールバックが「成功」を演出してしまったのです。
このとき痛感したのは、フォールバックの設計で本当に難しいのは「どのモデルに落とすか」ではなく、**「落ちてはいけない下限をどこに引くか」**だということでした。落とせるだけ落とす実装は、無人運用では静かな品質事故の温床になります。
「落とす」前に「落ちてよいか」を決める
多くのフォールバック実装は、エラーを受け取ってから次のモデルを選びます。順序が逆です。先に決めるべきは、そのタスクが弱いモデルに落ちても許されるのか という一点です。
たとえば私の運用では、タスクをおおまかに二種類に分けています。
タスク種別 降格して許されるか 理由
記事本文の生成・要約の最終確定 許されない 質が落ちると読者に届く成果物が劣化する。やり直しが効かない
タグ付け・カテゴリ分類・短い整形 許される 多少精度が落ちても後段で補正でき、影響範囲が小さい
前者で軽量モデルへ落ちるくらいなら、その回はスキップして次の実行に回す方がよい のです。「動き続けること」より「下限を割らないこと」を優先する。この判断を、エラーが起きてからその場の勢いで決めてはいけません。コードに契約として書いておきます。
タスクごとに「最低限必要な能力」を契約にする
フォールバックを「モデルの並び順」で書くと、新しいモデルが増えるたびに並びを直す羽目になります。代わりに、タスクが要求する能力 と、各モデルが満たす能力 を別々に宣言し、突き合わせる形にします。
from dataclasses import dataclass, field
from enum import IntEnum
class Tier ( IntEnum ):
"""能力の段階。数値が大きいほど高能力。比較で下限判定に使う。"""
LIGHT = 1 # 軽量・高速。分類や整形向け
BALANCED = 2 # 標準。多くの実務タスクの主力
DEEP = 3 # 高能力。最終成果物や難しい推論向け
@dataclass ( frozen = True )
class ModelSpec :
"""モデル1つ分の能力宣言。model_id は環境変数などから差し込む前提。"""
model_id: str
tier: Tier
supports_thinking: bool = False
max_output_tokens: int = 8192
@dataclass
class TaskContract :
"""このタスクが満たすべき下限。ここを割るモデルは候補から外す。"""
name: str
min_tier: Tier
needs_thinking: bool = False
min_output_tokens: int = 1024
# 降格を1段でも許すか。最終成果物は False(=主力以外に落ちない)にする
allow_downgrade: bool = True
def is_satisfied_by (self, spec: ModelSpec) -> bool :
if spec.tier < self .min_tier:
return False
if self .needs_thinking and not spec.supports_thinking:
return False
if spec.max_output_tokens < self .min_output_tokens:
return False
return True
model_id を直接ハードコードしない点が肝心です。モデルの実IDは環境ごとに変わりますし、引退・改名もあります。コードが依存するのは「能力」であって「名前」ではない、という形にしておきます。
フォールバック表ではなく「下限」で連鎖を断つ
候補モデルを能力順に並べ、契約を満たすものだけを残します。allow_downgrade=False のタスクでは、主力(先頭候補)以外を切り捨てます。
def build_chain (catalog: list[ModelSpec], contract: TaskContract) -> list[ModelSpec]:
"""契約を満たすモデルだけを能力の高い順に並べたフォールバック連鎖を返す。"""
eligible = [m for m in catalog if contract.is_satisfied_by(m)]
eligible.sort( key =lambda m: m.tier, reverse = True )
if not eligible:
raise RuntimeError (
f "タスク ' { contract.name } ' の下限(min_tier= { contract.min_tier.name } )を"
f "満たすモデルが1つもありません。カタログか契約を見直してください。"
)
if not contract.allow_downgrade:
# 降格を許さないタスクは「主力1つ」だけ。落ちるくらいなら止める
return eligible[: 1 ]
return eligible
ここで重要なのは、build_chain が空になる前に例外を投げる ことです。「候補がゼロのまま黙って次へ進む」のが、冒頭の静かな事故そのものでした。下限を満たすモデルが消えたら、それは設定の異常として大きく鳴らすべきです。
降格予算 — 「落ちたまま」を時間と件数で止める
連鎖の中で一段下のモデルに落ちること自体は許容できても、それが何時間も続く のは別問題です。一時的な不在のつもりが、実は主力が長時間ダメな状態のこともあります。そこで「降格してよい予算」を持たせます。
import time
class DegradationBudget :
"""一定時間内に許す降格の上限。超えたら降格を禁止し、エラーで止める。"""
def __init__ (self, max_events: int = 5 , window_seconds: int = 3600 ):
self .max_events = max_events
self .window_seconds = window_seconds
self ._events: list[ float ] = []
def _prune (self, now: float ) -> None :
cutoff = now - self .window_seconds
self ._events = [t for t in self ._events if t >= cutoff]
def can_degrade (self) -> bool :
now = time.time()
self ._prune(now)
return len ( self ._events) < self .max_events
def record (self) -> None :
self ._events.append(time.time())
予算を使い切ったら、その後の降格は「許可しない」に倒します。私はこの場合は厳しめに倒すことを好みます。つまり主力が使えなければ、その回は失敗として記録して次の実行に回す 。無人運用では、これが「気づかないまま薄い成果物を量産する」最悪のシナリオを防ぐ最後の砦になります。1時間に5回まで、という数字は運用の体感で決めたもので、実行頻度に応じて調整してください。
一時的な不在と、恒久的な消滅を分ける
すべての失敗を同じ「フォールバック」で扱うと判断を誤ります。少なくとも次の3つは別物です。
分類 典型的なサイン とるべき対応
一時的な不在 過負荷(529)、「現在利用できません」、タイムアウト 同じモデルで間を置いて再試行。連鎖の降格は最後の手段
恒久的な消滅 model_not_found、引退、地域・規約による提供停止 そのモデルを候補から永続的に外し、切り替えを記録する
こちら側の誤り 401(鍵)、400(リクエスト不正) 降格しても直らない。即座に止めて通知する
本番運用を続けていると、恒久的な消滅は実際に起きます。新しいモデルが出ても、地域や規約の事情で特定モデルへのアクセスが急に止まることがありますし、旧モデルは予告のうえ引退していきます。一時的な不在として延々リトライすると、無駄なレイテンシとコストを払い続けることになります。だからこそ、恒久消滅だけは「切り替えを覚える」のです。
class ModelHealth :
"""恒久的に消えたモデルを記録し、以後カタログから除外する。"""
PERMANENT_HINTS = ( "model_not_found" , "not_found" ,
"deprecated" , "decommissioned" , "unsupported_region" )
def __init__ (self) -> None :
self ._dead: set[ str ] = set ()
def classify (self, status_code: int , error_text: str ) -> str :
text = error_text.lower()
if status_code in ( 401 , 400 ):
return "caller_error"
if any (h in text for h in self . PERMANENT_HINTS ):
return "permanent"
# 529 や「現在利用できません」「タイムアウト」はここに落ちる
return "transient"
def mark_dead (self, model_id: str ) -> None :
self ._dead.add(model_id)
def filter_catalog (self, catalog: list[ModelSpec]) -> list[ModelSpec]:
return [m for m in catalog if m.model_id not in self ._dead]
無人プロセスでは _dead をメモリに持つだけでなく、KV や小さなファイルなど再起動を越えて残る場所 に書き出しておくことを推奨します。そうしないと、プロセスが落ちて立ち上がるたびに、すでに消えたモデルへ毎回ぶつかりに行くことになります。
連鎖・予算・健康状態を1か所で回す
ここまでの部品を組み合わせると、呼び出し本体はとても素直になります。
def call_with_floor (client, contract, catalog, health, budget, build_payload):
"""契約の下限を守りつつ呼び出す。下限を割るくらいなら例外で止める。"""
live = health.filter_catalog(catalog)
chain = build_chain(live, contract) # 候補ゼロならここで例外
last_error = None
for index, spec in enumerate (chain):
is_downgrade = index > 0
if is_downgrade and not budget.can_degrade():
raise RuntimeError (
f "' { contract.name } ': 降格予算を使い切りました。"
f "主力が回復するまでこの回は失敗として扱います。"
)
try :
resp = client.messages.create( ** build_payload(spec))
if is_downgrade:
budget.record() # 降格して成功 = 予算を消費。鳴らすべき事象
log_degradation(contract.name, spec.model_id)
return resp
except APIError as e:
kind = health.classify(e.status_code, str (e))
last_error = e
if kind == "caller_error" :
raise # 落としても直らない。即停止
if kind == "permanent" :
health.mark_dead(spec.model_id) # 次回以降この連鎖から消える
# transient はこのモデルを諦めて次の候補へ
continue
raise RuntimeError ( f "' { contract.name } ': 連鎖を使い切りました。" ) from last_error
build_payload(spec) はモデルごとの messages.create 引数を組み立てる関数で、spec.model_id や max_tokens、思考の有無を差し込みます(実IDはプレースホルダー YOUR_MODEL_ID 等から環境変数で注入する想定です)。log_degradation は降格が起きた事実を、成功ログとは別の場所に明示的に残すための関数です。**降格は「成功」ではなく「記録すべき劣化」**として扱う、という思想がコードに表れています。
最終成果物のタスクなら、契約を allow_downgrade=False にしておくだけで、連鎖は主力1つに縮まります。主力が一時的に不在なら transient で候補を使い切り、最後に例外で止まる。これが「弱いモデルへ静かに落ちる」事故を構造的に回避します。
落ちたこと自体を、見える場所に出す
無人運用での最後の仕上げは通知です。次の3つを成功ログとは別のチャンネルに出します。
降格が起きた回数
恒久消滅として候補から外したモデル
降格予算の残量
これを私の場合は日次のサマリにまとめて出すだけで、「翌朝になって初めて気づく」がほぼなくなりました。
個人開発で自動運用を長く続けてきて思うのは、自動化の価値は「人が見ていない時間」にあるからこそ、見ていない時間に何が劣化したかを必ず残す ことが対になる、ということです。私自身、静かな成功ほど疑うようにしています。フォールバックは、その疑いをコードに埋め込む場所です。
まず一歩進めるなら、いま動いているパイプラインのフォールバック実装を開いて、**「候補がゼロになったとき、黙って次へ進む経路があるか」**だけを確認してみてください。そこに静かな事故が潜んでいます。