最初に Code Execution ツールでデータ集計を任せたとき、私は同じ CSV を毎リクエストでアップロードし直していました。抽出して、整形して、集計して、グラフにする。工程を分けてリクエストを重ねるたびに、前のリクエストで作ったはずの中間ファイルが消えていて、また一からやり直しになる。個人開発で複数サイトの数値を自動でまとめている身としては、この「毎回ゼロから」が地味に実行時間を食っていることに、請求の内訳を見て気づきました。
原因は単純で、リクエストごとに別のコンテナが立ち上がっていたからです。Code Execution のコンテナは container パラメータで明示的に再利用しない限り、リクエストをまたいで状態を共有しません。逆に言えば、前のレスポンスが返してきた container ID を次のリクエストへ渡すだけで、作ったファイルも展開済みのデータも、そのまま引き継げます。今日はこの「コンテナを跨いで状態を保つ」設計と、その裏にある課金の実像、そして避けて通れない失効の扱いを、実装コードと一緒に整理します。
コンテナIDを渡すだけで、作ったファイルが次のリクエストに残る
仕組み自体はとても素直です。レスポンスには container オブジェクトが含まれていて、その id を次のリクエストの container パラメータに指定すると、同じワークスペースが再利用されます。
import anthropic
client = anthropic.Anthropic() # APIキーは環境変数 ANTHROPIC_API_KEY から読み込まれます
# 1回目: ファイルを作って /tmp に保存する
response1 = client.messages.create(
model = "claude-opus-4-8" ,
max_tokens = 4096 ,
messages = [
{
"role" : "user" ,
"content" : "乱数を1つ生成して '/tmp/seed.txt' に書き込んでください" ,
}
],
tools = [{ "type" : "code_execution_20250825" , "name" : "code_execution" }],
)
# レスポンスから container ID を取り出す
container_id = response1.container.id
# 2回目: 同じコンテナを再利用して、さっき書いたファイルを読む
response2 = client.messages.create(
container = container_id, # ← これだけで状態が引き継がれます
model = "claude-opus-4-8" ,
max_tokens = 4096 ,
messages = [
{
"role" : "user" ,
"content" : "'/tmp/seed.txt' の値を読み、その2乗を計算してください" ,
}
],
tools = [{ "type" : "code_execution_20250825" , "name" : "code_execution" }],
)
print (response2)
ポイントは response1.container.id を保持して、2回目の container= に渡しているところだけです。これを忘れると、2回目は新しいコンテナで立ち上がり、/tmp/seed.txt は「そんなファイルはありません」になります。私が最初にハマったのは、まさにここでした。
何が「持ち越せて」何が「消える」のか
再利用と聞くと「全部そのまま続きから」と思いがちですが、引き継がれるものとそうでないものは分けて理解しておく必要があります。
ワークスペース上のファイルは残ります。/tmp や作業ディレクトリに書き出したものは、同じ container ID を指定する限りそのまま見えます。これが再利用の主役です。一方、会話のコンテキスト(messages 配列)は別物 です。コンテナを再利用しても、前のやり取りの内容を Claude が覚えているわけではありません。状態は「ファイルとして残っている」だけなので、2回目のリクエストでは「どのファイルに何が入っているか」を改めて伝える必要があります。
もう一つ注意したいのが、Python の REPL 上の変数(メモリ上の状態)です。これは基本のツールバージョンでは引き継がれません。インメモリの変数まで保持したい場合は、REPL 状態の永続化に対応した新しいツールバージョン(code_execution_20260120 以降、Opus 4.5+ / Sonnet 4.5+ で利用可)が必要になります。ただ実務では、変数を抱え込むより節目ごとに中間結果をファイルへ書き出す 方が、失効や再実行に強い設計になります。私はこの工程を、出力を必ずファイルへ落とす形に統一することを推奨します。実体験として、その方が失効や再実行にずっと強いからです。
対象 同じコンテナ再利用で引き継がれるか
ワークスペースに書き出したファイル(/tmp 等) 引き継がれる
Files API でマウントした入力ファイル コンテナ上に残る(再マウント不要)
Python の REPL 変数(メモリ状態) 基本版では引き継がれない(20260120 以降で対応)
会話の文脈(messages の中身) 引き継がれない(毎回伝え直す)
課金の実像 — 実行時間課金と「ファイル同梱の落とし穴」
再利用を語る前に、Code Execution の課金がトークンとは別建てだという点を押さえておきます。Web 検索や Web フェッチと併用する場合は追加料金がかかりませんが、単体で使うときは実行時間(コンテナの稼働時間)で課金 されます。ここに、知らないと損をする落とし穴がいくつかあります。
項目 内容
課金単位 コンテナの実行時間(トークンとは別建て)
最小課金 1リクエストあたり最低5分
無料枠 組織あたり月 1,550 時間
超過分 コンテナ1台あたり 1時間 0.05 ドル
見落としがちな点 リクエストにファイルを同梱すると、ツールが呼ばれなくてもプリロードのため実行時間が課金される
最後の行が一番大事です。入力ファイルを付けた時点で、Claude がコードを実行しなくても、コンテナへの読み込みのために実行時間が発生します。 「念のため毎回 CSV を同梱しておこう」という運用は、コードが走らない軽い質問でもコストを発生させ続けるということです。
ここでコンテナ再利用が効いてきます。大きな入力ファイルを毎リクエストで同梱し直す代わりに、最初の1回だけマウントして中間結果をワークスペースに残し、以降は container ID を渡して同梱を省く 。最小課金が5分単位であることを考えると、工程を細切れにして毎回新しいコンテナを立てるより、1つのコンテナで連続して片付ける方が無駄が少なくなります。私の集計ジョブでは、入力同梱を初回だけにしてからは、実行時間の内訳がはっきり減りました。
多段パイプラインを1つのコンテナで回す
抽象論だけだと掴みづらいので、実際に「抽出 → 整形 → 集計 → 出力」を1つのコンテナで通す形にしてみます。各工程の成果物はファイルとして残し、次の工程はそれを読む、という流れです。
import anthropic
client = anthropic.Anthropic()
TOOLS = [{ "type" : "code_execution_20250825" , "name" : "code_execution" }]
MODEL = "claude-opus-4-8"
def step (container_id, instruction):
"""同じコンテナを使い回しながら1工程を実行し、container ID を返す"""
kwargs = dict ( model = MODEL , max_tokens = 4096 , tools = TOOLS ,
messages = [{ "role" : "user" , "content" : instruction}])
if container_id:
kwargs[ "container" ] = container_id
resp = client.messages.create( ** kwargs)
return resp.container.id, resp
# 工程1: 入力を読み込み、抽出結果を中間ファイルに保存(初回だけ入力を同梱する想定)
cid, _ = step( None ,
"売上の生データを読み込み、必要列だけ抽出して '/tmp/extracted.parquet' に保存してください" )
# 工程2: 整形(前工程の中間ファイルを読む。入力の再同梱はしない)
cid, _ = step(cid,
"'/tmp/extracted.parquet' を読み、欠損補完と型変換をして '/tmp/clean.parquet' に保存してください" )
# 工程3: 集計
cid, _ = step(cid,
"'/tmp/clean.parquet' を月次で集計し '/tmp/summary.csv' に保存してください" )
# 工程4: 出力(グラフ生成)
cid, final = step(cid,
"'/tmp/summary.csv' から月次推移の折れ線グラフを作り 'report.png' として保存してください" )
print ( "pipeline container:" , cid)
step() が container_id を受け取って、あれば container= に渡すだけの薄いラッパーです。工程2以降は入力データを同梱していません。すでにコンテナの中に extracted.parquet があるからです。こうしておくと、各工程の指示は「どのファイルを読んで、どのファイルに書くか」だけに集中でき、Claude 側も迷いにくくなります。
設計上のコツは、工程の境界を必ずファイルで区切る ことです。中間結果がファイルとして残っていれば、ある工程だけ作り直したいときに、その工程の入力ファイルから再開できます。全部を1回のリクエストに詰め込むと、90秒のセル実行上限(後述)にぶつかったときに最初からやり直しになりがちです。
container_expired を握りつぶさない
再利用は便利ですが、コンテナは永遠には生きていません。**コンテナは作成から30日で失効します。**また、失効したコンテナを指定すると、ツール結果として container_expired エラーコードが返ります。長時間放置したジョブや、翌週に続きを動かすバッチでは、ここを必ず想定しておく必要があります。
やってはいけないのは、container_expired を握りつぶして「成功扱い」で先へ進むことです。中間ファイルが無いコンテナで集計を続ければ、空のレポートが静かに出来上がります。私が無人パイプラインで一番怖いのは、この「エラーにならずに間違った結果が出る」パターンです。
本番運用でこの失敗を回避する基本は、container ID を絶対の前提にしないことです。中間ファイルはあくまでキャッシュとみなし、失効していたらソース(元の入力)から作り直せる ようにしておきます。
import anthropic
client = anthropic.Anthropic()
TOOLS = [{ "type" : "code_execution_20250825" , "name" : "code_execution" }]
MODEL = "claude-opus-4-8"
def has_container_expired (resp):
"""レスポンス内に container_expired エラーが含まれていないか確認する"""
for block in resp.content:
content = getattr (block, "content" , None )
err = getattr (content, "error_code" , None )
if err == "container_expired" :
return True
return False
def run_summary (container_id, source_instruction, summary_instruction):
# まずは既存コンテナで続きをやろうとする
if container_id:
resp = client.messages.create(
container = container_id, model = MODEL , max_tokens = 4096 , tools = TOOLS ,
messages = [{ "role" : "user" , "content" : summary_instruction}],
)
if not has_container_expired(resp):
return resp.container.id, resp
# 失効していたら握りつぶさず、作り直しに倒す
print ( "container_expired を検知。ソースから再構築します。" )
# コンテナが無い/失効した場合は、元の入力から作り直す
resp = client.messages.create(
model = MODEL , max_tokens = 4096 , tools = TOOLS ,
messages = [{ "role" : "user" , "content" : source_instruction}],
)
return resp.container.id, resp
この形にしておけば、失効はエラーではなく「キャッシュミス」として扱えます。30日という猶予は長いようでいて、月をまたぐ運用や、たまにしか動かさないジョブでは普通に踏みます。container_expired を明示的に拾って分岐する一手間が、無音の劣化を防いでくれます。
90秒のセル上限と再利用を組み合わせる
もう一つ知っておきたいのが、各セルの実行には90秒の壁時計上限がある点です(超過すると detection_timeout 系の結果が返ります)。重い処理を1セルに詰め込むと、ここで切られます。
コンテナ再利用は、この上限とも相性が良い組み合わせです。重い処理を「90秒で終わる粒度」に分割し、各ステップの結果をファイルに書き出しながら、同じコンテナで続けていく。たとえば大量レコードの集計なら、チャンクごとに部分集計を /tmp/partial_0001.parquet のように保存し、最後にそれらを統合する、という分け方ができます。状態がコンテナ内のファイルとして残るからこそ、こうした分割が成立します。
# 概念例: 重い集計を90秒で切られない粒度に分けて同じコンテナで続ける
cid, _ = step( None ,
"巨大なログを 50万行ごとのチャンクに分け、各チャンクを部分集計して "
"'/tmp/partial_<連番>.parquet' に保存してください。1チャンクずつ進めて構いません" )
cid, _ = step(cid,
"'/tmp/partial_*.parquet' をすべて読み込んで統合し、"
"最終結果を '/tmp/final.parquet' に保存してください" )
分割した部分集計がコンテナ内に残るので、もし途中のチャンクで詰まっても、残っている partial_*.parquet から再開できます。すべてを1リクエストに押し込まないことが、結果的に堅牢さにつながります。
まず試すなら、いま毎リクエストで入力ファイルを同梱しているジョブを1つ選んで、初回だけマウント・以降は container.id の引き回しに変えてみてください。実行時間の内訳がどれだけ変わるか、請求の数字で確かめるのが一番確実です。私自身、再利用と失効ハンドリングを入れてからは、無人で回すパイプラインの安心感が一段変わりました。同じように多段処理を自動化している方の役に立てば嬉しいです。