Files
windmill/mcp/windmill_mcp.py
Akira da2466462e windmill_update_flow: エンドポイントを flows/edit → flows/update に修正
Windmill API の正しいフロー更新エンドポイントは /flows/update/{path}。
/flows/edit/{path} は 404 を返す(誤ったエンドポイント)。

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 02:30:56 +09:00

344 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Windmill MCP Server - Claude が Windmill を直接操作できるようにする"""
import os
import json
import sys
import httpx
from mcp.server.fastmcp import FastMCP
WINDMILL_URL = os.environ.get("WINDMILL_URL", "https://windmill.keinafarm.net")
WINDMILL_TOKEN = os.environ.get("WINDMILL_TOKEN", "")
WINDMILL_WORKSPACE = os.environ.get("WINDMILL_WORKSPACE", "admins")
MCP_HOST = os.environ.get("MCP_HOST", "127.0.0.1")
MCP_PORT = int(os.environ.get("MCP_PORT", "8001"))
if not WINDMILL_TOKEN:
print("Error: WINDMILL_TOKEN 環境変数が設定されていません", file=sys.stderr)
sys.exit(1)
mcp = FastMCP("windmill", host=MCP_HOST, port=MCP_PORT)
def _headers() -> dict:
return {"Authorization": f"Bearer {WINDMILL_TOKEN}"}
def _api(path: str) -> str:
return f"{WINDMILL_URL}/api/w/{WINDMILL_WORKSPACE}/{path}"
@mcp.tool()
def windmill_list_flows(per_page: int = 20) -> str:
"""Windmill のフロー一覧を取得する
Args:
per_page: 取得件数最大100
"""
resp = httpx.get(
_api("flows/list"),
headers=_headers(),
params={"per_page": min(per_page, 100)},
timeout=30,
)
resp.raise_for_status()
flows = resp.json()
if not flows:
return "フローが見つかりませんでした"
lines = [
f"- {f['path']}: {f.get('summary', '(概要なし)')}" for f in flows
]
return "\n".join(lines)
@mcp.tool()
def windmill_get_flow(path: str) -> str:
"""指定したパスのフロー定義(スクリプト含む)を取得する
Args:
path: フローのパス (例: u/antigravity/git_sync)
"""
resp = httpx.get(_api(f"flows/get/{path}"), headers=_headers(), timeout=30)
resp.raise_for_status()
return json.dumps(resp.json(), indent=2, ensure_ascii=False)
@mcp.tool()
def windmill_run_flow(path: str, args: str = "{}") -> str:
"""フローをトリガーして実行する
Args:
path: フローのパス (例: u/antigravity/git_sync)
args: JSON形式の入力引数 (例: {"key": "value"})
"""
try:
args_dict = json.loads(args)
except json.JSONDecodeError as e:
return f"Error: argsのJSON形式が不正です: {e}"
resp = httpx.post(
_api(f"jobs/run/f/{path}"),
headers=_headers(),
json=args_dict,
timeout=30,
)
resp.raise_for_status()
job_id = resp.text.strip().strip('"')
return (
f"フローを開始しました。\n"
f"ジョブID: {job_id}\n"
f"詳細URL: {WINDMILL_URL}/run/{job_id}?workspace={WINDMILL_WORKSPACE}"
)
@mcp.tool()
def windmill_list_recent_jobs(
limit: int = 20,
success_only: bool = False,
failure_only: bool = False,
script_path_filter: str = "",
) -> str:
"""最近のジョブ一覧を取得する
Args:
limit: 取得件数最大100
success_only: Trueにすると成功ジョブのみ表示
failure_only: Trueにすると失敗ジョブのみ表示
script_path_filter: パスで絞り込む (例: u/antigravity/git_sync)
"""
params: dict = {"per_page": min(limit, 100)}
if success_only:
params["success"] = "true"
if failure_only:
params["success"] = "false"
if script_path_filter:
params["script_path_filter"] = script_path_filter
resp = httpx.get(_api("jobs/list"), headers=_headers(), params=params, timeout=30)
resp.raise_for_status()
jobs = resp.json()
if not jobs:
return "ジョブが見つかりませんでした"
lines = []
for j in jobs:
success = j.get("success")
if success is True:
status = "[OK]"
elif success is False:
status = "[FAIL]"
else:
status = "[RUNNING]"
path = j.get("script_path", "unknown")
started = (j.get("started_at") or "")[:19] or "pending"
job_id = j.get("id", "")
lines.append(f"{status} [{started}] {path} (ID: {job_id})")
return "\n".join(lines)
@mcp.tool()
def windmill_get_job_logs(job_id: str) -> str:
"""ジョブの詳細情報とログを取得する
Args:
job_id: ジョブのIDwindmill_list_recent_jobs で確認できる)
"""
resp = httpx.get(_api(f"jobs_u/get/{job_id}"), headers=_headers(), timeout=30)
resp.raise_for_status()
job = resp.json()
success = job.get("success")
if success is True:
state = "成功 [OK]"
elif success is False:
state = "失敗 [FAIL]"
else:
state = "実行中 [RUNNING]"
result_parts = [
f"ジョブID: {job_id}",
f"パス: {job.get('script_path', 'N/A')}",
f"状態: {state}",
f"開始: {job.get('started_at', 'N/A')}",
f"終了: {job.get('created_at', 'N/A')}",
]
log_resp = httpx.get(
_api(f"jobs_u/getlogs/{job_id}"), headers=_headers(), timeout=30
)
if log_resp.status_code == 200:
result_parts.append("\n--- ログ ---")
result_parts.append(log_resp.text)
result_val = job.get("result")
if result_val is not None:
result_parts.append("\n--- 実行結果 ---")
result_parts.append(
json.dumps(result_val, indent=2, ensure_ascii=False)
if isinstance(result_val, (dict, list))
else str(result_val)
)
return "\n".join(result_parts)
@mcp.tool()
def windmill_create_flow(path: str, summary: str, flow_definition: str, description: str = "") -> str:
"""新しいフローを作成する
Args:
path: フローのパス (例: u/admin/my_flow)
summary: フローの概要
flow_definition: フローの定義 (JSON形式の文字列)
description: フローの詳細説明 (省略可)
"""
try:
flow_value = json.loads(flow_definition)
except json.JSONDecodeError as e:
return f"Error: flow_definitionのJSON形式が不正です: {e}"
payload = {
"path": path,
"summary": summary,
"description": description,
"value": flow_value,
}
resp = httpx.post(
_api("flows/create"),
headers=_headers(),
json=payload,
timeout=30,
)
resp.raise_for_status()
return (
f"フローを作成しました。\n"
f"パス: {path}\n"
f"URL: {WINDMILL_URL}/flows/edit/{path}?workspace={WINDMILL_WORKSPACE}"
)
@mcp.tool()
def windmill_update_flow(path: str, summary: str, flow_definition: str, description: str = "") -> str:
"""既存のフローを更新する
Args:
path: フローのパス (例: u/admin/my_flow)
summary: フローの概要
flow_definition: フローの定義 (JSON形式の文字列)
description: フローの詳細説明 (省略可)
"""
try:
flow_value = json.loads(flow_definition)
except json.JSONDecodeError as e:
return f"Error: flow_definitionのJSON形式が不正です: {e}"
payload = {
"path": path,
"summary": summary,
"description": description,
"value": flow_value,
}
resp = httpx.post(
_api(f"flows/update/{path}"),
headers=_headers(),
json=payload,
timeout=30,
)
resp.raise_for_status()
return (
f"フローを更新しました。\n"
f"パス: {path}\n"
f"URL: {WINDMILL_URL}/flows/edit/{path}?workspace={WINDMILL_WORKSPACE}"
)
@mcp.tool()
def windmill_create_script(
path: str, language: str, content: str, summary: str = "", description: str = ""
) -> str:
"""新しいスクリプトを作成する(既存パスの場合は新バージョンを登録する)
Args:
path: スクリプトのパス (例: u/admin/my_script)
language: 言語 (python3, deno, bun, bash など)
content: スクリプトのソースコード
summary: スクリプトの概要 (省略可)
description: スクリプトの詳細説明 (省略可)
"""
payload = {
"path": path,
"language": language,
"content": content,
"summary": summary,
"description": description,
}
resp = httpx.post(
_api("scripts/create"),
headers=_headers(),
json=payload,
timeout=30,
)
resp.raise_for_status()
hash_val = resp.text.strip().strip('"')
return (
f"スクリプトを作成しました。\n"
f"パス: {path}\n"
f"ハッシュ: {hash_val}\n"
f"URL: {WINDMILL_URL}/scripts/edit/{path}?workspace={WINDMILL_WORKSPACE}"
)
@mcp.tool()
def windmill_list_scripts(per_page: int = 20) -> str:
"""Windmill のスクリプト一覧を取得する
Args:
per_page: 取得件数最大100
"""
resp = httpx.get(
_api("scripts/list"),
headers=_headers(),
params={"per_page": min(per_page, 100)},
timeout=30,
)
resp.raise_for_status()
scripts = resp.json()
if not scripts:
return "スクリプトが見つかりませんでした"
lines = [
f"- {s['path']} [{s.get('language', '?')}]: {s.get('summary', '(概要なし)')}"
for s in scripts
]
return "\n".join(lines)
@mcp.tool()
def windmill_get_script(path: str) -> str:
"""指定したパスのスクリプトのソースコードを取得する
Args:
path: スクリプトのパス (例: u/antigravity/test_git_sync)
"""
resp = httpx.get(_api(f"scripts/get/{path}"), headers=_headers(), timeout=30)
resp.raise_for_status()
script = resp.json()
result_parts = [
f"パス: {script.get('path', 'N/A')}",
f"言語: {script.get('language', 'N/A')}",
f"概要: {script.get('summary', 'N/A')}",
"",
"--- コード ---",
script.get("content", "(コードなし)"),
]
return "\n".join(result_parts)
if __name__ == "__main__":
transport = os.environ.get("MCP_TRANSPORT", "stdio")
mcp.run(transport=transport)