from __future__ import annotations import json import pickle from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import numpy as np import polars as pl from polars.testing import assert_frame_equal from diff_df import recursive_diff_frames # ------------------------------------------------------------ # Config you can tweak # ------------------------------------------------------------ MODELS = ["deepsad", "isoforest", "ocsvm"] EVALS = ["exp_based", "manual_based"] SCHEMA_STATIC = { # identifiers / dims "network": pl.Utf8, # e.g. "LeNet", "efficient" "latent_dim": pl.Int32, "semi_normals": pl.Int32, "semi_anomalous": pl.Int32, "model": pl.Utf8, # "deepsad" | "isoforest" | "ocsvm" "eval": pl.Utf8, # "exp_based" | "manual_based" "fold": pl.Int32, # metrics "auc": pl.Float64, "ap": pl.Float64, # per-sample scores: list of (idx, label, score) "scores": pl.List( pl.Struct( { "sample_idx": pl.Int32, # dataloader idx "orig_label": pl.Int8, # {-1,0,1} "score": pl.Float64, # anomaly score } ) ), # curves (normalized) "roc_curve": pl.Struct( { "fpr": pl.List(pl.Float64), "tpr": pl.List(pl.Float64), "thr": pl.List(pl.Float64), } ), "prc_curve": pl.Struct( { "precision": pl.List(pl.Float64), "recall": pl.List(pl.Float64), "thr": pl.List(pl.Float64), # may be len(precision)-1 } ), # deepsad-only per-eval arrays (None for other models) "sample_indices": pl.List(pl.Int32), "sample_labels": pl.List(pl.Int8), "valid_mask": pl.List(pl.Boolean), # timings / housekeeping "train_time": pl.Float64, "test_time": pl.Float64, "folder": pl.Utf8, "k_fold_num": pl.Int32, "config_json": pl.Utf8, # full config.json as string (for reference) } # Pretraining-only (AE) schema # Pretraining-only (AE) schema — lighter defaults PRETRAIN_SCHEMA = { # identifiers / dims "network": pl.Utf8, # e.g. "LeNet", "efficient" "latent_dim": pl.Int32, "semi_normals": pl.Int32, "semi_anomalous": pl.Int32, "model": pl.Utf8, # always "ae" "fold": pl.Int32, "split": pl.Utf8, # "train" | "test" # timings and optimization "time": pl.Float64, "loss": pl.Float64, # per-sample arrays (as lists) "indices": pl.List(pl.Int32), "labels_exp_based": pl.List(pl.Int32), "labels_manual_based": pl.List(pl.Int32), "semi_targets": pl.List(pl.Int32), "file_ids": pl.List(pl.Int32), "frame_ids": pl.List(pl.Int32), "scores": pl.List(pl.Float32), # <— use Float32 to match source and save space # file id -> name mapping from the result dict "file_names": pl.List(pl.Struct({"file_id": pl.Int32, "name": pl.Utf8})), # housekeeping "folder": pl.Utf8, "k_fold_num": pl.Int32, "config_json": pl.Utf8, # full config.json as string (for reference) } # ------------------------------------------------------------ # Helpers: curve/scores normalizers (tuples/ndarrays -> dict/list) # ------------------------------------------------------------ def _tolist(x): if x is None: return None if isinstance(x, np.ndarray): return x.tolist() if isinstance(x, (list, tuple)): return list(x) # best-effort scalar wrap try: return [x] except Exception: return None def normalize_float_list(a) -> Optional[List[float]]: if a is None: return None if isinstance(a, np.ndarray): a = a.tolist() return [None if x is None else float(x) for x in a] def normalize_file_names(d) -> Optional[List[dict]]: """ Convert the 'file_names' dict (keys like numpy.int64 -> str) to a list[ {file_id:int, name:str} ], sorted by file_id. """ if not isinstance(d, dict): return None out: List[dict] = [] for k, v in d.items(): try: file_id = int(k) except Exception: # keys are printed as np.int64 in the structure; best-effort cast continue out.append({"file_id": file_id, "name": str(v)}) out.sort(key=lambda x: x["file_id"]) return out def normalize_roc(obj: Any) -> Optional[dict]: if obj is None: return None fpr = tpr = thr = None if isinstance(obj, (tuple, list)): if len(obj) >= 2: fpr, tpr = _tolist(obj[0]), _tolist(obj[1]) if len(obj) >= 3: thr = _tolist(obj[2]) elif isinstance(obj, dict): fpr = _tolist(obj.get("fpr") or obj.get("x")) tpr = _tolist(obj.get("tpr") or obj.get("y")) thr = _tolist(obj.get("thr") or obj.get("thresholds")) else: return None if fpr is None or tpr is None: return None return {"fpr": fpr, "tpr": tpr, "thr": thr} def normalize_prc(obj: Any) -> Optional[dict]: if obj is None: return None precision = recall = thr = None if isinstance(obj, (tuple, list)): if len(obj) >= 2: precision, recall = _tolist(obj[0]), _tolist(obj[1]) if len(obj) >= 3: thr = _tolist(obj[2]) elif isinstance(obj, dict): precision = _tolist(obj.get("precision") or obj.get("y")) recall = _tolist(obj.get("recall") or obj.get("x")) thr = _tolist(obj.get("thr") or obj.get("thresholds")) else: return None if precision is None or recall is None: return None return {"precision": precision, "recall": recall, "thr": thr} def normalize_scores_to_struct(seq) -> Optional[List[dict]]: """ Input: list of (idx, label, score) tuples (as produced in your test()). Output: list of dicts with keys sample_idx, orig_label, score. """ if seq is None: return None if isinstance(seq, np.ndarray): seq = seq.tolist() if not isinstance(seq, (list, tuple)): return None out: List[dict] = [] for item in seq: if isinstance(item, (list, tuple)) and len(item) >= 3: idx, lab, sc = item[0], item[1], item[2] out.append( { "sample_idx": None if idx is None else int(idx), "orig_label": None if lab is None else int(lab), "score": None if sc is None else float(sc), } ) else: # fallback: single numeric -> score sc = ( float(item) if isinstance(item, (int, float, np.integer, np.floating)) else None ) out.append({"sample_idx": None, "orig_label": None, "score": sc}) return out def normalize_int_list(a) -> Optional[List[int]]: if a is None: return None if isinstance(a, np.ndarray): a = a.tolist() return list(a) def normalize_bool_list(a) -> Optional[List[bool]]: if a is None: return None if isinstance(a, np.ndarray): a = a.tolist() return [bool(x) for x in a] # ------------------------------------------------------------ # Low-level: read one experiment folder # ------------------------------------------------------------ def read_config(exp_dir: Path) -> dict: cfg = exp_dir / "config.json" with cfg.open("r") as f: c = json.load(f) if not c.get("k_fold"): raise ValueError(f"{exp_dir.name}: not trained as k-fold") return c def read_pickle(p: Path) -> Any: with p.open("rb") as f: return pickle.load(f) # ------------------------------------------------------------ # Extractors for each model # ------------------------------------------------------------ counting = { (label_method, eval_method): [] for label_method in ["exp_based", "manual_based"] for eval_method in ["roc", "prc"] } def rows_from_deepsad(data: dict, evals: List[str]) -> Dict[str, dict]: """ deepsad under data['test'][eval], with extra per-eval arrays and AP present. """ out: Dict[str, dict] = {} test = data.get("test", {}) for ev in evals: evd = test.get(ev) if not isinstance(evd, dict): continue counting[(ev, "roc")].append(len(evd["roc"][0])) counting[(ev, "prc")].append(len(evd["prc"][0])) out[ev] = { "auc": float(evd["auc"]) if "auc" in evd and evd["auc"] is not None else None, "roc": normalize_roc(evd.get("roc")), "prc": normalize_prc(evd.get("prc")), "ap": float(evd["ap"]) if "ap" in evd and evd["ap"] is not None else None, "scores": normalize_scores_to_struct(evd.get("scores")), "sample_indices": normalize_int_list(evd.get("indices")), "sample_labels": normalize_int_list(evd.get("labels")), "valid_mask": normalize_bool_list(evd.get("valid_mask")), "train_time": data.get("train", {}).get("time"), "test_time": test.get("time"), } return out def rows_from_isoforest(data: dict, evals: List[str]) -> Dict[str, dict]: """ Keys: test_auc_, test_roc_, test_prc_, test_ap_, test_scores_. """ out: Dict[str, dict] = {} for ev in evals: auc = data.get(f"test_auc_{ev}") if auc is None: continue out[ev] = { "auc": float(auc), "roc": normalize_roc(data.get(f"test_roc_{ev}")), "prc": normalize_prc(data.get(f"test_prc_{ev}")), "ap": float(data.get(f"test_ap_{ev}")) if data.get(f"test_ap_{ev}") is not None else None, "scores": normalize_scores_to_struct(data.get(f"test_scores_{ev}")), "sample_indices": None, "sample_labels": None, "valid_mask": None, "train_time": data.get("train_time"), "test_time": data.get("test_time"), } return out def rows_from_ocsvm_default(data: dict, evals: List[str]) -> Dict[str, dict]: """ Default OCSVM only (ignore linear variant entirely). """ out: Dict[str, dict] = {} for ev in evals: auc = data.get(f"test_auc_{ev}") if auc is None: continue out[ev] = { "auc": float(auc), "roc": normalize_roc(data.get(f"test_roc_{ev}")), "prc": normalize_prc(data.get(f"test_prc_{ev}")), "ap": float(data.get(f"test_ap_{ev}")) if data.get(f"test_ap_{ev}") is not None else None, "scores": normalize_scores_to_struct(data.get(f"test_scores_{ev}")), "sample_indices": None, "sample_labels": None, "valid_mask": None, "train_time": data.get("train_time"), "test_time": data.get("test_time"), } return out # ------------------------------------------------------------ # Build the Polars DataFrame # ------------------------------------------------------------ def load_results_dataframe(root: Path, allow_cache: bool = True) -> pl.DataFrame: """ Walks experiment subdirs under `root`. For each (model, fold) it adds rows: Columns (SCHEMA_STATIC): network, latent_dim, semi_normals, semi_anomalous, model, eval, fold, auc, ap, scores{sample_idx,orig_label,score}, roc_curve{fpr,tpr,thr}, prc_curve{precision,recall,thr}, sample_indices, sample_labels, valid_mask, train_time, test_time, folder, k_fold_num """ if allow_cache: cache = root / "results_cache.parquet" if cache.exists(): try: df = pl.read_parquet(cache) print(f"[info] loaded cached results frame from {cache}") return df except Exception as e: print(f"[warn] failed to load cache {cache}: {e}") rows: List[dict] = [] exp_dirs = [p for p in root.iterdir() if p.is_dir()] for exp_dir in sorted(exp_dirs): try: cfg = read_config(exp_dir) cfg_json = json.dumps(cfg, sort_keys=True) except Exception as e: print(f"[warn] skipping {exp_dir.name}: {e}") continue network = cfg.get("net_name") latent_dim = int(cfg.get("latent_space_dim")) semi_normals = int(cfg.get("num_known_normal")) semi_anomalous = int(cfg.get("num_known_outlier")) k = int(cfg.get("k_fold_num")) for model in MODELS: for fold in range(k): pkl = exp_dir / f"results_{model}_{fold}.pkl" if not pkl.exists(): continue try: data = read_pickle(pkl) except Exception as e: print(f"[warn] failed to read {pkl.name}: {e}") continue if model == "deepsad": per_eval = rows_from_deepsad(data, EVALS) # eval -> dict elif model == "isoforest": per_eval = rows_from_isoforest(data, EVALS) # eval -> dict elif model == "ocsvm": per_eval = rows_from_ocsvm_default(data, EVALS) # eval -> dict else: per_eval = {} for ev, vals in per_eval.items(): rows.append( { "network": network, "latent_dim": latent_dim, "semi_normals": semi_normals, "semi_anomalous": semi_anomalous, "model": model, "eval": ev, "fold": fold, "auc": vals["auc"], "ap": vals["ap"], "scores": vals["scores"], "roc_curve": vals["roc"], "prc_curve": vals["prc"], "sample_indices": vals.get("sample_indices"), "sample_labels": vals.get("sample_labels"), "valid_mask": vals.get("valid_mask"), "train_time": vals["train_time"], "test_time": vals["test_time"], "folder": str(exp_dir), "k_fold_num": k, "config_json": cfg_json, } ) # If empty, return a typed empty frame if not rows: return pl.DataFrame(schema=SCHEMA_STATIC) df = pl.DataFrame(rows, schema=SCHEMA_STATIC) # Cast to efficient dtypes (categoricals etc.) – no extra sanitation df = df.with_columns( pl.col("network", "model", "eval").cast(pl.Categorical), pl.col( "latent_dim", "semi_normals", "semi_anomalous", "fold", "k_fold_num" ).cast(pl.Int32), pl.col("auc", "ap", "train_time", "test_time").cast(pl.Float64), # NOTE: no cast on 'scores' here; it's already List(Struct) per schema. ) if allow_cache: try: df.write_parquet(cache) print(f"[info] cached results frame to {cache}") except Exception as e: print(f"[warn] failed to write cache {cache}: {e}") return df def load_pretraining_results_dataframe( root: Path, allow_cache: bool = True, include_train: bool = False, # <— default: store only TEST to keep cache tiny keep_file_names: bool = False, # <— drop file_names by default; they’re repeated parquet_compression: str = "zstd", parquet_compression_level: int = 7, # <— stronger compression than default ) -> pl.DataFrame: """ Loads only AE pretraining results: files named `results_ae_.pkl`. Produces one row per (experiment, fold, split). By default we: - include only the TEST split (include_train=False) - store scores as Float32 - drop the repeated file_names mapping to save space - write Parquet with zstd(level=7) """ if allow_cache: cache = root / "pretraining_results_cache.parquet" if cache.exists(): try: df = pl.read_parquet(cache) print(f"[info] loaded cached pretraining frame from {cache}") return df except Exception as e: print(f"[warn] failed to load pretraining cache {cache}: {e}") rows: List[dict] = [] exp_dirs = [p for p in root.iterdir() if p.is_dir()] for exp_dir in sorted(exp_dirs): try: cfg = read_config(exp_dir) cfg_json = json.dumps(cfg, sort_keys=True) except Exception as e: print(f"[warn] skipping {exp_dir.name} (pretraining): {e}") continue network = cfg.get("net_name") latent_dim = int(cfg.get("latent_space_dim")) semi_normals = int(cfg.get("num_known_normal")) semi_anomalous = int(cfg.get("num_known_outlier")) k = int(cfg.get("k_fold_num")) # Only test split by default (include_train=False) splits = ("train", "test") if include_train else ("test",) for fold in range(k): pkl = exp_dir / f"results_ae_{fold}.pkl" if not pkl.exists(): continue try: data = read_pickle(pkl) # expected: {"train": {...}, "test": {...}} except Exception as e: print(f"[warn] failed to read {pkl.name}: {e}") continue for split in splits: splitd = data.get(split) if not isinstance(splitd, dict): continue rows.append( { "network": network, "latent_dim": latent_dim, "semi_normals": semi_normals, "semi_anomalous": semi_anomalous, "model": "ae", "fold": fold, "split": split, "time": float(splitd.get("time")) if splitd.get("time") is not None else None, "loss": float(splitd.get("loss")) if splitd.get("loss") is not None else None, # ints as Int32, scores as Float32 to save space "indices": normalize_int_list(splitd.get("indices")), "labels_exp_based": normalize_int_list( splitd.get("labels_exp_based") ), "labels_manual_based": normalize_int_list( splitd.get("labels_manual_based") ), "semi_targets": normalize_int_list(splitd.get("semi_targets")), "file_ids": normalize_int_list(splitd.get("file_ids")), "frame_ids": normalize_int_list(splitd.get("frame_ids")), "scores": ( None if splitd.get("scores") is None else [ float(x) for x in ( splitd["scores"].tolist() if isinstance(splitd["scores"], np.ndarray) else splitd["scores"] ) ] ), "file_names": normalize_file_names(splitd.get("file_names")) if keep_file_names else None, "folder": str(exp_dir), "k_fold_num": k, "config_json": cfg_json, } ) if not rows: return pl.DataFrame(schema=PRETRAIN_SCHEMA) df = pl.DataFrame(rows, schema=PRETRAIN_SCHEMA) # Cast/optimize a bit (categoricals, ints, floats) df = df.with_columns( pl.col("network", "model", "split").cast(pl.Categorical), pl.col( "latent_dim", "semi_normals", "semi_anomalous", "fold", "k_fold_num" ).cast(pl.Int32), pl.col("time", "loss").cast(pl.Float64), pl.col("scores").cast(pl.List(pl.Float32)), # ensure downcast took ) if allow_cache: try: cache = root / "pretraining_results_cache.parquet" df.write_parquet( cache, compression=parquet_compression, compression_level=parquet_compression_level, statistics=True, ) print( f"[info] cached pretraining frame to {cache} " f"({parquet_compression}, level={parquet_compression_level})" ) except Exception as e: print(f"[warn] failed to write pretraining cache {cache}: {e}") return df def main(): root = Path("/home/fedex/mt/results/copy") df1 = load_results_dataframe(root, allow_cache=True) exit(0) retest_root = Path("/home/fedex/mt/results/copy/retest_nodrop") df2 = load_results_dataframe(retest_root, allow_cache=False).drop("folder") # exact schema & shape first (optional but helpful messages) assert df1.shape == df2.shape, f"Shape differs: {df1.shape} vs {df2.shape}" assert set(df1.columns) == set(df2.columns), ( f"Column sets differ: {df1.columns} vs {df2.columns}" ) # allow small float diffs, ignore column order differences if you want df1_sorted = df1.select(sorted(df1.columns)) df2_sorted = df2.select(sorted(df2.columns)) # Optionally pre-align/sort both frames by a stable key before diffing. summary, leaves = recursive_diff_frames( df1, df2, ignore=["timestamp"], # columns to ignore float_atol=0.1, # absolute tolerance for floats float_rtol=0.0, # relative tolerance for floats max_rows_per_column=20, # limit expansion per column max_leafs_per_row=200, # cap leaves per row ) pl.Config.set_fmt_table_cell_list_len(100) pl.Config.set_tbl_rows(100) print(summary) # which columns differ & how many rows print(leaves) # exact nested paths + scalar diffs # check_exact=False lets us use atol/rtol for floats assert_frame_equal( df1_sorted, df2_sorted, check_exact=False, atol=0.1, # absolute tolerance for floats rtol=0.0, # relative tolerance (set if you want % based) check_dtypes=True, # set False if you only care about values ) print("DataFrames match within tolerance ✅") # df_pre = load_pretraining_results_dataframe(root, allow_cache=True) # print("pretraining:", df_pre.shape, df_pre.head()) if __name__ == "__main__": main()