なぜ「エンタープライズ設計」が必要なのか
Vertex AI で Claude を動かすこと自体は、入門記事 で解説した通り比較的シンプルです。しかし本番環境でサービスを安定稼働させ、コストを抑えながらスケールさせるためには、単なる API 呼び出し以上の設計が必要になります。
プロンプトキャッシング :繰り返しのコンテキストを効率化してコストを大幅に削減
BigQuery 統合ログ :コンプライアンス・品質モニタリング・コスト分析
マルチモーダル処理 :画像・PDF・ドキュメントを含む複合的な入力処理
エージェント設計 :ツール呼び出しとマルチエージェントオーケストレーション
RAG(検索拡張生成) :企業内ナレッジベースとの統合
本番運用の設計パターン :リトライ・サーキットブレーカー・コスト管理
1. プロンプトキャッシングで API コストを最大 90% 削減する
キャッシングの仕組みを理解する
Claude にはプロンプトキャッシング 機能があり、長いシステムプロンプトやコンテキストを一度処理した後にキャッシュしておくことができます。同じキャッシュ済みコンテンツを含むリクエストが来た場合、キャッシュヒット料金(フル料金の約 10〜20%)のみが課金されます。
これは特に以下のユースケースで効果的です。
数千行のシステムプロンプト(ペルソナ定義・ルール・知識ベース)を毎回送信している場合
同一ドキュメントに対して異なる質問を繰り返す RAG システム
長いコードベースを繰り返し参照するコードアシスタント
実装:キャッシュコントロールを使ったシステムプロンプト
from anthropic import AnthropicVertex
client = AnthropicVertex( project_id = "your-project" , region = "asia-southeast1" )
# システムプロンプトのキャッシング
# beta ヘッダーが必要
system_prompt = """あなたは株式会社サンプルの顧客サポートエージェントです。
以下のルールに従って対応してください。
[製品カタログ - 全2,500製品の詳細情報]
製品ID: P001 - スマートウォッチ Pro X
価格: 38,000円
仕様: 心拍数モニター、GPS、防水5ATM、バッテリー7日間
...
[この部分が数千トークンに及ぶ場合、キャッシングが特に効果的]
[対応ポリシー]
1. 返品は購入後30日以内に受け付ける
2. 修理対応は平日9:00〜18:00
3. 緊急の場合は上位サポートにエスカレーション
...
"""
# ユーザーからの最初の質問
response1 = client.beta.messages.create(
model = "claude-sonnet-4-6" ,
max_tokens = 1024 ,
betas = [ "prompt-caching-2024-07-31" ],
system = [
{
"type" : "text" ,
"text" : system_prompt,
"cache_control" : { "type" : "ephemeral" } # キャッシュを有効化
}
],
messages = [
{ "role" : "user" , "content" : "P001 の製品について教えてください" }
]
)
print ( "キャッシュ統計:" , response1.usage)
# {'input_tokens': 2800, 'cache_creation_input_tokens': 2500, 'cache_read_input_tokens': 0, 'output_tokens': 180}
# 同じシステムプロンプトを使った2回目以降のリクエスト
# → cache_read_input_tokens が増加し、コストが大幅削減
response2 = client.beta.messages.create(
model = "claude-sonnet-4-6" ,
max_tokens = 1024 ,
betas = [ "prompt-caching-2024-07-31" ],
system = [
{
"type" : "text" ,
"text" : system_prompt,
"cache_control" : { "type" : "ephemeral" }
}
],
messages = [
{ "role" : "user" , "content" : "返品ポリシーを教えてください" }
]
)
print ( "キャッシュ統計(2回目):" , response2.usage)
# {'input_tokens': 300, 'cache_creation_input_tokens': 0, 'cache_read_input_tokens': 2500, 'output_tokens': 150}
# → システムプロンプト分のトークンがキャッシュヒット!
マルチターン会話でのキャッシング戦略
class CachedConversationManager :
"""プロンプトキャッシングを活用したマルチターン会話管理クラス"""
def __init__ (self, client: AnthropicVertex, system_prompt: str ):
self .client = client
self .system_prompt = system_prompt
self .conversation_history = []
self .total_cache_savings = 0
def chat (self, user_message: str ) -> str :
self .conversation_history.append({
"role" : "user" ,
"content" : user_message
})
response = self .client.beta.messages.create(
model = "claude-sonnet-4-6" ,
max_tokens = 2048 ,
betas = [ "prompt-caching-2024-07-31" ],
system = [
{
"type" : "text" ,
"text" : self .system_prompt,
"cache_control" : { "type" : "ephemeral" }
}
],
messages = self .conversation_history
)
assistant_message = response.content[ 0 ].text
self .conversation_history.append({
"role" : "assistant" ,
"content" : assistant_message
})
# コスト節約量の追跡
cache_read = response.usage.cache_read_input_tokens
if cache_read:
# キャッシュヒット分は通常料金の約 10% なので、90% 節約
self .total_cache_savings += cache_read * 0.9
print ( f "💰 キャッシュ節約: { cache_read } トークン" )
return assistant_message
# 使用例
client = AnthropicVertex( project_id = "your-project" , region = "asia-southeast1" )
manager = CachedConversationManager(client, system_prompt)
print (manager.chat( "スマートウォッチの防水性能を教えてください" ))
print (manager.chat( "修理に出すにはどうすればいいですか?" ))
print ( f "合計節約トークン数(換算): { manager.total_cache_savings :.0f } " )
2. BigQuery 統合ログによるコンプライアンスとモニタリング
ログ記録の要件とアーキテクチャ
エンタープライズ環境では、AI システムへの入出力ログを保存することが求められるケースが増えています。個人情報保護・内部不正防止・品質監査など、様々な目的に対応するために、Vertex AI ではリクエスト・レスポンスを BigQuery に自動記録する機能があります。
⚠️ 注意点 :この機能はリージョナルエンドポイントでのみ利用可能です。グローバルエンドポイント(region="global")ではログ記録が動作しません。
BigQuery ログの有効化
# Vertex AI SDK でのログ設定(Python SDK ではなく、Cloud Console または REST API で設定)
# 設定パス: Vertex AI → Settings → Request/Response Logging
# ログの保存先 BigQuery テーブルスキーマ(自動作成):
# - request_time: TIMESTAMP
# - response_time: TIMESTAMP
# - model: STRING
# - input_tokens: INT64
# - output_tokens: INT64
# - request_payload: JSON
# - response_payload: JSON
# - user_id: STRING (カスタムヘッダーから取得)
カスタムログ実装(BigQuery クライアント直接利用)
リアルタイムのより詳細なログが必要な場合は、BigQuery クライアントを使って独自のログ記録システムを実装します。
from anthropic import AnthropicVertex
from google.cloud import bigquery
from datetime import datetime
import json
import uuid
class LoggedClaudeClient :
"""BigQuery ログ付き Claude クライアント"""
def __init__ (self, project_id: str , region: str , bq_dataset: str , bq_table: str ):
self .client = AnthropicVertex( project_id = project_id, region = region)
self .bq_client = bigquery.Client( project = project_id)
self .table_ref = f " { project_id } . { bq_dataset } . { bq_table } "
def create_message (
self,
messages: list ,
model: str = "claude-sonnet-4-6" ,
max_tokens: int = 1024 ,
user_id: str = None ,
session_id: str = None ,
** kwargs
):
request_id = str (uuid.uuid4())
request_time = datetime.utcnow()
try :
response = self .client.messages.create(
model = model,
max_tokens = max_tokens,
messages = messages,
** kwargs
)
# 成功ログを BigQuery に書き込み
self ._log_to_bq({
"request_id" : request_id,
"request_time" : request_time.isoformat(),
"response_time" : datetime.utcnow().isoformat(),
"model" : model,
"user_id" : user_id,
"session_id" : session_id,
"input_tokens" : response.usage.input_tokens,
"output_tokens" : response.usage.output_tokens,
"status" : "success" ,
"messages_json" : json.dumps(messages, ensure_ascii = False ),
"response_text" : response.content[ 0 ].text,
"error_message" : None
})
return response
except Exception as e:
# エラーログも記録
self ._log_to_bq({
"request_id" : request_id,
"request_time" : request_time.isoformat(),
"response_time" : datetime.utcnow().isoformat(),
"model" : model,
"user_id" : user_id,
"session_id" : session_id,
"input_tokens" : 0 ,
"output_tokens" : 0 ,
"status" : "error" ,
"messages_json" : json.dumps(messages, ensure_ascii = False ),
"response_text" : None ,
"error_message" : str (e)
})
raise
def _log_to_bq (self, row: dict ):
"""非同期的に BigQuery にログを書き込む"""
try :
errors = self .bq_client.insert_rows_json( self .table_ref, [row])
if errors:
print ( f "BigQuery ログエラー: { errors } " )
except Exception as e:
print ( f "BigQuery 書き込み失敗(本処理には影響なし): { e } " )
# 使用例
logged_client = LoggedClaudeClient(
project_id = "your-project" ,
region = "asia-southeast1" ,
bq_dataset = "claude_logs" ,
bq_table = "messages"
)
response = logged_client.create_message(
messages = [{ "role" : "user" , "content" : "売上レポートを要約してください" }],
user_id = "user_123" ,
session_id = "session_abc"
)
BigQuery でのコスト・品質分析クエリ
-- 日別コスト推移
SELECT
DATE (request_time) as date ,
model,
COUNT ( * ) as request_count,
SUM (input_tokens) as total_input_tokens,
SUM (output_tokens) as total_output_tokens,
-- Sonnet 4.6 の料金(2026年4月時点の概算)
ROUND ( SUM (input_tokens) * 3 / 1000000 , 2 ) as estimated_input_cost_usd,
ROUND ( SUM (output_tokens) * 15 / 1000000 , 2 ) as estimated_output_cost_usd
FROM `your-project.claude_logs.messages`
WHERE DATE (request_time) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY )
GROUP BY date , model
ORDER BY date DESC ;
-- ユーザー別利用状況(上位10名)
SELECT
user_id,
COUNT ( * ) as request_count,
SUM (input_tokens + output_tokens) as total_tokens,
AVG (output_tokens) as avg_output_tokens
FROM `your-project.claude_logs.messages`
WHERE status = 'success'
GROUP BY user_id
ORDER BY total_tokens DESC
LIMIT 10 ;
3. マルチモーダル処理:画像・PDF・ドキュメント解析
画像解析の実装
Claude は画像を直接理解できます。Vertex AI 経由でも同様の機能が利用可能です。
import base64
from anthropic import AnthropicVertex
client = AnthropicVertex( project_id = "your-project" , region = "global" )
def analyze_image_from_file (image_path: str , question: str ) -> str :
"""ローカル画像ファイルを解析する"""
with open (image_path, "rb" ) as f:
image_data = base64.standard_b64encode(f.read()).decode( "utf-8" )
# 画像形式の判定
if image_path.endswith( ".png" ):
media_type = "image/png"
elif image_path.endswith(( ".jpg" , ".jpeg" )):
media_type = "image/jpeg"
elif image_path.endswith( ".webp" ):
media_type = "image/webp"
else :
media_type = "image/jpeg"
response = client.messages.create(
model = "claude-sonnet-4-6" ,
max_tokens = 2048 ,
messages = [
{
"role" : "user" ,
"content" : [
{
"type" : "image" ,
"source" : {
"type" : "base64" ,
"media_type" : media_type,
"data" : image_data
}
},
{
"type" : "text" ,
"text" : question
}
]
}
]
)
return response.content[ 0 ].text
# 使用例
result = analyze_image_from_file(
"monthly_report.png" ,
"この売上グラフから読み取れる主要な傾向と、改善が必要な領域を特定してください。"
)
print (result)
Google Cloud Storage からの画像処理
def analyze_gcs_image (gcs_uri: str , question: str , client: AnthropicVertex) -> str :
"""GCS 上の画像を URL で参照して解析"""
# GCS の公開URLまたは署名付きURLを使用
# gs://bucket/path → https://storage.googleapis.com/bucket/path
public_url = gcs_uri.replace( "gs://" , "https://storage.googleapis.com/" )
response = client.messages.create(
model = "claude-sonnet-4-6" ,
max_tokens = 2048 ,
messages = [
{
"role" : "user" ,
"content" : [
{
"type" : "image" ,
"source" : {
"type" : "url" ,
"url" : public_url
}
},
{ "type" : "text" , "text" : question}
]
}
]
)
return response.content[ 0 ].text
複数画像の一括処理(バッチ解析)
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict
def batch_analyze_images (
client: AnthropicVertex,
image_tasks: List[Dict],
max_workers: int = 5
) -> List[Dict]:
"""複数画像を並列処理で効率的に解析"""
results = []
def process_single (task):
try :
result = analyze_image_from_file(
task[ "image_path" ],
task[ "question" ]
)
return { "id" : task[ "id" ], "result" : result, "status" : "success" }
except Exception as e:
return { "id" : task[ "id" ], "error" : str (e), "status" : "error" }
with ThreadPoolExecutor( max_workers = max_workers) as executor:
futures = {executor.submit(process_single, task): task for task in image_tasks}
for future in as_completed(futures):
results.append(future.result())
return results
# 使用例:月次レポート画像を一括解析
tasks = [
{ "id" : "jan" , "image_path" : "reports/jan.png" , "question" : "月次KPIを抽出してください" },
{ "id" : "feb" , "image_path" : "reports/feb.png" , "question" : "月次KPIを抽出してください" },
{ "id" : "mar" , "image_path" : "reports/mar.png" , "question" : "月次KPIを抽出してください" },
]
results = batch_analyze_images(client, tasks)
for r in results:
print ( f " { r[ 'id' ] } : { r.get( 'result' , r.get( 'error' ))[: 100 ] } ..." )
4. ツール呼び出しとエージェント設計
ツール定義とオーケストレーション
Claude のツール呼び出し(Function Calling)を使うと、外部API・データベース・社内システムと連携したエージェントを構築できます。
from anthropic import AnthropicVertex
import json
import requests
client = AnthropicVertex( project_id = "your-project" , region = "global" )
# ツールの定義
tools = [
{
"name" : "get_product_info" ,
"description" : "製品IDから製品の詳細情報(価格・在庫・仕様)を取得します" ,
"input_schema" : {
"type" : "object" ,
"properties" : {
"product_id" : {
"type" : "string" ,
"description" : "製品ID(例: P001, P002)"
}
},
"required" : [ "product_id" ]
}
},
{
"name" : "check_inventory" ,
"description" : "指定した製品の現在の在庫数を確認します" ,
"input_schema" : {
"type" : "object" ,
"properties" : {
"product_id" : { "type" : "string" },
"warehouse" : {
"type" : "string" ,
"description" : "倉庫コード(tokyo, osaka, nagoya)" ,
"enum" : [ "tokyo" , "osaka" , "nagoya" ]
}
},
"required" : [ "product_id" ]
}
},
{
"name" : "create_order" ,
"description" : "発注を作成します。在庫確認後に使用してください。" ,
"input_schema" : {
"type" : "object" ,
"properties" : {
"product_id" : { "type" : "string" },
"quantity" : { "type" : "integer" },
"customer_id" : { "type" : "string" }
},
"required" : [ "product_id" , "quantity" , "customer_id" ]
}
}
]
# ツール実行関数(実際の処理)
def execute_tool (tool_name: str , tool_input: dict ) -> str :
"""ツール名と入力に基づいて実際の処理を実行"""
if tool_name == "get_product_info" :
# 実際は社内 API を呼び出す
return json.dumps({
"product_id" : tool_input[ "product_id" ],
"name" : "スマートウォッチ Pro X" ,
"price" : 38000 ,
"spec" : { "battery" : "7days" , "waterproof" : "5ATM" }
}, ensure_ascii = False )
elif tool_name == "check_inventory" :
return json.dumps({
"product_id" : tool_input[ "product_id" ],
"warehouse" : tool_input.get( "warehouse" , "tokyo" ),
"stock" : 42 ,
"available" : True
})
elif tool_name == "create_order" :
order_id = f "ORD- { tool_input[ 'product_id' ] } - { tool_input[ 'customer_id' ] } "
return json.dumps({
"order_id" : order_id,
"status" : "created" ,
"estimated_delivery" : "2026-04-10"
})
return json.dumps({ "error" : f "未知のツール: { tool_name } " })
def run_agent (user_message: str ) -> str :
"""エージェントループの実行"""
messages = [{ "role" : "user" , "content" : user_message}]
while True :
response = client.messages.create(
model = "claude-sonnet-4-6" ,
max_tokens = 2048 ,
tools = tools,
messages = messages
)
# ツール呼び出しがない場合は終了
if response.stop_reason == "end_turn" :
return response.content[ 0 ].text
# ツール呼び出しを処理
if response.stop_reason == "tool_use" :
# アシスタントの応答をメッセージに追加
messages.append({ "role" : "assistant" , "content" : response.content})
# 全てのツール呼び出しを処理
tool_results = []
for content_block in response.content:
if content_block.type == "tool_use" :
tool_result = execute_tool(content_block.name, content_block.input)
tool_results.append({
"type" : "tool_result" ,
"tool_use_id" : content_block.id,
"content" : tool_result
})
# ツール結果をメッセージに追加して次のターンへ
messages.append({ "role" : "user" , "content" : tool_results})
# 使用例
result = run_agent(
"顧客ID C-789 のために製品 P001 を3個発注してください。"
"在庫確認をしてから発注を実行してください。"
)
print (result)
5. RAG(検索拡張生成)との統合
Vertex AI Search との連携
from anthropic import AnthropicVertex
from google.cloud import discoveryengine_v1alpha as discoveryengine
class RAGWithVertexSearch :
"""Vertex AI Search + Claude による RAG 実装"""
def __init__ (
self,
claude_project_id: str ,
search_project_id: str ,
data_store_id: str ,
location: str = "global"
):
self .claude = AnthropicVertex(
project_id = claude_project_id,
region = "asia-southeast1"
)
self .search_client = discoveryengine.SearchServiceClient()
self .search_project = search_project_id
self .data_store_id = data_store_id
self .location = location
def search_documents (self, query: str , top_k: int = 5 ) -> list :
"""Vertex AI Search で関連ドキュメントを検索"""
serving_config = (
f "projects/ { self .search_project } /locations/ { self .location } "
f "/dataStores/ { self .data_store_id } /servingConfigs/default_config"
)
request = discoveryengine.SearchRequest(
serving_config = serving_config,
query = query,
page_size = top_k,
)
response = self .search_client.search(request)
documents = []
for result in response.results:
doc = result.document
documents.append({
"id" : doc.id,
"title" : doc.derived_struct_data.get( "title" , "" ),
"snippet" : doc.derived_struct_data.get( "snippets" , [{}])[ 0 ].get( "snippet" , "" ),
"link" : doc.derived_struct_data.get( "link" , "" )
})
return documents
def answer_with_rag (self, question: str ) -> dict :
"""RAG を使って質問に回答"""
# 1. 関連ドキュメントを検索
docs = self .search_documents(question)
if not docs:
return { "answer" : "関連するドキュメントが見つかりませんでした。" , "sources" : []}
# 2. 検索結果をコンテキストに組み込む
context = " \n\n " .join([
f "【ドキュメント: { doc[ 'title' ] } 】 \n{ doc[ 'snippet' ] } "
for doc in docs
])
# 3. Claude で回答を生成
response = self .claude.messages.create(
model = "claude-sonnet-4-6" ,
max_tokens = 2048 ,
system = """あなたは社内ナレッジベースに基づいて質問に回答するアシスタントです。
提供されたコンテキストに基づいて正確に回答してください。
コンテキストに情報がない場合は、その旨を明示してください。""" ,
messages = [
{
"role" : "user" ,
"content" : f "以下のコンテキストを参照して質問に回答してください。 \n\n "
f "=== コンテキスト === \n{ context }\n\n "
f "=== 質問 === \n{ question } "
}
]
)
return {
"answer" : response.content[ 0 ].text,
"sources" : [{ "title" : d[ "title" ], "link" : d[ "link" ]} for d in docs]
}
6. 本番運用のための設計パターン
サーキットブレーカーパターン
from enum import Enum
from datetime import datetime, timedelta
import threading
class CircuitState ( Enum ):
CLOSED = "closed" # 正常
OPEN = "open" # 遮断中
HALF_OPEN = "half_open" # 試験中
class CircuitBreaker :
"""Claude API 呼び出し用サーキットブレーカー"""
def __init__ (
self,
failure_threshold: int = 5 ,
recovery_timeout: int = 60 ,
half_open_max_calls: int = 3
):
self .failure_threshold = failure_threshold
self .recovery_timeout = recovery_timeout
self .half_open_max_calls = half_open_max_calls
self .state = CircuitState. CLOSED
self .failure_count = 0
self .last_failure_time = None
self .half_open_calls = 0
self ._lock = threading.Lock()
def call (self, func, * args, ** kwargs):
with self ._lock:
if self .state == CircuitState. OPEN :
if datetime.now() - self .last_failure_time > timedelta( seconds = self .recovery_timeout):
self .state = CircuitState. HALF_OPEN
self .half_open_calls = 0
else :
raise Exception ( "サーキットブレーカーがオープン状態です。しばらく後に再試行してください。" )
try :
result = func( * args, ** kwargs)
with self ._lock:
if self .state == CircuitState. HALF_OPEN :
self .half_open_calls += 1
if self .half_open_calls >= self .half_open_max_calls:
self .state = CircuitState. CLOSED
self .failure_count = 0
elif self .state == CircuitState. CLOSED :
self .failure_count = 0
return result
except Exception as e:
with self ._lock:
self .failure_count += 1
self .last_failure_time = datetime.now()
if self .failure_count >= self .failure_threshold:
self .state = CircuitState. OPEN
print ( f "⚡ サーキットブレーカーがオープンしました: { e } " )
raise
# 使用例
breaker = CircuitBreaker( failure_threshold = 5 , recovery_timeout = 60 )
client = AnthropicVertex( project_id = "your-project" , region = "global" )
def safe_claude_call (messages):
return breaker.call(
client.messages.create,
model = "claude-sonnet-4-6" ,
max_tokens = 1024 ,
messages = messages
)
コスト管理:トークン予算の実装
class TokenBudgetManager :
"""組織・ユーザー単位のトークン予算管理"""
def __init__ (self, daily_budget_per_user: int = 100000 ):
self .daily_budget = daily_budget_per_user
self .usage_tracker = {} # user_id → {date: tokens}
def check_and_deduct (self, user_id: str , estimated_tokens: int ) -> bool :
"""予算チェックと消費記録"""
today = datetime.now().date().isoformat()
if user_id not in self .usage_tracker:
self .usage_tracker[user_id] = {}
current_usage = self .usage_tracker[user_id].get(today, 0 )
if current_usage + estimated_tokens > self .daily_budget:
remaining = self .daily_budget - current_usage
raise Exception (
f "本日のトークン予算( { self .daily_budget :, } )を超過します。"
f "残り予算: { remaining :, } トークン"
)
self .usage_tracker[user_id][today] = current_usage + estimated_tokens
return True
def get_usage_summary (self, user_id: str ) -> dict :
today = datetime.now().date().isoformat()
used = self .usage_tracker.get(user_id, {}).get(today, 0 )
return {
"user_id" : user_id,
"date" : today,
"tokens_used" : used,
"tokens_remaining" : self .daily_budget - used,
"usage_percentage" : round (used / self .daily_budget * 100 , 1 )
}
全体を振り返って:エンタープライズ導入のロードマップ
Vertex AI × Claude のエンタープライズ統合は、段階的に進めることをお勧めします。
フェーズ 1(〜2週間):基盤構築
Vertex AI API の有効化と IAM 設計
Model Garden でのモデル有効化
基本的な API 呼び出しとエラーハンドリングの実装
フェーズ 2(〜1ヶ月):コスト最適化
プロンプトキャッシングの導入(コスト削減効果が高いユースケースから優先)
トークン予算管理の実装
BigQuery ログによるコスト可視化
フェーズ 3(〜2ヶ月):高度な機能統合
マルチモーダル処理(画像・ドキュメント解析)
ツール呼び出し(エージェント機能)
RAG による社内ナレッジベース連携
フェーズ 4(継続的):スケールと最適化
サーキットブレーカーなどの信頼性パターン実装
Provisioned Throughput の検討
モデルバージョン管理と A/B テスト
本記事で紹介したパターンを組み合わせることで、Google Cloud の強固なインフラ上に、コスト効率の高い本番グレードの Claude 統合を実現できます。各チームの要件に合わせて、必要なコンポーネントから順番に取り入れていただければ幸いです。