深夜2時に動くはずだったジョブが、朝になっても何も生み出していない。ログをたどると、リモートMCPのツール呼び出しが一度だけ失敗していて、原因はアクセストークンの期限切れでした。対話的に使っているときは、トークンが切れても画面に認可ダイアログが出て、ブラウザでボタンを一度押せば何事もなく続きます。けれど無人で回しているスケジュール実行には、そのボタンを押す人がいません。
私自身、個人開発で複数サイトの自動投稿を回しているのですが、リモートMCPコネクタをつないだ直後はうまく動くのに、数十時間後に静かに止まる、という経験を何度かしました。コネクタの設定そのものは正しいのです。崩れているのは「トークンの寿命」を誰も管理していなかった、その一点でした。
ここからは、リモートMCPコネクタのOAuthトークンを「自分が管理する状態」として持ち、実行前と実行中に先読みで更新して、無人実行を止めないための設計を、動くコードと一緒に組み立てていきます。
なぜ対話実行ではトークン切れが見えないのか
OAuthのアクセストークンは、たいてい数十分から数時間で失効する短命のものです。これに対してリフレッシュトークンは長命で、これを使えば新しいアクセストークンを無人でも取り直せます。
対話的なクライアントは、この更新を裏側で自動的に行ってくれます。アクセストークンが切れていれば、リフレッシュトークンで黙って取り直し、それも失効していれば、ブラウザを開いて「認可しますか?」と人間に尋ねます。つまり「人間がいつでも認可ボタンを押せる」という前提が、トークン切れを目に見えない問題に変えているわけです。
無人実行ではこの前提が崩れます。ブラウザを開いても押す人がいないので、アクセストークンもリフレッシュトークンも切れた瞬間に、そのジョブは前に進めなくなります。しかも失効は「401」として返ってきて、MCPクライアントの層では汎用的なツールエラーに丸められがちです。「ツールが失敗した」としか見えず、本当の理由が「認可の期限切れ」だと気づくのが遅れます。
ですから無人運用では、トークンの更新をクライアント任せにせず、自分でライフサイクルを握る必要があります。順番に組み立てていきましょう。
トークンを「期限つきの状態」として持つ
最初の一歩は、アクセストークン・リフレッシュトークン・失効時刻をひとまとめにして永続化することです。失効時刻を持っていないと「いつ切れるか」が分からず、先読み更新ができません。
import json
import os
import time
import tempfile
from dataclasses import dataclass, asdict
from pathlib import Path
@dataclass
class TokenSet:
access_token: str
refresh_token: str
expires_at: float # UNIX秒。アクセストークンが失効する絶対時刻
def expires_in(self) -> float:
return self.expires_at - time.time()
class TokenStore:
"""トークンをJSONで永続化する。書き込みは原子的に行う。"""
def __init__(self, path: str):
self.path = Path(path)
def load(self) -> TokenSet:
data = json.loads(self.path.read_text(encoding="utf-8"))
return TokenSet(**data)
def save(self, tokens: TokenSet) -> None:
# 同一ディレクトリに一時ファイルを書いてからrenameする。
# 途中でプロセスが落ちても、半端なファイルで上書きしないため。
d = self.path.parent
d.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=d, suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(asdict(tokens), f)
os.replace(tmp, self.path)
os.chmod(self.path, 0o600) # トークンファイルは本人だけが読めるように
finally:
if os.path.exists(tmp):
os.unlink(tmp)
ここで地味に大事なのは、保存を原子的(一時ファイル+rename)にしている点です。トークンの保存中にプロセスが落ちると、ファイルが半端な状態で残り、次回の読み込みで壊れます。更新は無人実行のたびに起きるので、ここが壊れると静かな停止の温床になります。chmod 0o600 でファイル権限を絞っているのも、トークンは秘密情報だからです。
プロバイダによってトークンエンドポイントが返すのは expires_at(絶対時刻)ではなく expires_in(残り秒数)であることが多いので、受け取った瞬間に time.time() + expires_in で絶対時刻へ変換して保存します。残り秒数のまま持つと、いつ取得したかを別途覚えておく必要があり、ずれの原因になります。
実行前に「スキューを見て」先読み更新する
トークンが「いま切れているか」だけで判断すると、判定した直後・呼び出す前の数秒で切れてしまう競合が起きます。これを避けるため、失効まで一定の余裕(スキュー)を切ったら、まだ有効でも先回りして更新します。
import requests
TOKEN_ENDPOINT = "https://example-mcp-provider.com/oauth/token"
CLIENT_ID = os.environ["MCP_OAUTH_CLIENT_ID"]
CLIENT_SECRET = os.environ["MCP_OAUTH_CLIENT_SECRET"]
# 失効の120秒前には更新する。実行が長いほど大きめに取る。
REFRESH_SKEW_SECONDS = 120
class ReauthRequired(Exception):
"""リフレッシュトークンまで失効し、人間の再認可が要る状態。"""
def refresh_tokens(store: TokenStore, current: TokenSet) -> TokenSet:
resp = requests.post(
TOKEN_ENDPOINT,
data={
"grant_type": "refresh_token",
"refresh_token": current.refresh_token,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
},
timeout=15,
)
# invalid_grant はリフレッシュトークン自体の失効。再認可が要る。
if resp.status_code == 400 and "invalid_grant" in resp.text:
raise ReauthRequired("refresh token is no longer valid")
resp.raise_for_status()
payload = resp.json()
new_tokens = TokenSet(
access_token=payload["access_token"],
# ローテーションで新しいrefresh_tokenが返れば差し替える。
# 返らないプロバイダもあるので、その場合は既存を引き継ぐ。
refresh_token=payload.get("refresh_token", current.refresh_token),
expires_at=time.time() + payload["expires_in"],
)
store.save(new_tokens)
return new_tokens
def ensure_fresh_token(store: TokenStore) -> TokenSet:
tokens = store.load()
if tokens.expires_in() <= REFRESH_SKEW_SECONDS:
tokens = refresh_tokens(store, tokens)
return tokens
refresh_token=payload.get("refresh_token", current.refresh_token) の一行が、運用で効いてきます。多くのプロバイダはリフレッシュトークンのローテーションを採用していて、更新のたびに新しいリフレッシュトークンを返し、古いものを失効させます。新しい方を保存し損ねると、次回の更新で invalid_grant になり、そこから無人では復旧できなくなります。
スキューの値は、ジョブの長さから逆算します。1回の呼び出しで終わる軽い処理なら120秒で十分ですが、数十分回り続けるジョブでは、開始時に有効でも途中で切れます。次の節で、その実行中の失効に備えます。
実行中に切れる長時間ジョブへの備え
開始前に更新しても、長時間ジョブでは実行の途中でアクセストークンが失効します。対策は二段構えです。まず、トークンを必要とする呼び出しの直前で毎回 ensure_fresh_token を通す。そのうえで、それでもすり抜けて401が返った場合に、一度だけ強制更新してリトライします。
def call_mcp_tool(store: TokenStore, tool_name: str, arguments: dict) -> dict:
tokens = ensure_fresh_token(store)
def _do(access_token: str) -> requests.Response:
return requests.post(
"https://example-mcp-provider.com/mcp/call",
headers={"Authorization": f"Bearer {access_token}"},
json={"name": tool_name, "arguments": arguments},
timeout=30,
)
resp = _do(tokens.access_token)
# 先読みしても、サーバ側の時計ずれ等で401が返ることはある。
# その場合のみ強制更新して一度だけ再試行する。
if resp.status_code == 401:
tokens = refresh_tokens(store, tokens)
resp = _do(tokens.access_token)
resp.raise_for_status()
return resp.json()
ポイントは、401でのリトライを「一度だけ」に限ることです。更新したトークンでもう一度401が返るなら、それは期限切れではなく権限不足やスコープ違いなど別の問題です。無限に更新とリトライを繰り返すと、トークンエンドポイントへの負荷だけが増え、レート制限に当たって本当に必要な更新まで弾かれます。一度試して駄目なら、素直に例外を上げて上位で扱います。
実行中の更新で見落としがちなのが、複数のジョブが同じトークンストアを共有しているときの競合です。二つのプロセスが同時に「切れそうだ」と判断して同時に更新をかけると、ローテーションで片方のリフレッシュトークンが先に失効し、もう片方が invalid_grant で落ちます。同一ホストで並行実行するなら、更新処理をファイルロックで囲って、同時に走らないようにします。
import fcntl
from contextlib import contextmanager
@contextmanager
def refresh_lock(lock_path: str):
with open(lock_path, "w") as lf:
fcntl.flock(lf, fcntl.LOCK_EX) # 更新が終わるまで他プロセスは待つ
try:
yield
finally:
fcntl.flock(lf, fcntl.LOCK_UN)
refresh_tokens の呼び出しをこのロックで囲み、ロック取得後に改めて store.load() で最新の失効時刻を読み直してから更新の要否を判断すれば、待っている間に別プロセスが済ませた更新を二重に行わずに済みます。
リフレッシュ自体が失敗したら、黙って止めない
ここがいちばん大切な設計判断です。リフレッシュトークンまで失効した ReauthRequired は、無人では自力で復旧できません。このとき何もせず例外で落ちるだけだと、ジョブは「失敗」とすら記録されないまま、次の実行を待つ間ずっと止まり続けます。
無人運用では、復旧できないことそのものより、「気づけないこと」が損失を生みます。ですから撤退するときも、人間が後から拾える構造化された痕跡を残します。
import sys
import datetime
def run_job_with_token_guard(store: TokenStore, run_job) -> int:
try:
run_job(store) # ジョブ本体。内部でcall_mcp_toolを使う
return 0
except ReauthRequired as e:
record = {
"event": "mcp_reauth_required",
"connector": "example-mcp-provider",
"detail": str(e),
"ts": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"action": "skipped_run; human re-authorization needed",
}
# 構造化ログを「成功扱いの標準出力」ではなく標準エラーに出す。
print(json.dumps(record, ensure_ascii=False), file=sys.stderr)
notify_owner(record) # メール/通知など、自分が必ず見る経路へ
# 終了コードで「正常終了ではない」と明示する。
return 75 # EX_TEMPFAIL 相当: 一時的失敗。後で再認可すれば回復する
ここで意識しているのは、次の三点です。
- 復旧不能の状態を、一般的なエラーと区別できる固有の例外(
ReauthRequired)にしておくこと。
- 痕跡を標準出力ではなく標準エラーと通知に出して、「成功」と取り違えられない経路に流すこと。
- 終了コードで一時的失敗を表明し、後から再認可すれば回復する種類の停止だと、人間にもスケジューラにも伝えること。
notify_owner の中身は何でも構いません。自分が必ず一日のうちに目を通す経路、たとえば日次のサマリーやメッセージ通知に一行流れれば十分です。大事なのは、再認可が要る状態が「黙って溜まらない」ことです。
運用で踏みやすい落とし穴
設計が固まっても、運用ではいくつか足をすくわれる点があります。私が実際にぶつかったものを挙げておきます。
| 落とし穴 | 症状 | 対処 |
| リフレッシュトークンの保存漏れ | 数回は動くが、ある時点から必ず invalid_grant | 更新応答の refresh_token を毎回保存する。ローテーション前提で設計する |
| サーバとの時計ずれ | 先読みしているのに 401 が散発する | スキューを大きめに取り、401時の一度きり強制更新を併用する |
| 並行実行の競合 | 片方のジョブだけが invalid_grant で落ちる | 更新処理をファイルロックで囲み、ロック後に失効時刻を読み直す |
| トークンファイルの破損 | JSON の読み込みで例外、全ジョブが起動直後に停止 | 保存を一時ファイル+rename の原子的書き込みにする |
とりわけ時計ずれは見つけにくい落とし穴です。手元の時計とプロバイダ側の時計が数十秒ずれていると、こちらは「まだ有効」と思っているのに相手は「もう失効」と判断します。先読みのスキューを少し大きめに取っておくと、このずれを吸収できます。先読みと「401時の一度きり強制更新」は、どちらか一方ではなく両方そろえて初めて、ずれにも実行中失効にも強くなります。
まず試す一歩
いま無人で回しているリモートMCPの処理があるなら、私はまずトークンの失効時刻をログに一行出すところから始めることをお勧めします。ensure_fresh_token の中で、更新したかどうかと残り秒数を記録するだけで十分です。数日ぶんのログを眺めると、自分のジョブがどのくらいの周期でトークンを更新しているか、そしてどこに先読みのスキューを置くべきかが、数字で見えてきます。そこからリフレッシュとロックを足していけば、深夜に静かに止まる類の停止は、ずいぶん減らせるはずです。
同じように無人運用でトークン切れに悩んでいる方の参考になれば幸いです。お読みいただきありがとうございました。