diff --git a/workflows/f/butler/execute_task_steps__flow/execute_butler_task_steps.lock b/workflows/f/butler/execute_task_steps__flow/execute_butler_task_steps.lock new file mode 100644 index 0000000..3aacf1d --- /dev/null +++ b/workflows/f/butler/execute_task_steps__flow/execute_butler_task_steps.lock @@ -0,0 +1 @@ +# py: 3.12 diff --git a/workflows/f/butler/execute_task_steps__flow/execute_butler_task_steps.py b/workflows/f/butler/execute_task_steps__flow/execute_butler_task_steps.py new file mode 100644 index 0000000..d56bd22 --- /dev/null +++ b/workflows/f/butler/execute_task_steps__flow/execute_butler_task_steps.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +import re +import subprocess +import time +from typing import Any + + +def main( + task_contract: dict[str, Any], + steps: list[dict[str, Any]], + context: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Standalone Windmill runner flow for Butler delegated step execution. + + This file is intentionally self-contained so it can be pasted or synced to + Windmill without requiring the Butler repository on the worker. + """ + timeout_sec = _resolve_timeout_sec(task_contract) + resolved_context = dict(context or {}) + results: list[dict[str, Any]] = [] + + for idx, raw_step in enumerate(steps): + result = _execute_step(raw_step, idx, timeout_sec, resolved_context) + + if raw_step.get("kind") in {"cmd", "check"} and result["exit_code"] != 0: + err_type = _classify_error(str(raw_step.get("value", "")), result["stderr"] or result["stdout"]) + if err_type == "transient": + time.sleep(30) + result = _execute_step(raw_step, idx, timeout_sec, resolved_context) + + results.append(result) + + if result["exit_code"] != 0: + evidence = _build_evidence(results) + evidence["ok"] = False + return { + "ok": False, + "summary": _failure_summary(raw_step, result), + "failed_step_index": idx, + "step_results": results, + "evidence": evidence, + } + + evidence = _build_evidence(results) + evidence["ok"] = True + return { + "ok": True, + "summary": f"Executed {len(results)} step(s) successfully.", + "step_results": results, + "evidence": evidence, + } + + +def _resolve_timeout_sec(task_contract: dict[str, Any]) -> int: + constraints = task_contract.get("constraints", {}) + max_minutes = constraints.get("max_minutes", 1) + try: + return max(1, int(max_minutes) * 60) + except (TypeError, ValueError): + return 60 + + +def _execute_step( + step: dict[str, Any], + step_index: int, + timeout_sec: int, + context: dict[str, Any], +) -> dict[str, Any]: + kind = str(step.get("kind", "")).strip() + value = str(step.get("value", "") or "") + + if kind == "wait": + started = time.perf_counter() + seconds = _parse_wait_seconds(value) + time.sleep(seconds) + duration_ms = int((time.perf_counter() - started) * 1000) + return _step_result(step_index, kind, value, 0, "", "", duration_ms) + + if kind == "mcp_call": + return _execute_mcp_call(step, step_index, timeout_sec, context) + + started = time.perf_counter() + try: + proc = subprocess.run( + value, + shell=True, + capture_output=True, + timeout=timeout_sec, + text=True, + ) + duration_ms = int((time.perf_counter() - started) * 1000) + return _step_result( + step_index, + kind, + value, + proc.returncode, + proc.stdout, + proc.stderr, + duration_ms, + ) + except subprocess.TimeoutExpired as exc: + duration_ms = int((time.perf_counter() - started) * 1000) + stdout = exc.stdout if isinstance(exc.stdout, str) else "" + return _step_result( + step_index, + kind, + value, + 124, + stdout, + f"timeout after {timeout_sec}s", + duration_ms, + ) + + +def _execute_mcp_call( + step: dict[str, Any], + step_index: int, + timeout_sec: int, + context: dict[str, Any], +) -> dict[str, Any]: + """Placeholder for future Windmill-side MCP execution. + + The first real connectivity test uses `check` steps, so we keep the + deployment artifact dependency-free for now and fail explicitly if a flow + attempts `mcp_call`. + """ + _ = timeout_sec, context + server = str(step.get("server", "") or "").strip() + tool = str(step.get("tool", "") or "").strip() + return _step_result( + step_index, + "mcp_call", + tool, + 1, + "", + f"mcp_call is not supported in the standalone Windmill runner yet (server={server}, tool={tool})", + 0, + ) + + +def _step_result( + step_index: int, + kind: str, + value: str, + exit_code: int, + stdout: str, + stderr: str, + duration_ms: int, +) -> dict[str, Any]: + return { + "step_index": step_index, + "kind": kind, + "value": value, + "exit_code": exit_code, + "stdout": stdout, + "stderr": stderr, + "duration_ms": duration_ms, + } + + +def _build_evidence(results: list[dict[str, Any]]) -> dict[str, Any]: + executed_commands = [str(result.get("value", "")) for result in results] + key_outputs: list[str] = [] + error_lines: list[str] = [] + + for result in results: + stdout = str(result.get("stdout", "") or "") + stderr = str(result.get("stderr", "") or "") + if stdout: + key_outputs.extend(stdout.splitlines()[:5]) + if stderr: + lines = stderr.splitlines() + error_lines.extend(lines[:5]) + if len(lines) > 5: + error_lines.extend(lines[-5:]) + + return { + "executed_commands": executed_commands, + "key_outputs": key_outputs, + "error_head_tail": "\n".join(error_lines) if error_lines else None, + } + + +def _failure_summary(step: dict[str, Any], result: dict[str, Any]) -> str: + kind = str(step.get("kind", "") or "") + stderr = str(result.get("stderr", "") or "") + stdout = str(result.get("stdout", "") or "") + if kind == "mcp_call": + return stderr or stdout or "mcp_call failed." + return stderr or stdout or f"{kind} step failed." + + +def _classify_error(command: str, output: str) -> str: + lowered = (command + "\n" + output).lower() + transient_markers = [ + "timeout", + "timed out", + "temporarily unavailable", + "connection reset", + "connection aborted", + "connection refused", + "503", + "502", + "rate limit", + ] + for marker in transient_markers: + if marker in lowered: + return "transient" + return "permanent" + + +def _parse_wait_seconds(value: str) -> float: + normalized = value.strip().lower() + if re.fullmatch(r"\d+(\.\d+)?s", normalized): + return float(normalized[:-1]) + if re.fullmatch(r"\d+(\.\d+)?", normalized): + return float(normalized) + raise ValueError(f"Invalid wait value: {value}") diff --git a/workflows/f/butler/execute_task_steps__flow/flow.yaml b/workflows/f/butler/execute_task_steps__flow/flow.yaml new file mode 100644 index 0000000..21864c0 --- /dev/null +++ b/workflows/f/butler/execute_task_steps__flow/flow.yaml @@ -0,0 +1,47 @@ +summary: Butler generic runner - delegated step execution +description: >- + Receives a serialized TaskContract and resolved step list from Butler, + executes steps server-side with Butler-compatible semantics + (cmd/check/wait/retry), and returns ok/summary/step_results/evidence. +value: + modules: + - id: a + summary: Execute Butler task steps + value: + type: rawscript + content: '!inline execute_butler_task_steps.py' + input_transforms: + context: + type: javascript + expr: flow_input.context + steps: + type: javascript + expr: flow_input.steps + task_contract: + type: javascript + expr: flow_input.task_contract + lock: '!inline execute_butler_task_steps.lock' + language: python3 +schema: + $schema: 'https://json-schema.org/draft/2020-12/schema' + type: object + order: + - task_contract + - steps + - context + properties: + context: + type: object + description: 'Execution context (target, payload)' + default: {} + steps: + type: array + description: Resolved SOP step list + items: + type: object + task_contract: + type: object + description: Serialized Butler TaskContract + required: + - task_contract + - steps diff --git a/workflows/wmill-lock.yaml b/workflows/wmill-lock.yaml index aa05c85..4ece79f 100644 --- a/workflows/wmill-lock.yaml +++ b/workflows/wmill-lock.yaml @@ -5,6 +5,8 @@ locks: 'f/app_custom/system_heartbeat__flow+step2:_データ検証.py': d7f4e6e04ed116ba3836cb32793a0187a69359a3f2a807b533030b01d42bed39 'f/app_custom/system_heartbeat__flow+step3:_httpヘルスチェック.py': 5d3bce0ddb4f521444bf01bc80670e7321933ad09f935044f4d6123c658ca7a8 'f/app_custom/system_heartbeat__flow+step4:_年度判定_&_最終レポート.py': 6889bfac9a629fa42cf0505cbc945ba3782c59e1697b8493ce6101ef5ffa8b32 + f/butler/execute_task_steps__flow+__flow_hash: 4b331a51d9f4bd6fbfc4714a859a08df86184f81fd902a382725541c002bdca8 + f/butler/execute_task_steps__flow+execute_butler_task_steps.py: 90e90680a89ff3e7bd05d6c32513e9893b0c2064ae1c9e3dc3e2f3e05bad2166 f/dev/hello_world__flow+__flow_hash: 08a256433d5978b05d08e2ba6cfa8e4324c23be4875c9775777d683f32c6015e f/dev/hello_world__flow+a.py: 63bf18351b5b0e81067254a03c9811e6bb388c890ad72e18092ac5ec2690a456 f/dev/konnnichiha__flow+__flow_hash: 0d40e9e9fe2cf6944028d671b6facb9e0598d41abc3682993d5339800188b8f1