import json import pickle import shutil from datetime import datetime from pathlib import Path import matplotlib.pyplot as plt import numpy as np # ========================= # User-configurable params # ========================= # Single experiment to plot (stem of the .bag file, e.g. "3_smoke_human_walking_2023-01-23") EXPERIMENT_NAME = "3_smoke_human_walking_2023-01-23" # Directory that contains {EXPERIMENT_NAME}_{method}_scores.npy for methods in {"deepsad","ocsvm","isoforest"} # Adjust this to where you save your per-method scores. methods_scores_path = Path( "/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/test/inference" ) # Root data path containing .bag files used to build the cached stats all_data_path = Path("/home/fedex/mt/data/subter") # Output base directory (timestamped subfolder will be created here, then archived and copied to "latest/") output_path = Path("/home/fedex/mt/plots/results_inference_timeline") # Cache (stats + labels) directory — same as your original script cache_path = output_path # Assumed LiDAR frame resolution to convert counts -> percent (unchanged from original) data_resolution = 32 * 2048 # Frames per second for x-axis time FPS = 10.0 # Whether to try to align score sign so that higher = more degraded. # If manual labels exist for this experiment, alignment uses anomaly window mean vs. outside. ALIGN_SCORE_DIRECTION = True # ========================= # Setup output folders # ========================= datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") latest_folder_path = output_path / "latest" archive_folder_path = output_path / "archive" output_datetime_path = output_path / datetime_folder_name output_path.mkdir(exist_ok=True, parents=True) output_datetime_path.mkdir(exist_ok=True, parents=True) latest_folder_path.mkdir(exist_ok=True, parents=True) archive_folder_path.mkdir(exist_ok=True, parents=True) # ========================= # Discover experiments to reconstruct indices consistent with caches # ========================= normal_experiment_paths, anomaly_experiment_paths = [], [] if not all_data_path.exists(): raise FileNotFoundError(f"all_data_path does not exist: {all_data_path}") for bag_file_path in all_data_path.iterdir(): if bag_file_path.suffix != ".bag": continue if "smoke" in bag_file_path.name: anomaly_experiment_paths.append(bag_file_path) else: normal_experiment_paths.append(bag_file_path) # Sort by filesize to match original ordering used when caches were generated normal_experiment_paths = sorted( normal_experiment_paths, key=lambda p: p.stat().st_size ) anomaly_experiment_paths = sorted( anomaly_experiment_paths, key=lambda p: p.stat().st_size ) # Find the path for the requested experiment exp_path = None exp_is_anomaly = None for p in anomaly_experiment_paths: if p.stem == EXPERIMENT_NAME: exp_path = p exp_is_anomaly = True break if exp_path is None: for p in normal_experiment_paths: if p.stem == EXPERIMENT_NAME: exp_path = p exp_is_anomaly = False break if exp_path is None: raise FileNotFoundError( f"Experiment '{EXPERIMENT_NAME}' not found as a .bag in {all_data_path}" ) # Get the index within the appropriate list if exp_is_anomaly: exp_index = anomaly_experiment_paths.index(exp_path) else: exp_index = normal_experiment_paths.index(exp_path) # ========================= # Load cached statistical data # ========================= missing_points_cache = Path(cache_path / "missing_points.pkl") near_sensor_cache = Path(cache_path / "particles_near_sensor_counts_500.pkl") if not missing_points_cache.exists(): raise FileNotFoundError(f"Missing points cache not found: {missing_points_cache}") if not near_sensor_cache.exists(): raise FileNotFoundError(f"Near-sensor cache not found: {near_sensor_cache}") with open(missing_points_cache, "rb") as f: missing_points_normal, missing_points_anomaly = pickle.load(f) with open(near_sensor_cache, "rb") as f: near_sensor_normal, near_sensor_anomaly = pickle.load(f) if exp_is_anomaly: missing_points_series = np.asarray(missing_points_anomaly[exp_index], dtype=float) near_sensor_series = np.asarray(near_sensor_anomaly[exp_index], dtype=float) else: missing_points_series = np.asarray(missing_points_normal[exp_index], dtype=float) near_sensor_series = np.asarray(near_sensor_normal[exp_index], dtype=float) # Convert counts to percentages of total points missing_points_pct = (missing_points_series / data_resolution) * 100.0 near_sensor_pct = (near_sensor_series / data_resolution) * 100.0 # ========================= # Load manual anomaly frame borders (optional; used for sign alignment + vertical markers) # ========================= manually_labeled_anomaly_frames = {} labels_json_path = cache_path / "manually_labeled_anomaly_frames.json" if labels_json_path.exists(): with open(labels_json_path, "r") as frame_borders_file: manually_labeled_anomaly_frames_json = json.load(frame_borders_file) for file in manually_labeled_anomaly_frames_json.get("files", []): manually_labeled_anomaly_frames[file["filename"]] = ( file.get("semi_target_begin_frame", None), file.get("semi_target_end_frame", None), ) # The JSON uses .npy filenames (as in original script). Create this experiment’s key. exp_npy_filename = exp_path.with_suffix(".npy").name anomaly_window = manually_labeled_anomaly_frames.get(exp_npy_filename, (None, None)) # ========================= # Load method scores and z-score normalize per method # ========================= def zscore_1d(x: np.ndarray, eps=1e-12): x = np.asarray(x, dtype=float) mu = np.mean(x) sigma = np.std(x, ddof=0) if sigma < eps: return np.zeros_like(x) return (x - mu) / sigma def maybe_align_direction(z: np.ndarray, window): """Flip sign so that the anomaly window mean is higher than the outside mean, if labels exist.""" start, end = window if start is None or end is None: return z # no labels → leave as-is start = int(max(0, start)) end = int(min(len(z), end)) if end <= start or end > len(z): return z inside_mean = float(np.mean(z[start:end])) # outside: everything except [start:end]; handle edge cases if start == 0 and end == len(z): return z outside_parts = [] if start > 0: outside_parts.append(z[:start]) if end < len(z): outside_parts.append(z[end:]) if not outside_parts: return z outside_mean = float(np.mean(np.concatenate(outside_parts))) return z if inside_mean >= outside_mean else -z methods = ["deepsad", "ocsvm", "isoforest"] method_scores = {} method_zscores = {} if not methods_scores_path.exists(): raise FileNotFoundError( f"Methods scores path does not exist: {methods_scores_path}" ) for m in methods: file_path = methods_scores_path / f"{EXPERIMENT_NAME}_{m}_scores.npy" if not file_path.exists(): raise FileNotFoundError(f"Missing scores file for method '{m}': {file_path}") s = np.load(file_path) s = np.asarray(s, dtype=float).reshape(-1) # If needed, truncate or pad to match stats length (should match if generated consistently) n = min(len(s), len(missing_points_pct)) if len(s) != len(missing_points_pct): # Align by truncation to the shortest length s = s[:n] # Also truncate stats to match missing_points_pct = missing_points_pct[:n] near_sensor_pct = near_sensor_pct[:n] z = zscore_1d(s) if ALIGN_SCORE_DIRECTION: z = maybe_align_direction(z, anomaly_window) method_scores[m] = s method_zscores[m] = z # Common time axis in seconds num_frames = len(missing_points_pct) t = np.arange(num_frames) / FPS # ========================= # Plot 1: Missing points (%) vs. method z-scores # ========================= fig1, axz1 = plt.subplots(figsize=(14, 6), constrained_layout=True) axy1 = axz1.twinx() # plot z-scores for m in methods: axz1.plot(t, method_zscores[m], label=f"{m} (z)", alpha=0.9) # plot missing points (%) axy1.plot(t, missing_points_pct, linestyle="--", alpha=0.7, label="Missing points (%)") # vertical markers for anomaly window if available start, end = anomaly_window if start is not None and end is not None and 0 <= start < end <= num_frames: axz1.axvline(x=start / FPS, linestyle=":", alpha=0.6) axz1.axvline(x=end / FPS, linestyle=":", alpha=0.6) axz1.set_xlabel("Time (s)") axz1.set_ylabel("Anomaly score (z-score, ↑ = more degraded)") axy1.set_ylabel("Missing points (%)") axz1.set_title(f"{EXPERIMENT_NAME}\nDegradation vs. Missing Points") # Build a combined legend lines1, labels1 = axz1.get_legend_handles_labels() lines2, labels2 = axy1.get_legend_handles_labels() axz1.legend(lines1 + lines2, labels1 + labels2, loc="upper right") axz1.grid(True, alpha=0.3) fig1.savefig( output_datetime_path / f"{EXPERIMENT_NAME}_zscores_vs_missing_points.png", dpi=150 ) plt.close(fig1) # ========================= # Plot 2: Near-sensor (%) vs. method z-scores # ========================= fig2, axz2 = plt.subplots(figsize=(14, 6), constrained_layout=True) axy2 = axz2.twinx() for m in methods: axz2.plot(t, method_zscores[m], label=f"{m} (z)", alpha=0.9) axy2.plot(t, near_sensor_pct, linestyle="--", alpha=0.7, label="Near-sensor <0.5m (%)") start, end = anomaly_window if start is not None and end is not None and 0 <= start < end <= num_frames: axz2.axvline(x=start / FPS, linestyle=":", alpha=0.6) axz2.axvline(x=end / FPS, linestyle=":", alpha=0.6) axz2.set_xlabel("Time (s)") axz2.set_ylabel("Anomaly score (z-score, ↑ = more degraded)") axy2.set_ylabel("Near-sensor points (%)") axz2.set_title(f"{EXPERIMENT_NAME}\nDegradation vs. Near-Sensor Points (<0.5 m)") lines1, labels1 = axz2.get_legend_handles_labels() lines2, labels2 = axy2.get_legend_handles_labels() axz2.legend(lines1 + lines2, labels1 + labels2, loc="upper right") axz2.grid(True, alpha=0.3) fig2.savefig( output_datetime_path / f"{EXPERIMENT_NAME}_zscores_vs_near_sensor.png", dpi=150 ) plt.close(fig2) # ========================= # Preserve latest/, archive/, copy script # ========================= # delete current latest folder shutil.rmtree(latest_folder_path, ignore_errors=True) # create new latest folder latest_folder_path.mkdir(exist_ok=True, parents=True) # copy contents of output folder to the latest folder for file in output_datetime_path.iterdir(): shutil.copy2(file, latest_folder_path) # copy this python script to preserve the code used shutil.copy2(__file__, output_datetime_path) shutil.copy2(__file__, latest_folder_path) # move output date folder to archive shutil.move(output_datetime_path, archive_folder_path) print("Done. Plots saved and archived.")