Files

149 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
自律ループ制御。全体のオーケストレーションのみを行う。
Usage:
python controller.py <flow_name> <task_description>
Example:
python controller.py hello_world "print Hello World in Python"
"""
import json
import sys
from config import DEV_PATH_PREFIX, MAX_RETRIES, MAX_JSON_RETRIES
from state_manager import State, is_duplicate, register_hash
from validator import validate
from windmill_client import create_flow, update_flow, flow_exists, run_flow
from job_poller import poll_until_done, JobTimeout
from llm_interface import generate_flow, fix_flow
def _log(prefix: str, msg: str) -> None:
print(f"{prefix} {msg}", flush=True)
def run(task_description: str, flow_name: str) -> bool:
"""
自律ループを実行する。
Returns:
True = 成功, False = 失敗
"""
# パス制限: f/dev/* のみcontroller 側で強制)
flow_path = f"{DEV_PATH_PREFIX}/{flow_name}"
state = State(retry_count=MAX_RETRIES)
is_first = True
json_fail_count = 0
_log("[START]", f"タスク: {task_description}")
_log("[START]", f"フローパス: {flow_path}")
while state.retry_count > 0:
attempt = MAX_RETRIES - state.retry_count + 1
prefix = f"[試行 {attempt}/{MAX_RETRIES}]"
# ── 1. LLM 生成 ─────────────────────────────────────────
_log(prefix, "フロー生成中...")
if is_first:
raw = generate_flow(task_description)
else:
prev_json = json.dumps(state.current_flow, ensure_ascii=False)
raw = fix_flow(prev_json, state.last_error or "")
# ── 2 & 3. JSON 検証 ─────────────────────────────────────
try:
flow_dict = validate(raw)
json_fail_count = 0
_log(prefix, "JSON検証: OK")
except ValueError as e:
_log(prefix, f"JSON検証: NG - {e}")
json_fail_count += 1
if json_fail_count >= MAX_JSON_RETRIES:
_log(prefix, f"JSON検証 {MAX_JSON_RETRIES} 回連続失敗 → リトライ消費")
state.retry_count -= 1
state.last_error = str(e)
json_fail_count = 0
is_first = False
continue
# ── 5. ハッシュ比較 ──────────────────────────────────────
if is_duplicate(state, flow_dict):
_log("[STOP]", "同一JSON検出 → 即停止")
return False
register_hash(state, flow_dict)
# ── 6. create / update ───────────────────────────────────
summary = flow_dict["summary"]
value = flow_dict["value"]
try:
if flow_exists(flow_path):
update_flow(flow_path, summary, value)
_log(prefix, f"フロー更新: {flow_path}")
else:
create_flow(flow_path, summary, value)
_log(prefix, f"フロー作成: {flow_path}")
except Exception as e:
_log(prefix, f"フロー送信エラー: {e}")
state.retry_count -= 1
state.last_error = str(e)
is_first = False
continue
# API 送信成功直後に current_flow を更新run 前・失敗時は更新しない)
state.current_flow = flow_dict
# ── 7. run ──────────────────────────────────────────────
try:
job_id = run_flow(flow_path)
state.job_id = job_id
_log(prefix, f"ジョブ実行: {job_id}")
except Exception as e:
_log(prefix, f"ジョブ起動エラー: {e}")
state.retry_count -= 1
state.last_error = str(e)
is_first = False
continue
# ── 8. ジョブ完了待ち ────────────────────────────────────
_log(prefix, "ジョブ完了待ち...")
try:
success, logs = poll_until_done(job_id)
except JobTimeout as e:
_log(prefix, f"タイムアウト: {e}")
state.retry_count -= 1
state.last_error = "タイムアウト"
state.last_logs = None
is_first = False
continue
state.last_logs = logs
# ── 9. ステータス判定 ────────────────────────────────────
if success:
_log(prefix, "実行結果: SUCCESS")
_log("[最終]", "状態: 成功")
return True
else:
excerpt = logs[:300] if logs else "(ログなし)"
_log(prefix, "実行結果: FAILURE")
_log(prefix, f"エラー内容: {excerpt}")
state.retry_count -= 1
state.last_error = logs or "不明なエラー"
is_first = False
_log("[最終]", f"状態: {MAX_RETRIES} 回失敗で停止")
return False
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python controller.py <flow_name> <task_description>")
print("Example: python controller.py hello_world 'print Hello World in Python'")
sys.exit(1)
_flow_name = sys.argv[1]
_task = " ".join(sys.argv[2:])
ok = run(_task, _flow_name)
sys.exit(0 if ok else 1)