import logging import pickle import random from pathlib import Path import click import numpy as np import torch from baselines.isoforest import IsoForest from baselines.ocsvm import OCSVM from datasets.main import load_dataset from DeepSAD import DeepSAD from utils.config import Config from utils.visualization.plot_images_grid import plot_images_grid ################################################################################ # Settings ################################################################################ @click.command() @click.argument( "action", type=click.Choice( [ "train", "infer", "ae_elbow_test", "retest", ] ), ) @click.argument( "dataset_name", type=click.Choice( [ "mnist", "elpv", "subter", "subtersplit", "fmnist", "cifar10", "arrhythmia", "cardio", "satellite", "satimage-2", "shuttle", "thyroid", ] ), ) @click.argument( "net_name", type=click.Choice( [ "mnist_LeNet", "elpv_LeNet", "subter_LeNet", "subter_efficient", "subter_LeNet_Split", "fmnist_LeNet", "cifar10_LeNet", "arrhythmia_mlp", "cardio_mlp", "satellite_mlp", "satimage-2_mlp", "shuttle_mlp", "thyroid_mlp", ] ), ) @click.argument("xp_path", type=click.Path(exists=True)) @click.argument("data_path", type=click.Path(exists=True)) @click.option( "--k_fold", type=bool, default=False, help="Use k-fold cross-validation for training (default: False).", ) @click.option( "--k_fold_num", type=int, default=None, help="Number of folds for k-fold cross-validation (default: None).", ) @click.option( "--num_known_normal", type=int, default=0, help="Number of max known normal samples (semi-supervised-setting) (default: 0).", ) @click.option( "--num_known_outlier", type=int, default=0, help="Number of max known outlier samples (semi-supervised-setting) (default: 0).", ) @click.option( "--load_config", type=click.Path(exists=True), default=None, help="Config JSON-file path (default: None).", ) @click.option( "--load_model", type=click.Path(exists=True), default=None, help="Model file path (default: None).", ) @click.option( "--eta", type=float, default=1.0, help="Deep SAD hyperparameter eta (must be 0 < eta).", ) @click.option( "--ratio_known_normal", type=float, default=0.0, help="Ratio of known (labeled) normal training examples.", ) @click.option( "--ratio_known_outlier", type=float, default=0.0, help="Ratio of known (labeled) anomalous training examples.", ) @click.option( "--ratio_pollution", type=float, default=0.0, help="Pollution ratio of unlabeled training data with unknown (unlabeled) anomalies.", ) @click.option( "--device", type=str, default="cuda", help='Computation device to use ("cpu", "cuda", "cuda:2", etc.).', ) @click.option( "--seed", type=int, default=-1, help="Set seed. If -1, use randomization." ) @click.option( "--optimizer_name", type=click.Choice(["adam"]), default="adam", help="Name of the optimizer to use for Deep SAD network training.", ) @click.option( "--lr", type=float, default=0.001, help="Initial learning rate for Deep SAD network training. Default=0.001", ) @click.option( "--train_test_split", type=float, default=0.7, help="Ratio of training data in the train-test split (default: 0.7).", ) @click.option("--n_epochs", type=int, default=50, help="Number of epochs to train.") @click.option( "--lr_milestone", type=int, default=[0], multiple=True, help="Lr scheduler milestones at which lr is multiplied by 0.1. Can be multiple and must be increasing.", ) @click.option( "--batch_size", type=int, default=128, help="Batch size for mini-batch training." ) @click.option( "--weight_decay", type=float, default=1e-6, help="Weight decay (L2 penalty) hyperparameter for Deep SAD objective.", ) @click.option( "--latent_space_dim", type=int, default=128, help="Dimensionality of the latent space for the autoencoder.", ) @click.option( "--pretrain", type=bool, default=True, help="Pretrain neural network parameters via autoencoder.", ) @click.option( "--ae_optimizer_name", type=click.Choice(["adam"]), default="adam", help="Name of the optimizer to use for autoencoder pretraining.", ) @click.option( "--ae_lr", type=float, default=0.001, help="Initial learning rate for autoencoder pretraining. Default=0.001", ) @click.option( "--ae_n_epochs", type=int, default=100, help="Number of epochs to train autoencoder.", ) @click.option( "--ae_lr_milestone", type=int, default=[0], multiple=True, help="Lr scheduler milestones at which lr is multiplied by 0.1. Can be multiple and must be increasing.", ) @click.option( "--ae_batch_size", type=int, default=128, help="Batch size for mini-batch autoencoder training.", ) @click.option( "--ae_weight_decay", type=float, default=1e-6, help="Weight decay (L2 penalty) hyperparameter for autoencoder objective.", ) @click.option( "--num_threads", type=int, default=0, help="Number of threads used for parallelizing CPU operations. 0 means that all resources are used.", ) @click.option( "--n_jobs_dataloader", type=int, default=0, help="Number of workers for data loading. 0 means that the data will be loaded in the main process.", ) @click.option( "--normal_class", type=int, default=0, help="Specify the normal class of the dataset (all other classes are considered anomalous).", ) @click.option( "--known_outlier_class", type=int, default=1, help="Specify the known outlier class of the dataset for semi-supervised anomaly detection.", ) @click.option( "--n_known_outlier_classes", type=int, default=0, help="Number of known outlier classes." "If 0, no anomalies are known." "If 1, outlier class as specified in --known_outlier_class option." "If > 1, the specified number of outlier classes will be sampled at random.", ) @click.option( "--ocsvm_kernel", type=click.Choice(["rbf", "linear", "poly"]), default="rbf", help="Kernel for the OC-SVM", ) @click.option( "--ocsvm_nu", type=float, default=0.1, help="OC-SVM hyperparameter nu (must be 0 < nu <= 1).", ) @click.option( "--isoforest_n_estimators", type=int, default=100, help="Set the number of base estimators in the ensemble (default: 100).", ) @click.option( "--isoforest_max_samples", type=int, default=256, help="Set the number of samples drawn to train each base estimator (default: 256).", ) @click.option( "--isoforest_contamination", type=float, default=0.1, help="Expected fraction of anomalies in the training set. (default: 0.1).", ) @click.option( "--isoforest_n_jobs_model", type=int, default=-1, help="Number of jobs for model training.", ) def main( action, dataset_name, net_name, xp_path, data_path, k_fold, k_fold_num, num_known_normal, num_known_outlier, load_config, load_model, eta, ratio_known_normal, ratio_known_outlier, ratio_pollution, device, seed, optimizer_name, lr, train_test_split, n_epochs, lr_milestone, batch_size, weight_decay, latent_space_dim, pretrain, ae_optimizer_name, ae_lr, ae_n_epochs, ae_lr_milestone, ae_batch_size, ae_weight_decay, num_threads, n_jobs_dataloader, normal_class, known_outlier_class, n_known_outlier_classes, ocsvm_kernel, ocsvm_nu, isoforest_n_estimators, isoforest_max_samples, isoforest_contamination, isoforest_n_jobs_model, ): """ Deep SAD, a method for deep semi-supervised anomaly detection. :arg DATASET_NAME: Name of the dataset to load. :arg NET_NAME: Name of the neural network to use. :arg XP_PATH: Export path for logging the experiment. :arg DATA_PATH: Root path of data. """ # Get configuration cfg = Config(locals().copy()) # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) log_file = xp_path + "/log.txt" file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Print paths logger.info("Log file is %s" % log_file) logger.info("Data path is %s" % data_path) logger.info("Export path is %s" % xp_path) # Print experimental setup logger.info("Dataset: %s" % dataset_name) logger.info("Normal class: %d" % normal_class) logger.info("Ratio of labeled normal train samples: %.2f" % ratio_known_normal) logger.info("Ratio of labeled anomalous samples: %.2f" % ratio_known_outlier) logger.info("Pollution ratio of unlabeled train data: %.2f" % ratio_pollution) if n_known_outlier_classes == 1: logger.info("Known anomaly class: %d" % known_outlier_class) else: logger.info("Number of known anomaly classes: %d" % n_known_outlier_classes) logger.info("Network: %s" % net_name) # If specified, load experiment config from JSON-file if load_config: cfg.load_config(import_json=load_config) logger.info("Loaded configuration from %s." % load_config) # Print model configuration logger.info("Eta-parameter: %.2f" % cfg.settings["eta"]) # Set seed if cfg.settings["seed"] != -1: random.seed(cfg.settings["seed"]) np.random.seed(cfg.settings["seed"]) torch.manual_seed(cfg.settings["seed"]) torch.cuda.manual_seed(cfg.settings["seed"]) torch.backends.cudnn.deterministic = True logger.info("Set seed to %d." % cfg.settings["seed"]) # Default device to 'cpu' if cuda is not available if not torch.cuda.is_available(): device = "cpu" # Set the number of threads used for parallelizing CPU operations if num_threads > 0: torch.set_num_threads(num_threads) logger.info("Computation device: %s" % device) logger.info("Number of threads: %d" % num_threads) logger.info("Number of dataloader workers: %d" % n_jobs_dataloader) if action == "train": # Load data # TODO: pass num of folds dataset = load_dataset( dataset_name, data_path, normal_class, known_outlier_class, n_known_outlier_classes, ratio_known_normal, ratio_known_outlier, ratio_pollution, random_state=np.random.RandomState(cfg.settings["seed"]), k_fold_num=k_fold_num, num_known_normal=num_known_normal, num_known_outlier=num_known_outlier, split=train_test_split, ) # Log random sample of known anomaly classes if more than 1 class if n_known_outlier_classes > 1: logger.info("Known anomaly classes: %s" % (dataset.known_outlier_classes,)) train_passes = range(k_fold_num) if k_fold else [None] train_isoforest = True train_ocsvm = True train_deepsad = True for fold_idx in train_passes: if fold_idx is None: logger.info("Single training without k-fold") else: logger.info(f"Fold {fold_idx + 1}/{k_fold_num}") # Initialize Isolation Forest model if train_isoforest: Isoforest = IsoForest( hybrid=False, n_estimators=isoforest_n_estimators, max_samples=isoforest_max_samples, contamination=isoforest_contamination, n_jobs=isoforest_n_jobs_model, seed=seed, ) # Initialize DeepSAD model and set neural network phi if train_deepsad: deepSAD = DeepSAD(latent_space_dim, cfg.settings["eta"]) deepSAD.set_network(net_name) # If specified, load Deep SAD model (center c, network weights, and possibly autoencoder weights) if train_deepsad and load_model: deepSAD.load_model( model_path=load_model, load_ae=True, map_location=device ) logger.info("Loading model from %s." % load_model) logger.info("Pretraining: %s" % pretrain) if train_deepsad and pretrain: # Log pretraining details logger.info( "Pretraining optimizer: %s" % cfg.settings["ae_optimizer_name"] ) logger.info("Pretraining learning rate: %g" % cfg.settings["ae_lr"]) logger.info("Pretraining epochs: %d" % cfg.settings["ae_n_epochs"]) logger.info( "Pretraining learning rate scheduler milestones: %s" % (cfg.settings["ae_lr_milestone"],) ) logger.info( "Pretraining batch size: %d" % cfg.settings["ae_batch_size"] ) logger.info( "Pretraining weight decay: %g" % cfg.settings["ae_weight_decay"] ) # Pretrain model on dataset (via autoencoder) deepSAD.pretrain( dataset, optimizer_name=cfg.settings["ae_optimizer_name"], lr=cfg.settings["ae_lr"], n_epochs=cfg.settings["ae_n_epochs"], lr_milestones=cfg.settings["ae_lr_milestone"], batch_size=cfg.settings["ae_batch_size"], weight_decay=cfg.settings["ae_weight_decay"], device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, ) # Save pretraining results if fold_idx is None: deepSAD.save_ae_results(export_pkl=xp_path + "/results_ae.pkl") ae_model_path = xp_path + "/model_ae.tar" deepSAD.save_model(export_model=ae_model_path, save_ae=True) else: deepSAD.save_ae_results( export_pkl=xp_path + f"/results_ae_{fold_idx}.pkl" ) ae_model_path = xp_path + f"/model_ae_{fold_idx}.tar" deepSAD.save_model(export_model=ae_model_path, save_ae=True) # Initialize OC-SVM model (after pretraining to use autoencoder features) if train_ocsvm: ocsvm = OCSVM( kernel=ocsvm_kernel, nu=ocsvm_nu, hybrid=True, latent_space_dim=latent_space_dim, ) if load_model and not pretrain: ae_model_path = load_model ocsvm.load_ae( net_name=net_name, model_path=ae_model_path, device=device ) logger.info( f"Loaded pretrained autoencoder for features from {ae_model_path}." ) # Log training details logger.info("Training optimizer: %s" % cfg.settings["optimizer_name"]) logger.info("Training learning rate: %g" % cfg.settings["lr"]) logger.info("Training epochs: %d" % cfg.settings["n_epochs"]) logger.info( "Training learning rate scheduler milestones: %s" % (cfg.settings["lr_milestone"],) ) logger.info("Training batch size: %d" % cfg.settings["batch_size"]) logger.info("Training weight decay: %g" % cfg.settings["weight_decay"]) # Train model on dataset if train_deepsad: deepSAD.train( dataset, optimizer_name=cfg.settings["optimizer_name"], lr=cfg.settings["lr"], n_epochs=cfg.settings["n_epochs"], lr_milestones=cfg.settings["lr_milestone"], batch_size=cfg.settings["batch_size"], weight_decay=cfg.settings["weight_decay"], device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, ) # Train model on dataset if train_ocsvm: ocsvm.train( dataset, device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, batch_size=256, ) # Train model on dataset if train_isoforest: Isoforest.train( dataset, device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, ) # Test model if train_deepsad: deepSAD.test( dataset, device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, ) # Test model if train_ocsvm: ocsvm.test( dataset, device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, batch_size=256, ) # Test model if train_isoforest: Isoforest.test( dataset, device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, ) # Save results, model, and configuration if fold_idx is None: if train_deepsad: deepSAD.save_results(export_pkl=xp_path + "/results_deepsad.pkl") deepSAD.save_model(export_model=xp_path + "/model_deepsad.tar") if train_ocsvm: ocsvm.save_results(export_pkl=xp_path + "/results_ocsvm.pkl") ocsvm.save_model(export_path=xp_path + "/model_ocsvm.pkl") if train_isoforest: Isoforest.save_results( export_pkl=xp_path + "/results_isoforest.pkl" ) Isoforest.save_model(export_path=xp_path + "/model_isoforest.pkl") else: if train_deepsad: deepSAD.save_results( export_pkl=xp_path + f"/results_deepsad_{fold_idx}.pkl" ) deepSAD.save_model( export_model=xp_path + f"/model_deepsad_{fold_idx}.tar" ) if train_ocsvm: ocsvm.save_results( export_pkl=xp_path + f"/results_ocsvm_{fold_idx}.pkl" ) ocsvm.save_model( export_path=xp_path + f"/model_ocsvm_{fold_idx}.pkl" ) if train_isoforest: Isoforest.save_results( export_pkl=xp_path + f"/results_isoforest_{fold_idx}.pkl" ) Isoforest.save_model( export_path=xp_path + f"/model_isoforest_{fold_idx}.pkl" ) cfg.save_config(export_json=xp_path + "/config.json") elif action == "infer": # Inference uses a deterministic, non-shuffled loader to preserve temporal order dataset = load_dataset( cfg.settings["dataset_name"], data_path, cfg.settings["normal_class"], cfg.settings["known_outlier_class"], cfg.settings["n_known_outlier_classes"], cfg.settings["ratio_known_normal"], cfg.settings["ratio_known_outlier"], cfg.settings["ratio_pollution"], random_state=np.random.RandomState(cfg.settings["seed"]), k_fold_num=False, inference=True, ) # Log random sample of known anomaly classes if more than 1 class if n_known_outlier_classes > 1: logger.info("Known anomaly classes: %s" % (dataset.known_outlier_classes,)) # --- Expect a model DIRECTORY (aligned with 'retest') --- if ( (not load_model) or (not Path(load_model).exists()) or (not Path(load_model).is_dir()) ): logger.error( "For inference mode a model directory has to be loaded! " "Pass the --load_model option with the model directory path!" ) return load_model = Path(load_model) # Resolve expected model artifacts (single-model / no k-fold suffixes) deepsad_model_path = load_model / "model_deepsad.tar" ae_model_path = load_model / "model_ae.tar" ocsvm_model_path = load_model / "model_ocsvm.pkl" isoforest_model_path = load_model / "model_isoforest.pkl" # Sanity check model files exist model_paths = [ deepsad_model_path, ae_model_path, ocsvm_model_path, isoforest_model_path, ] missing = [p.name for p in model_paths if not p.exists() or not p.is_file()] if missing: logger.error( "The following model files do not exist in the provided model directory: " + ", ".join(missing) ) return # Prepare output paths inf_dir = Path(xp_path) / "inference" inf_dir.mkdir(parents=True, exist_ok=True) base_stem = Path(Path(dataset.root).stem) # keep your previous naming # DeepSAD outputs (keep legacy filenames for backward compatibility) deepsad_scores_path = inf_dir / Path( base_stem.stem + "_deepsad_scores" ).with_suffix(".npy") deepsad_outputs_path = inf_dir / Path(base_stem.stem + "_outputs").with_suffix( ".npy" ) # Baselines ocsvm_scores_path = inf_dir / Path( base_stem.stem + "_ocsvm_scores" ).with_suffix(".npy") isoforest_scores_path = inf_dir / Path( base_stem.stem + "_isoforest_scores" ).with_suffix(".npy") # Common loader settings _n_jobs = ( n_jobs_dataloader if "n_jobs_dataloader" in locals() else cfg.settings.get("n_jobs_dataloader", 0) ) # ----------------- DeepSAD ----------------- deepSAD = DeepSAD(cfg.settings["latent_space_dim"], cfg.settings["eta"]) deepSAD.set_network(cfg.settings["net_name"]) deepSAD.load_model( model_path=deepsad_model_path, load_ae=True, map_location=device ) logger.info("Loaded DeepSAD model from %s.", deepsad_model_path) deepsad_scores, deepsad_all_outputs = deepSAD.inference( dataset, device=device, n_jobs_dataloader=_n_jobs ) np.save(deepsad_scores_path, deepsad_scores) # np.save(deepsad_outputs_path, deepsad_all_outputs) logger.info( "DeepSAD inference: median=%.6f mean=%.6f min=%.6f max=%.6f", float(np.median(deepsad_scores)), float(np.mean(deepsad_scores)), float(np.min(deepsad_scores)), float(np.max(deepsad_scores)), ) # ----------------- OCSVM (hybrid) ----------------- ocsvm_scores = None ocsvm = OCSVM( kernel=cfg.settings["ocsvm_kernel"], nu=cfg.settings["ocsvm_nu"], hybrid=True, latent_space_dim=cfg.settings["latent_space_dim"], ) # load AE to build the feature extractor for hybrid OCSVM ocsvm.load_ae( net_name=cfg.settings["net_name"], model_path=ae_model_path, device=device, ) ocsvm.load_model(import_path=ocsvm_model_path) ocsvm_scores = ocsvm.inference( dataset, device=device, n_jobs_dataloader=_n_jobs, batch_size=32 ) if ocsvm_scores is not None: np.save(ocsvm_scores_path, ocsvm_scores) logger.info( "OCSVM inference: median=%.6f mean=%.6f min=%.6f max=%.6f", float(np.median(ocsvm_scores)), float(np.mean(ocsvm_scores)), float(np.min(ocsvm_scores)), float(np.max(ocsvm_scores)), ) else: logger.warning("OCSVM scores could not be determined; no array saved.") # ----------------- Isolation Forest ----------------- isoforest_scores = None Isoforest = IsoForest( hybrid=False, n_estimators=cfg.settings["isoforest_n_estimators"], max_samples=cfg.settings["isoforest_max_samples"], contamination=cfg.settings["isoforest_contamination"], n_jobs=cfg.settings["isoforest_n_jobs_model"], seed=cfg.settings["seed"], ) Isoforest.load_model(import_path=isoforest_model_path, device=device) isoforest_scores = Isoforest.inference( dataset, device=device, n_jobs_dataloader=_n_jobs ) if isoforest_scores is not None: np.save(isoforest_scores_path, isoforest_scores) logger.info( "IsolationForest inference: median=%.6f mean=%.6f min=%.6f max=%.6f", float(np.median(isoforest_scores)), float(np.mean(isoforest_scores)), float(np.min(isoforest_scores)), float(np.max(isoforest_scores)), ) else: logger.warning( "Isolation Forest scores could not be determined; no array saved." ) # Final summary (DeepSAD always runs; baselines are best-effort) logger.info( "Inference complete. Saved arrays to %s:\n" " DeepSAD scores: %s\n" " DeepSAD outputs: %s\n" " OCSVM scores: %s\n" " IsoForest scores: %s", inf_dir, deepsad_scores_path.name, deepsad_outputs_path.name, ocsvm_scores_path.name if ocsvm_scores is not None else "(not saved)", isoforest_scores_path.name if isoforest_scores is not None else "(not saved)", ) elif action == "ae_elbow_test": # Load data once dataset = load_dataset( dataset_name, data_path, normal_class, known_outlier_class, n_known_outlier_classes, ratio_known_normal, ratio_known_outlier, ratio_pollution, random_state=np.random.RandomState(cfg.settings["seed"]), k_fold_num=k_fold_num, split=train_test_split, ) # Set up k-fold passes train_passes = range(k_fold_num) if k_fold else [None] # Test dimensions ae_elbow_dims = [32, 64, 128, 256, 384, 512, 768, 1024] # Test each dimension for rep_dim in ae_elbow_dims: logger.info(f"Testing autoencoder with latent dimension: {rep_dim}") # Results dictionary for this dimension dim_results = { "dimension": rep_dim, "ae_results": {}, "k_fold": k_fold, "k_fold_num": k_fold_num, } # For each fold for fold_idx in train_passes: if fold_idx is None: logger.info(f"Dimension {rep_dim}: Single training without k-fold") else: logger.info( f"Dimension {rep_dim}: Fold {fold_idx + 1}/{k_fold_num}" ) # Initialize DeepSAD model with current dimension deepSAD = DeepSAD(rep_dim, cfg.settings["eta"]) deepSAD.set_network(net_name) # Pretrain autoencoder with current dimension deepSAD.pretrain( dataset, optimizer_name=cfg.settings["ae_optimizer_name"], lr=cfg.settings["ae_lr"], n_epochs=cfg.settings["ae_n_epochs"], lr_milestones=cfg.settings["ae_lr_milestone"], batch_size=cfg.settings["ae_batch_size"], weight_decay=cfg.settings["ae_weight_decay"], device=device, n_jobs_dataloader=n_jobs_dataloader, k_fold_idx=fold_idx, ) # Store results for this fold fold_key = "single" if fold_idx is None else f"fold_{fold_idx}" dim_results["ae_results"][fold_key] = deepSAD.ae_results logger.info( f"Finished testing dimension {rep_dim} " + ( f"fold {fold_idx + 1}/{k_fold_num}" if fold_idx is not None else "single pass" ) ) # Clear some memory del deepSAD torch.cuda.empty_cache() # Save results for this dimension (includes all folds) results_filename = ( f"ae_elbow_results_{net_name}_dim_{rep_dim}" + ("_kfold" if k_fold else "") + ".pkl" ) results_path = Path(xp_path) / results_filename with open(results_path, "wb") as f: pickle.dump(dim_results, f) logger.info( f"Saved elbow test results for dimension {rep_dim} to {results_path}" ) else: logger.error(f"Unknown action: {action}") elif action == "retest": if ( not load_model or not Path(load_model).exists() or not Path(load_model).is_dir() ): logger.error( "For retest mode a model directory has to be loaded! Pass the --load_model option with the model directory path!" ) return load_model = Path(load_model) if not load_config: logger.error( "For retest mode a config has to be loaded! Pass the --load_config option with the config path!" ) return dataset = load_dataset( cfg.settings["dataset_name"], data_path, cfg.settings["normal_class"], cfg.settings["known_outlier_class"], cfg.settings["n_known_outlier_classes"], cfg.settings["ratio_known_normal"], cfg.settings["ratio_known_outlier"], cfg.settings["ratio_pollution"], random_state=np.random.RandomState(cfg.settings["seed"]), k_fold_num=cfg.settings["k_fold_num"], num_known_normal=cfg.settings["num_known_normal"], num_known_outlier=cfg.settings["num_known_outlier"], split=train_test_split, ) train_passes = ( range(cfg.settings["k_fold_num"]) if cfg.settings["k_fold"] else [None] ) retest_autoencoder = False retest_isoforest = True retest_ocsvm = True retest_deepsad = True for fold_idx in train_passes: if fold_idx is None: logger.info("Single train re-testing without k-fold") deepsad_model_path = load_model / "model_deepsad.tar" isoforest_model_path = load_model / "model_ocsvm.pkl" ocsvm_model_path = load_model / "model_isoforest.pkl" ae_model_path = load_model / "model_ae.tar" else: logger.info(f"Fold {fold_idx + 1}/{cfg.settings['k_fold_num']}") deepsad_model_path = load_model / f"model_deepsad_{fold_idx}.tar" isoforest_model_path = load_model / f"model_isoforest_{fold_idx}.pkl" ocsvm_model_path = load_model / f"model_ocsvm_{fold_idx}.pkl" ae_model_path = load_model / f"model_ae_{fold_idx}.tar" # Check which model files exist and which do not model_paths = [ deepsad_model_path, isoforest_model_path, ocsvm_model_path, ae_model_path, ] missing_models = [ path.name for path in model_paths if not path.exists() or not path.is_file() ] if missing_models: logger.error( f"The following model files do not exist: {', '.join(missing_models)}. Please check the load_model path." ) return # Initialize Isolation Forest model if retest_isoforest: Isoforest = IsoForest( hybrid=False, n_estimators=cfg.settings["isoforest_n_estimators"], max_samples=cfg.settings["isoforest_max_samples"], contamination=cfg.settings["isoforest_contamination"], n_jobs=cfg.settings["isoforest_n_jobs_model"], seed=cfg.settings["seed"], ) Isoforest.load_model(import_path=isoforest_model_path, device=device) Isoforest.test( dataset, device=device, n_jobs_dataloader=cfg.settings["n_jobs_dataloader"], k_fold_idx=fold_idx, ) if retest_autoencoder: # Initialize DeepSAD model and set neural network phi deepSAD = DeepSAD(cfg.settings["latent_space_dim"], cfg.settings["eta"]) deepSAD.set_network(cfg.settings["net_name"]) deepSAD.load_model( model_path=ae_model_path, load_ae=True, map_location=device ) logger.info("Loading model from %s." % load_model) # Save pretraining results if fold_idx is None: deepSAD.save_ae_results( export_pkl=load_model / "results_ae_retest.pkl" ) else: deepSAD.save_ae_results( export_pkl=load_model / f"results_ae_retest_{fold_idx}.pkl" ) del deepSAD # Initialize DeepSAD model and set neural network phi if retest_deepsad: deepSAD = DeepSAD(cfg.settings["latent_space_dim"], cfg.settings["eta"]) deepSAD.set_network(cfg.settings["net_name"]) deepSAD.load_model( model_path=deepsad_model_path, load_ae=True, map_location=device ) logger.info("Loading model from %s." % load_model) deepSAD.test( dataset, device=device, n_jobs_dataloader=cfg.settings["n_jobs_dataloader"], k_fold_idx=fold_idx, ) if retest_ocsvm: ocsvm = OCSVM( kernel=cfg.settings["ocsvm_kernel"], nu=cfg.settings["ocsvm_nu"], hybrid=True, latent_space_dim=cfg.settings["latent_space_dim"], ) ocsvm.load_ae( net_name=cfg.settings["net_name"], model_path=ae_model_path, device=device, ) ocsvm.load_model(import_path=ocsvm_model_path) ocsvm.test( dataset, device=device, n_jobs_dataloader=cfg.settings["n_jobs_dataloader"], k_fold_idx=fold_idx, batch_size=32, ) retest_output_path = load_model / "retest_output" retest_output_path.mkdir(parents=True, exist_ok=True) # Save results, model, and configuration if fold_idx is None: if retest_deepsad: deepSAD.save_results( export_pkl=retest_output_path / "results_deepsad.pkl" ) if retest_ocsvm: ocsvm.save_results( export_pkl=retest_output_path / "results_ocsvm.pkl" ) if retest_isoforest: Isoforest.save_results( export_pkl=retest_output_path / "results_isoforest.pkl" ) else: if retest_deepsad: deepSAD.save_results( export_pkl=retest_output_path / f"results_deepsad_{fold_idx}.pkl" ) if retest_ocsvm: ocsvm.save_results( export_pkl=retest_output_path / f"results_ocsvm_{fold_idx}.pkl" ) if retest_isoforest: Isoforest.save_results( export_pkl=retest_output_path / f"results_isoforest_{fold_idx}.pkl" ) if __name__ == "__main__": main()