179 lines
5.8 KiB
Python
179 lines
5.8 KiB
Python
import pickle
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
import open3d as o3d
|
|
from pointcloudset import Dataset
|
|
from pointcloudset.io.pointcloud.open3d import to_open3d
|
|
from rich.progress import track
|
|
|
|
# define data path containing the bag files
|
|
all_data_path = Path("/home/fedex/mt/data/subter")
|
|
|
|
output_path = Path("/home/fedex/mt/plots/data_icp_rmse")
|
|
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
|
|
latest_folder_path = output_path / "latest"
|
|
archive_folder_path = output_path / "archive"
|
|
output_datetime_path = output_path / datetime_folder_name
|
|
|
|
# if output does not exist, create it
|
|
output_path.mkdir(exist_ok=True, parents=True)
|
|
output_datetime_path.mkdir(exist_ok=True, parents=True)
|
|
latest_folder_path.mkdir(exist_ok=True, parents=True)
|
|
archive_folder_path.mkdir(exist_ok=True, parents=True)
|
|
|
|
# Add cache folder
|
|
cache_folder = output_path / "cache"
|
|
cache_folder.mkdir(exist_ok=True, parents=True)
|
|
|
|
|
|
def get_cache_path(experiment_path: Path) -> Path:
|
|
"""Convert experiment bag path to cache pkl path"""
|
|
return cache_folder / f"{experiment_path.stem}.pkl"
|
|
|
|
|
|
# find all bag files and sort them correctly by name (experiments with smoke in the name are anomalies)
|
|
normal_experiment_paths, anomaly_experiment_paths = [], []
|
|
for bag_file_path in all_data_path.iterdir():
|
|
if bag_file_path.suffix != ".bag":
|
|
continue
|
|
if "smoke" in bag_file_path.name:
|
|
anomaly_experiment_paths.append(bag_file_path)
|
|
else:
|
|
normal_experiment_paths.append(bag_file_path)
|
|
|
|
# sort anomaly and normal experiments by filesize, ascending
|
|
anomaly_experiment_paths.sort(key=lambda path: path.stat().st_size)
|
|
normal_experiment_paths.sort(key=lambda path: path.stat().st_size)
|
|
|
|
|
|
def calculate_icp_rmse(experiment_path):
|
|
"""Calculate ICP RMSE between consecutive frames in an experiment"""
|
|
cache_path = get_cache_path(experiment_path)
|
|
|
|
# Check cache first
|
|
if cache_path.exists():
|
|
with open(cache_path, "rb") as file:
|
|
return pickle.load(file)
|
|
|
|
dataset = Dataset.from_file(experiment_path, topic="/ouster/points")
|
|
rmse_values = []
|
|
|
|
# Convert iterator to list for progress tracking
|
|
pointclouds = list(dataset)
|
|
|
|
# Get consecutive pairs of point clouds with progress bar
|
|
for i in track(
|
|
range(len(pointclouds) - 1),
|
|
description=f"Processing {experiment_path.name}",
|
|
total=len(pointclouds) - 1,
|
|
):
|
|
# Get consecutive point clouds and convert to Open3D format
|
|
source = to_open3d(pointclouds[i])
|
|
target = to_open3d(pointclouds[i + 1])
|
|
|
|
# Apply point-to-point ICP (much faster than point-to-plane)
|
|
result = o3d.pipelines.registration.registration_icp(
|
|
source,
|
|
target,
|
|
max_correspondence_distance=0.2, # 20cm max correspondence
|
|
init=np.identity(4), # Identity matrix as initial transformation
|
|
estimation_method=o3d.pipelines.registration.TransformationEstimationPointToPoint(),
|
|
criteria=o3d.pipelines.registration.ICPConvergenceCriteria(
|
|
max_iteration=50
|
|
),
|
|
)
|
|
|
|
rmse_values.append(result.inlier_rmse)
|
|
|
|
# Cache the results
|
|
with open(cache_path, "wb") as file:
|
|
pickle.dump(rmse_values, file, protocol=pickle.HIGHEST_PROTOCOL)
|
|
|
|
return rmse_values
|
|
|
|
|
|
def plot_statistical_comparison(normal_paths, anomaly_paths, output_path):
|
|
"""Create statistical comparison plots between normal and anomalous experiments"""
|
|
# Collect all RMSE values
|
|
normal_rmse = []
|
|
anomaly_rmse = []
|
|
|
|
print("Loading cached RMSE values...")
|
|
for path in normal_paths:
|
|
normal_rmse.extend(calculate_icp_rmse(path))
|
|
for path in anomaly_paths:
|
|
anomaly_rmse.extend(calculate_icp_rmse(path))
|
|
|
|
# Convert to numpy arrays
|
|
normal_rmse = np.array(normal_rmse)
|
|
anomaly_rmse = np.array(anomaly_rmse)
|
|
|
|
# Create figure with two subplots side by sidnte
|
|
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
|
|
|
|
# Box plot
|
|
data = [normal_rmse, anomaly_rmse]
|
|
ax1.boxplot(data, labels=["Normal", "Anomalous"])
|
|
ax1.set_ylabel("ICP RMSE (meters)")
|
|
ax1.set_title("Box Plot Comparison")
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
# Violin plot
|
|
ax2.violinplot(data)
|
|
ax2.set_xticks([1, 2])
|
|
ax2.set_xticklabels(["Normal", "Anomalous"])
|
|
ax2.set_ylabel("ICP RMSE (meters)")
|
|
ax2.set_title("Violin Plot Comparison")
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
plt.suptitle(
|
|
"ICP RMSE Statistical Comparison: Normal vs Anomalous Experiments", fontsize=14
|
|
)
|
|
plt.tight_layout()
|
|
|
|
# Save both linear and log scale versions
|
|
plt.savefig(output_path / "icp_rmse_distribution_linear.png", dpi=150)
|
|
|
|
# Log scale version
|
|
ax1.set_yscale("log")
|
|
ax2.set_yscale("log")
|
|
plt.savefig(output_path / "icp_rmse_distribution_log.png", dpi=150)
|
|
plt.close()
|
|
|
|
# Print some basic statistics
|
|
print("\nBasic Statistics:")
|
|
print(
|
|
f"Normal experiments - mean: {np.mean(normal_rmse):.4f}, std: {np.std(normal_rmse):.4f}"
|
|
)
|
|
print(
|
|
f"Anomaly experiments - mean: {np.mean(anomaly_rmse):.4f}, std: {np.std(anomaly_rmse):.4f}"
|
|
)
|
|
|
|
|
|
# Replace all plotting code with just this:
|
|
plot_statistical_comparison(
|
|
normal_experiment_paths, anomaly_experiment_paths, output_datetime_path
|
|
)
|
|
|
|
# delete current latest folder
|
|
shutil.rmtree(latest_folder_path, ignore_errors=True)
|
|
|
|
# create new latest folder
|
|
latest_folder_path.mkdir(exist_ok=True, parents=True)
|
|
|
|
# copy contents of output folder to the latest folder
|
|
for file in output_datetime_path.iterdir():
|
|
shutil.copy2(file, latest_folder_path)
|
|
|
|
# copy this python script to preserve the code used
|
|
shutil.copy2(__file__, output_datetime_path)
|
|
shutil.copy2(__file__, latest_folder_path)
|
|
|
|
# move output date folder to archive
|
|
shutil.move(output_datetime_path, archive_folder_path)
|