import pickle import shutil from datetime import datetime from pathlib import Path import matplotlib.pyplot as plt import numpy as np from pointcloudset import Dataset # define data paths all_data_path = Path("/home/fedex/mt/data/subter") output_path = Path("/home/fedex/mt/plots/data_missing_points_anomalies") datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") latest_folder_path = output_path / "latest" archive_folder_path = output_path / "archive" output_datetime_path = output_path / datetime_folder_name # if output does not exist, create it output_path.mkdir(exist_ok=True, parents=True) output_datetime_path.mkdir(exist_ok=True, parents=True) latest_folder_path.mkdir(exist_ok=True, parents=True) archive_folder_path.mkdir(exist_ok=True, parents=True) data_resolution = 32 * 2048 # find all bag files and sort them correctly by name normal_experiment_paths, anomaly_experiment_paths = [], [] for bag_file_path in all_data_path.iterdir(): if bag_file_path.suffix != ".bag": continue if "smoke" in bag_file_path.name: anomaly_experiment_paths.append(bag_file_path) else: normal_experiment_paths.append(bag_file_path) def plot_timeline_comparison(anomaly_experiment_paths, title, num_bins=50): """Plot missing points percentage over normalized timeline for moving anomaly experiments""" # Sort experiments by filesize first (to match original processing order) anomaly_experiment_paths = sorted( anomaly_experiment_paths, key=lambda path: path.stat().st_size ) # Filter out stationary experiments moving_exp_indices = [ i for i, path in enumerate(anomaly_experiment_paths) if "stationary" not in path.name ] moving_anomaly_paths = [anomaly_experiment_paths[i] for i in moving_exp_indices] # Try to load cached data from original script's location cache_path = Path("/home/fedex/mt/plots/data_missing_points/missing_points.pkl") if not cache_path.exists(): print("No cached data found. Please run the original script first.") return with open(cache_path, "rb") as file: _, missing_points_anomaly = pickle.load(file) # Get data for moving experiments only (using original indices) moving_anomaly_data = [missing_points_anomaly[i] for i in moving_exp_indices] # Create figure plt.figure(figsize=(12, 6)) # Plot each experiment's timeline for i, exp_data in enumerate(moving_anomaly_data): # Convert to percentage percentages = np.array(exp_data) / data_resolution * 100 # Create normalized timeline bins exp_len = len(percentages) bins = np.linspace(0, exp_len - 1, num_bins) binned_data = np.zeros(num_bins) # Bin the data for bin_idx in range(num_bins): if bin_idx == num_bins - 1: start_idx = int(bins[bin_idx]) end_idx = exp_len else: start_idx = int(bins[bin_idx]) end_idx = int(bins[bin_idx + 1]) binned_data[bin_idx] = np.mean(percentages[start_idx:end_idx]) # Plot with slight transparency to show overlaps plt.plot( range(num_bins), binned_data, alpha=0.6, label=f"Experiment {moving_anomaly_paths[i].stem}", ) plt.title(title) plt.xlabel("Normalized Timeline") # Add percentage ticks on x-axis plt.xticks( np.linspace(0, num_bins - 1, 5), [f"{x:.0f}%" for x in np.linspace(0, 100, 5)] ) plt.ylabel("Missing Points (%)") plt.grid(True, alpha=0.3) plt.legend() plt.tight_layout() # Save the plot plt.savefig(output_datetime_path / "missing_points_timeline.png", dpi=150) plt.close() # Generate the timeline comparison plot plot_timeline_comparison( anomaly_experiment_paths, "Missing Lidar Measurements Over Time\n(Moving Anomaly Experiments Only)", ) # delete current latest folder shutil.rmtree(latest_folder_path, ignore_errors=True) # create new latest folder latest_folder_path.mkdir(exist_ok=True, parents=True) # copy contents of output folder to the latest folder for file in output_datetime_path.iterdir(): shutil.copy2(file, latest_folder_path) # copy this python script to preserve the code used shutil.copy2(__file__, output_datetime_path) shutil.copy2(__file__, latest_folder_path) # move output date folder to archive shutil.move(output_datetime_path, archive_folder_path)