import json import pickle from pathlib import Path import matplotlib.pyplot as plt import numpy as np from rich.progress import track from scipy.stats import sem, t models = ["deepsad", "isoforest", "ocsvm"] evaluation_types = ["exp_based", "manual_based"] parent_results_path = Path("/home/fedex/mt/results/done") base_output_path = Path("/home/fedex/mt/results/tmp_plots") def confidence_interval(data, confidence=0.95): n = len(data) mean = np.mean(data) std_err = sem(data) h = std_err * t.ppf((1 + confidence) / 2.0, n - 1) return mean, h def load_results_data(folder): experiment_data = {} json_config_path = folder / "config.json" with json_config_path.open("r") as f: config = json.load(f) try: net = config["net_name"] num_known_normal, num_known_anomalous = ( config["num_known_normal"], config["num_known_outlier"], ) semi_known_nums = (num_known_normal, num_known_anomalous) latent_dim = config["latent_space_dim"] exp_title = f"{net} - {num_known_normal} normal, {num_known_anomalous} anomalous, latent dim {latent_dim}" if not config["k_fold"]: raise ValueError(f"{folder.name} was not trained as k-fold. Exiting...") k_fold_num = config["k_fold_num"] except KeyError as e: print(f"Missing key in config.json for experiment folder {folder.name}: {e}") raise experiment_data["exp_title"] = exp_title experiment_data["k_fold_num"] = k_fold_num experiment_data["semi_known_nums"] = semi_known_nums experiment_data["folder"] = folder experiment_data["net"] = net experiment_data["latent_dim"] = latent_dim roc_data = {} roc_auc_data = {} prc_data = {} for model in models: # You can adjust the number of folds if needed for fold_idx in range(k_fold_num): results_file = folder / f"results_{model}_{fold_idx}.pkl" if not results_file.exists(): print( f"Expected results file {results_file.name} does not exist. Skipping..." ) with results_file.open("rb") as f: data = pickle.load(f) try: if model == "deepsad": test_results = data["test"] for evaluation_type in evaluation_types: eval_type_results = test_results[evaluation_type] roc_data.setdefault(model, {}).setdefault( evaluation_type, {} )[fold_idx] = eval_type_results["roc"] roc_auc_data.setdefault(model, {}).setdefault( evaluation_type, {} )[fold_idx] = eval_type_results["auc"] prc_data.setdefault(model, {}).setdefault( evaluation_type, {} )[fold_idx] = eval_type_results["prc"] elif model in ["isoforest", "ocsvm"]: for evaluation_type in evaluation_types: roc_data.setdefault(model, {}).setdefault( evaluation_type, {} )[fold_idx] = data[f"test_roc_{evaluation_type}"] roc_auc_data.setdefault(model, {}).setdefault( evaluation_type, {} )[fold_idx] = data[f"test_auc_{evaluation_type}"] prc_data.setdefault(model, {}).setdefault( evaluation_type, {} )[fold_idx] = data[f"test_prc_{evaluation_type}"] except KeyError as e: print(f"Missing key in results file {results_file.name}: {e}") raise experiment_data["roc_data"] = roc_data experiment_data["roc_auc_data"] = roc_auc_data experiment_data["prc_data"] = prc_data return experiment_data def plot_roc_curve(experiment_data, output_path): try: k_fold_num = experiment_data["k_fold_num"] roc_data = experiment_data["roc_data"] roc_auc_data = experiment_data["roc_auc_data"] folder = experiment_data["folder"] exp_title = experiment_data["exp_title"] except KeyError as e: print(f"Missing key in experiment data: {e}") raise for evaluation_type in evaluation_types: plt.figure(figsize=(8, 6)) for model in models: # Gather all folds' ROC data for this model and evaluation_type fold_rocs = [] auc_scores = [] for fold_idx in range(k_fold_num): try: fpr, tpr, thresholds = roc_data[model][evaluation_type][fold_idx] fold_rocs.append((fpr, tpr)) auc_scores.append(roc_auc_data[model][evaluation_type][fold_idx]) except KeyError: continue if not fold_rocs: print( f"No ROC data for model {model}, evaluation {evaluation_type} in {folder.name}" ) continue # Interpolate TPRs to a common FPR grid mean_fpr = np.linspace(0, 1, 100) interp_tprs = [] for fpr, tpr in fold_rocs: interp_tpr = np.interp(mean_fpr, fpr, tpr) interp_tpr[0] = 0.0 interp_tprs.append(interp_tpr) mean_tpr = np.mean(interp_tprs, axis=0) std_tpr = np.std(interp_tprs, axis=0) mean_tpr[-1] = 1.0 # Mean and CI for AUC mean_auc, auc_ci = confidence_interval(auc_scores) # Plot mean ROC and std band plt.plot( mean_fpr, mean_tpr, label=f"{model} (AUC={mean_auc:.2f}±{auc_ci:.2f})", ) plt.fill_between( mean_fpr, mean_tpr - std_tpr, mean_tpr + std_tpr, alpha=0.15, ) plt.plot([0, 1], [0, 1], "k--", label="Chance") plt.xlabel("False Positive Rate") plt.ylabel("True Positive Rate") plt.title(f"ROC Curve ({exp_title} - {evaluation_type})") plt.legend(loc="lower right") plt.tight_layout() plt.savefig( (output_path / f"roc_curve_{folder.name}_{evaluation_type}.png").as_posix() ) plt.close() def main(): base_output_path.mkdir(exist_ok=True, parents=True) # Find all subfolders (skip files) subfolders = [f for f in parent_results_path.iterdir() if f.is_dir()] print(f"Found {len(subfolders)} subfolders in {parent_results_path}") all_experiments_data = [] for folder in track( subfolders, description="[cyan]Loading data...", total=len(subfolders) ): all_experiments_data.append(load_results_data(folder)) print("Data loading complete. Plotting ROC curves...") roc_curves_output_path = base_output_path / "roc_curves" roc_curves_output_path.mkdir(exist_ok=True, parents=True) for experiment_data in track( all_experiments_data, description="[green]Plotting ROC curves...", total=len(all_experiments_data), ): plot_roc_curve(experiment_data, roc_curves_output_path) if __name__ == "__main__": main()