#!/usr/bin/env python3 """Windmill MCP Server - Claude が Windmill を直接操作できるようにする""" import os import json import sys import httpx from mcp.server.fastmcp import FastMCP WINDMILL_URL = os.environ.get("WINDMILL_URL", "https://windmill.keinafarm.net") WINDMILL_TOKEN = os.environ.get("WINDMILL_TOKEN", "") WINDMILL_WORKSPACE = os.environ.get("WINDMILL_WORKSPACE", "admins") if not WINDMILL_TOKEN: print("Error: WINDMILL_TOKEN 環境変数が設定されていません", file=sys.stderr) sys.exit(1) mcp = FastMCP("windmill") def _headers() -> dict: return {"Authorization": f"Bearer {WINDMILL_TOKEN}"} def _api(path: str) -> str: return f"{WINDMILL_URL}/api/w/{WINDMILL_WORKSPACE}/{path}" @mcp.tool() def windmill_list_flows(per_page: int = 20) -> str: """Windmill のフロー一覧を取得する Args: per_page: 取得件数(最大100) """ resp = httpx.get( _api("flows/list"), headers=_headers(), params={"per_page": min(per_page, 100)}, timeout=30, ) resp.raise_for_status() flows = resp.json() if not flows: return "フローが見つかりませんでした" lines = [ f"- {f['path']}: {f.get('summary', '(概要なし)')}" for f in flows ] return "\n".join(lines) @mcp.tool() def windmill_get_flow(path: str) -> str: """指定したパスのフロー定義(スクリプト含む)を取得する Args: path: フローのパス (例: u/antigravity/git_sync) """ resp = httpx.get(_api(f"flows/get/{path}"), headers=_headers(), timeout=30) resp.raise_for_status() return json.dumps(resp.json(), indent=2, ensure_ascii=False) @mcp.tool() def windmill_run_flow(path: str, args: str = "{}") -> str: """フローをトリガーして実行する Args: path: フローのパス (例: u/antigravity/git_sync) args: JSON形式の入力引数 (例: {"key": "value"}) """ try: args_dict = json.loads(args) except json.JSONDecodeError as e: return f"Error: argsのJSON形式が不正です: {e}" resp = httpx.post( _api(f"jobs/run/f/{path}"), headers=_headers(), json=args_dict, timeout=30, ) resp.raise_for_status() job_id = resp.text.strip().strip('"') return ( f"フローを開始しました。\n" f"ジョブID: {job_id}\n" f"詳細URL: {WINDMILL_URL}/run/{job_id}?workspace={WINDMILL_WORKSPACE}" ) @mcp.tool() def windmill_list_recent_jobs( limit: int = 20, success_only: bool = False, failure_only: bool = False, script_path_filter: str = "", ) -> str: """最近のジョブ一覧を取得する Args: limit: 取得件数(最大100) success_only: Trueにすると成功ジョブのみ表示 failure_only: Trueにすると失敗ジョブのみ表示 script_path_filter: パスで絞り込む (例: u/antigravity/git_sync) """ params: dict = {"per_page": min(limit, 100)} if success_only: params["success"] = "true" if failure_only: params["success"] = "false" if script_path_filter: params["script_path_filter"] = script_path_filter resp = httpx.get(_api("jobs/list"), headers=_headers(), params=params, timeout=30) resp.raise_for_status() jobs = resp.json() if not jobs: return "ジョブが見つかりませんでした" lines = [] for j in jobs: success = j.get("success") if success is True: status = "[OK]" elif success is False: status = "[FAIL]" else: status = "[RUNNING]" path = j.get("script_path", "unknown") started = (j.get("started_at") or "")[:19] or "pending" job_id = j.get("id", "") lines.append(f"{status} [{started}] {path} (ID: {job_id})") return "\n".join(lines) @mcp.tool() def windmill_get_job_logs(job_id: str) -> str: """ジョブの詳細情報とログを取得する Args: job_id: ジョブのID(windmill_list_recent_jobs で確認できる) """ resp = httpx.get(_api(f"jobs_u/get/{job_id}"), headers=_headers(), timeout=30) resp.raise_for_status() job = resp.json() success = job.get("success") if success is True: state = "成功 [OK]" elif success is False: state = "失敗 [FAIL]" else: state = "実行中 [RUNNING]" result_parts = [ f"ジョブID: {job_id}", f"パス: {job.get('script_path', 'N/A')}", f"状態: {state}", f"開始: {job.get('started_at', 'N/A')}", f"終了: {job.get('created_at', 'N/A')}", ] log_resp = httpx.get( _api(f"jobs_u/getlogs/{job_id}"), headers=_headers(), timeout=30 ) if log_resp.status_code == 200: result_parts.append("\n--- ログ ---") result_parts.append(log_resp.text) result_val = job.get("result") if result_val is not None: result_parts.append("\n--- 実行結果 ---") result_parts.append( json.dumps(result_val, indent=2, ensure_ascii=False) if isinstance(result_val, (dict, list)) else str(result_val) ) return "\n".join(result_parts) @mcp.tool() def windmill_list_scripts(per_page: int = 20) -> str: """Windmill のスクリプト一覧を取得する Args: per_page: 取得件数(最大100) """ resp = httpx.get( _api("scripts/list"), headers=_headers(), params={"per_page": min(per_page, 100)}, timeout=30, ) resp.raise_for_status() scripts = resp.json() if not scripts: return "スクリプトが見つかりませんでした" lines = [ f"- {s['path']} [{s.get('language', '?')}]: {s.get('summary', '(概要なし)')}" for s in scripts ] return "\n".join(lines) @mcp.tool() def windmill_get_script(path: str) -> str: """指定したパスのスクリプトのソースコードを取得する Args: path: スクリプトのパス (例: u/antigravity/test_git_sync) """ resp = httpx.get(_api(f"scripts/get/{path}"), headers=_headers(), timeout=30) resp.raise_for_status() script = resp.json() result_parts = [ f"パス: {script.get('path', 'N/A')}", f"言語: {script.get('language', 'N/A')}", f"概要: {script.get('summary', 'N/A')}", "", "--- コード ---", script.get("content", "(コードなし)"), ] return "\n".join(result_parts) if __name__ == "__main__": mcp.run(transport="stdio")