""" 自律ループ制御。全体のオーケストレーションのみを行う。 Usage: python controller.py Example: python controller.py hello_world "print Hello World in Python" """ import json import sys from config import DEV_PATH_PREFIX, MAX_RETRIES, MAX_JSON_RETRIES from state_manager import State, is_duplicate, register_hash from validator import validate from windmill_client import create_flow, update_flow, flow_exists, run_flow from job_poller import poll_until_done, JobTimeout from llm_interface import generate_flow, fix_flow def _log(prefix: str, msg: str) -> None: print(f"{prefix} {msg}", flush=True) def run(task_description: str, flow_name: str) -> bool: """ 自律ループを実行する。 Returns: True = 成功, False = 失敗 """ # パス制限: f/dev/* のみ(controller 側で強制) flow_path = f"{DEV_PATH_PREFIX}/{flow_name}" state = State(retry_count=MAX_RETRIES) is_first = True json_fail_count = 0 _log("[START]", f"タスク: {task_description}") _log("[START]", f"フローパス: {flow_path}") while state.retry_count > 0: attempt = MAX_RETRIES - state.retry_count + 1 prefix = f"[試行 {attempt}/{MAX_RETRIES}]" # ── 1. LLM 生成 ───────────────────────────────────────── _log(prefix, "フロー生成中...") if is_first: raw = generate_flow(task_description) else: prev_json = json.dumps(state.current_flow, ensure_ascii=False) raw = fix_flow(prev_json, state.last_error or "") # ── 2 & 3. JSON 検証 ───────────────────────────────────── try: flow_dict = validate(raw) json_fail_count = 0 _log(prefix, "JSON検証: OK") except ValueError as e: _log(prefix, f"JSON検証: NG - {e}") json_fail_count += 1 if json_fail_count >= MAX_JSON_RETRIES: _log(prefix, f"JSON検証 {MAX_JSON_RETRIES} 回連続失敗 → リトライ消費") state.retry_count -= 1 state.last_error = str(e) json_fail_count = 0 is_first = False continue # ── 5. ハッシュ比較 ────────────────────────────────────── if is_duplicate(state, flow_dict): _log("[STOP]", "同一JSON検出 → 即停止") return False register_hash(state, flow_dict) # ── 6. create / update ─────────────────────────────────── summary = flow_dict["summary"] value = flow_dict["value"] try: if flow_exists(flow_path): update_flow(flow_path, summary, value) _log(prefix, f"フロー更新: {flow_path}") else: create_flow(flow_path, summary, value) _log(prefix, f"フロー作成: {flow_path}") except Exception as e: _log(prefix, f"フロー送信エラー: {e}") state.retry_count -= 1 state.last_error = str(e) is_first = False continue # API 送信成功直後に current_flow を更新(run 前・失敗時は更新しない) state.current_flow = flow_dict # ── 7. run ────────────────────────────────────────────── try: job_id = run_flow(flow_path) state.job_id = job_id _log(prefix, f"ジョブ実行: {job_id}") except Exception as e: _log(prefix, f"ジョブ起動エラー: {e}") state.retry_count -= 1 state.last_error = str(e) is_first = False continue # ── 8. ジョブ完了待ち ──────────────────────────────────── _log(prefix, "ジョブ完了待ち...") try: success, logs = poll_until_done(job_id) except JobTimeout as e: _log(prefix, f"タイムアウト: {e}") state.retry_count -= 1 state.last_error = "タイムアウト" state.last_logs = None is_first = False continue state.last_logs = logs # ── 9. ステータス判定 ──────────────────────────────────── if success: _log(prefix, "実行結果: SUCCESS") _log("[最終]", "状態: 成功") return True else: excerpt = logs[:300] if logs else "(ログなし)" _log(prefix, "実行結果: FAILURE") _log(prefix, f"エラー内容: {excerpt}") state.retry_count -= 1 state.last_error = logs or "不明なエラー" is_first = False _log("[最終]", f"状態: {MAX_RETRIES} 回失敗で停止") return False if __name__ == "__main__": if len(sys.argv) < 3: print("Usage: python controller.py ") print("Example: python controller.py hello_world 'print Hello World in Python'") sys.exit(1) _flow_name = sys.argv[1] _task = " ".join(sys.argv[2:]) ok = run(_task, _flow_name) sys.exit(0 if ok else 1)