Files
windmill_workflow/wm-api.sh

997 lines
29 KiB
Bash
Executable File

#!/usr/bin/env bash
# Windmill REST API ヘルパースクリプト
# Usage: ./wm-api.sh <command> [args...]
set -euo pipefail
# 設定
WINDMILL_URL="${WINDMILL_URL:-https://windmill.keinafarm.net}"
WINDMILL_TOKEN="${WINDMILL_TOKEN:-qLJ3VPZ61kTDiIwaUPUu1dXszGrsN1Dh}"
WINDMILL_WORKSPACE="${WINDMILL_WORKSPACE:-admins}"
STATE_DIR="${STATE_DIR:-state}"
REMOTE_INDEX_FILE="${REMOTE_INDEX_FILE:-${STATE_DIR}/remote_index.json}"
WORKFLOW_INDEX_FILE="${WORKFLOW_INDEX_FILE:-${STATE_DIR}/workflow_index.json}"
WORKFLOWS_DIR="${WORKFLOWS_DIR:-workflows}"
BACKUP_ROOT="${BACKUP_ROOT:-${STATE_DIR}/backups/workflows}"
FORCE_PUSH="${FORCE_PUSH:-0}"
API_BASE="${WINDMILL_URL}/api/w/${WINDMILL_WORKSPACE}"
AUTH_HEADER="Authorization: Bearer ${WINDMILL_TOKEN}"
require_arg() {
if [ -z "${2:-}" ]; then
echo "Usage: $1"
exit 1
fi
}
ensure_parent_dir() {
local out="$1"
local dir
dir="$(dirname "$out")"
if [ "$dir" != "." ]; then
mkdir -p "$dir"
fi
}
safe_name() {
local path="$1"
echo "${path//\//__}"
}
api_get() {
curl -sk -H "${AUTH_HEADER}" "${API_BASE}$1" 2>/dev/null
}
api_post() {
curl -sk -X POST -H "${AUTH_HEADER}" -H "Content-Type: application/json" -d "$2" "${API_BASE}$1" 2>/dev/null
}
api_put() {
curl -sk -X PUT -H "${AUTH_HEADER}" -H "Content-Type: application/json" -d "$2" "${API_BASE}$1" 2>/dev/null
}
api_delete() {
curl -sk -X DELETE -H "${AUTH_HEADER}" "${API_BASE}$1" 2>/dev/null
}
json_pretty() {
python3 -m json.tool
}
save_json_pretty() {
local out="$1"
ensure_parent_dir "$out"
python3 -c 'import json,sys; print(json.dumps(json.load(sys.stdin), ensure_ascii=False, indent=2))' > "$out"
}
python_json() {
python3 - "$@"
}
path_to_script_file() {
local path="$1"
echo "scripts/${path##*/}.ts"
}
path_to_flow_file() {
local path="$1"
echo "flows/${path##*/}.flow.json"
}
workflow_dir_from_path() {
local flow_path="$1"
echo "${WORKFLOWS_DIR}/${flow_path}"
}
schedule_file_for_path() {
local schedule_path="$1"
echo "schedules/$(safe_name "$schedule_path").json"
}
get_schedules_list_json() {
api_get "/schedules/list?per_page=1000"
}
get_schedule_paths_for_flow() {
local schedules_json="$1"
local flow_path="$2"
python_json "$schedules_json" "$flow_path" <<'PY'
import json
import sys
for it in json.loads(sys.argv[1]):
if it.get("is_flow") and it.get("script_path") == sys.argv[2] and it.get("path"):
print(it["path"])
PY
}
pull_script() {
local script_path="$1"
local outfile="$2"
local data
data="$(api_get "/scripts/get/p/${script_path}")"
if [ -z "$data" ]; then
echo "pull-script failed: empty response (${script_path})" >&2
exit 1
fi
ensure_parent_dir "$outfile"
python_json "$outfile" "$data" <<'PY'
import json
import pathlib
import sys
outfile = pathlib.Path(sys.argv[1])
obj = json.loads(sys.argv[2])
content = obj.get("content")
if content is None:
raise SystemExit("content not found in script payload")
outfile.write_text(content, encoding="utf-8", newline="\n")
PY
echo "pulled script: ${script_path} -> ${outfile}"
}
push_script() {
local script_path="$1"
local infile="$2"
if [ ! -f "$infile" ]; then
echo "push-script failed: file not found (${infile})" >&2
exit 1
fi
local current payload
current="$(api_get "/scripts/get/p/${script_path}")"
if [ -z "$current" ]; then
echo "push-script failed: empty response (${script_path})" >&2
exit 1
fi
payload="$(python_json "$script_path" "$infile" "$current" <<'PY'
import json
import pathlib
import sys
script_path, infile = sys.argv[1], pathlib.Path(sys.argv[2])
remote = json.loads(sys.argv[3])
content = infile.read_text(encoding="utf-8")
payload = {
"path": script_path,
"parent_hash": remote.get("hash"),
"summary": remote.get("summary") or "",
"description": remote.get("description") or "",
"content": content,
"schema": remote.get("schema") or {"type": "object", "properties": {}, "required": []},
"language": remote.get("language") or "bun",
"kind": remote.get("kind") or "script",
"lock": remote.get("lock") or "",
}
missing = [k for k in ("path", "parent_hash", "content", "schema", "language", "kind", "lock") if payload.get(k) is None]
if missing:
raise SystemExit(f"missing required payload fields: {', '.join(missing)}")
print(json.dumps(payload, ensure_ascii=False))
PY
)"
api_post "/scripts/create" "$payload" | json_pretty
}
pull_flow() {
local flow_path="$1"
local outfile="$2"
local data
data="$(api_get "/flows/get/${flow_path}")"
if [ -z "$data" ]; then
echo "pull-flow failed: empty response (${flow_path})" >&2
exit 1
fi
ensure_parent_dir "$outfile"
python_json "$data" <<'PY' | save_json_pretty "$outfile"
import json
import sys
obj = json.loads(sys.argv[1])
out = {
"path": obj.get("path"),
"summary": obj.get("summary") or "",
"description": obj.get("description") or "",
"value": obj.get("value") or {},
"schema": obj.get("schema") or {"type": "object", "properties": {}, "required": []},
}
print(json.dumps(out, ensure_ascii=False))
PY
echo "pulled flow: ${flow_path} -> ${outfile}"
}
pull_schedule() {
local schedule_path="$1"
local outfile="$2"
local data
data="$(api_get "/schedules/get/${schedule_path}")"
if [ -z "$data" ]; then
echo "pull-schedule failed: empty response (${schedule_path})" >&2
exit 1
fi
python_json "$data" <<'PY' | save_json_pretty "$outfile"
import json
import sys
obj = json.loads(sys.argv[1])
out = {
"path": obj.get("path"),
"script_path": obj.get("script_path"),
"is_flow": bool(obj.get("is_flow")),
"schedule": obj.get("schedule"),
"timezone": obj.get("timezone"),
"enabled": bool(obj.get("enabled")),
"args": obj.get("args") or {},
"summary": obj.get("summary"),
"description": obj.get("description") or "",
"on_failure_times": obj.get("on_failure_times", 1),
"on_failure_exact": bool(obj.get("on_failure_exact", False)),
"on_recovery_times": obj.get("on_recovery_times", 1),
"on_recovery_extra_args": obj.get("on_recovery_extra_args") or {},
"on_success_extra_args": obj.get("on_success_extra_args") or {},
"no_flow_overlap": bool(obj.get("no_flow_overlap", False)),
"ws_error_handler_muted": bool(obj.get("ws_error_handler_muted", False)),
"cron_version": obj.get("cron_version") or "v2",
}
print(json.dumps(out, ensure_ascii=False))
PY
echo "pulled schedule: ${schedule_path} -> ${outfile}"
}
workflow_state_from_dir_json() {
local workflow_dir="$1"
python_json "$workflow_dir" <<'PY'
import glob
import hashlib
import json
import pathlib
import sys
workflow_dir = pathlib.Path(sys.argv[1])
flow_file = workflow_dir / "flow.json"
if not flow_file.exists():
raise SystemExit(f"flow.json not found: {flow_file}")
flow_obj = json.loads(flow_file.read_text(encoding="utf-8"))
flow_blob = json.dumps(flow_obj, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
flow_fp = hashlib.sha256(flow_blob.encode("utf-8")).hexdigest()
flow_path = flow_obj.get("path")
if not flow_path:
raise SystemExit("flow.json missing path")
schedules = {}
for p in sorted(glob.glob(str(workflow_dir / "schedules" / "*.json"))):
sobj = json.loads(pathlib.Path(p).read_text(encoding="utf-8"))
key = {
"path": sobj.get("path"),
"script_path": sobj.get("script_path"),
"is_flow": bool(sobj.get("is_flow")),
"schedule": sobj.get("schedule"),
"timezone": sobj.get("timezone"),
"enabled": bool(sobj.get("enabled")),
}
if not key["path"]:
raise SystemExit(f"schedule file missing path: {p}")
blob = json.dumps(key, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
schedules[key["path"]] = {"fingerprint": hashlib.sha256(blob.encode("utf-8")).hexdigest()}
print(json.dumps({
"flow_path": flow_path,
"flow_fingerprint": flow_fp,
"schedules": schedules,
}, ensure_ascii=False))
PY
}
workflow_state_from_remote_json() {
local flow_path="$1"
local flow_payload schedules_json
flow_payload="$(api_get "/flows/get/${flow_path}")"
if [ -z "$flow_payload" ]; then
echo "{}"
return
fi
schedules_json="$(get_schedules_list_json)"
python_json "$flow_payload" "$schedules_json" "$flow_path" <<'PY'
import hashlib
import json
import sys
flow_obj = json.loads(sys.argv[1])
schedules = json.loads(sys.argv[2])
flow_path = sys.argv[3]
flow_norm = {
"path": flow_obj.get("path"),
"summary": flow_obj.get("summary") or "",
"description": flow_obj.get("description") or "",
"value": flow_obj.get("value") or {},
"schema": flow_obj.get("schema") or {"type": "object", "properties": {}, "required": []},
}
flow_blob = json.dumps(flow_norm, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
flow_fp = hashlib.sha256(flow_blob.encode("utf-8")).hexdigest()
out_sched = {}
for it in schedules:
if not (it.get("is_flow") and it.get("script_path") == flow_path and it.get("path")):
continue
key = {
"path": it.get("path"),
"script_path": it.get("script_path"),
"is_flow": bool(it.get("is_flow")),
"schedule": it.get("schedule"),
"timezone": it.get("timezone"),
"enabled": bool(it.get("enabled")),
}
blob = json.dumps(key, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
out_sched[key["path"]] = {"fingerprint": hashlib.sha256(blob.encode("utf-8")).hexdigest()}
print(json.dumps({
"flow_path": flow_path,
"flow_fingerprint": flow_fp,
"schedules": out_sched,
}, ensure_ascii=False))
PY
}
push_flow() {
local json_file="$1"
if [ ! -f "$json_file" ]; then
echo "push-flow failed: file not found (${json_file})" >&2
exit 1
fi
local flow_path payload
flow_path="$(python_json "$json_file" <<'PY'
import json
import pathlib
import sys
obj = json.loads(pathlib.Path(sys.argv[1]).read_text(encoding="utf-8"))
path = obj.get("path")
if not path:
raise SystemExit("path is required in flow json")
print(path)
PY
)"
payload="$(cat "$json_file")"
echo "push-flow: delete ${flow_path}"
api_delete "/flows/delete/${flow_path}" >/dev/null || true
echo "push-flow: create ${flow_path}"
api_post "/flows/create" "$payload" | json_pretty
}
update_workflow_index_entry() {
local workflow_dir="$1"
mkdir -p "${STATE_DIR}"
local state_json now_utc tmp_file
state_json="$(workflow_state_from_dir_json "$workflow_dir")"
now_utc="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
tmp_file="${WORKFLOW_INDEX_FILE}.tmp"
python_json "$WORKFLOW_INDEX_FILE" "$state_json" "$now_utc" "$WINDMILL_WORKSPACE" "$workflow_dir" <<'PY' > "$tmp_file"
import json
import pathlib
import sys
index_path = pathlib.Path(sys.argv[1])
state = json.loads(sys.argv[2])
now_utc = sys.argv[3]
workspace = sys.argv[4]
workflow_dir = sys.argv[5]
if index_path.exists():
idx = json.loads(index_path.read_text(encoding="utf-8"))
else:
idx = {"synced_at": now_utc, "workspace": workspace, "workflows": {}}
idx["synced_at"] = now_utc
idx["workspace"] = workspace
wfs = idx.setdefault("workflows", {})
wfs[state["flow_path"]] = {
"workflow_dir": workflow_dir,
"flow_fingerprint": state["flow_fingerprint"],
"schedules": state["schedules"],
}
print(json.dumps(idx, ensure_ascii=False, indent=2))
PY
mv "$tmp_file" "$WORKFLOW_INDEX_FILE"
}
preflight_workflow() {
local flow_path="$1"
if [ "${FORCE_PUSH}" = "1" ]; then
echo "preflight skipped: FORCE_PUSH=1"
return
fi
if [ ! -f "$WORKFLOW_INDEX_FILE" ]; then
echo "preflight failed: ${WORKFLOW_INDEX_FILE} not found. run pull-workflow first." >&2
exit 1
fi
local expected actual
expected="$(python_json "$WORKFLOW_INDEX_FILE" "$flow_path" <<'PY'
import json
import pathlib
import sys
idx = json.loads(pathlib.Path(sys.argv[1]).read_text(encoding="utf-8"))
print(((idx.get("workflows") or {}).get(sys.argv[2]) or {}).get("flow_fingerprint", ""))
PY
)"
if [ -z "$expected" ]; then
echo "preflight failed: no baseline for ${flow_path}. run pull-workflow first." >&2
exit 1
fi
actual="$(python_json "$(workflow_state_from_remote_json "$flow_path")" <<'PY'
import json
import sys
print(json.loads(sys.argv[1]).get("flow_fingerprint", ""))
PY
)"
if [ -z "$actual" ] || [ "$actual" != "$expected" ]; then
echo "preflight failed: remote flow changed. run pull-workflow ${flow_path} first." >&2
exit 1
fi
}
backup_workflow() {
local flow_path="$1"
local ts backup_dir schedules_json schedule_paths
ts="$(date -u +"%Y%m%dT%H%M%SZ")"
backup_dir="${BACKUP_ROOT}/${ts}/$(safe_name "$flow_path")"
mkdir -p "${backup_dir}/schedules"
pull_flow "$flow_path" "${backup_dir}/flow.json" >/dev/null
schedules_json="$(get_schedules_list_json)"
schedule_paths="$(get_schedule_paths_for_flow "$schedules_json" "$flow_path")"
while IFS= read -r p; do
[ -z "$p" ] && continue
pull_schedule "$p" "${backup_dir}/$(schedule_file_for_path "$p")" >/dev/null
done <<< "$schedule_paths"
echo "$backup_dir"
}
restore_workflow_from_backup() {
local backup_dir="$1"
local flow_path="$2"
local schedules_json schedule_paths f
api_delete "/flows/delete/${flow_path}" >/dev/null || true
api_post "/flows/create" "$(cat "${backup_dir}/flow.json")" >/dev/null
schedules_json="$(get_schedules_list_json)"
schedule_paths="$(get_schedule_paths_for_flow "$schedules_json" "$flow_path")"
while IFS= read -r p; do
[ -z "$p" ] && continue
api_delete "/schedules/delete/${p}" >/dev/null || true
done <<< "$schedule_paths"
for f in "${backup_dir}/schedules/"*.json; do
[ -f "$f" ] || continue
api_post "/schedules/create" "$(cat "$f")" >/dev/null
done
}
sync_workflow_schedules() {
local flow_path="$1"
local workflow_dir="$2"
local schedules_json schedule_paths f
schedules_json="$(get_schedules_list_json)"
schedule_paths="$(get_schedule_paths_for_flow "$schedules_json" "$flow_path")"
while IFS= read -r p; do
[ -z "$p" ] && continue
echo "push-workflow: delete schedule ${p}"
api_delete "/schedules/delete/${p}" >/dev/null || true
done <<< "$schedule_paths"
for f in "${workflow_dir}/schedules/"*.json; do
[ -f "$f" ] || continue
echo "push-workflow: create schedule $(basename "$f")"
api_post "/schedules/create" "$(cat "$f")" >/dev/null
done
}
verify_workflow_matches_remote() {
local workflow_dir="$1"
local flow_path="$2"
python_json "$(workflow_state_from_dir_json "$workflow_dir")" "$(workflow_state_from_remote_json "$flow_path")" <<'PY'
import json
import sys
local = json.loads(sys.argv[1])
remote = json.loads(sys.argv[2])
ok = local.get("flow_fingerprint") == remote.get("flow_fingerprint") and (local.get("schedules") or {}) == (remote.get("schedules") or {})
print("ok" if ok else "mismatch")
PY
}
pull_workflow_internal() {
local flow_path="$1"
local workflow_dir="$2"
local schedules_json="${3:-}"
mkdir -p "${workflow_dir}/schedules"
pull_flow "$flow_path" "${workflow_dir}/flow.json" >/dev/null
rm -f "${workflow_dir}/schedules/"*.json 2>/dev/null || true
if [ -z "$schedules_json" ]; then
schedules_json="$(get_schedules_list_json)"
fi
local schedule_paths count
schedule_paths="$(get_schedule_paths_for_flow "$schedules_json" "$flow_path")"
count=0
while IFS= read -r p; do
[ -z "$p" ] && continue
pull_schedule "$p" "${workflow_dir}/$(schedule_file_for_path "$p")" >/dev/null
count=$((count + 1))
done <<< "$schedule_paths"
python_json "$flow_path" "$count" <<'PY' | save_json_pretty "${workflow_dir}/manifest.json"
import json
import sys
print(json.dumps({"flow_path": sys.argv[1], "schedule_count": int(sys.argv[2])}, ensure_ascii=False))
PY
update_workflow_index_entry "$workflow_dir"
echo "pulled workflow: ${flow_path} -> ${workflow_dir} (schedules=${count})"
}
pull_workflow() {
local flow_path="$1"
local outdir="${2:-$(workflow_dir_from_path "$flow_path")}"
pull_workflow_internal "$flow_path" "$outdir"
}
push_workflow() {
local workflow_dir="$1"
local flow_file="${workflow_dir}/flow.json"
if [ ! -f "$flow_file" ]; then
echo "push-workflow failed: flow.json not found (${flow_file})" >&2
exit 1
fi
local flow_path backup_dir verify_result
flow_path="$(python_json "$flow_file" <<'PY'
import json, pathlib, sys
obj = json.loads(pathlib.Path(sys.argv[1]).read_text(encoding="utf-8"))
print(obj.get("path", ""))
PY
)"
[ -n "$flow_path" ] || { echo "push-workflow failed: path missing in flow.json" >&2; exit 1; }
preflight_workflow "$flow_path"
backup_dir="$(backup_workflow "$flow_path")"
echo "backup saved: ${backup_dir}"
api_delete "/flows/delete/${flow_path}" >/dev/null || true
api_post "/flows/create" "$(cat "$flow_file")" >/dev/null
sync_workflow_schedules "$flow_path" "$workflow_dir"
verify_result="$(verify_workflow_matches_remote "$workflow_dir" "$flow_path")"
if [ "$verify_result" != "ok" ]; then
echo "post-verify failed, restoring backup..." >&2
restore_workflow_from_backup "$backup_dir" "$flow_path" || true
exit 1
fi
update_workflow_index_entry "$workflow_dir"
echo "push-workflow completed: ${flow_path}"
}
pull_all() {
mkdir -p scripts flows "${STATE_DIR}"
local scripts_json flows_json
scripts_json="$(api_get "/scripts/list?per_page=1000")"
flows_json="$(api_get "/flows/list?per_page=1000")"
python_json "$scripts_json" <<'PY' > "${STATE_DIR}/scripts.list.json"
import json
import sys
print(json.dumps(json.loads(sys.argv[1]), ensure_ascii=False, indent=2))
PY
python_json "$flows_json" <<'PY' > "${STATE_DIR}/flows.list.json"
import json
import sys
print(json.dumps(json.loads(sys.argv[1]), ensure_ascii=False, indent=2))
PY
local script_paths flow_paths
script_paths="$(python_json "$scripts_json" <<'PY'
import json
import sys
items = json.loads(sys.argv[1])
for it in items:
path = it.get("path")
if path:
print(path)
PY
)"
flow_paths="$(python_json "$flows_json" <<'PY'
import json
import sys
items = json.loads(sys.argv[1])
for it in items:
path = it.get("path")
if path:
print(path)
PY
)"
while IFS= read -r p; do
[ -z "$p" ] && continue
pull_script "$p" "$(path_to_script_file "$p")"
done <<< "$script_paths"
while IFS= read -r p; do
[ -z "$p" ] && continue
pull_flow "$p" "$(path_to_flow_file "$p")"
done <<< "$flow_paths"
refresh_indexes
echo "remote/workflow index updated."
}
build_remote_index() {
local scripts_json="$1"
local flows_json="$2"
local schedules_json="$3"
local synced_at="$4"
python_json "$scripts_json" "$flows_json" "$schedules_json" "$synced_at" <<'PY'
import json
import sys
scripts = json.loads(sys.argv[1])
flows = json.loads(sys.argv[2])
schedules = json.loads(sys.argv[3])
synced_at = sys.argv[4]
index = {
"synced_at": synced_at,
"workspace": None,
"scripts": {},
"flows": {},
"schedules": {},
}
for item in scripts:
path = item.get("path")
if not path:
continue
if index["workspace"] is None:
index["workspace"] = item.get("workspace_id")
index["scripts"][path] = {
"hash": item.get("hash"),
"updated_at": item.get("edited_at") or item.get("created_at"),
}
for item in flows:
path = item.get("path")
if not path:
continue
if index["workspace"] is None:
index["workspace"] = item.get("workspace_id")
index["flows"][path] = {
"updated_at": item.get("edited_at") or item.get("created_at"),
}
for item in schedules:
path = item.get("path")
if not path:
continue
if index["workspace"] is None:
index["workspace"] = item.get("workspace_id")
index["schedules"][path] = {
"schedule": item.get("schedule"),
"timezone": item.get("timezone"),
"enabled": bool(item.get("enabled")),
"script_path": item.get("script_path"),
"is_flow": bool(item.get("is_flow")),
"updated_at": item.get("edited_at"),
}
print(json.dumps(index, ensure_ascii=False, indent=2))
PY
}
refresh_indexes() {
mkdir -p "${STATE_DIR}"
local scripts_json flows_json schedules_json now_utc
scripts_json="$(api_get "/scripts/list?per_page=1000")"
flows_json="$(api_get "/flows/list?per_page=1000")"
schedules_json="$(get_schedules_list_json)"
now_utc="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
printf '%s' "$scripts_json" | save_json_pretty "${STATE_DIR}/scripts.list.json"
printf '%s' "$flows_json" | save_json_pretty "${STATE_DIR}/flows.list.json"
printf '%s' "$schedules_json" | save_json_pretty "${STATE_DIR}/schedules.list.json"
build_remote_index "$scripts_json" "$flows_json" "$schedules_json" "$now_utc" > "${REMOTE_INDEX_FILE}"
}
status_remote() {
mkdir -p "${STATE_DIR}"
local scripts_json flows_json schedules_json now_utc
scripts_json="$(api_get "/scripts/list?per_page=1000")"
flows_json="$(api_get "/flows/list?per_page=1000")"
schedules_json="$(get_schedules_list_json)"
now_utc="$(date -u +"%Y-%m-%dT%H:%M:%SZ")"
local current_index
current_index="$(build_remote_index "$scripts_json" "$flows_json" "$schedules_json" "$now_utc")"
printf '%s\n' "$current_index" > "${STATE_DIR}/remote_index.current.json"
if [ ! -f "${REMOTE_INDEX_FILE}" ]; then
echo "No baseline index: ${REMOTE_INDEX_FILE}"
echo "Run ./wm-api.sh pull-all first."
exit 0
fi
python_json "${REMOTE_INDEX_FILE}" "${STATE_DIR}/remote_index.current.json" <<'PY'
import json
import pathlib
import sys
old = json.loads(pathlib.Path(sys.argv[1]).read_text(encoding="utf-8"))
new = json.loads(pathlib.Path(sys.argv[2]).read_text(encoding="utf-8"))
def diff(kind):
old_map = old.get(kind, {})
new_map = new.get(kind, {})
added = sorted(set(new_map) - set(old_map))
removed = sorted(set(old_map) - set(new_map))
changed = sorted(
path for path in (set(new_map) & set(old_map))
if new_map[path] != old_map[path]
)
return added, changed, removed
for kind in ("scripts", "flows", "schedules"):
added, changed, removed = diff(kind)
print(f"[{kind}]")
if not (added or changed or removed):
print(" no changes")
continue
for p in added:
print(f" + {p}")
for p in changed:
print(f" ~ {p}")
for p in removed:
print(f" - {p}")
PY
}
pull_all_workflows() {
mkdir -p "${WORKFLOWS_DIR}"
local flows_json schedules_json flow_paths flow_path
flows_json="$(api_get "/flows/list?per_page=1000")"
schedules_json="$(get_schedules_list_json)"
flow_paths="$(python_json "$flows_json" <<'PY'
import json
import sys
for it in json.loads(sys.argv[1]):
if it.get("path"):
print(it["path"])
PY
)"
while IFS= read -r flow_path; do
[ -z "$flow_path" ] && continue
pull_workflow_internal "$flow_path" "$(workflow_dir_from_path "$flow_path")" "$schedules_json"
done <<< "$flow_paths"
refresh_indexes
echo "pulled all workflows into: ${WORKFLOWS_DIR}"
}
status_workflow() {
local flow_path="${1:-}"
if [ ! -f "${WORKFLOW_INDEX_FILE}" ]; then
echo "No baseline workflow index: ${WORKFLOW_INDEX_FILE}"
echo "Run ./wm-api.sh pull-workflow <flow_path> first."
exit 0
fi
local target_paths
if [ -n "$flow_path" ]; then
target_paths="$flow_path"
else
target_paths="$(python_json "$WORKFLOW_INDEX_FILE" <<'PY'
import json
import pathlib
import sys
idx = json.loads(pathlib.Path(sys.argv[1]).read_text(encoding="utf-8"))
for p in sorted((idx.get("workflows") or {}).keys()):
print(p)
PY
)"
fi
while IFS= read -r fp; do
[ -z "$fp" ] && continue
python_json "$WORKFLOW_INDEX_FILE" "$fp" "$(workflow_state_from_remote_json "$fp")" <<'PY'
import json
import pathlib
import sys
idx = json.loads(pathlib.Path(sys.argv[1]).read_text(encoding="utf-8"))
flow_path = sys.argv[2]
remote = json.loads(sys.argv[3])
expected = (idx.get("workflows") or {}).get(flow_path) or {}
print(f"[workflow] {flow_path}")
if not expected:
print(" status: untracked (no baseline)")
raise SystemExit(0)
flow_ok = expected.get("flow_fingerprint") == remote.get("flow_fingerprint")
exp_sched = expected.get("schedules") or {}
rem_sched = remote.get("schedules") or {}
added = sorted(set(rem_sched) - set(exp_sched))
removed = sorted(set(exp_sched) - set(rem_sched))
changed = sorted(k for k in (set(exp_sched) & set(rem_sched)) if exp_sched[k] != rem_sched[k])
if flow_ok and not (added or removed or changed):
print(" status: no changes")
else:
print(" status: changed")
if not flow_ok:
print(" ~ flow")
for k in added:
print(f" + schedule {k}")
for k in changed:
print(f" ~ schedule {k}")
for k in removed:
print(f" - schedule {k}")
PY
done <<< "$target_paths"
}
case "${1:-help}" in
whoami)
api_get "/users/whoami" | json_pretty
;;
scripts|list-scripts)
api_get "/scripts/list?per_page=${2:-100}" | json_pretty
;;
flows|list-flows)
api_get "/flows/list?per_page=${2:-100}" | json_pretty
;;
schedules|list-schedules)
api_get "/schedules/list?per_page=${2:-100}" | json_pretty
;;
get-script)
require_arg "$0 get-script <path>" "${2:-}"
api_get "/scripts/get/p/$2" | json_pretty
;;
get-flow)
require_arg "$0 get-flow <path>" "${2:-}"
api_get "/flows/get/$2" | json_pretty
;;
get-schedule)
require_arg "$0 get-schedule <path>" "${2:-}"
api_get "/schedules/get/$2" | json_pretty
;;
create-script)
require_arg "$0 create-script <json-file>" "${2:-}"
api_post "/scripts/create" "$(cat "$2")"
;;
create-flow)
require_arg "$0 create-flow <json-file>" "${2:-}"
api_post "/flows/create" "$(cat "$2")"
;;
update-flow)
require_arg "$0 update-flow <path> <json-file>" "${2:-}"
require_arg "$0 update-flow <path> <json-file>" "${3:-}"
api_put "/flows/update/$2" "$(cat "$3")"
;;
create-schedule)
require_arg "$0 create-schedule <json-file>" "${2:-}"
api_post "/schedules/create" "$(cat "$2")"
;;
run-script)
require_arg "$0 run-script <path> [json-args]" "${2:-}"
local_args="${3:-{}}"
api_post "/jobs/run/p/$2" "${local_args}"
;;
run-flow)
require_arg "$0 run-flow <path> [json-args]" "${2:-}"
local_args="${3:-{}}"
api_post "/jobs/run/f/$2" "${local_args}"
;;
job-status)
require_arg "$0 job-status <job-id>" "${2:-}"
api_get "/jobs_u/get/$2" | json_pretty
;;
job-result)
require_arg "$0 job-result <job-id>" "${2:-}"
api_get "/jobs_u/completed/get_result/$2" | json_pretty
;;
pull-script)
require_arg "$0 pull-script <path> <outfile>" "${2:-}"
require_arg "$0 pull-script <path> <outfile>" "${3:-}"
pull_script "$2" "$3"
;;
push-script)
require_arg "$0 push-script <path> <infile>" "${2:-}"
require_arg "$0 push-script <path> <infile>" "${3:-}"
push_script "$2" "$3"
;;
pull-flow)
require_arg "$0 pull-flow <path> <outfile>" "${2:-}"
require_arg "$0 pull-flow <path> <outfile>" "${3:-}"
pull_flow "$2" "$3"
;;
push-flow)
require_arg "$0 push-flow <json-file>" "${2:-}"
push_flow "$2"
;;
pull-schedule)
require_arg "$0 pull-schedule <path> <outfile>" "${2:-}"
require_arg "$0 pull-schedule <path> <outfile>" "${3:-}"
pull_schedule "$2" "$3"
;;
pull-workflow)
require_arg "$0 pull-workflow <flow_path> [outdir]" "${2:-}"
pull_workflow "$2" "${3:-}"
refresh_indexes
;;
push-workflow)
require_arg "$0 push-workflow <workflow-dir>" "${2:-}"
push_workflow "$2"
refresh_indexes
;;
pull-all)
pull_all
;;
pull-all-workflows)
pull_all_workflows
;;
status-remote)
status_remote
;;
status-workflow)
status_workflow "${2:-}"
;;
refresh-index)
refresh_indexes
;;
version)
curl -sk "${WINDMILL_URL}/api/version" 2>/dev/null
echo ""
;;
help|*)
cat <<'EOF'
Windmill REST API ヘルパー
使い方: ./wm-api.sh <command> [args...]
コマンド:
whoami - 現在のユーザー情報を表示
version - サーバーバージョンを表示
scripts|list-scripts [n] - スクリプト一覧を表示
flows|list-flows [n] - フロー一覧を表示
schedules|list-schedules [n] - スケジュール一覧を表示
get-script <path> - スクリプトの詳細を取得
get-flow <path> - フローの詳細を取得
get-schedule <path> - スケジュールの詳細を取得
create-script <file> - JSONファイルからスクリプトを作成
create-flow <file> - JSONファイルからフローを作成
update-flow <path> <file> - フローを更新
create-schedule <file> - JSONファイルからスケジュールを作成
run-script <path> [args] - スクリプトを実行
run-flow <path> [args] - フローを実行
job-status <id> - ジョブのステータスを確認
job-result <id> - ジョブの結果を取得
pull-script <path> <outfile> - スクリプトをローカルへ保存
push-script <path> <infile> - ローカルファイルをスクリプトへ反映
pull-flow <path> <outfile> - フローをローカルJSONへ保存
push-flow <json-file> - フローJSONを削除再作成で反映
pull-schedule <path> <outfile> - スケジュールをローカルJSONへ保存
pull-workflow <flow_path> [dir] - workflow package (flow+schedules) を取得
push-workflow <workflow-dir> - workflow package を安全反映
pull-all - scripts/flowsを一括pullしてstate更新
pull-all-workflows - flow全件をworkflow packageとしてpull
status-remote - remote_index基準で差分表示
status-workflow [flow_path] - workflow単位差分表示
refresh-index - remote/workflow index を再生成
EOF
;;
esac