Claude API を Webhook や非同期処理パイプラインに組み込むと、同期呼び出しとは異なるエラーパターンが次々と現れます。「Webhook が届いたのに処理が実行されない」「同じメッセージが2回処理された」「Claude が応答する前にジョブがタイムアウトした」――こういった問題は、適切なリカバリー戦略があれば確実に防げます。
非同期処理における Claude API のエラー分類
Claude API を非同期で呼び出す際のエラーは、大きく4カテゴリに分類できます。
一時的なエラー(Transient Errors) :自動リトライで解決できるもの。429 Too Many Requests(レート制限)、500/502/503/529 サーバーエラー、ネットワークタイムアウトなどがこれに当たります。
恒久的なエラー(Permanent Errors) :リトライしても解決しないもの。400 Bad Request(リクエスト形式の誤り)、401 Unauthorized(APIキー無効)、404 Not Found(モデル名のタイポ等)が該当します。
ビジネスロジックエラー :Claude の応答内容に起因するもの。期待した構造化データが返ってこない、コンテンツポリシーによる refusal、不完全な生成(途中で止まった)などが挙げられます。
インフラエラー :処理基盤に起因するもの。Webhook 配信の失敗、キューのオーバーフロー、処理時間超過によるワーカーの強制終了などがあります。
この4カテゴリを意識することで、どの層で対処すべきかが明確になります。
Webhook 配信エラーの3大パターンと対処
パターン1:配信タイムアウト(At-least-once 配信の落とし穴)
多くの Webhook サービスは「少なくとも1回配信する(at-least-once delivery)」保証を持ちます。エンドポイントが一定時間内にレスポンスを返さないと、再送が発生します。Claude API の呼び出しに時間がかかる場合、これが重複処理の原因になります。
悪い実装例(Webhook ハンドラーで直接 Claude API を呼ぶ):
from flask import Flask, request
app = Flask( __name__ )
@app.route ( '/webhook' , methods = [ 'POST' ])
def handle_webhook ():
data = request.json
# ❌ Webhookハンドラーで直接呼び出すと最大30秒かかり、
# Webhookサービスが再送(重複処理)を引き起こす
result = call_claude_api(data[ 'message' ])
return { 'status' : 'ok' , 'result' : result}
良い実装例(即時受付→非同期処理):
from flask import Flask, request
import redis
import json
import uuid
app = Flask( __name__ )
redis_client = redis.Redis( host = 'localhost' , port = 6379 )
@app.route ( '/webhook' , methods = [ 'POST' ])
def handle_webhook ():
data = request.json
job_id = str (uuid.uuid4())
# 冪等キーで重複チェック
idempotency_key = data.get( 'idempotency_key' ) or data.get( 'event_id' )
if idempotency_key:
existing = redis_client.get( f 'processed: { idempotency_key } ' )
if existing:
return { 'status' : 'already_processed' , 'job_id' : existing.decode()}
# ✅ キューに積んで即座に202を返す
redis_client.rpush( 'claude_jobs' , json.dumps({
'job_id' : job_id,
'idempotency_key' : idempotency_key,
'payload' : data
}))
if idempotency_key:
redis_client.setex( f 'processed: { idempotency_key } ' , 86400 , job_id)
return { 'status' : 'accepted' , 'job_id' : job_id}, 202
パターン2:重複処理と冪等性の確保
ネットワーク障害や再送ポリシーにより、同じ Webhook が複数回届くことは避けられません。Claude API の呼び出しコストを考えると、重複処理は財務的にも機能的にも問題になります。
冪等性を保証するワーカー実装:
import anthropic
import redis
import json
import logging
client = anthropic.Anthropic()
redis_client = redis.Redis( host = 'localhost' , port = 6379 )
logger = logging.getLogger( __name__ )
def process_job (job_data: dict ) -> dict :
"""冪等性を保証するジョブ処理"""
job_id = job_data[ 'job_id' ]
idempotency_key = job_data.get( 'idempotency_key' )
# 処理結果キャッシュの確認
result_key = f 'result: { idempotency_key or job_id } '
cached_result = redis_client.get(result_key)
if cached_result:
logger.info( f "キャッシュ済み結果を返します: { job_id } " )
return json.loads(cached_result)
# 処理中フラグのセット(重複実行防止)
lock_key = f 'lock: { idempotency_key or job_id } '
lock_acquired = redis_client.set(lock_key, '1' , ex = 300 , nx = True )
if not lock_acquired:
logger.warning( f "既に処理中: { job_id } " )
return { 'status' : 'in_progress' }
try :
response = call_claude_with_retry(job_data[ 'payload' ])
result = { 'status' : 'success' , 'job_id' : job_id, 'response' : response}
# 結果をキャッシュ(24時間)
redis_client.setex(result_key, 86400 , json.dumps(result))
return result
except Exception as e:
logger.error( f "処理失敗: { job_id } , エラー: { e } " )
raise
finally :
redis_client.delete(lock_key)
パターン3:大量受信時のバックプレッシャー制御
Claude API にはレート制限があります。Webhook が大量に届いた際に全リクエストを同時に処理しようとすると、429 エラーが頻発します。Token Bucket アルゴリズムでスロットリングを実装しましょう。
import time
import threading
class TokenBucket :
"""Claude APIのレート制限に合わせたトークンバケット"""
def __init__ (self, tokens_per_minute: int ):
self .capacity = tokens_per_minute
self .tokens = tokens_per_minute
self .last_refill = time.time()
self .lock = threading.Lock()
def consume (self, tokens: int = 1 ) -> bool :
with self .lock:
now = time.time()
elapsed = now - self .last_refill
self .tokens = min ( self .capacity, self .tokens + elapsed * ( self .capacity / 60 ))
self .last_refill = now
if self .tokens >= tokens:
self .tokens -= tokens
return True
return False
def wait_and_consume (self, timeout: float = 300 ) -> bool :
start = time.time()
while time.time() - start < timeout:
if self .consume():
return True
time.sleep( 0.1 )
return False
rate_limiter = TokenBucket( tokens_per_minute = 60 )
def rate_limited_worker ():
while True :
job = redis_client.blpop( 'claude_jobs' , timeout = 1 )
if not job:
continue
job_data = json.loads(job[ 1 ])
if not rate_limiter.wait_and_consume():
redis_client.rpush( 'claude_jobs_dead' , json.dumps({
** job_data,
'failure_reason' : 'rate_limit_timeout' ,
'failed_at' : time.time()
}))
continue
try :
process_job(job_data)
except Exception as e:
handle_job_failure(job_data, e)
エクスポネンシャルバックオフ付きリトライの実装
Claude API の一時的なエラー(429・5xx)には、指数的なバックオフでリトライすることが推奨されます。
import anthropic
import time
import random
import logging
from typing import Optional
logger = logging.getLogger( __name__ )
def call_claude_with_retry (
payload: dict ,
max_retries: int = 5 ,
base_delay: float = 1.0 ,
max_delay: float = 60.0
) -> Optional[ str ]:
"""エクスポネンシャルバックオフ付きリトライ"""
client = anthropic.Anthropic()
PERMANENT_ERROR_CODES = { 400 , 401 , 403 , 404 }
for attempt in range (max_retries + 1 ):
try :
response = client.messages.create(
model = payload.get( 'model' , 'claude-sonnet-4-6' ),
max_tokens = payload.get( 'max_tokens' , 2048 ),
messages = payload[ 'messages' ]
)
return response.content[ 0 ].text
except anthropic.RateLimitError as e:
if attempt >= max_retries:
raise
retry_after = e.response.headers.get( 'retry-after' )
wait_time = float (retry_after) if retry_after else min (
base_delay * ( 2 ** attempt) + random.uniform( 0 , 1 ), max_delay
)
logger.warning( f "レート制限: { wait_time :.1f } 秒後にリトライ (試行 { attempt + 1 } / { max_retries } )" )
time.sleep(wait_time)
except anthropic.APIStatusError as e:
if e.status_code in PERMANENT_ERROR_CODES :
logger.error( f "恒久的エラー { e.status_code } : リトライ不可" )
raise
if attempt >= max_retries:
raise
wait_time = min (base_delay * ( 2 ** attempt) + random.uniform( 0 , 1 ), max_delay)
logger.warning( f "サーバーエラー { e.status_code } : { wait_time :.1f } 秒後にリトライ" )
time.sleep(wait_time)
except anthropic.APIConnectionError:
if attempt >= max_retries:
raise
wait_time = min (base_delay * ( 2 ** attempt), max_delay)
logger.warning( f "接続エラー: { wait_time :.1f } 秒後にリトライ" )
time.sleep(wait_time)
return None
デッドレターキュー(DLQ)の設計
すべてのリトライが尽きたジョブは、デッドレターキューに移動してアラートと手動確認を可能にします。
import time
def handle_job_failure (job_data: dict , error: Exception , max_retries: int = 3 ):
"""失敗ジョブのハンドリングとDLQ移動"""
retry_count = job_data.get( 'retry_count' , 0 )
if retry_count < max_retries:
delay = 2 ** retry_count # 1, 2, 4秒
retry_job = {
** job_data,
'retry_count' : retry_count + 1 ,
'last_error' : str (error),
'retry_after' : time.time() + delay
}
redis_client.zadd( 'claude_jobs_delayed' ,
{json.dumps(retry_job): retry_job[ 'retry_after' ]})
logger.info( f "リトライキューに追加: { job_data[ 'job_id' ] } ( { retry_count + 1 } 回目)" )
else :
dead_job = {
** job_data,
'final_error' : str (error),
'failed_at' : time.time(),
'total_attempts' : retry_count + 1
}
redis_client.rpush( 'claude_jobs_dead' , json.dumps(dead_job))
send_alert(
f "[Claude API] ジョブが完全に失敗: { job_data[ 'job_id' ] }\n "
f "エラー: { error }\n 試行回数: { retry_count + 1 } "
)
logger.error( f "DLQに移動: { job_data[ 'job_id' ] } " )
サーキットブレーカーパターンの実装
Claude API が不安定な状態のとき、連続してリトライすることは状況を悪化させます。サーキットブレーカーを実装することで、障害を早期検出して処理を一時停止できます。
from enum import Enum
import threading
class CircuitState ( Enum ):
CLOSED = "closed" # 正常:リクエストを通す
OPEN = "open" # 異常:リクエストをブロック
HALF_OPEN = "half_open" # 回復確認中
class ClaudeAPICircuitBreaker :
def __init__ (self, failure_threshold = 5 , recovery_timeout = 60.0 , success_threshold = 2 ):
self .failure_threshold = failure_threshold
self .recovery_timeout = recovery_timeout
self .success_threshold = success_threshold
self .state = CircuitState. CLOSED
self .failure_count = 0
self .success_count = 0
self .last_failure_time = None
self .lock = threading.Lock()
def call (self, func, * args, ** kwargs):
with self .lock:
if self .state == CircuitState. OPEN :
if time.time() - self .last_failure_time > self .recovery_timeout:
self .state = CircuitState. HALF_OPEN
self .success_count = 0
else :
raise Exception ( "サーキットブレーカーがOPEN状態: Claude APIへのアクセスをブロック中" )
try :
result = func( * args, ** kwargs)
with self .lock:
if self .state == CircuitState. HALF_OPEN :
self .success_count += 1
if self .success_count >= self .success_threshold:
self .state = CircuitState. CLOSED
self .failure_count = 0
logger.info( "サーキットブレーカー: CLOSEDに回復" )
elif self .state == CircuitState. CLOSED :
self .failure_count = 0
return result
except Exception as e:
with self .lock:
self .failure_count += 1
self .last_failure_time = time.time()
if ( self .state == CircuitState. CLOSED and
self .failure_count >= self .failure_threshold):
self .state = CircuitState. OPEN
logger.error( f "サーキットブレーカーがOPENに: { self .failure_count } 回連続失敗" )
elif self .state == CircuitState. HALF_OPEN :
self .state = CircuitState. OPEN
raise
circuit_breaker = ClaudeAPICircuitBreaker( failure_threshold = 5 , recovery_timeout = 120.0 )
Graceful Degradation:障害時のフォールバック設計
Claude API が利用できない状況でも、サービスを完全には止めないためのフォールバック戦略を用意しておく点が肝心です。優先度は「Claude API → レスポンスキャッシュ → 事前定義フォールバック」の順です。
import hashlib
class GracefulClaudeHandler :
"""Claude API 障害時のグレースフルデグラデーション"""
def __init__ (self):
self .circuit_breaker = ClaudeAPICircuitBreaker()
self .fallback_responses = {
'summarize' : 'サービスが一時的に利用できないため、要約を提供できません。後ほどお試しください。' ,
'analyze' : '分析機能は現在メンテナンス中です。' ,
'default' : 'ご不便をおかけして申し訳ありません。現在サービスが混み合っています。'
}
def process (self, task_type: str , payload: dict ) -> dict :
# 1. キャッシュを確認
cache_key = hashlib.md5(json.dumps(payload, sort_keys = True ).encode()).hexdigest()
cached = redis_client.get( f 'response_cache: { cache_key } ' )
if cached:
return { 'source' : 'cache' , 'result' : json.loads(cached)}
# 2. Claude API を試みる(サーキットブレーカー経由)
try :
result = self .circuit_breaker.call(call_claude_with_retry, payload)
redis_client.setex( f 'response_cache: { cache_key } ' , 3600 , json.dumps(result))
return { 'source' : 'claude_api' , 'result' : result}
except Exception as e:
logger.error( f "Claude API 失敗: { e } " )
# 3. フォールバックレスポンスを返す
fallback = self .fallback_responses.get(task_type, self .fallback_responses[ 'default' ])
return { 'source' : 'fallback' , 'result' : fallback, 'degraded' : True }
監視とアラートの設定
健全な非同期処理システムには、適切な可観測性が欠かせません。最低限、以下のメトリクスをモニタリングすることをお勧めします。
エラー率(全ジョブのうち最終的に失敗した割合)は 10% を超えたらアラートを発火させましょう。DLQ の蓄積件数は 10 件を超えたら即時確認が必要です。平均処理時間が 30 秒を超えている場合は、Claude API の応答が遅くなっているサインです。
def check_and_alert (metrics: dict ):
if metrics[ 'error_rate' ] > 0.1 :
send_alert( f "⚠️ Claude API エラー率が高い: { metrics[ 'error_rate' ] :.1% } " )
if metrics[ 'dlq_count' ] > 10 :
send_alert( f "🚨 DLQ に { metrics[ 'dlq_count' ] } 件の失敗ジョブが蓄積" )
if metrics[ 'avg_processing_time' ] > 30 :
send_alert( f "⏱️ Claude API の平均応答時間が遅い: { metrics[ 'avg_processing_time' ] :.1f } 秒" )
全体を振り返って:本番環境で実装すべき7つのパターン
Claude API を Webhook・非同期処理に組み込む際に実装しておきたい設計パターンをまとめます。
**即時対応(優先度:高)**として、まず Webhook の即時受付(Webhook ハンドラーでは Claude API を呼ばず、キューに積んで即座に 202 を返す)、次に冪等性の保証(処理前にキャッシュを確認し、重複実行を防止する)を実装してください。
**インフラ設計(優先度:高)**として、エクスポネンシャルバックオフ(一時的エラーは指数的バックオフでリトライ)とデッドレターキュー(最大リトライを超えたジョブは DLQ に移動してアラートを発火)が必要です。
**高可用性(優先度:中)**として、サーキットブレーカー(連続失敗時に回路を開いて連鎖障害を防ぐ)と Graceful Degradation(Claude API 障害時もサービスを完全に止めない)を加えましょう。
**運用(優先度:中)**として、エラー率・処理時間・DLQ 件数を継続的に監視するメトリクス収集とアラートが大切です。
これらを順番に実装していくことで、突発的なエラーや障害があっても安定してサービスを提供できる、本番品質の Claude API 連携を構築できます。少し手間はかかりますが、「夜中にアラートで起こされない安心感」はプライスレスです。ぜひ参考にしていただければ幸いです。