#!/usr/bin/env python3 """ Grid-search launcher (2-GPU, fixed per-GPU batch sizes) • one job at a time per GPU • watchdog heartbeats + error pushes """ import datetime import itertools import queue import signal import subprocess import sys import threading import time from os import environ from pathlib import Path import GPUtil import requests # ─────────────── Watchdog / host configuration ──────────────── WATCHDOG_ROOT = "https://mt.the-k.at" BASIC_AUTH = ("jan", "jruuI3GZ@9Rnt7") HEARTBEAT_EVERY = 60 HOSTNAME = "gpu-node-01" OUT_FOLDER = Path("./grid_search") DONE_FOLDER = OUT_FOLDER / "done" FAILED_FOLDER = OUT_FOLDER / "failed" PING_URL, ERR_URL = f"{WATCHDOG_ROOT}/ping", f"{WATCHDOG_ROOT}/error" HEADERS = {"Content-Type": "application/json"} # ───────────────────── Hyper-parameter grid ───────────────────── PARAM_GRID = { "network_name": ["subter_LeNet", "subter_efficient"], "latent_space_dim": [32, 64, 128, 256, 512, 768, 1024], "semi_label": [(0, 0), (50, 10), (500, 100)], # (normal, anomaly) } # ────────────────────── FIXED PARAMETERS ────────────────────── BATCH_SIZE = 128 # fixed batch size per GPU EPOCHS_AE = 50 # autoencoder epochs EPOCHS_DEEPSAD = 120 # Deep SAD epochs CMD_TEMPLATE = ( "python src/main.py train subter {network_name} {exp_folder} " "~/jan/data --k_fold True --k_fold_num 5 " "--num_known_normal {num_known_normal} --num_known_outlier {num_known_anomaly} " "--lr 0.0001 --n_epochs {deepsad_epochs} --lr_milestone 50 --batch_size {batch_size} " "--weight_decay 0.5e-6 --latent_space_dim {latent_space_dim} " "--pretrain True --ae_lr 0.0001 --ae_n_epochs {ae_epochs} --ae_batch_size {batch_size} " "--ae_weight_decay 0.5e-3 --normal_class 0 --known_outlier_class 1 " "--n_known_outlier_classes 1 --seed 3 --device cuda:0" ) # ──────────────── global bookkeeping / queues ────────────────── STOP_EVT = threading.Event() JOB_TABLE = [] # [{ …, status }] # ─────────────────────── helper functions ────────────────────── def post_json(url, payload): try: requests.post( url, json=payload, headers=HEADERS, auth=BASIC_AUTH, timeout=5 ).raise_for_status() except Exception as exc: print(f"[warn] POST {url} failed: {exc}", file=sys.stderr) def gpu_stats(): stats = [] for g in GPUtil.getGPUs(): stats.append( { "id": g.id, "util": int(g.load * 100), "mem": {"used": int(g.memoryUsed), "total": int(g.memoryTotal)}, } ) return stats def summarise_jobs(): out = {"pending": 0, "running": 0, "ok": 0, "fail": 0} for j in JOB_TABLE: out[j["status"]] += 1 return out def heartbeater(): while not STOP_EVT.wait(HEARTBEAT_EVERY): post_json( PING_URL, {"host": HOSTNAME, "gpu": gpu_stats(), "status": summarise_jobs()} ) # ───────────────────────── worker logic ──────────────────────── def worker(device: str, q: "queue.Queue[dict]"): while True: job = q.get() if job is None: break # sentinel for shutdown wait_until_allowed_hours() run_job(job, device) q.task_done() def exp_folder_name(params): num_norm, num_anom = params["semi_label"] return ( f"{params['network_name']}_" f"latent{params['latent_space_dim']}_" f"n{num_norm}_a{num_anom}" ) def move_exp_folder(exp_folder: Path, target_folder: Path): """Move the experiment folder to the target folder.""" destination_folder = target_folder / exp_folder.name if destination_folder.exists(): destination_folder.unlink() target_folder.mkdir(parents=True, exist_ok=True) exp_folder.rename(target_folder / exp_folder.name) def already_done_set(): if not DONE_FOLDER.exists(): return set() return set([f.name for f in DONE_FOLDER.iterdir() if f.is_dir()]) def run_job(job: dict, device: str): num_norm, num_anom = job["params"]["semi_label"] exp_folder = job["exp_folder"] exp_folder.mkdir(parents=True, exist_ok=True) cmd = CMD_TEMPLATE.format( network_name=job["params"]["network_name"], exp_folder=exp_folder.as_posix(), num_known_normal=num_norm, num_known_anomaly=num_anom, latent_space_dim=job["params"]["latent_space_dim"], batch_size=BATCH_SIZE, ae_epochs=EPOCHS_AE, deepsad_epochs=EPOCHS_DEEPSAD, ) job.update({"device": device, "status": "running", "start": time.time()}) post_json( PING_URL, {"host": HOSTNAME, "msg": f"starting {exp_folder.name} on {device}"} ) print(f"EXEC [{device}] {cmd}", flush=True) try: env = environ.copy() env["CUDA_VISIBLE_DEVICES"] = device.split(":")[1] # "0" or "1" res = subprocess.run( cmd, shell=True, capture_output=True, text=True, env=env, check=False ) job.update({"end": time.time(), "returncode": res.returncode}) job["status"] = "ok" if res.returncode == 0 else "fail" if res.returncode == 0: move_exp_folder(exp_folder, DONE_FOLDER) else: move_exp_folder(exp_folder, FAILED_FOLDER) tail = (res.stderr or "no-stderr")[-500:] post_json( ERR_URL, { "host": HOSTNAME, "msg": f"rc {res.returncode} {exp_folder.name} → {tail}", }, ) print( f"ERRR [{device}] rc {res.returncode} {exp_folder.name} → {tail}", flush=True, ) except Exception as exc: job.update({"end": time.time(), "status": "fail", "returncode": -999}) move_exp_folder(exp_folder, FAILED_FOLDER) post_json( ERR_URL, {"host": HOSTNAME, "msg": f'exception "{exc}" in {exp_folder.name}'}, ) # ────────────────────── graceful shutdown ────────────────────── def shutdown(reason): post_json(ERR_URL, {"host": HOSTNAME, "msg": f"run stopped: {reason}"}) STOP_EVT.set() sys.exit(0) for sig in (signal.SIGINT, signal.SIGTERM): signal.signal(sig, lambda s, f, sig=sig: shutdown(signal.Signals(sig).name)) # ─────────────────────────── main() ──────────────────────────── def wait_until_allowed_hours(): """Sleep until the current time is between 22:00 and 04:00.""" while True: now = datetime.datetime.now() hour = now.hour # Allowed window: 22:00 (22) to 04:00 (4) if hour >= 22 or hour < 4: break # Calculate seconds until 22:00 today or 22:00 tomorrow if after 4am if hour < 22: next_allowed = now.replace(hour=22, minute=0, second=0, microsecond=0) else: # After 4am but before 22:00, so next allowed is tonight next_allowed = now.replace(hour=22, minute=0, second=0, microsecond=0) if next_allowed < now: next_allowed += datetime.timedelta(days=1) wait_seconds = (next_allowed - now).total_seconds() print(f"[info] Waiting until 22:00 to start new jobs ({int(wait_seconds/60)} min)...") time.sleep(min(wait_seconds, 60*10)) # Sleep in chunks of max 10 minutes def main(): threading.Thread(target=heartbeater, daemon=True).start() # two worker queues, one per GPU q0, q1 = queue.Queue(), queue.Queue() threading.Thread(target=worker, args=("cuda:0", q0), daemon=True).start() threading.Thread(target=worker, args=("cuda:1", q1), daemon=True).start() # build full parameter combinations as jobs keys = list(PARAM_GRID) jobs = [] for combo in itertools.product(*PARAM_GRID.values()): params = dict(zip(keys, combo)) exp_folder = OUT_FOLDER / exp_folder_name(params) job = { "params": params, "exp_folder": exp_folder, "status": "pending", "start": None, "end": None, "returncode": None, } jobs.append(job) # Check which jobs are already done and mark them done_exps = already_done_set() #print(f"Already done jobs found: {done_exps}") for job in jobs: if job["exp_folder"].name in done_exps: job["status"] = "ok" # Print summary of job statuses before starting n_done = sum(1 for job in jobs if job["status"] == "ok") n_pending = sum(1 for job in jobs if job["status"] != "ok") print(f"[info] {n_done} jobs already done, {n_pending} jobs pending.") # Add all jobs to global JOB_TABLE for bookkeeping JOB_TABLE.extend(jobs) # Only enqueue jobs that are not already done for job in jobs: if job["status"] == "ok": continue # Hardcode device assignment if job["params"]["network_name"] == "subter_LeNet": q1.put(job) # cuda:1 elif job["params"]["network_name"] == "subter_efficient": q0.put(job) # cuda:0 else: # fallback: load-balance (q0 if q0.qsize() <= q1.qsize() else q1).put(job) q0.join() q1.join() # wait for all jobs to drain q0.put(None) q1.put(None) # terminate workers STOP_EVT.set() post_json( PING_URL, { "host": HOSTNAME, "msg": "all jobs done", "gpu": gpu_stats(), "status": summarise_jobs(), }, ) print("[info] grid search completed") if __name__ == "__main__": main()