Files
windmill_workflow/autonomous_windmill/state_manager.py

34 lines
1.0 KiB
Python

"""State オブジェクトの管理とハッシュ比較。"""
import json
import hashlib
from dataclasses import dataclass, field
from typing import Optional, Set
@dataclass
class State:
retry_count: int = 3
current_flow: Optional[dict] = None
last_logs: Optional[str] = None
last_error: Optional[str] = None
flow_hashes: Set[str] = field(default_factory=set)
job_id: Optional[str] = None
def _canonical(flow_dict: dict) -> str:
"""キー順を固定した JSON 文字列を返す(ハッシュ安定化)。"""
return json.dumps(flow_dict, sort_keys=True, separators=(',', ':'))
def compute_hash(flow_dict: dict) -> str:
return hashlib.sha256(_canonical(flow_dict).encode()).hexdigest()
def is_duplicate(state: State, flow_dict: dict) -> bool:
"""過去に同一の JSON を出力済みかどうかを判定する。"""
return compute_hash(flow_dict) in state.flow_hashes
def register_hash(state: State, flow_dict: dict) -> None:
state.flow_hashes.add(compute_hash(flow_dict))