バッチの状態が ended になったのを確認して、その日は安心して眠りました。異変に気づいたのは3日後です。
タグ再分類のために Message Batches API へ投入した 20,000 件のうち、集計テーブルに入っていたのは 19,959 件。41 件が、エラーログも例外もないまま欠けていました。個人開発で複数の技術ブログを並行運用している関係で、記事メタデータの一括処理にはバッチをよく使うのですが、この「静かな欠落」は私自身、初めての経験でした。
原因を追うと、欠落は API の不具合ではなく、私の思い込みでした。processing_status: "ended" は「バッチという箱の処理が終わった」ことしか意味せず、中身の 1 件 1 件が成功したかどうかは別の話だったのです。
この記録では、結果ストリームのどこで件数が欠けるのか、custom_id 台帳による照合で欠落をゼロにする実装、そして二重処理を起こさない再投入の設計を、実運用の計測値と一緒に残します。
「ended」は全件成功ではない — 4つの終わり方
Message Batches の結果は、リクエスト 1 件ごとに result.type を持ちます。終わり方は 4 つです。
result.type 意味 起きやすい状況 取るべき対応
succeeded 正常完了。message が入っている — そのまま集計
errored 個別リクエストの失敗。error オブジェクトが入る invalid_request(パラメータ不正)、api_error、overloaded エラー種別で分岐(後述)
canceled バッチのキャンセルに巻き込まれた 手動キャンセル時の未処理分 再投入
expired 24時間の処理ウィンドウ内に完了しなかった 大規模バッチ+混雑時間帯 再投入
重要なのは、errored や expired が混ざっていてもバッチ全体は正常に ended になる ことです。例外は投げられませんし、SDK が警告してくれるわけでもありません。成功分だけを素直にループで拾うコードを書くと、失敗分は誰にも知られずに消えます。
私の 41 件の内訳は、errored が 28 件(overloaded 系が 19、invalid_request が 9)、expired が 13 件でした。invalid_request の 9 件は、元データに混ざっていた空文字列の本文をそのまま投げていたもの。つまりリトライしても直らない失敗が混ざっていました。ここを区別せずに全件再投入すると、同じ 9 件が永遠に失敗し続けます。
台帳を先に作る — custom_id manifest の設計
照合の前提は「何を投げたか」を API の外に持っておくことです。投げた側の記録がなければ、結果と突き合わせる基準がありません。
送信時に、custom_id と入力内容のハッシュを JSONL の台帳(manifest)へ書き出します。
import anthropic
import hashlib
import json
from pathlib import Path
client = anthropic.Anthropic() # ANTHROPIC_API_KEY は環境変数から
MANIFEST = Path( "batch_manifest.jsonl" )
def build_requests (items: list[ dict ]) -> list[ dict ]:
"""items: [{"id": "article-0001", "text": "..."}] 形式の入力データ"""
requests = []
with MANIFEST .open( "a" , encoding = "utf-8" ) as mf:
for item in items:
# attempt=1 を custom_id に埋め込む(再投入時に 2, 3 と上げる)
custom_id = f " { item[ 'id' ] } __a1"
requests.append({
"custom_id" : custom_id,
"params" : {
"model" : "claude-sonnet-5" ,
"max_tokens" : 300 ,
"messages" : [{
"role" : "user" ,
"content" : f "次の記事本文に合うタグを3つ、JSON配列で返してください: \n\n{ item[ 'text' ] } "
}],
},
})
# 台帳: custom_id・元ID・入力ハッシュ・投入時刻を記録
mf.write(json.dumps({
"custom_id" : custom_id,
"source_id" : item[ "id" ],
"input_sha" : hashlib.sha256(item[ "text" ].encode()).hexdigest()[: 16 ],
"attempt" : 1 ,
}, ensure_ascii = False ) + " \n " )
return requests
batch = client.messages.batches.create( requests = build_requests(items))
print ( f "batch_id= { batch.id } submitted= { len (items) } " )
custom_id の末尾に __a1 と試行番号を付けているのが要点です。Anthropic 側は同一バッチ内の custom_id 重複を拒否しますが、別バッチ間の重複は関知しません 。再投入分を同じ ID で投げると、結果を集約するときにどちらの応答か区別できなくなります。試行番号を ID に埋め込んでおけば、台帳側で「この source_id の最新 attempt はどれか」を一意に辿れます。
なお空文字列の本文をそのまま投げた失敗を経験してからは、build_requests の入口で if not item["text"].strip(): continue の門番を置き、弾いた件数も台帳に記録するようにしました。invalid_request はバッチに入る前に潰すのが一番安上がりです。
ポーリングは指数バックオフで — 24時間ウィンドウの現実
完了待ちは 5 秒間隔の固定ポーリングで書きがちですが、大きなバッチでは無駄撃ちになります。手元の記録では、20,000 件(平均入力 900 トークン)のバッチは混雑の少ない日で 62 分、週明けの日中帯では 4 時間 11 分かかりました。処理時間は読めない前提で、間隔を伸ばしていく方が実務的です。
import time
def wait_for_batch (batch_id: str , base: float = 30.0 , cap: float = 600.0 ) -> None :
interval = base
while True :
status = client.messages.batches.retrieve(batch_id)
counts = status.request_counts
print ( f "processing= { counts.processing } succeeded= { counts.succeeded } "
f "errored= { counts.errored } expired= { counts.expired } " )
if status.processing_status == "ended" :
return
time.sleep(interval)
interval = min (interval * 1.5 , cap) # 30s → 45s → ... → 最大10分
request_counts には処理中の段階から errored や expired が刻々と積み上がります。ここを完了前から観測しておくと、欠落に「3日後」ではなく処理中に気づけます 。私は errored が投入件数の 1% を超えた時点で通知を飛ばすようにしています。
照合 — 台帳と結果ストリームを突き合わせる
ended を確認したら、結果を 4 分類しつつ台帳と突き合わせます。
from collections import defaultdict
def reconcile (batch_id: str ) -> dict :
# 1. 台帳から「このバッチで投げたはずの custom_id」を読む
expected = {}
with MANIFEST .open( encoding = "utf-8" ) as mf:
for line in mf:
rec = json.loads(line)
expected[rec[ "custom_id" ]] = rec
buckets = defaultdict( list )
seen = set ()
# 2. 結果ストリームを1件ずつ分類
for result in client.messages.batches.results(batch_id):
seen.add(result.custom_id)
rtype = result.result.type
if rtype == "succeeded" :
buckets[ "succeeded" ].append(result)
elif rtype == "errored" :
etype = result.result.error.error.type
# リトライで直る見込みのある失敗だけ retryable に入れる
key = "retryable" if etype in (
"overloaded_error" , "api_error" , "rate_limit_error"
) else "permanent"
buckets[key].append(result)
else : # canceled / expired
buckets[ "retryable" ].append(result)
# 3. 台帳にあるのに結果に現れなかった ID(理論上ゼロのはずだが検証する)
missing = [cid for cid in expected if cid not in seen]
print ( f "succeeded= { len (buckets[ 'succeeded' ]) } "
f "retryable= { len (buckets[ 'retryable' ]) } "
f "permanent= { len (buckets[ 'permanent' ]) } missing= { len (missing) } " )
return { "buckets" : buckets, "missing" : missing, "expected" : expected}
分類の境界線はエラー種別です。overloaded_error や api_error、rate_limit_error は時間を置けば直る失敗。invalid_request_error はリクエスト自体が壊れているので、何度投げても結果は同じです。この 2 つを同じ「エラー」として扱った設計が、私の最初の敗因でした。
もう1点、見落としやすい実装上の注意があります。結果ストリームの並び順は投入順と一致する保証がありません 。「n 件目の結果は n 件目の入力に対応する」前提でインデックス結合すると、順序が入れ替わった瞬間に静かにデータが壊れます。対応付けは必ず custom_id で行います。結果の取得期限が作成から 29 日である点も、月次でまとめて集計する運用だと意外と効いてきます。
再投入 — attempt を上げて、上限を決める
retryable に分類された分だけを、試行番号を上げた新しい custom_id で別バッチとして投げ直します。
MAX_ATTEMPTS = 3
def requeue (recon: dict , items_by_id: dict ) -> list[ dict ]:
retry_requests = []
with MANIFEST .open( "a" , encoding = "utf-8" ) as mf:
for result in recon[ "buckets" ][ "retryable" ]:
rec = recon[ "expected" ][result.custom_id]
next_attempt = rec[ "attempt" ] + 1
if next_attempt > MAX_ATTEMPTS :
print ( f "give up: { rec[ 'source_id' ] } (attempt { rec[ 'attempt' ] } )" )
continue
new_cid = f " { rec[ 'source_id' ] } __a { next_attempt } "
src = items_by_id[rec[ "source_id" ]]
retry_requests.append({
"custom_id" : new_cid,
"params" : { # 元と同一パラメータを再構築(結果オブジェクトからの流用はしない)
"model" : "claude-sonnet-5" ,
"max_tokens" : 300 ,
"messages" : [{ "role" : "user" ,
"content" : f "次の記事本文に合うタグを3つ、JSON配列で返してください: \n\n{ src[ 'text' ] } " }],
},
})
mf.write(json.dumps({ "custom_id" : new_cid, "source_id" : rec[ "source_id" ],
"input_sha" : rec[ "input_sha" ],
"attempt" : next_attempt}, ensure_ascii = False ) + " \n " )
return retry_requests
上限(ここでは 3 回)を必ず決めておきます。上限なしの自動再投入は、恒久的な失敗を retryable と誤分類していた場合に課金だけが積み上がる仕組みになってしまうためです。本番運用で一番怖いのは、この静かな空回りです。3 回失敗した ID は台帳に残るので、翌朝に人間の目で見る運用にしています。20,000 件の実行では、retryable 32 件のうち 31 件が 2 回目で成功し、1 件だけ 3 回目に回りました。
集計側にも保険を掛けます。同じ source_id に複数 attempt の成功が並ぶ可能性はゼロではないので、集計クエリは「source_id ごとに最大 attempt の succeeded を 1 件だけ採用する」形にしておくと、二重集計を構造的に回避できます。
コストの実測 — Sonnet 5 導入価格 × Batch 50%
2026年6月30日に公開された Claude Sonnet 5 は、導入価格が入力 $2 / 出力 $10(100万トークンあたり・2026年8月31日まで、以降は $3 / $15)です。Batch はここからさらに 50% 引きになるため、導入期間中は実質 入力 $1 / 出力 $5 で回せます。
20,000 件(平均入力 900 トークン・平均出力 250 トークン)の実測はこうなりました。
経路 入力 18M tok 出力 5M tok 合計
リアルタイム API(導入価格) $36.00 $50.00 $86.00
Batch(導入価格 × 50%) $18.00 $25.00 $43.00
再投入分 32 件の追加コスト $0.03 $0.04 $0.07
照合と再投入の仕組みを足しても、追加コストは誤差の範囲でした。逆に言えば、41 件の欠落を人手で調べ直す時間の方がずっと高くつきます 。夜間に投げて朝に照合レポートだけ確認する形に落ち着いてからは、バッチ処理が「投げたら祈る」作業ではなくなりました。
なおバッチ 1 個あたりの上限は 100,000 リクエストまたは 256MB です。上限近くまで詰めるより、2〜3 万件で分割するのをお勧めします。私の環境では expired は大きなバッチの後半で出やすく、分割しておくと被害範囲も再投入も小さく済むためです。
運用に組み込む — 照合を「毎回必ず通る道」にする
仕組みは作って終わりではなく、通らないと意味がありません。私の場合は cron で動く処理の最後に照合を固定で挟み、結果を 1 行のサマリーとしてログに残しています。
バッチ投入時に manifest へ追記(投入と台帳記録を同一トランザクション扱いに)
ended 後に reconcile を実行し、succeeded / retryable / permanent / missing の 4 値を必ずログへ
missing > 0 または permanent > 投入件数の 0.5% なら通知して人間が見る
retryable は attempt 上限 3 で自動再投入
この 4 行の運用にしてから、欠落の発見が「3日後の違和感」から「当日のログ 1 行」に変わりました。
次の一歩
まず既存のバッチ処理コードで、result.type を分岐せずに成功分だけ拾っている箇所がないかを確認してみてください。そこが今日直せる一番の急所です。台帳と照合は、この記録のコードを土台にすれば小一時間で組み込めます。
同じように無人のバッチ処理を回している方の、転ばぬ先の杖になれば幸いです。