import json import pickle import shutil from datetime import datetime from pathlib import Path import matplotlib.pyplot as plt import numpy as np # ========================= # User-configurable params # ========================= # Single experiment to plot (stem of the .bag file, e.g. "3_smoke_human_walking_2023-01-23") EXPERIMENT_NAME = "3_smoke_human_walking_2023-01-23" # Directory that contains {EXPERIMENT_NAME}_{method}_scores.npy for methods in {"deepsad","ocsvm","isoforest"} methods_scores_path = Path( "/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/test/inference" ) # Root data path containing .bag files used to build the cached stats all_data_path = Path("/home/fedex/mt/data/subter") # Output base directory (timestamped subfolder will be created here, then archived and copied to "latest/") output_path = Path("/home/fedex/mt/plots/results_inference_timeline_smoothed") # Cache (stats + labels) directory — same as your original script cache_path = output_path # Assumed LiDAR frame resolution to convert counts -> percent (unchanged from original) data_resolution = 32 * 2048 # Frames per second for x-axis time FPS = 10.0 # Whether to try to align score sign so that higher = more degraded. ALIGN_SCORE_DIRECTION = True # ========================= # Smoothing configuration # ========================= # Options: "none", "moving_average", "gaussian", "ema" SMOOTHING_METHOD = "ema" # Moving average window size (in frames). Use odd number for symmetry; <=1 disables. MA_WINDOW = 11 # Gaussian sigma (in frames). ~2-3 frames is mild smoothing. GAUSSIAN_SIGMA = 2.0 # Exponential moving average factor in (0,1]; smaller = smoother. ~0.2 is a good start. EMA_ALPHA = 0.1 # ========================= # Setup output folders # ========================= datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") latest_folder_path = output_path / "latest" archive_folder_path = output_path / "archive" output_datetime_path = output_path / datetime_folder_name output_path.mkdir(exist_ok=True, parents=True) output_datetime_path.mkdir(exist_ok=True, parents=True) latest_folder_path.mkdir(exist_ok=True, parents=True) archive_folder_path.mkdir(exist_ok=True, parents=True) # ========================= # Discover experiments # ========================= normal_experiment_paths, anomaly_experiment_paths = [], [] for bag_file_path in all_data_path.iterdir(): if bag_file_path.suffix != ".bag": continue if "smoke" in bag_file_path.name: anomaly_experiment_paths.append(bag_file_path) else: normal_experiment_paths.append(bag_file_path) normal_experiment_paths = sorted( normal_experiment_paths, key=lambda p: p.stat().st_size ) anomaly_experiment_paths = sorted( anomaly_experiment_paths, key=lambda p: p.stat().st_size ) # Find experiment exp_path, exp_is_anomaly = None, None for p in anomaly_experiment_paths: if p.stem == EXPERIMENT_NAME: exp_path, exp_is_anomaly = p, True break if exp_path is None: for p in normal_experiment_paths: if p.stem == EXPERIMENT_NAME: exp_path, exp_is_anomaly = p, False break if exp_path is None: raise FileNotFoundError(f"Experiment '{EXPERIMENT_NAME}' not found") exp_index = ( anomaly_experiment_paths.index(exp_path) if exp_is_anomaly else normal_experiment_paths.index(exp_path) ) # ========================= # Load cached statistical data # ========================= with open(cache_path / "missing_points.pkl", "rb") as f: missing_points_normal, missing_points_anomaly = pickle.load(f) with open(cache_path / "particles_near_sensor_counts_500.pkl", "rb") as f: near_sensor_normal, near_sensor_anomaly = pickle.load(f) if exp_is_anomaly: missing_points_series = np.asarray(missing_points_anomaly[exp_index], dtype=float) near_sensor_series = np.asarray(near_sensor_anomaly[exp_index], dtype=float) else: missing_points_series = np.asarray(missing_points_normal[exp_index], dtype=float) near_sensor_series = np.asarray(near_sensor_normal[exp_index], dtype=float) missing_points_pct = (missing_points_series / data_resolution) * 100.0 near_sensor_pct = (near_sensor_series / data_resolution) * 100.0 # ========================= # Load manual anomaly frame borders # ========================= manually_labeled_anomaly_frames = {} labels_json_path = cache_path / "manually_labeled_anomaly_frames.json" if labels_json_path.exists(): with open(labels_json_path, "r") as f: labeled_json = json.load(f) for file in labeled_json.get("files", []): manually_labeled_anomaly_frames[file["filename"]] = ( file.get("semi_target_begin_frame"), file.get("semi_target_end_frame"), ) exp_npy_filename = exp_path.with_suffix(".npy").name anomaly_window = manually_labeled_anomaly_frames.get(exp_npy_filename, (None, None)) # ========================= # Load method scores and normalize # ========================= def zscore_1d(x, eps=1e-12): mu, sigma = np.mean(x), np.std(x, ddof=0) return np.zeros_like(x) if sigma < eps else (x - mu) / sigma def maybe_align_direction(z, window): start, end = window if start is None or end is None: return z inside_mean = np.mean(z[start:end]) if end > start else 0 outside = np.concatenate([z[:start], z[end:]]) if start > 0 or end < len(z) else [] outside_mean = np.mean(outside) if len(outside) else inside_mean return z if inside_mean >= outside_mean else -z methods = ["deepsad", "ocsvm", "isoforest"] method_zscores = {} for m in methods: s = np.load(methods_scores_path / f"{EXPERIMENT_NAME}_{m}_scores.npy") s = np.asarray(s, dtype=float).ravel() n = min(len(s), len(missing_points_pct)) s, missing_points_pct, near_sensor_pct = ( s[:n], missing_points_pct[:n], near_sensor_pct[:n], ) z = zscore_1d(s) if ALIGN_SCORE_DIRECTION: z = maybe_align_direction(z, anomaly_window) method_zscores[m] = z # ========================= # Smoothing # ========================= def moving_average(x, window): if window <= 1: return x if window % 2 == 0: window += 1 return np.convolve(x, np.ones(window) / window, mode="same") def gaussian_smooth(x, sigma): from scipy.ndimage import gaussian_filter1d return gaussian_filter1d(x, sigma=sigma, mode="nearest") if sigma > 0 else x def ema(x, alpha): y = np.empty_like(x) y[0] = x[0] for i in range(1, len(x)): y[i] = alpha * x[i] + (1 - alpha) * y[i - 1] return y def apply_smoothing(x): m = SMOOTHING_METHOD.lower() if m == "none": return x if m == "moving_average": return moving_average(x, MA_WINDOW) if m == "gaussian": return gaussian_smooth(x, GAUSSIAN_SIGMA) if m == "ema": return ema(x, EMA_ALPHA) raise ValueError(f"Unknown SMOOTHING_METHOD: {SMOOTHING_METHOD}") smoothed_z = {k: apply_smoothing(v) for k, v in method_zscores.items()} smoothed_missing = apply_smoothing(missing_points_pct) smoothed_near = apply_smoothing(near_sensor_pct) # ========================= # Plot # ========================= t = np.arange(len(missing_points_pct)) / FPS def plot_series(y2, ylabel, fname, title_suffix): fig, axz = plt.subplots(figsize=(14, 6), constrained_layout=True) axy = axz.twinx() for m in methods: axz.plot(t, smoothed_z[m], label=f"{m} (z)") axy.plot(t, y2, linestyle="--", label=ylabel) start, end = anomaly_window if start and end: axz.axvline(start / FPS, linestyle=":", alpha=0.6) axz.axvline(end / FPS, linestyle=":", alpha=0.6) axz.set_xlabel("Time (s)") axz.set_ylabel("Anomaly score (z)") axy.set_ylabel(ylabel) axz.set_title(f"{EXPERIMENT_NAME}\n{title_suffix}\nSmoothing: {SMOOTHING_METHOD}") lines1, labels1 = axz.get_legend_handles_labels() lines2, labels2 = axy.get_legend_handles_labels() axz.legend(lines1 + lines2, labels1 + labels2, loc="upper right") axz.grid(True, alpha=0.3) fig.savefig(output_datetime_path / fname, dpi=150) plt.close(fig) plot_series( smoothed_missing, "Missing points (%)", f"{EXPERIMENT_NAME}_zscores_vs_missing.png", "Degradation vs Missing Points", ) plot_series( smoothed_near, "Near-sensor points (%)", f"{EXPERIMENT_NAME}_zscores_vs_near.png", "Degradation vs Near-Sensor Points (<0.5m)", ) # ========================= # Save & archive # ========================= shutil.rmtree(latest_folder_path, ignore_errors=True) latest_folder_path.mkdir(exist_ok=True, parents=True) for f in output_datetime_path.iterdir(): shutil.copy2(f, latest_folder_path) shutil.copy2(__file__, output_datetime_path) shutil.copy2(__file__, latest_folder_path) shutil.move(output_datetime_path, archive_folder_path) print("Done. Plots saved and archived.")