Files
windmill_workflow/autonomous_windmill/windmill_client.py

51 lines
1.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Windmill REST API の薄いラッパー。MCP は使わず直接 HTTPS で叩く。"""
import httpx
from config import WINDMILL_URL, WINDMILL_TOKEN, WINDMILL_WORKSPACE
def _headers() -> dict:
return {"Authorization": f"Bearer {WINDMILL_TOKEN}"}
def _api(path: str) -> str:
return f"{WINDMILL_URL}/api/w/{WINDMILL_WORKSPACE}/{path}"
def flow_exists(path: str) -> bool:
resp = httpx.get(_api(f"flows/get/{path}"), headers=_headers(), timeout=30)
return resp.status_code == 200
def create_flow(path: str, summary: str, value: dict) -> None:
payload = {"path": path, "summary": summary, "description": "", "value": value}
resp = httpx.post(_api("flows/create"), headers=_headers(), json=payload, timeout=30)
resp.raise_for_status()
def update_flow(path: str, summary: str, value: dict) -> None:
# 正しいエンドポイントは /flows/update/{path}/flows/edit/ は404になる
payload = {"path": path, "summary": summary, "description": "", "value": value}
resp = httpx.post(_api(f"flows/update/{path}"), headers=_headers(), json=payload, timeout=30)
resp.raise_for_status()
def run_flow(path: str) -> str:
"""フローを実行して job_id を返す。"""
resp = httpx.post(_api(f"jobs/run/f/{path}"), headers=_headers(), json={}, timeout=30)
resp.raise_for_status()
return resp.text.strip().strip('"')
def get_job(job_id: str) -> dict:
"""ジョブの状態を取得する。success フィールド: True=成功, False=失敗, None=実行中。"""
resp = httpx.get(_api(f"jobs_u/get/{job_id}"), headers=_headers(), timeout=30)
resp.raise_for_status()
return resp.json()
def get_job_logs(job_id: str) -> str:
resp = httpx.get(_api(f"jobs_u/getlogs/{job_id}"), headers=_headers(), timeout=30)
if resp.status_code == 200:
return resp.text
return ""