Auto-sync: 2026-03-13 06:30:01

This commit is contained in:
Windmill Bot
2026-03-13 06:30:01 +00:00
parent 280c12ccdf
commit d29dcc2b61
4 changed files with 269 additions and 0 deletions

View File

@@ -0,0 +1 @@
# py: 3.12

View File

@@ -0,0 +1,219 @@
from __future__ import annotations
import re
import subprocess
import time
from typing import Any
def main(
task_contract: dict[str, Any],
steps: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Standalone Windmill runner flow for Butler delegated step execution.
This file is intentionally self-contained so it can be pasted or synced to
Windmill without requiring the Butler repository on the worker.
"""
timeout_sec = _resolve_timeout_sec(task_contract)
resolved_context = dict(context or {})
results: list[dict[str, Any]] = []
for idx, raw_step in enumerate(steps):
result = _execute_step(raw_step, idx, timeout_sec, resolved_context)
if raw_step.get("kind") in {"cmd", "check"} and result["exit_code"] != 0:
err_type = _classify_error(str(raw_step.get("value", "")), result["stderr"] or result["stdout"])
if err_type == "transient":
time.sleep(30)
result = _execute_step(raw_step, idx, timeout_sec, resolved_context)
results.append(result)
if result["exit_code"] != 0:
evidence = _build_evidence(results)
evidence["ok"] = False
return {
"ok": False,
"summary": _failure_summary(raw_step, result),
"failed_step_index": idx,
"step_results": results,
"evidence": evidence,
}
evidence = _build_evidence(results)
evidence["ok"] = True
return {
"ok": True,
"summary": f"Executed {len(results)} step(s) successfully.",
"step_results": results,
"evidence": evidence,
}
def _resolve_timeout_sec(task_contract: dict[str, Any]) -> int:
constraints = task_contract.get("constraints", {})
max_minutes = constraints.get("max_minutes", 1)
try:
return max(1, int(max_minutes) * 60)
except (TypeError, ValueError):
return 60
def _execute_step(
step: dict[str, Any],
step_index: int,
timeout_sec: int,
context: dict[str, Any],
) -> dict[str, Any]:
kind = str(step.get("kind", "")).strip()
value = str(step.get("value", "") or "")
if kind == "wait":
started = time.perf_counter()
seconds = _parse_wait_seconds(value)
time.sleep(seconds)
duration_ms = int((time.perf_counter() - started) * 1000)
return _step_result(step_index, kind, value, 0, "", "", duration_ms)
if kind == "mcp_call":
return _execute_mcp_call(step, step_index, timeout_sec, context)
started = time.perf_counter()
try:
proc = subprocess.run(
value,
shell=True,
capture_output=True,
timeout=timeout_sec,
text=True,
)
duration_ms = int((time.perf_counter() - started) * 1000)
return _step_result(
step_index,
kind,
value,
proc.returncode,
proc.stdout,
proc.stderr,
duration_ms,
)
except subprocess.TimeoutExpired as exc:
duration_ms = int((time.perf_counter() - started) * 1000)
stdout = exc.stdout if isinstance(exc.stdout, str) else ""
return _step_result(
step_index,
kind,
value,
124,
stdout,
f"timeout after {timeout_sec}s",
duration_ms,
)
def _execute_mcp_call(
step: dict[str, Any],
step_index: int,
timeout_sec: int,
context: dict[str, Any],
) -> dict[str, Any]:
"""Placeholder for future Windmill-side MCP execution.
The first real connectivity test uses `check` steps, so we keep the
deployment artifact dependency-free for now and fail explicitly if a flow
attempts `mcp_call`.
"""
_ = timeout_sec, context
server = str(step.get("server", "") or "").strip()
tool = str(step.get("tool", "") or "").strip()
return _step_result(
step_index,
"mcp_call",
tool,
1,
"",
f"mcp_call is not supported in the standalone Windmill runner yet (server={server}, tool={tool})",
0,
)
def _step_result(
step_index: int,
kind: str,
value: str,
exit_code: int,
stdout: str,
stderr: str,
duration_ms: int,
) -> dict[str, Any]:
return {
"step_index": step_index,
"kind": kind,
"value": value,
"exit_code": exit_code,
"stdout": stdout,
"stderr": stderr,
"duration_ms": duration_ms,
}
def _build_evidence(results: list[dict[str, Any]]) -> dict[str, Any]:
executed_commands = [str(result.get("value", "")) for result in results]
key_outputs: list[str] = []
error_lines: list[str] = []
for result in results:
stdout = str(result.get("stdout", "") or "")
stderr = str(result.get("stderr", "") or "")
if stdout:
key_outputs.extend(stdout.splitlines()[:5])
if stderr:
lines = stderr.splitlines()
error_lines.extend(lines[:5])
if len(lines) > 5:
error_lines.extend(lines[-5:])
return {
"executed_commands": executed_commands,
"key_outputs": key_outputs,
"error_head_tail": "\n".join(error_lines) if error_lines else None,
}
def _failure_summary(step: dict[str, Any], result: dict[str, Any]) -> str:
kind = str(step.get("kind", "") or "")
stderr = str(result.get("stderr", "") or "")
stdout = str(result.get("stdout", "") or "")
if kind == "mcp_call":
return stderr or stdout or "mcp_call failed."
return stderr or stdout or f"{kind} step failed."
def _classify_error(command: str, output: str) -> str:
lowered = (command + "\n" + output).lower()
transient_markers = [
"timeout",
"timed out",
"temporarily unavailable",
"connection reset",
"connection aborted",
"connection refused",
"503",
"502",
"rate limit",
]
for marker in transient_markers:
if marker in lowered:
return "transient"
return "permanent"
def _parse_wait_seconds(value: str) -> float:
normalized = value.strip().lower()
if re.fullmatch(r"\d+(\.\d+)?s", normalized):
return float(normalized[:-1])
if re.fullmatch(r"\d+(\.\d+)?", normalized):
return float(normalized)
raise ValueError(f"Invalid wait value: {value}")

View File

@@ -0,0 +1,47 @@
summary: Butler generic runner - delegated step execution
description: >-
Receives a serialized TaskContract and resolved step list from Butler,
executes steps server-side with Butler-compatible semantics
(cmd/check/wait/retry), and returns ok/summary/step_results/evidence.
value:
modules:
- id: a
summary: Execute Butler task steps
value:
type: rawscript
content: '!inline execute_butler_task_steps.py'
input_transforms:
context:
type: javascript
expr: flow_input.context
steps:
type: javascript
expr: flow_input.steps
task_contract:
type: javascript
expr: flow_input.task_contract
lock: '!inline execute_butler_task_steps.lock'
language: python3
schema:
$schema: 'https://json-schema.org/draft/2020-12/schema'
type: object
order:
- task_contract
- steps
- context
properties:
context:
type: object
description: 'Execution context (target, payload)'
default: {}
steps:
type: array
description: Resolved SOP step list
items:
type: object
task_contract:
type: object
description: Serialized Butler TaskContract
required:
- task_contract
- steps

View File

@@ -5,6 +5,8 @@ locks:
'f/app_custom/system_heartbeat__flow+step2:_データ検証.py': d7f4e6e04ed116ba3836cb32793a0187a69359a3f2a807b533030b01d42bed39
'f/app_custom/system_heartbeat__flow+step3:_httpヘルスチェック.py': 5d3bce0ddb4f521444bf01bc80670e7321933ad09f935044f4d6123c658ca7a8
'f/app_custom/system_heartbeat__flow+step4:_年度判定_&_最終レポート.py': 6889bfac9a629fa42cf0505cbc945ba3782c59e1697b8493ce6101ef5ffa8b32
f/butler/execute_task_steps__flow+__flow_hash: 4b331a51d9f4bd6fbfc4714a859a08df86184f81fd902a382725541c002bdca8
f/butler/execute_task_steps__flow+execute_butler_task_steps.py: 90e90680a89ff3e7bd05d6c32513e9893b0c2064ae1c9e3dc3e2f3e05bad2166
f/dev/hello_world__flow+__flow_hash: 08a256433d5978b05d08e2ba6cfa8e4324c23be4875c9775777d683f32c6015e
f/dev/hello_world__flow+a.py: 63bf18351b5b0e81067254a03c9811e6bb388c890ad72e18092ac5ec2690a456
f/dev/konnnichiha__flow+__flow_hash: 0d40e9e9fe2cf6944028d671b6facb9e0598d41abc3682993d5339800188b8f1