Files
mt/tools/plot_scripts/results_inference_timeline_smoothed.py
2025-09-15 14:25:15 +02:00

460 lines
16 KiB
Python

import json
import pickle
import shutil
from datetime import datetime
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import polars as pl
# =====================================
# User-configurable params
# =====================================
# Root directory that contains per-run outputs (your loader will scan this)
INFERENCE_ROOT = Path("/home/fedex/mt/results/inference/copy")
# Path that holds cached stats (same as before)
CACHE_PATH = Path("/home/fedex/mt/plots/data_anomalies_timeline")
# Root data path containing .bag files to rebuild ordering (for stats mapping)
ALL_DATA_PATH = Path("/home/fedex/mt/data/subter")
# Output base directory (timestamped subfolder will be created here, then archived and copied to "latest/")
OUTPUT_PATH = Path("/home/fedex/mt/plots/results_inference_timeline_smoothed")
# Frames per second for x-axis time
FPS = 10.0
# ---- Smoothing: EMA only ----
EMA_ALPHA = 0.1 # models (0,1], smaller = smoother
STATS_EMA_ALPHA = 0.1 # stats (absolute %); tweak independently if desired
# Whether to z-score per-curve for the model methods (recommended)
Z_SCORE_MODELS = True
# If some model's series is longer/shorter than others in a group, align to min length
ALIGN_TO_MIN_LENGTH = True
# Whether to try to align model score sign so that higher = more degraded using manual window
ALIGN_SCORE_DIRECTION = True
# LiDAR points per frame (for stats -> percent)
DATA_RESOLUTION = 32 * 2048
# =====================================
# Setup output folders
# =====================================
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
latest_folder_path = OUTPUT_PATH / "latest"
archive_folder_path = OUTPUT_PATH / "archive"
output_datetime_path = OUTPUT_PATH / datetime_folder_name
OUTPUT_PATH.mkdir(exist_ok=True, parents=True)
archive_folder_path.mkdir(exist_ok=True, parents=True)
latest_folder_path.mkdir(exist_ok=True, parents=True)
output_datetime_path.mkdir(exist_ok=True, parents=True)
# =====================================
# Load Polars DataFrame via your helper
# =====================================
from load_results import load_inference_results_dataframe
df: pl.DataFrame = load_inference_results_dataframe(INFERENCE_ROOT)
# sanity
expected_cols = {
"experiment",
"network",
"latent_dim",
"semi_normals",
"semi_anomalous",
"model",
"scores",
"folder",
"config_json",
}
missing_cols = expected_cols - set(df.columns)
if missing_cols:
raise KeyError(f"DataFrame missing required columns: {sorted(missing_cols)}")
# =====================================
# Rebuild experiment → stats mapping (like your original)
# =====================================
def rebuild_experiment_index():
normals, anomalies = [], []
if not ALL_DATA_PATH.exists():
return [], [], {}
for bag in ALL_DATA_PATH.iterdir():
if bag.suffix != ".bag":
continue
if "smoke" in bag.name:
anomalies.append(bag)
else:
normals.append(bag)
normals = sorted(normals, key=lambda p: p.stat().st_size)
anomalies = sorted(anomalies, key=lambda p: p.stat().st_size)
mapping = {}
for i, p in enumerate(normals):
mapping[p.stem] = (False, i, p)
for i, p in enumerate(anomalies):
mapping[p.stem] = (True, i, p)
return normals, anomalies, mapping
normal_paths, anomaly_paths, exp_map = rebuild_experiment_index()
# Load cached statistical data (+ manual labels)
missing_points_cache = CACHE_PATH / "missing_points.pkl"
near_sensor_cache = CACHE_PATH / "particles_near_sensor_counts_500.pkl"
labels_json_path = CACHE_PATH / "manually_labeled_anomaly_frames.json"
missing_points_normal = missing_points_anomaly = None
near_sensor_normal = near_sensor_anomaly = None
if missing_points_cache.exists():
with open(missing_points_cache, "rb") as f:
missing_points_normal, missing_points_anomaly = pickle.load(f)
if near_sensor_cache.exists():
with open(near_sensor_cache, "rb") as f:
near_sensor_normal, near_sensor_anomaly = pickle.load(f)
manual_windows = {}
if labels_json_path.exists():
with open(labels_json_path, "r") as f:
labeled_json = json.load(f)
for file in labeled_json.get("files", []):
manual_windows[file["filename"]] = (
file.get("semi_target_begin_frame"),
file.get("semi_target_end_frame"),
)
def get_stats_for_experiment(exp_name: str):
"""
Returns:
missing_pct (np.ndarray) | None,
near_pct (np.ndarray) | None,
anomaly_window (tuple(start,end)) | (None,None)
"""
if exp_name not in exp_map:
return None, None, (None, None)
is_anomaly, idx, path = exp_map[exp_name]
missing = None
near = None
if missing_points_normal is not None and missing_points_anomaly is not None:
series = (
missing_points_anomaly[idx] if is_anomaly else missing_points_normal[idx]
)
missing = (np.asarray(series, dtype=float) / DATA_RESOLUTION) * 100.0
if near_sensor_normal is not None and near_sensor_anomaly is not None:
series = near_sensor_anomaly[idx] if is_anomaly else near_sensor_normal[idx]
near = (np.asarray(series, dtype=float) / DATA_RESOLUTION) * 100.0
npy_key = path.with_suffix(".npy").name
window = manual_windows.get(npy_key, (None, None))
return missing, near, window
# =====================================
# Helpers
# =====================================
def to_np(a):
"""Convert a Polars list cell to a 1D NumPy array of float."""
if a is None:
return None
return np.asarray(a, dtype=float).ravel()
def zscore_1d(x, eps=1e-12):
if x is None or len(x) == 0:
return x
mu = float(np.mean(x))
sigma = float(np.std(x, ddof=0))
return np.zeros_like(x) if sigma < eps else (x - mu) / sigma
def ema(x, alpha):
if x is None or len(x) == 0:
return x
y = np.empty_like(x, dtype=float)
y[0] = x[0]
for i in range(1, len(x)):
y[i] = alpha * x[i] + (1 - alpha) * y[i - 1]
return y
def apply_ema_models(x):
return ema(x, EMA_ALPHA)
def apply_ema_stats(x):
return ema(x, STATS_EMA_ALPHA)
def align_lengths(series_dict):
"""Truncate all series to the shortest available length."""
valid_lengths = [
len(v) for v in series_dict.values() if v is not None and len(v) > 0
]
if not valid_lengths:
return series_dict
min_len = min(valid_lengths)
return {k: (v[:min_len] if v is not None else None) for k, v in series_dict.items()}
def maybe_align_direction(z: np.ndarray, window):
"""Flip sign so that the anomaly window mean is higher than the outside mean, if labels exist."""
if z is None:
return z
start, end = window
if start is None or end is None:
return z
start = int(max(0, start))
end = int(min(len(z), end))
if end <= start or end > len(z):
return z
inside_mean = float(np.mean(z[start:end]))
if start == 0 and end == len(z):
return z
outside_parts = []
if start > 0:
outside_parts.append(z[:start])
if end < len(z):
outside_parts.append(z[end:])
if not outside_parts:
return z
outside_mean = float(np.mean(np.concatenate(outside_parts)))
return z if inside_mean >= outside_mean else -z
def safe_title(s: str) -> str:
return s.replace("_", " ")
# =====================================
# Model selection per group (network names updated)
# =====================================
group_cols = ["experiment", "latent_dim", "semi_normals", "semi_anomalous"]
def pick_rows(gdf: pl.DataFrame):
sel = {}
sel["DeepSAD (LeNet)"] = gdf.filter(
(pl.col("network") == "subter_LeNet") & (pl.col("model") == "deepsad")
)
sel["DeepSAD (efficient)"] = gdf.filter(
(pl.col("network") == "subter_efficient") & (pl.col("model") == "deepsad")
)
sel["OCSVM (LeNet)"] = gdf.filter(
(pl.col("network") == "subter_LeNet") & (pl.col("model") == "ocsvm")
)
sel["IsoForest (LeNet)"] = gdf.filter(
(pl.col("network") == "subter_LeNet") & (pl.col("model") == "isoforest")
)
chosen = {}
for k, dfk in sel.items():
chosen[k] = dfk.row(0) if dfk.height > 0 else None
return chosen
# =====================================
# Iterate groups and plot
# =====================================
plots_made = 0
for keys, g in df.group_by(group_cols, maintain_order=True):
experiment, latent_dim, semi_normals, semi_anomalous = keys
chosen = pick_rows(g)
# Extract series for models
curves_raw = {}
for label, row in chosen.items():
if row is None:
curves_raw[label] = None
continue
row_dict = {c: row[i] for i, c in enumerate(df.columns)}
scores = to_np(row_dict["scores"])
curves_raw[label] = scores
# If nothing to plot, skip group
if all(v is None or len(v) == 0 for v in curves_raw.values()):
continue
# Stats for this experiment (absolute %; no z-scoring)
missing_pct, near_pct, anomaly_window = get_stats_for_experiment(experiment)
# Optionally align lengths among model curves
curves = curves_raw.copy()
if ALIGN_TO_MIN_LENGTH:
curves = align_lengths(curves)
# Prepare processed model curves: z-score (if enabled) + EMA smoothing
proc = {}
for k, v in curves.items():
if v is None:
continue
x = zscore_1d(v) if Z_SCORE_MODELS else v.astype(float)
if ALIGN_SCORE_DIRECTION and anomaly_window != (None, None):
x = maybe_align_direction(x, anomaly_window)
x = apply_ema_models(x)
proc[k] = x
if not proc:
continue
# Establish time axis for model curves
any_len = len(next(iter(proc.values())))
t_models = np.arange(any_len) / FPS
# =========== Plot A: Scores-only (models z-scored; stats not shown) ===========
figA, axA = plt.subplots(figsize=(14, 6), constrained_layout=True)
for label, y in proc.items():
if y is not None:
axA.plot(t_models, y, label=label)
axA.set_xlabel("Time (s)")
axA.set_ylabel("Model anomaly score" + (" (z-score)" if Z_SCORE_MODELS else ""))
titleA = (
f"{safe_title(experiment)} | latent_dim={latent_dim}, "
f"semi_normals={semi_normals}, semi_anomalous={semi_anomalous}\n"
f"Smoothing: EMA(alpha={EMA_ALPHA})"
)
axA.set_title(titleA)
axA.grid(True, alpha=0.3)
axA.legend(loc="upper right")
fnameA = (
f"{experiment}_ld{latent_dim}_sn{semi_normals}_sa{semi_anomalous}"
f"_scores_EMA-{EMA_ALPHA}{'_z' if Z_SCORE_MODELS else ''}.png"
)
figA.savefig(output_datetime_path / fnameA, dpi=150)
plt.close(figA)
# =========== Plot B: Models (z-scored) + Missing Points (%) absolute ===========
if missing_pct is not None and len(missing_pct) > 0:
mp = missing_pct
if ALIGN_TO_MIN_LENGTH:
mp = mp[:any_len]
mp_s = apply_ema_stats(mp)
t_stats = np.arange(len(mp_s)) / FPS
figB, axB = plt.subplots(figsize=(14, 6), constrained_layout=True)
axBy = axB.twinx()
for label, y in proc.items():
if y is not None:
axB.plot(t_models, y, label=label)
axBy.plot(t_stats, mp_s, linestyle="--", label="Missing points (%)")
if anomaly_window != (None, None):
start, end = anomaly_window
if isinstance(start, int) and isinstance(end, int) and 0 <= start < end:
axB.axvline(start / FPS, linestyle=":", alpha=0.6)
axB.axvline(end / FPS, linestyle=":", alpha=0.6)
axB.set_xlabel("Time (s)")
axB.set_ylabel("Model anomaly score" + (" (z-score)" if Z_SCORE_MODELS else ""))
axBy.set_ylabel("Missing points (%)")
titleB = (
f"{safe_title(experiment)} | latent_dim={latent_dim}, "
f"semi_normals={semi_normals}, semi_anomalous={semi_anomalous}\n"
f"Models: EMA({EMA_ALPHA}) | Stats: EMA({STATS_EMA_ALPHA}) — + Missing points (absolute %)"
)
axB.set_title(titleB)
axB.grid(True, alpha=0.3)
lines1, labels1 = axB.get_legend_handles_labels()
lines2, labels2 = axBy.get_legend_handles_labels()
axB.legend(lines1 + lines2, labels1 + labels2, loc="upper right")
fnameB = (
f"{experiment}_ld{latent_dim}_sn{semi_normals}_sa{semi_anomalous}"
f"_scores_plus_missing_EMA-{EMA_ALPHA}_stats-{STATS_EMA_ALPHA}"
f"{'_z' if Z_SCORE_MODELS else ''}.png"
)
figB.savefig(output_datetime_path / fnameB, dpi=150)
plt.close(figB)
# =========== Plot C: Models (z-scored) + Near-sensor Points (%) absolute ===========
if near_pct is not None and len(near_pct) > 0:
ns = near_pct
if ALIGN_TO_MIN_LENGTH:
ns = ns[:any_len]
ns_s = apply_ema_stats(ns)
t_stats = np.arange(len(ns_s)) / FPS
figC, axC = plt.subplots(figsize=(14, 6), constrained_layout=True)
axCy = axC.twinx()
for label, y in proc.items():
if y is not None:
axC.plot(t_models, y, label=label)
axCy.plot(t_stats, ns_s, linestyle="--", label="Near-sensor <0.5m (%)")
if anomaly_window != (None, None):
start, end = anomaly_window
if isinstance(start, int) and isinstance(end, int) and 0 <= start < end:
axC.axvline(start / FPS, linestyle=":", alpha=0.6)
axC.axvline(end / FPS, linestyle=":", alpha=0.6)
axC.set_xlabel("Time (s)")
axC.set_ylabel("Model anomaly score" + (" (z-score)" if Z_SCORE_MODELS else ""))
axCy.set_ylabel("Near-sensor points (%)")
titleC = (
f"{safe_title(experiment)} | latent_dim={latent_dim}, "
f"semi_normals={semi_normals}, semi_anomalous={semi_anomalous}\n"
f"Models: EMA({EMA_ALPHA}) | Stats: EMA({STATS_EMA_ALPHA}) — + Near-sensor <0.5m (absolute %)"
)
axC.set_title(titleC)
axC.grid(True, alpha=0.3)
lines1, labels1 = axC.get_legend_handles_labels()
lines2, labels2 = axCy.get_legend_handles_labels()
axC.legend(lines1 + lines2, labels1 + labels2, loc="upper right")
fnameC = (
f"{experiment}_ld{latent_dim}_sn{semi_normals}_sa{semi_anomalous}"
f"_scores_plus_nearsensor_EMA-{EMA_ALPHA}_stats-{STATS_EMA_ALPHA}"
f"{'_z' if Z_SCORE_MODELS else ''}.png"
)
figC.savefig(output_datetime_path / fnameC, dpi=150)
plt.close(figC)
plots_made += 1
# =====================================
# Preserve latest/, archive/, copy script
# =====================================
# delete current latest folder
shutil.rmtree(latest_folder_path, ignore_errors=True)
# create new latest folder
latest_folder_path.mkdir(exist_ok=True, parents=True)
# copy contents of output folder to the latest folder
for file in output_datetime_path.iterdir():
shutil.copy2(file, latest_folder_path)
# copy this python script to preserve the code used
try:
shutil.copy2(__file__, output_datetime_path)
shutil.copy2(__file__, latest_folder_path)
except Exception:
# If running interactively, fall back to saving the config snapshot
(output_datetime_path / "run_config.json").write_text(
json.dumps(
{
"INFERENCE_ROOT": str(INFERENCE_ROOT),
"CACHE_PATH": str(CACHE_PATH),
"ALL_DATA_PATH": str(ALL_DATA_PATH),
"FPS": FPS,
"EMA_ALPHA": EMA_ALPHA,
"STATS_EMA_ALPHA": STATS_EMA_ALPHA,
"Z_SCORE_MODELS": Z_SCORE_MODELS,
"ALIGN_TO_MIN_LENGTH": ALIGN_TO_MIN_LENGTH,
"ALIGN_SCORE_DIRECTION": ALIGN_SCORE_DIRECTION,
"timestamp": datetime_folder_name,
},
indent=2,
)
)
# move output date folder to archive
shutil.move(output_datetime_path, archive_folder_path)
print(f"Done. Plotted {plots_made} groups. Archived under: {archive_folder_path}")