Files
mt/tools/plot_scripts/results_inference_timeline.py

305 lines
11 KiB
Python
Raw Permalink Normal View History

2025-09-15 11:21:30 +02:00
import json
import pickle
import shutil
from datetime import datetime
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
# =========================
# User-configurable params
# =========================
# Single experiment to plot (stem of the .bag file, e.g. "3_smoke_human_walking_2023-01-23")
EXPERIMENT_NAME = "3_smoke_human_walking_2023-01-23"
# Directory that contains {EXPERIMENT_NAME}_{method}_scores.npy for methods in {"deepsad","ocsvm","isoforest"}
# Adjust this to where you save your per-method scores.
methods_scores_path = Path(
"/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/test/inference"
)
# Root data path containing .bag files used to build the cached stats
all_data_path = Path("/home/fedex/mt/data/subter")
# Output base directory (timestamped subfolder will be created here, then archived and copied to "latest/")
output_path = Path("/home/fedex/mt/plots/results_inference_timeline")
# Cache (stats + labels) directory — same as your original script
cache_path = output_path
# Assumed LiDAR frame resolution to convert counts -> percent (unchanged from original)
data_resolution = 32 * 2048
# Frames per second for x-axis time
FPS = 10.0
# Whether to try to align score sign so that higher = more degraded.
# If manual labels exist for this experiment, alignment uses anomaly window mean vs. outside.
ALIGN_SCORE_DIRECTION = True
# =========================
# Setup output folders
# =========================
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
latest_folder_path = output_path / "latest"
archive_folder_path = output_path / "archive"
output_datetime_path = output_path / datetime_folder_name
output_path.mkdir(exist_ok=True, parents=True)
output_datetime_path.mkdir(exist_ok=True, parents=True)
latest_folder_path.mkdir(exist_ok=True, parents=True)
archive_folder_path.mkdir(exist_ok=True, parents=True)
# =========================
# Discover experiments to reconstruct indices consistent with caches
# =========================
normal_experiment_paths, anomaly_experiment_paths = [], []
if not all_data_path.exists():
raise FileNotFoundError(f"all_data_path does not exist: {all_data_path}")
for bag_file_path in all_data_path.iterdir():
if bag_file_path.suffix != ".bag":
continue
if "smoke" in bag_file_path.name:
anomaly_experiment_paths.append(bag_file_path)
else:
normal_experiment_paths.append(bag_file_path)
# Sort by filesize to match original ordering used when caches were generated
normal_experiment_paths = sorted(
normal_experiment_paths, key=lambda p: p.stat().st_size
)
anomaly_experiment_paths = sorted(
anomaly_experiment_paths, key=lambda p: p.stat().st_size
)
# Find the path for the requested experiment
exp_path = None
exp_is_anomaly = None
for p in anomaly_experiment_paths:
if p.stem == EXPERIMENT_NAME:
exp_path = p
exp_is_anomaly = True
break
if exp_path is None:
for p in normal_experiment_paths:
if p.stem == EXPERIMENT_NAME:
exp_path = p
exp_is_anomaly = False
break
if exp_path is None:
raise FileNotFoundError(
f"Experiment '{EXPERIMENT_NAME}' not found as a .bag in {all_data_path}"
)
# Get the index within the appropriate list
if exp_is_anomaly:
exp_index = anomaly_experiment_paths.index(exp_path)
else:
exp_index = normal_experiment_paths.index(exp_path)
# =========================
# Load cached statistical data
# =========================
missing_points_cache = Path(cache_path / "missing_points.pkl")
near_sensor_cache = Path(cache_path / "particles_near_sensor_counts_500.pkl")
if not missing_points_cache.exists():
raise FileNotFoundError(f"Missing points cache not found: {missing_points_cache}")
if not near_sensor_cache.exists():
raise FileNotFoundError(f"Near-sensor cache not found: {near_sensor_cache}")
with open(missing_points_cache, "rb") as f:
missing_points_normal, missing_points_anomaly = pickle.load(f)
with open(near_sensor_cache, "rb") as f:
near_sensor_normal, near_sensor_anomaly = pickle.load(f)
if exp_is_anomaly:
missing_points_series = np.asarray(missing_points_anomaly[exp_index], dtype=float)
near_sensor_series = np.asarray(near_sensor_anomaly[exp_index], dtype=float)
else:
missing_points_series = np.asarray(missing_points_normal[exp_index], dtype=float)
near_sensor_series = np.asarray(near_sensor_normal[exp_index], dtype=float)
# Convert counts to percentages of total points
missing_points_pct = (missing_points_series / data_resolution) * 100.0
near_sensor_pct = (near_sensor_series / data_resolution) * 100.0
# =========================
# Load manual anomaly frame borders (optional; used for sign alignment + vertical markers)
# =========================
manually_labeled_anomaly_frames = {}
labels_json_path = cache_path / "manually_labeled_anomaly_frames.json"
if labels_json_path.exists():
with open(labels_json_path, "r") as frame_borders_file:
manually_labeled_anomaly_frames_json = json.load(frame_borders_file)
for file in manually_labeled_anomaly_frames_json.get("files", []):
manually_labeled_anomaly_frames[file["filename"]] = (
file.get("semi_target_begin_frame", None),
file.get("semi_target_end_frame", None),
)
# The JSON uses .npy filenames (as in original script). Create this experiments key.
exp_npy_filename = exp_path.with_suffix(".npy").name
anomaly_window = manually_labeled_anomaly_frames.get(exp_npy_filename, (None, None))
# =========================
# Load method scores and z-score normalize per method
# =========================
def zscore_1d(x: np.ndarray, eps=1e-12):
x = np.asarray(x, dtype=float)
mu = np.mean(x)
sigma = np.std(x, ddof=0)
if sigma < eps:
return np.zeros_like(x)
return (x - mu) / sigma
def maybe_align_direction(z: np.ndarray, window):
"""Flip sign so that the anomaly window mean is higher than the outside mean, if labels exist."""
start, end = window
if start is None or end is None:
return z # no labels → leave as-is
start = int(max(0, start))
end = int(min(len(z), end))
if end <= start or end > len(z):
return z
inside_mean = float(np.mean(z[start:end]))
# outside: everything except [start:end]; handle edge cases
if start == 0 and end == len(z):
return z
outside_parts = []
if start > 0:
outside_parts.append(z[:start])
if end < len(z):
outside_parts.append(z[end:])
if not outside_parts:
return z
outside_mean = float(np.mean(np.concatenate(outside_parts)))
return z if inside_mean >= outside_mean else -z
methods = ["deepsad", "ocsvm", "isoforest"]
method_scores = {}
method_zscores = {}
if not methods_scores_path.exists():
raise FileNotFoundError(
f"Methods scores path does not exist: {methods_scores_path}"
)
for m in methods:
file_path = methods_scores_path / f"{EXPERIMENT_NAME}_{m}_scores.npy"
if not file_path.exists():
raise FileNotFoundError(f"Missing scores file for method '{m}': {file_path}")
s = np.load(file_path)
s = np.asarray(s, dtype=float).reshape(-1)
# If needed, truncate or pad to match stats length (should match if generated consistently)
n = min(len(s), len(missing_points_pct))
if len(s) != len(missing_points_pct):
# Align by truncation to the shortest length
s = s[:n]
# Also truncate stats to match
missing_points_pct = missing_points_pct[:n]
near_sensor_pct = near_sensor_pct[:n]
z = zscore_1d(s)
if ALIGN_SCORE_DIRECTION:
z = maybe_align_direction(z, anomaly_window)
method_scores[m] = s
method_zscores[m] = z
# Common time axis in seconds
num_frames = len(missing_points_pct)
t = np.arange(num_frames) / FPS
# =========================
# Plot 1: Missing points (%) vs. method z-scores
# =========================
fig1, axz1 = plt.subplots(figsize=(14, 6), constrained_layout=True)
axy1 = axz1.twinx()
# plot z-scores
for m in methods:
axz1.plot(t, method_zscores[m], label=f"{m} (z)", alpha=0.9)
# plot missing points (%)
axy1.plot(t, missing_points_pct, linestyle="--", alpha=0.7, label="Missing points (%)")
# vertical markers for anomaly window if available
start, end = anomaly_window
if start is not None and end is not None and 0 <= start < end <= num_frames:
axz1.axvline(x=start / FPS, linestyle=":", alpha=0.6)
axz1.axvline(x=end / FPS, linestyle=":", alpha=0.6)
axz1.set_xlabel("Time (s)")
axz1.set_ylabel("Anomaly score (z-score, ↑ = more degraded)")
axy1.set_ylabel("Missing points (%)")
axz1.set_title(f"{EXPERIMENT_NAME}\nDegradation vs. Missing Points")
# Build a combined legend
lines1, labels1 = axz1.get_legend_handles_labels()
lines2, labels2 = axy1.get_legend_handles_labels()
axz1.legend(lines1 + lines2, labels1 + labels2, loc="upper right")
axz1.grid(True, alpha=0.3)
fig1.savefig(
output_datetime_path / f"{EXPERIMENT_NAME}_zscores_vs_missing_points.png", dpi=150
)
plt.close(fig1)
# =========================
# Plot 2: Near-sensor (%) vs. method z-scores
# =========================
fig2, axz2 = plt.subplots(figsize=(14, 6), constrained_layout=True)
axy2 = axz2.twinx()
for m in methods:
axz2.plot(t, method_zscores[m], label=f"{m} (z)", alpha=0.9)
axy2.plot(t, near_sensor_pct, linestyle="--", alpha=0.7, label="Near-sensor <0.5m (%)")
start, end = anomaly_window
if start is not None and end is not None and 0 <= start < end <= num_frames:
axz2.axvline(x=start / FPS, linestyle=":", alpha=0.6)
axz2.axvline(x=end / FPS, linestyle=":", alpha=0.6)
axz2.set_xlabel("Time (s)")
axz2.set_ylabel("Anomaly score (z-score, ↑ = more degraded)")
axy2.set_ylabel("Near-sensor points (%)")
axz2.set_title(f"{EXPERIMENT_NAME}\nDegradation vs. Near-Sensor Points (<0.5 m)")
lines1, labels1 = axz2.get_legend_handles_labels()
lines2, labels2 = axy2.get_legend_handles_labels()
axz2.legend(lines1 + lines2, labels1 + labels2, loc="upper right")
axz2.grid(True, alpha=0.3)
fig2.savefig(
output_datetime_path / f"{EXPERIMENT_NAME}_zscores_vs_near_sensor.png", dpi=150
)
plt.close(fig2)
# =========================
# Preserve latest/, archive/, copy script
# =========================
# delete current latest folder
shutil.rmtree(latest_folder_path, ignore_errors=True)
# create new latest folder
latest_folder_path.mkdir(exist_ok=True, parents=True)
# copy contents of output folder to the latest folder
for file in output_datetime_path.iterdir():
shutil.copy2(file, latest_folder_path)
# copy this python script to preserve the code used
shutil.copy2(__file__, output_datetime_path)
shutil.copy2(__file__, latest_folder_path)
# move output date folder to archive
shutil.move(output_datetime_path, archive_folder_path)
print("Done. Plots saved and archived.")