#!/usr/bin/env python3 """Windmill MCP Server - Claude が Windmill を直接操作できるようにする""" import os import json import sys import httpx from mcp.server.fastmcp import FastMCP WINDMILL_URL = os.environ.get("WINDMILL_URL", "https://windmill.keinafarm.net") WINDMILL_TOKEN = os.environ.get("WINDMILL_TOKEN", "") WINDMILL_WORKSPACE = os.environ.get("WINDMILL_WORKSPACE", "admins") MCP_HOST = os.environ.get("MCP_HOST", "127.0.0.1") MCP_PORT = int(os.environ.get("MCP_PORT", "8001")) if not WINDMILL_TOKEN: print("Error: WINDMILL_TOKEN 環境変数が設定されていません", file=sys.stderr) sys.exit(1) mcp = FastMCP("windmill", host=MCP_HOST, port=MCP_PORT) def _headers() -> dict: return {"Authorization": f"Bearer {WINDMILL_TOKEN}"} def _api(path: str) -> str: return f"{WINDMILL_URL}/api/w/{WINDMILL_WORKSPACE}/{path}" @mcp.tool() def windmill_list_flows(per_page: int = 20) -> str: """Windmill のフロー一覧を取得する Args: per_page: 取得件数(最大100) """ resp = httpx.get( _api("flows/list"), headers=_headers(), params={"per_page": min(per_page, 100)}, timeout=30, ) resp.raise_for_status() flows = resp.json() if not flows: return "フローが見つかりませんでした" lines = [ f"- {f['path']}: {f.get('summary', '(概要なし)')}" for f in flows ] return "\n".join(lines) @mcp.tool() def windmill_get_flow(path: str) -> str: """指定したパスのフロー定義(スクリプト含む)を取得する Args: path: フローのパス (例: u/antigravity/git_sync) """ resp = httpx.get(_api(f"flows/get/{path}"), headers=_headers(), timeout=30) resp.raise_for_status() return json.dumps(resp.json(), indent=2, ensure_ascii=False) @mcp.tool() def windmill_run_flow(path: str, args: str = "{}") -> str: """フローをトリガーして実行する Args: path: フローのパス (例: u/antigravity/git_sync) args: JSON形式の入力引数 (例: {"key": "value"}) """ try: args_dict = json.loads(args) except json.JSONDecodeError as e: return f"Error: argsのJSON形式が不正です: {e}" resp = httpx.post( _api(f"jobs/run/f/{path}"), headers=_headers(), json=args_dict, timeout=30, ) resp.raise_for_status() job_id = resp.text.strip().strip('"') return ( f"フローを開始しました。\n" f"ジョブID: {job_id}\n" f"詳細URL: {WINDMILL_URL}/run/{job_id}?workspace={WINDMILL_WORKSPACE}" ) @mcp.tool() def windmill_list_recent_jobs( limit: int = 20, success_only: bool = False, failure_only: bool = False, script_path_filter: str = "", ) -> str: """最近のジョブ一覧を取得する Args: limit: 取得件数(最大100) success_only: Trueにすると成功ジョブのみ表示 failure_only: Trueにすると失敗ジョブのみ表示 script_path_filter: パスで絞り込む (例: u/antigravity/git_sync) """ params: dict = {"per_page": min(limit, 100)} if success_only: params["success"] = "true" if failure_only: params["success"] = "false" if script_path_filter: params["script_path_filter"] = script_path_filter resp = httpx.get(_api("jobs/list"), headers=_headers(), params=params, timeout=30) resp.raise_for_status() jobs = resp.json() if not jobs: return "ジョブが見つかりませんでした" lines = [] for j in jobs: success = j.get("success") if success is True: status = "[OK]" elif success is False: status = "[FAIL]" else: status = "[RUNNING]" path = j.get("script_path", "unknown") started = (j.get("started_at") or "")[:19] or "pending" job_id = j.get("id", "") lines.append(f"{status} [{started}] {path} (ID: {job_id})") return "\n".join(lines) @mcp.tool() def windmill_get_job_logs(job_id: str) -> str: """ジョブの詳細情報とログを取得する Args: job_id: ジョブのID(windmill_list_recent_jobs で確認できる) """ resp = httpx.get(_api(f"jobs_u/get/{job_id}"), headers=_headers(), timeout=30) resp.raise_for_status() job = resp.json() success = job.get("success") if success is True: state = "成功 [OK]" elif success is False: state = "失敗 [FAIL]" else: state = "実行中 [RUNNING]" result_parts = [ f"ジョブID: {job_id}", f"パス: {job.get('script_path', 'N/A')}", f"状態: {state}", f"開始: {job.get('started_at', 'N/A')}", f"終了: {job.get('created_at', 'N/A')}", ] log_resp = httpx.get( _api(f"jobs_u/getlogs/{job_id}"), headers=_headers(), timeout=30 ) if log_resp.status_code == 200: result_parts.append("\n--- ログ ---") result_parts.append(log_resp.text) result_val = job.get("result") if result_val is not None: result_parts.append("\n--- 実行結果 ---") result_parts.append( json.dumps(result_val, indent=2, ensure_ascii=False) if isinstance(result_val, (dict, list)) else str(result_val) ) return "\n".join(result_parts) @mcp.tool() def windmill_create_flow(path: str, summary: str, flow_definition: str, description: str = "") -> str: """新しいフローを作成する Args: path: フローのパス (例: u/admin/my_flow) summary: フローの概要 flow_definition: フローの定義 (JSON形式の文字列) description: フローの詳細説明 (省略可) """ try: flow_value = json.loads(flow_definition) except json.JSONDecodeError as e: return f"Error: flow_definitionのJSON形式が不正です: {e}" payload = { "path": path, "summary": summary, "description": description, "value": flow_value, } resp = httpx.post( _api("flows/create"), headers=_headers(), json=payload, timeout=30, ) resp.raise_for_status() return ( f"フローを作成しました。\n" f"パス: {path}\n" f"URL: {WINDMILL_URL}/flows/edit/{path}?workspace={WINDMILL_WORKSPACE}" ) @mcp.tool() def windmill_update_flow(path: str, summary: str, flow_definition: str, description: str = "") -> str: """既存のフローを更新する Args: path: フローのパス (例: u/admin/my_flow) summary: フローの概要 flow_definition: フローの定義 (JSON形式の文字列) description: フローの詳細説明 (省略可) """ try: flow_value = json.loads(flow_definition) except json.JSONDecodeError as e: return f"Error: flow_definitionのJSON形式が不正です: {e}" payload = { "path": path, "summary": summary, "description": description, "value": flow_value, } resp = httpx.post( _api(f"flows/edit/{path}"), headers=_headers(), json=payload, timeout=30, ) resp.raise_for_status() return ( f"フローを更新しました。\n" f"パス: {path}\n" f"URL: {WINDMILL_URL}/flows/edit/{path}?workspace={WINDMILL_WORKSPACE}" ) @mcp.tool() def windmill_create_script( path: str, language: str, content: str, summary: str = "", description: str = "" ) -> str: """新しいスクリプトを作成する(既存パスの場合は新バージョンを登録する) Args: path: スクリプトのパス (例: u/admin/my_script) language: 言語 (python3, deno, bun, bash など) content: スクリプトのソースコード summary: スクリプトの概要 (省略可) description: スクリプトの詳細説明 (省略可) """ payload = { "path": path, "language": language, "content": content, "summary": summary, "description": description, } resp = httpx.post( _api("scripts/create"), headers=_headers(), json=payload, timeout=30, ) resp.raise_for_status() hash_val = resp.text.strip().strip('"') return ( f"スクリプトを作成しました。\n" f"パス: {path}\n" f"ハッシュ: {hash_val}\n" f"URL: {WINDMILL_URL}/scripts/edit/{path}?workspace={WINDMILL_WORKSPACE}" ) @mcp.tool() def windmill_list_scripts(per_page: int = 20) -> str: """Windmill のスクリプト一覧を取得する Args: per_page: 取得件数(最大100) """ resp = httpx.get( _api("scripts/list"), headers=_headers(), params={"per_page": min(per_page, 100)}, timeout=30, ) resp.raise_for_status() scripts = resp.json() if not scripts: return "スクリプトが見つかりませんでした" lines = [ f"- {s['path']} [{s.get('language', '?')}]: {s.get('summary', '(概要なし)')}" for s in scripts ] return "\n".join(lines) @mcp.tool() def windmill_get_script(path: str) -> str: """指定したパスのスクリプトのソースコードを取得する Args: path: スクリプトのパス (例: u/antigravity/test_git_sync) """ resp = httpx.get(_api(f"scripts/get/{path}"), headers=_headers(), timeout=30) resp.raise_for_status() script = resp.json() result_parts = [ f"パス: {script.get('path', 'N/A')}", f"言語: {script.get('language', 'N/A')}", f"概要: {script.get('summary', 'N/A')}", "", "--- コード ---", script.get("content", "(コードなし)"), ] return "\n".join(result_parts) if __name__ == "__main__": transport = os.environ.get("MCP_TRANSPORT", "stdio") mcp.run(transport=transport)