ある朝、決済エージェントのログに同じ請求書 ID が二回並んでいるのを見つけて、背筋が冷えました。
幸い金額が小さく、Stripe 側の Idempotency-Key が二重請求を弾いていたので実害はありませんでした。けれど原因をたどると、SDK のタイムアウト再試行で同じツールが二度呼ばれ、その間に自社 DB へのレコードだけは二件入っていた。Stripe が守ってくれなければ、顧客に二重で請求が飛んでいたところです。
エージェントが副作用を持つ処理を扱うようになると、この種の事故は静かに増えます。Claude Agent SDK はセッション再開やツールのリトライを内蔵していて、長時間タスクや一時障害に強い。その強さの裏側で、決済・メール送信・在庫減算のような「取り消せない副作用」だけは、開発者が冪等性を設計しないと二重実行のリスクが残ります。公式ドキュメントには API 単位の idempotency-key の記述こそありますが、エージェント全体としてどう冪等性を組むかはあまり語られていません。
ここでは、個人開発で運用していた決済エージェントを作り直したときの冪等性レイヤーを、そのまま使える形にまとめます。決定的な冪等性キー、Outbox、軽量ラッパーの3パターン。コピーして動くコードと、本番で何を計測すべきかまで踏み込みます。
なぜエージェントでは冪等性が一段むずかしいのか
普通の API クライアントでも冪等性は要りますが、エージェントには「失敗と成功の境界が曖昧」という事情が重なります。私が事故を起こして理解したのは、次の三つが同時に効いてくるという点でした。
一つ目はモデル応答の非決定性です。同じ意図でもツール引数の細部が揺れることがあるため、「同じ操作かどうか」の判定をモデル生成のキーに任せられません。二つ目は部分的成功です。ツールが成功した直後にループがクラッシュすると、次の起動でモデルは「まだ呼んでいない」と解釈して再実行します。三つ目は再開です。セッション再開やチェックポイントは状態を巻き戻すので、副作用側で重複を検知できないと素通りします。
Stripe のような成熟した SDK は Idempotency-Key を受け取ってくれますが、自社 DB への INSERT、メール送信、社内 API は自前で冪等性を作る必要があります。私は「ツールを全部冪等にしてから本番に出す」を自分のルールにしていて、この順序を逆にすると必ずどこかで事故が起きると実感しています。
パターン1: 決定的な冪等性キーを入力から導出する
冪等性キーに必要な性質はただ一つ、「同じ意図の操作なら、何度生成しても同じ値になる」ことです。その場で UUID を振っては意味がないので、操作の入力からハッシュで決定的に導出します。
# idempotency_key.py — 同じ意図から決定的に同じキーを導出する
from __future__ import annotations
import hashlib
import json
from typing import Any
def stable_idempotency_key (
session_id: str ,
tool_name: str ,
logical_args: dict[ str , Any],
* ,
version: str = "v1" ,
) -> str :
"""決定的な冪等性キーを生成する。
logical_args には「意図」を表す最小限の引数だけを渡す。
timestamp や retry_count を混ぜると、再試行のたびにキーが変わって破綻する。
"""
# 辞書順を固定して正規化(引数の順番が違っても同じキーにする)
canonical = json.dumps(logical_args, sort_keys = True , separators = ( "," , ":" ), default = str )
raw = f " { version } | { session_id } | { tool_name } | { canonical } "
digest = hashlib.sha256(raw.encode( "utf-8" )).hexdigest()
return f "idem_ { version } _ { digest[: 32 ] } "
if __name__ == "__main__" :
k1 = stable_idempotency_key(
"sess_abc" , "charge_payment" ,
{ "customer_id" : "cus_001" , "amount_jpy" : 2480 , "invoice_id" : "inv_555" },
)
k2 = stable_idempotency_key(
"sess_abc" , "charge_payment" ,
{ "invoice_id" : "inv_555" , "amount_jpy" : 2480 , "customer_id" : "cus_001" },
)
assert k1 == k2 # 引数の順番が違っても同じキー
キー生成に time.time() やランダム要素を混ぜると、再試行のたびにキーが変わって冪等性が崩れます。リトライ回数のような内部メタデータも同罪です。「意図」だけを抜き出してキーに含める——これが鉄則です。
version フィールドは、将来キー生成ロジックを変えたくなったときに旧キーと衝突させないための安全弁です。私はこれを怠ったことがあり、キー形式の変更時に DB クレンジングで一晩を溶かしました。最初から付けておくと安いコストで済みます。
セッション ID は、エージェントを起動する呼び出し側で発番して渡すのが素直です。ClaudeSDKClient はセッションを内部管理しますが、外部の永続化層と共有するには明示的な ID を握っておく必要があります。
import uuid
from claude_agent_sdk import ClaudeSDKClient, ClaudeAgentOptions
session_id = f "sess_ { uuid.uuid4().hex } "
options = ClaudeAgentOptions(
system_prompt = "あなたは決済処理エージェントです。" ,
extra_context = { "session_id" : session_id}, # ツール側から参照できるようにする
)
パターン2: Outbox でトランザクション境界を揃える
キーがあっても、「DB 書き込みは成功したが外部 API を呼ぶ前にクラッシュ」という部分的失敗は残ります。これを構造で解くのが Outbox パターンです。
発想は単純で、「副作用そのもの」と「副作用を起こす意図」を別レイヤーに分けます。エージェントのツールは outbox テーブルに「これを実行してほしい」というレコードを書くだけ。実際の外部呼び出しは、別のワーカーが outbox を読んで行います。
# outbox_tool.py — Outbox による冪等な副作用登録
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
import asyncpg
@dataclass
class OutboxEntry :
idempotency_key: str
operation: str
payload: dict[ str , Any]
status: str # 'pending' | 'completed' | 'failed'
async def enqueue_operation (
conn: asyncpg.Connection,
idempotency_key: str ,
operation: str ,
payload: dict[ str , Any],
) -> tuple[OutboxEntry, bool ]:
"""outbox に冪等に書き込む。返り値 (エントリ, 新規作成か)。"""
# ON CONFLICT DO NOTHING で重複登録を一発で防ぐ
row = await conn.fetchrow(
"""
INSERT INTO outbox (idempotency_key, operation, payload, status, created_at)
VALUES ($1, $2, $3, 'pending', $4)
ON CONFLICT (idempotency_key) DO NOTHING
RETURNING idempotency_key, operation, payload, status
""" ,
idempotency_key, operation, payload, datetime.now(timezone.utc),
)
if row is not None :
return OutboxEntry( ** dict (row)), True
existing = await conn.fetchrow(
"SELECT idempotency_key, operation, payload, status FROM outbox WHERE idempotency_key=$1" ,
idempotency_key,
)
return OutboxEntry( ** dict (existing)), False
async def charge_payment_tool (conn, idempotency_key, customer_id, amount_jpy, invoice_id):
entry, created = await enqueue_operation(
conn, idempotency_key, "stripe_charge" ,
{ "customer_id" : customer_id, "amount_jpy" : amount_jpy, "invoice_id" : invoice_id},
)
if not created:
# 既存 = リトライ。状態だけ返してツールを正常終了させる
return { "status" : "already_enqueued" , "state" : entry.status, "key" : idempotency_key}
return { "status" : "enqueued" , "key" : idempotency_key}
# DDL(PostgreSQL)
# CREATE TABLE outbox (
# idempotency_key TEXT PRIMARY KEY,
# operation TEXT NOT NULL,
# payload JSONB NOT NULL,
# status TEXT NOT NULL,
# attempts INT NOT NULL DEFAULT 0,
# last_error TEXT,
# created_at TIMESTAMPTZ NOT NULL,
# completed_at TIMESTAMPTZ
# );
Outbox が効くのは、ツールが「意図を記録する」だけで完結し、外部 API の成否がツールの戻り値に影響しないからです。モデル側は enqueued か already_enqueued のどちらかを見て次の推論へ進める。これでエージェントループと外部 API のトランザクション境界が揃い、二重請求の余地が構造的に消えます。
後段のワーカーは pending を SELECT ... FOR UPDATE SKIP LOCKED で取り、処理して completed に更新します。Stripe には outbox.idempotency_key をそのまま Idempotency-Key として渡せば、ネットワーク再送にも二重で守られます。私は決済まわりは必ずこの形にしています。
パターン3: 軽量に済ませたいときのラッパー
Outbox を入れるほど重くなく、既存の DB 書き込みをそのまま冪等にしたい場面もあります。そのときはデコレータで包むのが手早いです。
# idempotent_wrapper.py — 既存の async 関数を冪等にする
from __future__ import annotations
import functools
import json
from typing import Awaitable, Callable
import redis.asyncio as redis
def idempotent (store: redis.Redis, * , ttl_seconds: int = 86400 , prefix: str = "idem" ):
"""第1引数を冪等性キーとして扱い、成功結果をキャッシュする。"""
def decorator (func: Callable[ ... , Awaitable[ dict ]]):
@functools.wraps (func)
async def wrapper (idempotency_key: str , * args, ** kwargs) -> dict :
cache_key = f " { prefix } : { idempotency_key } "
inflight = f " { cache_key } :inflight"
cached = await store.get(cache_key)
if cached is not None :
return { "cached" : True , ** json.loads(cached)}
# 処理中ロック(SET NX EX)。並行リクエストの二重実行を防ぐ
locked = await store.set(inflight, "1" , nx = True , ex = 300 )
if not locked:
return { "status" : "in_progress" , "key" : idempotency_key}
try :
result = await func(idempotency_key, * args, ** kwargs)
await store.set(cache_key, json.dumps(result), ex = ttl_seconds)
return { "cached" : False , ** result}
finally :
# 失敗は保存せず(リトライ可能に)、ロックだけ必ず外す
await store.delete(inflight)
return wrapper
return decorator
このパターンは軽い反面、いくつか割り切りがあります。Redis が落ちるとロックが取れずアプリが詰まるので、Sentinel か、エラー時にローカル辞書へ退避するフォールバックを足してください。TTL 経過後に同じキーで呼ばれると「キャッシュなし」と判定され再実行の余地が出るため、絶対に二重実行してはいけない決済は TTL を設けず永続 DB に置きます。そして処理中ロックの TTL は最大処理時間より長く——短いとロック切れで並行実行が始まります。私は単純なメール送信は Redis、決済は Outbox、と割り切って使い分けています。
Claude Agent SDK に組み込む
3つの部品を、SDK の @tool と MCP サーバー経由のツール登録に落とし込みます。session_id をクロージャで保持して、ツール内で冪等性キーを導出する形です。
# payment_agent.py
from typing import Any
from claude_agent_sdk import (
ClaudeAgentOptions, tool, create_sdk_mcp_server, query,
)
import asyncpg
from idempotency_key import stable_idempotency_key
from outbox_tool import charge_payment_tool
def build_payment_tools (session_id: str , conn: asyncpg.Connection):
@tool (
"charge_invoice" ,
"顧客の請求書を決済する。リトライされても二重請求は起きない。" ,
{ "invoice_id" : str , "customer_id" : str , "amount_jpy" : int },
)
async def charge_invoice (args: dict[ str , Any]) -> dict[ str , Any]:
key = stable_idempotency_key(
session_id = session_id,
tool_name = "charge_invoice" ,
logical_args = {
"invoice_id" : args[ "invoice_id" ],
"customer_id" : args[ "customer_id" ],
"amount_jpy" : args[ "amount_jpy" ],
},
)
result = await charge_payment_tool(
conn, key, args[ "customer_id" ], args[ "amount_jpy" ], args[ "invoice_id" ],
)
return { "content" : [{ "type" : "text" ,
"text" : f "決済登録: { result[ 'status' ] } (key= { key[: 16 ] } ...)" }]}
return [charge_invoice]
async def run_payment_agent (session_id: str , instruction: str ):
conn = await asyncpg.connect( "postgresql://localhost/myapp" )
try :
tools = build_payment_tools(session_id, conn)
server = create_sdk_mcp_server( name = "payments" , version = "1.0.0" , tools = tools)
options = ClaudeAgentOptions(
mcp_servers = { "payments" : server},
allowed_tools = [ "mcp__payments__charge_invoice" ],
system_prompt = "あなたは決済処理の担当です。請求書の指示に従って処理してください。" ,
)
async for message in query( prompt = instruction, options = options):
print (message)
finally :
await conn.close()
モデルがネットワーク障害で同じツールを二度出しても、outbox の主キー制約が二重登録を弾きます。後段のワーカーは 1 件の pending を 1 回だけ処理する。多層の防御がそれぞれ独立に効くので、どこか一段が抜けても全体は守られます。関連して、サーキットブレーカーやフォールバックはClaude API の本番レジリエンスパターン が相補的です。
つまずきやすい五点
実装で繰り返しハマった所を、教訓として残します。
一つ目はタイムスタンプをキーに混ぜることです。hash(f"{invoice_id}|{datetime.now()}") のように書くと、再試行のたびに別のキーになり冪等性が成立しません。時刻はペイロードには入れてよいが、キー生成には使わないこと。
二つ目はキーの TTL が短すぎることです。決済のように「2時間後の再送」が起こりうる処理で有効期限を 1 時間にすると、その隙間で二重実行が起きます。決済は最低 30 日、重要通知は 7 日、消えても再送してよいメールは 24 時間、と副作用の重さで階層化するのを推奨します。
三つ目は処理中ロックの TTL が処理時間より短いことです。外部 API が 30 秒かかるのに inflight を 10 秒にすると、ロック切れで並行実行が始まります。ロック TTL は最大処理時間の 3 倍を目安にし、完了時に必ず finally で外します。
四つ目はプロンプトで安易に再試行を促すことです。「失敗したら同じツールをやり直して」と書くと、モデルが違う引数で再試行しかねません。「失敗したら状態確認ツールで現状を見てから判断して」のように、確認を挟む指示にします。
五つ目はキーを平文でログに出すことです。キーから顧客 ID や金額が逆算できると、ログ流出時に PII リスクになります。私は SHA256 の先頭 32 文字だけ残し、元の引数はマスキングしてから出力しています。
本番で何を計測するか
冪等性が効いているかは、見えなければ判断できません。最低限この三つは用意してください。重複検知率(duplicate_detected / tool_call)は、急増したらエージェントが同じ操作を繰り返している兆候で、プロンプト設計かインフラ不安定を疑います。Outbox の滞留件数は、一定時間 pending のままのレコード数で、ワーカーの詰まりを即座に捉えます。キー衝突率は、異なる操作で同じキーが出る事故(正規化ロジックのバグ)で、通常ゼロであるべき指標です。
# metrics.py(Prometheus 例)
from prometheus_client import Counter, Gauge
tool_calls = Counter( "agent_tool_calls_total" , "tool calls" , [ "tool" ])
duplicates = Counter( "agent_duplicate_detected_total" , "duplicates" , [ "tool" ])
outbox_pending = Gauge( "outbox_pending_entries" , "pending outbox entries" )
tool_calls.labels( tool = "charge_invoice" ).inc()
if not created:
duplicates.labels( tool = "charge_invoice" ).inc()
私の運用では、重複検知率が通常の 3 倍を超えたら Slack 通知、Outbox 滞留が 1000 件を超えたら PagerDuty、という二段アラートにしています。あるエージェントで重複検知率が常時 5〜10% を示していたので調べたら、システムプロンプトに「失敗したら必ず再実行」と書いてあり、モデルが無意味に再試行していました。冪等性の計測は、こうした設計ミスの早期発見にも効きます。
最初の一歩
この記事をそのまま本番に使うなら、まず既存エージェントの「副作用を持つツール」を一つだけ棚卸しして、Outbox に置き換えるところから始めてください。全ツールを一度に冪等化しようとするとテストが追いつかず、リリースが止まります。決済・メール送信・在庫操作のうち最も事故リスクが高い一つを選び、そこにパターン2を当てる——これが 1 週間で終わる現実的なスコープです。非同期処理の全体像を掴むならClaude API の Webhook 非同期処理とエラー復旧 もあわせてどうぞ。
冪等性は地味ですが、エージェントが本番で長く生き残れるかを決める基礎体力です。一度レイヤーを作ってしまえば、新しいツールも安全に足していける。最初のコストは必ず元が取れます。