Files
mt/tools/plot_scripts/data_anomalies_timeline.py
2025-08-13 14:17:12 +02:00

256 lines
9.0 KiB
Python

import json
import pickle
import shutil
from datetime import datetime
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
# define data paths
all_data_path = Path("/home/fedex/mt/data/subter")
output_path = Path("/home/fedex/mt/plots/data_anomalies_timeline")
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
latest_folder_path = output_path / "latest"
archive_folder_path = output_path / "archive"
output_datetime_path = output_path / datetime_folder_name
cache_path = output_path
# if output does not exist, create it
output_path.mkdir(exist_ok=True, parents=True)
output_datetime_path.mkdir(exist_ok=True, parents=True)
latest_folder_path.mkdir(exist_ok=True, parents=True)
archive_folder_path.mkdir(exist_ok=True, parents=True)
data_resolution = 32 * 2048
# find all bag files and sort them correctly by name
normal_experiment_paths, anomaly_experiment_paths = [], []
for bag_file_path in all_data_path.iterdir():
if bag_file_path.suffix != ".bag":
continue
if "smoke" in bag_file_path.name:
anomaly_experiment_paths.append(bag_file_path)
else:
normal_experiment_paths.append(bag_file_path)
# Load manually labeled frames
with open(
cache_path / "manually_labeled_anomaly_frames.json", "r"
) as frame_borders_file:
manually_labeled_anomaly_frames_json = json.load(frame_borders_file)
if not manually_labeled_anomaly_frames_json:
print("No manually labeled anomaly frames found. Exiting...")
exit(1)
manually_labeled_anomaly_frames = {}
try:
for file in manually_labeled_anomaly_frames_json["files"]:
if file["filename"] not in (
p.with_suffix(".npy").name for p in anomaly_experiment_paths
):
print(
f"File {file['filename']} from manually labeled frames not found in anomaly experiments. Exiting..."
)
exit(1)
manually_labeled_anomaly_frames[file["filename"]] = (
file["semi_target_begin_frame"],
file["semi_target_end_frame"],
)
except KeyError as e:
print(f"Missing key in manually labeled frames JSON: {e}")
exit(1)
def plot_combined_timeline(
normal_experiment_paths, anomaly_experiment_paths, title, num_bins=50
):
"""Plot both missing points and near-sensor measurements over normalized timeline"""
# Sort experiments by filesize first (to match original processing order)
normal_experiment_paths = sorted(
normal_experiment_paths, key=lambda path: path.stat().st_size
)
anomaly_experiment_paths = sorted(
anomaly_experiment_paths, key=lambda path: path.stat().st_size
)
# Get largest normal experiment and moving anomaly experiments
baseline_path = normal_experiment_paths[-3] # largest normal experiment
moving_exp_indices = [
i
for i, path in enumerate(anomaly_experiment_paths)
if "stationary" not in path.name
]
moving_anomaly_paths = [anomaly_experiment_paths[i] for i in moving_exp_indices]
# Load missing points data
missing_points_cache = Path(cache_path / "missing_points.pkl")
if not missing_points_cache.exists():
print("Missing points cache not found!")
return
# Load near-sensor data (using 500mm threshold)
near_sensor_cache = Path(cache_path / "particles_near_sensor_counts_500.pkl")
if not near_sensor_cache.exists():
print("Near-sensor measurements cache not found!")
return
# Load both cached datasets
with open(missing_points_cache, "rb") as file:
missing_points_normal, missing_points_anomaly = pickle.load(file)
with open(near_sensor_cache, "rb") as file:
near_sensor_normal, near_sensor_anomaly = pickle.load(file)
# Get data for baseline and moving experiments
missing_data = [missing_points_normal[-3]] + [
missing_points_anomaly[i] for i in moving_exp_indices
]
near_sensor_data = [near_sensor_normal[-3]] + [
near_sensor_anomaly[i] for i in moving_exp_indices
]
all_paths = [baseline_path] + moving_anomaly_paths
# Create figure with two y-axes and dynamic layout
fig, ax1 = plt.subplots(figsize=(12, 6), constrained_layout=True)
ax2 = ax1.twinx()
# Color schemes - gray for baseline, colors for anomaly experiments
experiment_colors = ["#808080"] + ["#1f77b4", "#ff7f0e", "#2ca02c"] # gray + colors
# First create custom legend handles for the metrics
from matplotlib.lines import Line2D
metric_legend = [
Line2D([0], [0], color="gray", linestyle="-", label="Missing Points"),
Line2D([0], [0], color="gray", linestyle="--", label="Near-Sensor (<0.5m)"),
Line2D([0], [0], color="gray", linestyle=":", label="Manually Labeled Borders"),
]
# Plot each experiment's data
for i, (missing_exp, near_sensor_exp) in enumerate(
zip(missing_data, near_sensor_data)
):
# Get experiment name without the full path
exp_name = all_paths[i].stem
# Shorten experiment name if needed
exp_name = exp_name.replace("experiment_smoke_", "exp_")
# Convert both to percentages
missing_pct = np.array(missing_exp) / data_resolution * 100
near_sensor_pct = np.array(near_sensor_exp) / data_resolution * 100
# Create normalized timeline bins for both
exp_len = len(missing_pct)
bins = np.linspace(0, exp_len - 1, num_bins)
missing_binned = np.zeros(num_bins)
near_sensor_binned = np.zeros(num_bins)
# Bin both datasets
for bin_idx in range(num_bins):
if bin_idx == num_bins - 1:
start_idx = int(bins[bin_idx])
end_idx = exp_len
else:
start_idx = int(bins[bin_idx])
end_idx = int(bins[bin_idx + 1])
missing_binned[bin_idx] = np.mean(missing_pct[start_idx:end_idx])
near_sensor_binned[bin_idx] = np.mean(near_sensor_pct[start_idx:end_idx])
# Plot both metrics with same color but different line styles
color = experiment_colors[i]
ax1.plot(
range(num_bins),
missing_binned,
color=color,
linestyle="-",
alpha=0.6,
label=exp_name,
)
ax2.plot(
range(num_bins), near_sensor_binned, color=color, linestyle="--", alpha=0.6
)
# Add vertical lines for manually labeled frames if available
if all_paths[i].with_suffix(".npy").name in manually_labeled_anomaly_frames:
begin_frame, end_frame = manually_labeled_anomaly_frames[
all_paths[i].with_suffix(".npy").name
]
# Convert frame numbers to normalized timeline positions
begin_pos = (begin_frame / exp_len) * (num_bins - 1)
end_pos = (end_frame / exp_len) * (num_bins - 1)
# Add vertical lines with matching color and loose dotting
ax1.axvline(
x=begin_pos,
color=color,
linestyle=":",
alpha=0.6,
)
ax1.axvline(
x=end_pos,
color=color,
linestyle=":",
alpha=0.6,
)
# Customize axes
ax1.set_xlabel("Normalized Timeline")
ax1.set_xticks(np.linspace(0, num_bins - 1, 5))
ax1.set_xticklabels([f"{x:.0f}%" for x in np.linspace(0, 100, 5)])
ax1.set_ylabel("Missing Points (%)")
ax2.set_ylabel("Points with <0.5m Range (%)")
plt.title(title)
# Create legends without fixed positions
# First get all lines and labels for experiments
lines1, labels1 = ax1.get_legend_handles_labels()
# Combine both legends into one
all_handles = (
lines1
+ [Line2D([0], [0], color="gray", linestyle="-", label="", alpha=0)]
+ metric_legend
)
all_labels = (
labels1
+ [""]
+ ["Missing Points", "Points Near Sensor (<0.5m)", "Manually Labeled Borders"]
)
# Create single legend in top right corner with consistent margins
fig.legend(all_handles, all_labels, loc="upper right", borderaxespad=4.8)
plt.grid(True, alpha=0.3)
# Save figure letting matplotlib handle the layout
plt.savefig(output_datetime_path / "combined_anomalies_timeline.png", dpi=150)
plt.close()
# Generate the combined timeline plot
plot_combined_timeline(
normal_experiment_paths,
anomaly_experiment_paths,
"Lidar Degradation Indicators Throughout Experiments\n(Baseline and Moving Anomaly Experiments)",
)
# delete current latest folder
shutil.rmtree(latest_folder_path, ignore_errors=True)
# create new latest folder
latest_folder_path.mkdir(exist_ok=True, parents=True)
# copy contents of output folder to the latest folder
for file in output_datetime_path.iterdir():
shutil.copy2(file, latest_folder_path)
# copy this python script to preserve the code used
shutil.copy2(__file__, output_datetime_path)
shutil.copy2(__file__, latest_folder_path)
# move output date folder to archive
shutil.move(output_datetime_path, archive_folder_path)