from __future__ import annotations import json import pickle from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import numpy as np import polars as pl from diff_df import recursive_diff_frames from polars.testing import assert_frame_equal # ------------------------------------------------------------ # Config you can tweak # ------------------------------------------------------------ MODELS = ["deepsad", "isoforest", "ocsvm"] EVALS = ["exp_based", "manual_based"] SCHEMA_STATIC = { # identifiers / dims "network": pl.Utf8, # e.g. "LeNet", "efficient" "latent_dim": pl.Int32, "semi_normals": pl.Int32, "semi_anomalous": pl.Int32, "model": pl.Utf8, # "deepsad" | "isoforest" | "ocsvm" "eval": pl.Utf8, # "exp_based" | "manual_based" "fold": pl.Int32, # metrics "auc": pl.Float64, "ap": pl.Float64, # per-sample scores: list of (idx, label, score) "scores": pl.List( pl.Struct( { "sample_idx": pl.Int32, # dataloader idx "orig_label": pl.Int8, # {-1,0,1} "score": pl.Float64, # anomaly score } ) ), # curves (normalized) "roc_curve": pl.Struct( { "fpr": pl.List(pl.Float64), "tpr": pl.List(pl.Float64), "thr": pl.List(pl.Float64), } ), "prc_curve": pl.Struct( { "precision": pl.List(pl.Float64), "recall": pl.List(pl.Float64), "thr": pl.List(pl.Float64), # may be len(precision)-1 } ), # deepsad-only per-eval arrays (None for other models) "sample_indices": pl.List(pl.Int32), "sample_labels": pl.List(pl.Int8), "valid_mask": pl.List(pl.Boolean), # timings / housekeeping "train_time": pl.Float64, "test_time": pl.Float64, "folder": pl.Utf8, "k_fold_num": pl.Int32, "config_json": pl.Utf8, # full config.json as string (for reference) } # Pretraining-only (AE) schema # Pretraining-only (AE) schema — lighter defaults PRETRAIN_SCHEMA = { # identifiers / dims "network": pl.Utf8, # e.g. "LeNet", "efficient" "latent_dim": pl.Int32, "semi_normals": pl.Int32, "semi_anomalous": pl.Int32, "model": pl.Utf8, # always "ae" "fold": pl.Int32, # timings and optimization "train_time": pl.Float64, "test_time": pl.Float64, "loss": pl.Float64, # per-sample arrays (as lists) "indices": pl.List(pl.Int32), "labels_exp_based": pl.List(pl.Int32), "labels_manual_based": pl.List(pl.Int32), "semi_targets": pl.List(pl.Int32), "file_ids": pl.List(pl.Int32), "frame_ids": pl.List(pl.Int32), "scores": pl.List(pl.Float32), # <— use Float32 to match source and save space # file id -> name mapping from the result dict "file_names": pl.List(pl.Struct({"file_id": pl.Int32, "name": pl.Utf8})), # housekeeping "folder": pl.Utf8, "k_fold_num": pl.Int32, "config_json": pl.Utf8, # full config.json as string (for reference) } SCHEMA_INFERENCE = { # identifiers / dims "experiment": pl.Utf8, # e.g. "2_static_no_artifacts_illuminated_2023-01-23-001" "network": pl.Utf8, # e.g. "LeNet", "efficient" "latent_dim": pl.Int32, "semi_normals": pl.Int32, "semi_anomalous": pl.Int32, "model": pl.Utf8, # "deepsad" | "isoforest" | "ocsvm" # metrics "scores": pl.List(pl.Float64), # timings / housekeeping "folder": pl.Utf8, "config_json": pl.Utf8, # full config.json as string (for reference) } # ------------------------------------------------------------ # Helpers: curve/scores normalizers (tuples/ndarrays -> dict/list) # ------------------------------------------------------------ def _tolist(x): if x is None: return None if isinstance(x, np.ndarray): return x.tolist() if isinstance(x, (list, tuple)): return list(x) # best-effort scalar wrap try: return [x] except Exception: return None def normalize_float_list(a) -> Optional[List[float]]: if a is None: return None if isinstance(a, np.ndarray): a = a.tolist() return [None if x is None else float(x) for x in a] def normalize_file_names(d) -> Optional[List[dict]]: """ Convert the 'file_names' dict (keys like numpy.int64 -> str) to a list[ {file_id:int, name:str} ], sorted by file_id. """ if not isinstance(d, dict): return None out: List[dict] = [] for k, v in d.items(): try: file_id = int(k) except Exception: # keys are printed as np.int64 in the structure; best-effort cast continue out.append({"file_id": file_id, "name": str(v)}) out.sort(key=lambda x: x["file_id"]) return out def normalize_roc(obj: Any) -> Optional[dict]: if obj is None: return None fpr = tpr = thr = None if isinstance(obj, (tuple, list)): if len(obj) >= 2: fpr, tpr = _tolist(obj[0]), _tolist(obj[1]) if len(obj) >= 3: thr = _tolist(obj[2]) elif isinstance(obj, dict): fpr = _tolist(obj.get("fpr") or obj.get("x")) tpr = _tolist(obj.get("tpr") or obj.get("y")) thr = _tolist(obj.get("thr") or obj.get("thresholds")) else: return None if fpr is None or tpr is None: return None return {"fpr": fpr, "tpr": tpr, "thr": thr} def normalize_prc(obj: Any) -> Optional[dict]: if obj is None: return None precision = recall = thr = None if isinstance(obj, (tuple, list)): if len(obj) >= 2: precision, recall = _tolist(obj[0]), _tolist(obj[1]) if len(obj) >= 3: thr = _tolist(obj[2]) elif isinstance(obj, dict): precision = _tolist(obj.get("precision") or obj.get("y")) recall = _tolist(obj.get("recall") or obj.get("x")) thr = _tolist(obj.get("thr") or obj.get("thresholds")) else: return None if precision is None or recall is None: return None return {"precision": precision, "recall": recall, "thr": thr} def normalize_scores_to_struct(seq) -> Optional[List[dict]]: """ Input: list of (idx, label, score) tuples (as produced in your test()). Output: list of dicts with keys sample_idx, orig_label, score. """ if seq is None: return None if isinstance(seq, np.ndarray): seq = seq.tolist() if not isinstance(seq, (list, tuple)): return None out: List[dict] = [] for item in seq: if isinstance(item, (list, tuple)) and len(item) >= 3: idx, lab, sc = item[0], item[1], item[2] out.append( { "sample_idx": None if idx is None else int(idx), "orig_label": None if lab is None else int(lab), "score": None if sc is None else float(sc), } ) else: # fallback: single numeric -> score sc = ( float(item) if isinstance(item, (int, float, np.integer, np.floating)) else None ) out.append({"sample_idx": None, "orig_label": None, "score": sc}) return out def normalize_int_list(a) -> Optional[List[int]]: if a is None: return None if isinstance(a, np.ndarray): a = a.tolist() return list(a) def normalize_bool_list(a) -> Optional[List[bool]]: if a is None: return None if isinstance(a, np.ndarray): a = a.tolist() return [bool(x) for x in a] # ------------------------------------------------------------ # Low-level: read one experiment folder # ------------------------------------------------------------ def read_config(exp_dir: Path, k_fold_required: bool = True) -> dict: cfg = exp_dir / "config.json" with cfg.open("r") as f: c = json.load(f) if k_fold_required and not c.get("k_fold"): raise ValueError(f"{exp_dir.name}: not trained as k-fold") return c def read_pickle(p: Path) -> Any: with p.open("rb") as f: return pickle.load(f) # ------------------------------------------------------------ # Extractors for each model # ------------------------------------------------------------ counting = { (label_method, eval_method): [] for label_method in ["exp_based", "manual_based"] for eval_method in ["roc", "prc"] } def rows_from_deepsad(data: dict, evals: List[str]) -> Dict[str, dict]: """ deepsad under data['test'][eval], with extra per-eval arrays and AP present. """ out: Dict[str, dict] = {} test = data.get("test", {}) for ev in evals: evd = test.get(ev) if not isinstance(evd, dict): continue counting[(ev, "roc")].append(len(evd["roc"][0])) counting[(ev, "prc")].append(len(evd["prc"][0])) out[ev] = { "auc": float(evd["auc"]) if "auc" in evd and evd["auc"] is not None else None, "roc": normalize_roc(evd.get("roc")), "prc": normalize_prc(evd.get("prc")), "ap": float(evd["ap"]) if "ap" in evd and evd["ap"] is not None else None, "scores": normalize_scores_to_struct(evd.get("scores")), "sample_indices": normalize_int_list(evd.get("indices")), "sample_labels": normalize_int_list(evd.get("labels")), "valid_mask": normalize_bool_list(evd.get("valid_mask")), "train_time": data.get("train", {}).get("time"), "test_time": test.get("time"), } return out def rows_from_isoforest(data: dict, evals: List[str]) -> Dict[str, dict]: """ Keys: test_auc_, test_roc_, test_prc_, test_ap_, test_scores_. """ out: Dict[str, dict] = {} for ev in evals: auc = data.get(f"test_auc_{ev}") if auc is None: continue out[ev] = { "auc": float(auc), "roc": normalize_roc(data.get(f"test_roc_{ev}")), "prc": normalize_prc(data.get(f"test_prc_{ev}")), "ap": float(data.get(f"test_ap_{ev}")) if data.get(f"test_ap_{ev}") is not None else None, "scores": normalize_scores_to_struct(data.get(f"test_scores_{ev}")), "sample_indices": None, "sample_labels": None, "valid_mask": None, "train_time": data.get("train_time"), "test_time": data.get("test_time"), } return out def rows_from_ocsvm_default(data: dict, evals: List[str]) -> Dict[str, dict]: """ Default OCSVM only (ignore linear variant entirely). """ out: Dict[str, dict] = {} for ev in evals: auc = data.get(f"test_auc_{ev}") if auc is None: continue out[ev] = { "auc": float(auc), "roc": normalize_roc(data.get(f"test_roc_{ev}")), "prc": normalize_prc(data.get(f"test_prc_{ev}")), "ap": float(data.get(f"test_ap_{ev}")) if data.get(f"test_ap_{ev}") is not None else None, "scores": normalize_scores_to_struct(data.get(f"test_scores_{ev}")), "sample_indices": None, "sample_labels": None, "valid_mask": None, "train_time": data.get("train_time"), "test_time": data.get("test_time"), } return out # ------------------------------------------------------------ # Build the Polars DataFrame # ------------------------------------------------------------ def load_results_dataframe(root: Path, allow_cache: bool = True) -> pl.DataFrame: """ Walks experiment subdirs under `root`. For each (model, fold) it adds rows: Columns (SCHEMA_STATIC): network, latent_dim, semi_normals, semi_anomalous, model, eval, fold, auc, ap, scores{sample_idx,orig_label,score}, roc_curve{fpr,tpr,thr}, prc_curve{precision,recall,thr}, sample_indices, sample_labels, valid_mask, train_time, test_time, folder, k_fold_num """ if allow_cache: cache = root / "results_cache.parquet" if cache.exists(): try: df = pl.read_parquet(cache) print(f"[info] loaded cached results frame from {cache}") return df except Exception as e: print(f"[warn] failed to load cache {cache}: {e}") rows: List[dict] = [] exp_dirs = [p for p in root.iterdir() if p.is_dir()] for exp_dir in sorted(exp_dirs): try: cfg = read_config(exp_dir) cfg_json = json.dumps(cfg, sort_keys=True) except Exception as e: print(f"[warn] skipping {exp_dir.name}: {e}") continue network = cfg.get("net_name") latent_dim = int(cfg.get("latent_space_dim")) semi_normals = int(cfg.get("num_known_normal")) semi_anomalous = int(cfg.get("num_known_outlier")) k = int(cfg.get("k_fold_num")) for model in MODELS: for fold in range(k): pkl = exp_dir / f"results_{model}_{fold}.pkl" if not pkl.exists(): continue try: data = read_pickle(pkl) except Exception as e: print(f"[warn] failed to read {pkl.name}: {e}") continue if model == "deepsad": per_eval = rows_from_deepsad(data, EVALS) # eval -> dict elif model == "isoforest": per_eval = rows_from_isoforest(data, EVALS) # eval -> dict elif model == "ocsvm": per_eval = rows_from_ocsvm_default(data, EVALS) # eval -> dict else: per_eval = {} for ev, vals in per_eval.items(): rows.append( { "network": network, "latent_dim": latent_dim, "semi_normals": semi_normals, "semi_anomalous": semi_anomalous, "model": model, "eval": ev, "fold": fold, "auc": vals["auc"], "ap": vals["ap"], "scores": vals["scores"], "roc_curve": vals["roc"], "prc_curve": vals["prc"], "sample_indices": vals.get("sample_indices"), "sample_labels": vals.get("sample_labels"), "valid_mask": vals.get("valid_mask"), "train_time": vals["train_time"], "test_time": vals["test_time"], "folder": str(exp_dir), "k_fold_num": k, "config_json": cfg_json, } ) # If empty, return a typed empty frame if not rows: return pl.DataFrame(schema=SCHEMA_STATIC) df = pl.DataFrame(rows, schema=SCHEMA_STATIC) # Cast to efficient dtypes (categoricals etc.) – no extra sanitation df = df.with_columns( pl.col("network", "model", "eval").cast(pl.Categorical), pl.col( "latent_dim", "semi_normals", "semi_anomalous", "fold", "k_fold_num" ).cast(pl.Int32), pl.col("auc", "ap", "train_time", "test_time").cast(pl.Float64), # NOTE: no cast on 'scores' here; it's already List(Struct) per schema. ) if allow_cache: try: df.write_parquet(cache) print(f"[info] cached results frame to {cache}") except Exception as e: print(f"[warn] failed to write cache {cache}: {e}") return df def load_pretraining_results_dataframe( root: Path, allow_cache: bool = True, keep_file_names: bool = False, # <— drop file_names by default; they’re repeated parquet_compression: str = "zstd", parquet_compression_level: int = 7, # <— stronger compression than default ) -> pl.DataFrame: """ Loads only AE pretraining results: files named `results_ae_.pkl`. Produces one row per (experiment, fold, split). By default we: - include only the TEST split (include_train=False) - store scores as Float32 - drop the repeated file_names mapping to save space - write Parquet with zstd(level=7) """ if allow_cache: cache = root / "pretraining_results_cache.parquet" if cache.exists(): try: df = pl.read_parquet(cache) print(f"[info] loaded cached pretraining frame from {cache}") return df except Exception as e: print(f"[warn] failed to load pretraining cache {cache}: {e}") rows: List[dict] = [] exp_dirs = [p for p in root.iterdir() if p.is_dir()] for exp_dir in sorted(exp_dirs): try: cfg = read_config(exp_dir) cfg_json = json.dumps(cfg, sort_keys=True) except Exception as e: print(f"[warn] skipping {exp_dir.name} (pretraining): {e}") continue network = cfg.get("net_name") latent_dim = int(cfg.get("latent_space_dim")) semi_normals = int(cfg.get("num_known_normal")) semi_anomalous = int(cfg.get("num_known_outlier")) k = int(cfg.get("k_fold_num")) for fold in range(k): pkl = exp_dir / f"results_ae_{fold}.pkl" if not pkl.exists(): continue try: data = read_pickle(pkl) # expected: {"train": {...}, "test": {...}} except Exception as e: print(f"[warn] failed to read {pkl.name}: {e}") continue train_time = data.get("train", {}).get("time") data = data.get("test", {}) rows.append( { "network": network, "latent_dim": latent_dim, "semi_normals": semi_normals, "semi_anomalous": semi_anomalous, "model": "ae", "fold": fold, "train_time": train_time, "test_time": data.get("time"), "loss": float(data.get("loss")) if data.get("loss") is not None else None, # ints as Int32, scores as Float32 to save space "indices": normalize_int_list(data.get("indices")), "labels_exp_based": normalize_int_list( data.get("labels_exp_based") ), "labels_manual_based": normalize_int_list( data.get("labels_manual_based") ), "semi_targets": normalize_int_list(data.get("semi_targets")), "file_ids": normalize_int_list(data.get("file_ids")), "frame_ids": normalize_int_list(data.get("frame_ids")), "scores": ( None if data.get("scores") is None else [ float(x) for x in ( data["scores"].tolist() if isinstance(data["scores"], np.ndarray) else data["scores"] ) ] ), "file_names": normalize_file_names(data.get("file_names")) if keep_file_names else None, "folder": str(exp_dir), "k_fold_num": k, "config_json": cfg_json, } ) if not rows: return pl.DataFrame(schema=PRETRAIN_SCHEMA) df = pl.DataFrame(rows, schema=PRETRAIN_SCHEMA) # Cast/optimize a bit (categoricals, ints, floats) df = df.with_columns( pl.col("network", "model").cast(pl.Categorical), pl.col( "latent_dim", "semi_normals", "semi_anomalous", "fold", "k_fold_num" ).cast(pl.Int32), pl.col("test_time", "train_time", "loss").cast(pl.Float64), pl.col("scores").cast(pl.List(pl.Float32)), # ensure downcast took ) if allow_cache: try: cache = root / "pretraining_results_cache.parquet" df.write_parquet( cache, compression=parquet_compression, compression_level=parquet_compression_level, statistics=True, ) print( f"[info] cached pretraining frame to {cache} " f"({parquet_compression}, level={parquet_compression_level})" ) except Exception as e: print(f"[warn] failed to write pretraining cache {cache}: {e}") return df def load_inference_results_dataframe( root: Path, allow_cache: bool = True, models: List[str] = MODELS, ) -> pl.DataFrame: """Load inference results from experiment folders. Args: root: Path to root directory containing experiment folders allow_cache: Whether to use/create cache file models: List of models to look for scores Returns: pl.DataFrame: DataFrame containing inference results """ if allow_cache: cache = root / "inference_results_cache.parquet" if cache.exists(): try: df = pl.read_parquet(cache) print(f"[info] loaded cached inference frame from {cache}") return df except Exception as e: print(f"[warn] failed to load inference cache {cache}: {e}") rows: List[dict] = [] exp_dirs = [p for p in root.iterdir() if p.is_dir()] for exp_dir in sorted(exp_dirs): try: # Load and validate config cfg = read_config(exp_dir, k_fold_required=False) cfg_json = json.dumps(cfg, sort_keys=True) # Extract config values network = cfg.get("net_name") latent_dim = int(cfg.get("latent_space_dim")) semi_normals = int(cfg.get("num_known_normal")) semi_anomalous = int(cfg.get("num_known_outlier")) # Process each model's scores inference_dir = exp_dir / "inference" if not inference_dir.exists(): print(f"[warn] no inference directory for {exp_dir.name}") continue # Find all unique experiments in this folder's inference files score_files = list(inference_dir.glob("*_scores.npy")) if not score_files: print(f"[warn] no score files in {inference_dir}") continue # Extract unique experiment names from score files # Format: {experiment}_{model}_scores.npy experiments = set() for score_file in score_files: exp_name = score_file.stem.rsplit("_", 2)[0] experiments.add(exp_name) # Load scores for each experiment and model for experiment in sorted(experiments): for model in models: score_file = inference_dir / f"{experiment}_{model}_scores.npy" if not score_file.exists(): print(f"[warn] missing score file for {experiment}, {model}") continue try: scores = np.load(score_file) rows.append( { "experiment": experiment, "network": network, "latent_dim": latent_dim, "semi_normals": semi_normals, "semi_anomalous": semi_anomalous, "model": model, "scores": scores.tolist(), "folder": str(exp_dir), "config_json": cfg_json, } ) except Exception as e: print( f"[warn] failed to load scores for {experiment}, {model}: {e}" ) continue except Exception as e: print(f"[warn] skipping {exp_dir.name}: {e}") continue # If empty, return a typed empty frame if not rows: return pl.DataFrame(schema=SCHEMA_INFERENCE) df = pl.DataFrame(rows, schema=SCHEMA_INFERENCE) # Optimize datatypes df = df.with_columns( [ pl.col("experiment", "network", "model").cast(pl.Categorical), pl.col("latent_dim", "semi_normals", "semi_anomalous").cast(pl.Int32), ] ) # Cache if enabled if allow_cache: try: df.write_parquet(cache) print(f"[info] cached inference frame to {cache}") except Exception as e: print(f"[warn] failed to write cache {cache}: {e}") return df def main(): inference_root = Path("/home/fedex/mt/results/inference/copy") df_inference = load_inference_results_dataframe(inference_root, allow_cache=True) exit(0) root = Path("/home/fedex/mt/results/copy") df1 = load_results_dataframe(root, allow_cache=True) exit(0) retest_root = Path("/home/fedex/mt/results/copy/retest_nodrop") df2 = load_results_dataframe(retest_root, allow_cache=False).drop("folder") # exact schema & shape first (optional but helpful messages) assert df1.shape == df2.shape, f"Shape differs: {df1.shape} vs {df2.shape}" assert set(df1.columns) == set(df2.columns), ( f"Column sets differ: {df1.columns} vs {df2.columns}" ) # allow small float diffs, ignore column order differences if you want df1_sorted = df1.select(sorted(df1.columns)) df2_sorted = df2.select(sorted(df2.columns)) # Optionally pre-align/sort both frames by a stable key before diffing. summary, leaves = recursive_diff_frames( df1, df2, ignore=["timestamp"], # columns to ignore float_atol=0.1, # absolute tolerance for floats float_rtol=0.0, # relative tolerance for floats max_rows_per_column=20, # limit expansion per column max_leafs_per_row=200, # cap leaves per row ) pl.Config.set_fmt_table_cell_list_len(100) pl.Config.set_tbl_rows(100) print(summary) # which columns differ & how many rows print(leaves) # exact nested paths + scalar diffs # check_exact=False lets us use atol/rtol for floats assert_frame_equal( df1_sorted, df2_sorted, check_exact=False, atol=0.1, # absolute tolerance for floats rtol=0.0, # relative tolerance (set if you want % based) check_dtypes=True, # set False if you only care about values ) print("DataFrames match within tolerance ✅") # df_pre = load_pretraining_results_dataframe(root, allow_cache=True) # print("pretraining:", df_pre.shape, df_pre.head()) if __name__ == "__main__": main()