週に一度、同じ手順でCSVを開いてpivot tableを作り、グラフをコピーしてスライドに貼る——そういう作業を毎週繰り返していませんか。
実は、その作業のほぼすべてをCoworkとPythonで自動化できます。月曜の朝にメールボックスを開いたら、前週のKPIレポートがすでにSlackに届いています。そんな状態を、このガイドで作り上げます。
単なる「Pythonスクリプト入門」ではありません。Coworkのスケジュールタスク・bash環境・Claude AIの分析力を組み合わせて、実務レベルで動くデータパイプラインを構築するための、実践的な設計ガイドです。
なぜCoworkでデータ分析を自動化するのか
「Pythonが使えるなら、サーバーでcronを回せばいい」と思う方もいるかもしれません。それは正しいのですが、Coworkにはcronにない強みがあります。
Claude AIとの統合です。
単にデータを集計してグラフにするだけなら、cronで十分です。しかしCoworkでは、集計結果をClaude AIに渡して「先週と比べて何が変わったか」「異常値の原因として考えられること」を自然言語で分析させ、それをレポートの冒頭に差し込む——という処理が、追加のインフラなしで実現できます。
もう一つの強みはMacのローカル環境を使えることです。Coworkはデスクトップアプリとして動作するため、ローカルに保存されているCSVファイルや、ネットワーク上の社内ファイルサーバーへのアクセスも、特別な設定なしに行えます。
このガイドでは次のパイプラインを構築します:
- ローカルCSVまたはGoogle SheetsからデータをPythonで取得
- pandasで集計・前処理
- Matplotlibでグラフ生成(PNG出力)
- Claude AIが前週との差分を分析してコメントを生成
- markdownレポートとして保存 + Slack/メール通知
- Coworkのスケジュールタスクで毎週月曜朝に自動実行
Step 1:Coworkのbash環境でPythonを準備する
CoworkはMacOSのシェル環境に直接アクセスできます。まず現在のPython環境を確認し、必要なライブラリを整えましょう。
Coworkで新しい会話を開始し、次のように指示してください:
bash環境を確認してください。python3 --version、pip3 --version、そして
pandas・matplotlib・openpyxl がインストールされているか確認してください。
Claudeは以下のようなコマンドを実行します:
# Python環境の確認
python3 --version
pip3 --version
# 必要なライブラリの確認
python3 -c "import pandas; print('pandas:', pandas.__version__)" 2>/dev/null || echo "pandas: NOT INSTALLED"
python3 -c "import matplotlib; print('matplotlib:', matplotlib.__version__)" 2>/dev/null || echo "matplotlib: NOT INSTALLED"
python3 -c "import openpyxl; print('openpyxl:', openpyxl.__version__)" 2>/dev/null || echo "openpyxl: NOT INSTALLED"
ライブラリが不足している場合、Claudeに「不足しているライブラリをインストールしてください」と伝えれば、pip3 install pandas matplotlib openpyxl を実行してくれます。
仮想環境の作成(オプション)
プロジェクト専用の仮想環境を作りたい場合は次のように指示します:
~/data-analysis-automation というディレクトリに仮想環境を作成し、
pandas・matplotlib・openpyxl・requests・slack-sdk をインストールしてください。
Claudeが実行するコマンド:
# プロジェクトディレクトリと仮想環境の作成
mkdir -p ~/data-analysis-automation
cd ~/data-analysis-automation
python3 -m venv .venv
# ライブラリのインストール
.venv/bin/pip install pandas matplotlib openpyxl requests slack-sdk gspread google-auth
# requirements.txtの生成(再現性のために)
.venv/bin/pip freeze > requirements.txt
echo "✅ 環境構築完了"
Step 2:データ取得スクリプトの設計
データソースによってスクリプトの書き方が変わります。ここでは最も一般的なパターン——CSVファイルとGoogle Sheets——の両方を解説します。
パターンA:ローカルCSVからのデータ取得
売上管理やアクセス解析のデータをCSVで書き出しているケースに対応します。
# data_loader.py
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_weekly_data(data_dir: str, weeks_back: int = 2) -> pd.DataFrame:
"""
指定ディレクトリから過去N週分のCSVを読み込んで結合する。
ファイル名形式: sales_YYYY-MM-DD.csv
"""
data_path = Path(data_dir)
today = datetime.now()
dfs = []
for i in range(weeks_back * 7):
target_date = today - timedelta(days=i)
filename = f"sales_{target_date.strftime('%Y-%m-%d')}.csv"
filepath = data_path / filename
if filepath.exists():
try:
df = pd.read_csv(filepath, encoding='utf-8-sig')
df['source_file'] = filename
df['record_date'] = target_date.strftime('%Y-%m-%d')
dfs.append(df)
logger.info(f"読み込み成功: {filename} ({len(df)}行)")
except Exception as e:
logger.warning(f"読み込み失敗: {filename} - {e}")
if not dfs:
raise FileNotFoundError(f"{data_path} に過去{weeks_back}週分のCSVが見つかりません")
combined = pd.concat(dfs, ignore_index=True)
logger.info(f"合計 {len(combined)} 行を読み込みました")
return combined
def validate_dataframe(df: pd.DataFrame, required_columns: list) -> bool:
"""
必須カラムが存在し、データが空でないことを確認する。
本番運用では必ずバリデーションを挟む。
"""
missing = [col for col in required_columns if col not in df.columns]
if missing:
logger.error(f"必須カラムが不足: {missing}")
return False
if df.empty:
logger.error("データフレームが空です")
return False
# Nullチェック
null_counts = df[required_columns].isnull().sum()
if null_counts.any():
logger.warning(f"Null値が含まれるカラム:\n{null_counts[null_counts > 0]}")
return True
パターンB:Google Sheetsからのデータ取得
スプレッドシートをデータソースにするケースでは、Google Sheets APIを使います。
# sheets_loader.py
import gspread
from google.oauth2.service_account import Credentials
import pandas as pd
import logging
from pathlib import Path
logger = logging.getLogger(__name__)
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets.readonly',
'https://www.googleapis.com/auth/drive.readonly'
]
def get_sheets_client(credentials_path: str) -> gspread.Client:
"""
サービスアカウントの認証情報からGoogle Sheetsクライアントを作成する。
credentials.jsonは ~/data-analysis-automation/credentials/ に保管する。
"""
creds = Credentials.from_service_account_file(
credentials_path,
scopes=SCOPES
)
return gspread.authorize(creds)
def load_sheet_as_dataframe(
client: gspread.Client,
spreadsheet_id: str,
sheet_name: str,
header_row: int = 1
) -> pd.DataFrame:
"""
Google SheetsのシートをDataFrameとして読み込む。
Args:
spreadsheet_id: スプレッドシートのID(URLの /d/ と /edit の間の文字列)
sheet_name: シート名
header_row: ヘッダー行番号(1始まり)
"""
try:
spreadsheet = client.open_by_key(spreadsheet_id)
worksheet = spreadsheet.worksheet(sheet_name)
data = worksheet.get_all_records(head=header_row)
df = pd.DataFrame(data)
logger.info(f"Google Sheets読み込み完了: {sheet_name} ({len(df)}行)")
return df
except gspread.exceptions.APIError as e:
logger.error(f"Sheets API エラー: {e}")
raise
except gspread.exceptions.WorksheetNotFound:
logger.error(f"シート '{sheet_name}' が見つかりません")
raise
注意点: Google Sheets APIの認証情報(credentials.json)は、Coworkからアクセス可能なローカルパスに保存します。環境変数として管理するか、Coworkのスケジュールタスク実行前に確認するチェックを入れると安全です。
Step 3:pandasによる集計処理
データが読み込めたら、集計・加工を行います。ここでは実務でよく使うパターンを示します。
# aggregator.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
def compute_weekly_summary(df: pd.DataFrame, date_col: str, value_col: str) -> dict:
"""
今週と先週の集計値を計算し、変化率も含めて返す。
Returns:
{
'this_week': {'total': ..., 'daily_avg': ..., 'max': ...},
'last_week': {'total': ..., 'daily_avg': ..., 'max': ...},
'change_rate': 0.12 # +12%の場合
}
"""
# 日付型に変換
df[date_col] = pd.to_datetime(df[date_col])
today = datetime.now().date()
# 週の始まりを月曜日に設定
this_week_start = today - timedelta(days=today.weekday() + 7)
this_week_end = this_week_start + timedelta(days=6)
last_week_start = this_week_start - timedelta(days=7)
last_week_end = this_week_start - timedelta(days=1)
this_week_df = df[
(df[date_col].dt.date >= this_week_start) &
(df[date_col].dt.date <= this_week_end)
]
last_week_df = df[
(df[date_col].dt.date >= last_week_start) &
(df[date_col].dt.date <= last_week_end)
]
def summarize(week_df: pd.DataFrame) -> dict:
if week_df.empty:
return {'total': 0, 'daily_avg': 0, 'max': 0, 'min': 0}
values = week_df[value_col].dropna()
return {
'total': float(values.sum()),
'daily_avg': float(values.mean()),
'max': float(values.max()),
'min': float(values.min()),
'count': len(values)
}
this_week_stats = summarize(this_week_df)
last_week_stats = summarize(last_week_df)
# 変化率の計算(ゼロ除算を防ぐ)
if last_week_stats['total'] \!= 0:
change_rate = (this_week_stats['total'] - last_week_stats['total']) / last_week_stats['total']
else:
change_rate = None
return {
'this_week': this_week_stats,
'last_week': last_week_stats,
'change_rate': change_rate,
'period': {
'this_week': f"{this_week_start} 〜 {this_week_end}",
'last_week': f"{last_week_start} 〜 {last_week_end}"
}
}
def detect_anomalies(df: pd.DataFrame, value_col: str, threshold: float = 2.0) -> pd.DataFrame:
"""
平均±N標準偏差を超える値を異常値として検出する。
threshold=2.0 はおよそ95%信頼区間の外側を対象にする。
これを入れることで「先週急に売上が落ちたのはデータ異常では?」という
ヒューリスティックな確認をスキップできる。
"""
mean = df[value_col].mean()
std = df[value_col].std()
lower = mean - threshold * std
upper = mean + threshold * std
anomalies = df[(df[value_col] < lower) | (df[value_col] > upper)].copy()
anomalies['anomaly_type'] = np.where(
anomalies[value_col] > upper, 'HIGH', 'LOW'
)
if not anomalies.empty:
logger.warning(f"異常値を {len(anomalies)} 件検出しました")
return anomalies
Step 4:Matplotlibでグラフを自動生成する
レポートに添付するグラフを生成します。日本語フォントの設定がPoC段階でよく詰まるポイントなので、最初に確実に動く設定を示します。
# chart_generator.py
import matplotlib
matplotlib.use('Agg') # GUIなし環境(スケジュールタスク)では必ず先頭で設定する
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib import rcParams
import pandas as pd
from pathlib import Path
import logging
from datetime import datetime
logger = logging.getLogger(__name__)
# 日本語フォント設定(macOS)
# Coworkはmacを使用しているため、ヒラギノが使える
rcParams['font.family'] = ['Hiragino Sans', 'Hiragino Kaku Gothic Pro', 'sans-serif']
rcParams['axes.unicode_minus'] = False # マイナス記号の文字化けを防ぐ
def generate_weekly_trend_chart(
df: pd.DataFrame,
date_col: str,
value_col: str,
title: str,
output_dir: str,
filename: str = "weekly_trend.png"
) -> str:
"""
週次トレンドの折れ線グラフを生成してPNGとして保存する。
Returns:
保存したファイルのパス
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
fig, ax = plt.subplots(figsize=(12, 5))
df = df.copy()
df[date_col] = pd.to_datetime(df[date_col])
df_sorted = df.sort_values(date_col)
daily = df_sorted.groupby(df_sorted[date_col].dt.date)[value_col].sum().reset_index()
daily.columns = ['date', 'value']
daily['date'] = pd.to_datetime(daily['date'])
ax.plot(daily['date'], daily['value'], marker='o', linewidth=2, color='#2563eb', markersize=6)
ax.fill_between(daily['date'], daily['value'], alpha=0.1, color='#2563eb')
ax.xaxis.set_major_formatter(mdates.DateFormatter('%m/%d'))
ax.xaxis.set_major_locator(mdates.DayLocator(interval=1))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right')
ax.set_title(title, fontsize=14, fontweight='bold', pad=15)
ax.set_xlabel('日付', fontsize=11)
ax.set_ylabel(value_col, fontsize=11)
ax.grid(True, alpha=0.3)
# 最高値・最低値にアノテーション
max_idx = daily['value'].idxmax()
min_idx = daily['value'].idxmin()
ax.annotate(
f"最大: {daily.loc[max_idx, 'value']:,.0f}",
xy=(daily.loc[max_idx, 'date'], daily.loc[max_idx, 'value']),
xytext=(10, 10), textcoords='offset points',
fontsize=9, color='#dc2626',
arrowprops=dict(arrowstyle='->', color='#dc2626')
)
plt.tight_layout()
save_path = output_path / filename
fig.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close(fig) # メモリリーク防止のために必ずclose
logger.info(f"グラフ保存: {save_path}")
return str(save_path)
def generate_comparison_bar_chart(
summary: dict,
metric_name: str,
output_dir: str,
filename: str = "comparison.png"
) -> str:
"""
今週vs先週の比較棒グラフを生成する。
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
fig, ax = plt.subplots(figsize=(8, 5))
categories = ['先週', '今週']
values = [summary['last_week']['total'], summary['this_week']['total']]
colors = ['#94a3b8', '#2563eb']
bars = ax.bar(categories, values, color=colors, width=0.5, edgecolor='white', linewidth=1.5)
# 変化率を表示
if summary['change_rate'] is not None:
rate = summary['change_rate'] * 100
arrow = "↑" if rate > 0 else "↓"
color = "#16a34a" if rate > 0 else "#dc2626"
ax.text(
1, values[1] + max(values) * 0.02,
f"{arrow} {abs(rate):.1f}%",
ha='center', va='bottom', fontsize=14,
fontweight='bold', color=color
)
# 棒の上に数値を表示
for bar, val in zip(bars, values):
ax.text(
bar.get_x() + bar.get_width() / 2,
bar.get_height() + max(values) * 0.01,
f"{val:,.0f}",
ha='center', va='bottom', fontsize=11, fontweight='bold'
)
ax.set_title(f"{metric_name} 週次比較", fontsize=14, fontweight='bold', pad=15)
ax.set_ylabel(metric_name, fontsize=11)
ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{x:,.0f}'))
ax.grid(True, axis='y', alpha=0.3)
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
plt.tight_layout()
save_path = output_path / filename
fig.savefig(save_path, dpi=150, bbox_inches='tight')
plt.close(fig)
logger.info(f"比較グラフ保存: {save_path}")
return str(save_path)
Step 5:CoworkのClaude AIでデータを自動分析する
ここがCoworkを使う最大のメリットです。集計結果をClaude AIに渡して、人間が読んで意味のある分析コメントを生成します。
スケジュールタスクのプロンプトにこの分析ロジックを組み込みます:
以下のデータ分析結果を読んで、ビジネス上の意味を3〜5文で日本語で説明してください。
専門用語は使わず、経営者・マーケターが読んで即座に意味が分かる言葉で書いてください。
【集計期間】
今週: {this_week_period}
先週: {last_week_period}
【主要KPI】
- 今週の合計売上: {this_week_total}
- 先週の合計売上: {last_week_total}
- 前週比: {change_rate}%
- 今週の日次平均: {this_week_avg}
【異常値情報】
{anomaly_info}
分析結果を Markdownの引用ブロック(>)形式で出力してください。
Coworkのスケジュールタスク内でこのプロンプトを実行し、返答をレポートのヘッダー部分に差し込みます。
Step 6:レポートの自動生成とSlack通知
分析結果・グラフを1つのMarkdownレポートとしてまとめ、Slackに送信します。
# reporter.py
import json
import logging
import requests
from datetime import datetime
from pathlib import Path
logger = logging.getLogger(__name__)
def generate_markdown_report(
summary: dict,
ai_comment: str,
chart_paths: list,
output_dir: str
) -> str:
"""
Markdownレポートを生成して保存する。
Returns:
レポートファイルのパス
"""
today = datetime.now().strftime('%Y-%m-%d')
report_path = Path(output_dir) / f"weekly_report_{today}.md"
change_emoji = "📈" if (summary['change_rate'] or 0) > 0 else "📉"
change_text = "増加" if (summary['change_rate'] or 0) > 0 else "減少"
change_rate_str = f"{abs((summary['change_rate'] or 0) * 100):.1f}%"
content = f"""# 週次レポート ({today})
## AI分析サマリー
{ai_comment}
---
## KPI サマリー
| 指標 | 今週 | 先週 | 前週比 |
|------|------|------|--------|
| 合計 | {summary['this_week']['total']:,.0f} | {summary['last_week']['total']:,.0f} | {change_emoji} {change_rate_str} {change_text} |
| 日次平均 | {summary['this_week']['daily_avg']:,.1f} | {summary['last_week']['daily_avg']:,.1f} | — |
| 最大値 | {summary['this_week']['max']:,.0f} | {summary['last_week']['max']:,.0f} | — |
**集計期間**
- 今週: {summary['period']['this_week']}
- 先週: {summary['period']['last_week']}
---
## 生成されたグラフ
"""
for path in chart_paths:
content += f"- `{path}`\n"
content += f"""
---
*このレポートはCowork + Python データ分析パイプラインによって自動生成されました*
*生成日時: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
"""
report_path.write_text(content, encoding='utf-8')
logger.info(f"レポート保存: {report_path}")
return str(report_path)
def notify_slack(
webhook_url: str,
summary: dict,
ai_comment: str,
report_path: str
) -> bool:
"""
SlackのIncoming Webhookでレポートのサマリーを通知する。
全文ではなくサマリーのみ送り、詳細はレポートファイルを参照させる。
"""
change_rate = summary.get('change_rate')
if change_rate is not None:
change_text = f"{'📈' if change_rate > 0 else '📉'} {abs(change_rate * 100):.1f}% {'増' if change_rate > 0 else '減'}"
else:
change_text = "比較データなし"
payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"📊 週次レポート ({datetime.now().strftime('%Y/%m/%d')})"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*今週の合計*\n{summary['this_week']['total']:,.0f}"
},
{
"type": "mrkdwn",
"text": f"*前週比*\n{change_text}"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*AI分析:*\n{ai_comment[:300]}..."
}
},
{
"type": "context",
"elements": [
{
"type": "mrkdwn",
"text": f"詳細レポート: `{report_path}`"
}
]
}
]
}
try:
response = requests.post(
webhook_url,
json=payload,
timeout=10
)
response.raise_for_status()
logger.info("Slack通知送信成功")
return True
except requests.exceptions.RequestException as e:
logger.error(f"Slack通知失敗: {e}")
return False # 通知失敗でもレポート生成は成功とみなす
Step 7:Coworkスケジュールタスクに組み込む
ここまでのスクリプトを1つのエントリーポイントとして統合し、Coworkのスケジュールタスクから呼び出せるようにします。
# main_pipeline.py
#\!/usr/bin/env python3
"""
週次データ分析パイプライン
Coworkのスケジュールタスクから呼び出される。
毎週月曜 7:00 JST に実行。
"""
import sys
import logging
from pathlib import Path
from datetime import datetime
# ロギング設定(ファイルとコンソールの両方に出力)
log_dir = Path.home() / "data-analysis-automation" / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / f"pipeline_{datetime.now().strftime('%Y%m%d')}.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# 設定(環境に合わせて変更する)
CONFIG = {
"data_dir": str(Path.home() / "data-analysis-automation" / "data"),
"output_dir": str(Path.home() / "data-analysis-automation" / "output"),
"date_col": "date",
"value_col": "sales",
"metric_name": "売上",
"slack_webhook_url": "YOUR_SLACK_WEBHOOK_URL", # 環境変数から読むのが推奨
}
def run_pipeline():
"""パイプラインのメイン処理。エラーが発生しても途中で止めず、ログに記録して続行する。"""
logger.info("=== 週次データ分析パイプライン 開始 ===")
results = {
"data_loaded": False,
"summary_computed": False,
"charts_generated": False,
"report_generated": False,
"slack_notified": False
}
try:
from data_loader import load_weekly_data, validate_dataframe
from aggregator import compute_weekly_summary, detect_anomalies
from chart_generator import generate_weekly_trend_chart, generate_comparison_bar_chart
from reporter import generate_markdown_report, notify_slack
# Step 1: データ読み込み
logger.info("Step 1: データ読み込み")
df = load_weekly_data(CONFIG["data_dir"], weeks_back=2)
required_columns = [CONFIG["date_col"], CONFIG["value_col"]]
if not validate_dataframe(df, required_columns):
raise ValueError("データバリデーション失敗")
results["data_loaded"] = True
# Step 2: 集計
logger.info("Step 2: 週次集計")
summary = compute_weekly_summary(df, CONFIG["date_col"], CONFIG["value_col"])
anomalies = detect_anomalies(df, CONFIG["value_col"])
results["summary_computed"] = True
# Step 3: グラフ生成
logger.info("Step 3: グラフ生成")
chart_paths = []
trend_path = generate_weekly_trend_chart(
df, CONFIG["date_col"], CONFIG["value_col"],
f"{CONFIG['metric_name']} 週次トレンド",
CONFIG["output_dir"], "weekly_trend.png"
)
chart_paths.append(trend_path)
comparison_path = generate_comparison_bar_chart(
summary, CONFIG["metric_name"],
CONFIG["output_dir"], "weekly_comparison.png"
)
chart_paths.append(comparison_path)
results["charts_generated"] = True
# Step 4: AI分析コメント(Coworkから呼ばれる場合はここをClaudeに任せる)
anomaly_info = f"{len(anomalies)}件の異常値を検出" if not anomalies.empty else "異常値なし"
ai_comment = f"> 今週の{CONFIG['metric_name']}は前週比{(summary['change_rate'] or 0) * 100:.1f}%の変動がありました。{anomaly_info}。詳細はグラフをご確認ください。"
# Step 5: レポート生成
logger.info("Step 5: レポート生成")
report_path = generate_markdown_report(summary, ai_comment, chart_paths, CONFIG["output_dir"])
results["report_generated"] = True
# Step 6: Slack通知
logger.info("Step 6: Slack通知")
if CONFIG["slack_webhook_url"] \!= "YOUR_SLACK_WEBHOOK_URL":
notify_slack(CONFIG["slack_webhook_url"], summary, ai_comment, report_path)
results["slack_notified"] = True
else:
logger.warning("Slack webhook URL未設定 — 通知をスキップ")
logger.info(f"=== パイプライン完了 ===\n結果: {results}")
return True
except Exception as e:
logger.error(f"パイプライン失敗: {e}", exc_info=True)
logger.info(f"途中結果: {results}")
return False
if __name__ == "__main__":
success = run_pipeline()
sys.exit(0 if success else 1)
Coworkスケジュールタスクの設定
Coworkで「スケジュール」から新しいタスクを作成し、次のプロンプトを設定します:
毎週月曜日 7:00 に週次データ分析レポートを生成してください。
bash: cd ~/data-analysis-automation && .venv/bin/python main_pipeline.py
実行後、ログファイル(~/data-analysis-automation/logs/pipeline_YYYYMMDD.log)の末尾を確認し、
成功か失敗かを報告してください。失敗した場合は、エラーメッセージを要約してください。
Cron設定: 0 7 * * 1(毎週月曜 7:00 JST)
よくある間違いと落とし穴
実際に本番運用すると必ず当たるポイントを3つ挙げます。
1. Matplotlibが「NSException」でクラッシュする
スケジュールタスク(GUI非表示環境)でMatplotlibを実行すると、macのDisplayを要求してクラッシュすることがあります。必ず matplotlib.use('Agg') をスクリプトの冒頭(importより前)に記述することで、バックエンドをAgg(非GUI)に切り替えてください。これを忘れると、手動実行では動くのにスケジュール実行では失敗するという謎のバグに陥ります。
2. 日本語フォントが豆腐(□□□)になる
rcParams['font.family'] を設定していないと、グラフ上の日本語が豆腐になります。macOSには「Hiragino Sans」が標準搭載されているので、それを指定します。ただし、フォント名の大文字小文字に注意してください。'hiragino sans'(小文字)では認識されないことがあります。
3. pandasのSettingWithCopyWarning
# ❌ こう書くとSettingWithCopyWarningが発生する
df_filtered = df[df['status'] == 'active']
df_filtered['new_col'] = 'value' # 警告が出る
# ✅ .copy()を使って明示的にコピーを作る
df_filtered = df[df['status'] == 'active'].copy()
df_filtered['new_col'] = 'value' # 警告なし
これは警告に留まることも多いですが、意図しない挙動につながることがあります。DataFrameをフィルタリングした後に新しいカラムを追加する場合は、常に .copy() を使う習慣をつけましょう。
Google Sheets への書き戻し
生成した集計結果をGoogle Sheetsに書き戻して、管理者がブラウザで確認できるようにするパターンもよく使われます。
def write_summary_to_sheet(client, spreadsheet_id: str, sheet_name: str, summary: dict):
"""
集計結果をGoogle Sheetsの指定シートに書き戻す。
既存データがある場合は末尾に追記する。
"""
worksheet = client.open_by_key(spreadsheet_id).worksheet(sheet_name)
today = datetime.now().strftime('%Y-%m-%d')
change_rate = summary.get('change_rate')
row = [
today,
summary['this_week']['total'],
summary['last_week']['total'],
f"{change_rate * 100:.2f}%" if change_rate is not None else "N/A",
summary['this_week']['daily_avg'],
summary['this_week']['max'],
]
worksheet.append_row(row, value_input_option='USER_ENTERED')
logger.info(f"Google Sheets書き戻し完了: {today}")
全体を振り返って:自動化によって生まれる時間を何に使うか
このガイドで作ったパイプラインが動き始めると、毎週30〜60分かかっていたレポート作業がゼロになります。
大切なのは、その空いた時間をどう使うかです。
データを見て「何かおかしい」と気づく判断、次の施策を考える思考——それはまだAIには代替できません。CoworkのClaude AIが分析コメントを出してくれても、最終的に「だから何をするか」を決めるのは人間です。
自動化の目的は、人間がより重要な思考に集中できるようにすることです。このパイプラインをそのための土台として活用してください。