import pickle import shutil from datetime import datetime from pathlib import Path import matplotlib.pyplot as plt import numpy as np from pointcloudset import Dataset # define data path containing the bag files all_data_path = Path("/home/fedex/mt/data/subter") output_path = Path("/home/fedex/mt/plots/data_missing_points") datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") latest_folder_path = output_path / "latest" archive_folder_path = output_path / "archive" output_datetime_path = output_path / datetime_folder_name # if output does not exist, create it output_path.mkdir(exist_ok=True, parents=True) output_datetime_path.mkdir(exist_ok=True, parents=True) latest_folder_path.mkdir(exist_ok=True, parents=True) archive_folder_path.mkdir(exist_ok=True, parents=True) data_resolution = 32 * 2048 normal_experiment_paths, anomaly_experiment_paths = [], [] # find all bag files and sort them correctly by name (experiments with smoke in the name are anomalies) for bag_file_path in all_data_path.iterdir(): if bag_file_path.suffix != ".bag": continue if "smoke" in bag_file_path.name: anomaly_experiment_paths.append(bag_file_path) else: normal_experiment_paths.append(bag_file_path) # sort anomaly and normal experiments by filesize, ascending anomaly_experiment_paths.sort(key=lambda path: path.stat().st_size) normal_experiment_paths.sort(key=lambda path: path.stat().st_size) # function that plots histogram of how many points are missing in pointclouds for both normal and anomaly experiments def plot_data_points(normal_experiment_paths, anomaly_experiment_paths, title): # function that finds the number of missing points in list of experiments (used for both normal and anomalous) def find_missing_points(experiment_paths): missing_points = [] for dataset in ( Dataset.from_file(experiment_path, topic="/ouster/points") for experiment_path in experiment_paths ): missing_points_per_pc = [] for pc in dataset: missing_points_per_pc.append(data_resolution - pc.data.shape[0]) missing_points.append(missing_points_per_pc) # FIXME temporary break to test code on only one experiment # break return missing_points # check if the data has already been calculated and saved to a pickle file if (output_path / "missing_points.pkl").exists(): with open(output_path / "missing_points.pkl", "rb") as file: missing_points_normal, missing_points_anomaly = pickle.load(file) else: missing_points_normal = find_missing_points(normal_experiment_paths) missing_points_anomaly = find_missing_points(anomaly_experiment_paths) # for faster subsequent runs, save the data to a pickle file with open(output_path / "missing_points.pkl", "wb") as file: pickle.dump( (missing_points_normal, missing_points_anomaly), file, protocol=pickle.HIGHEST_PROTOCOL, ) # combine all missing points into one list for each type of experiment missing_points_normal = np.concatenate(missing_points_normal) missing_points_anomaly = np.concatenate(missing_points_anomaly) # create histogram of missing points for normal and anomaly experiments plt.figure(figsize=(10, 5)) plt.hist(missing_points_normal, bins=100, alpha=0.5, label="Normal Experiments") plt.hist(missing_points_anomaly, bins=100, alpha=0.5, label="Anomaly Experiments") plt.title(title) plt.xlabel("Number of Missing Points") plt.ylabel("Number of Pointclouds") plt.legend() plt.tight_layout() plt.savefig(output_datetime_path / "missing_points.png") plt.clf() # alternatively plot curves representing the data # create alternative version with missing points on y axis and number of pointclouds on x axis plt.figure(figsize=(10, 5)) plt.hist( missing_points_normal, bins=100, alpha=0.5, label="Normal Experiments", orientation="horizontal", ) plt.hist( missing_points_anomaly, bins=100, alpha=0.5, label="Anomaly Experiments", orientation="horizontal", ) plt.title(title) plt.xlabel("Number of Pointclouds") plt.ylabel("Number of Missing Points") plt.legend() plt.tight_layout() plt.savefig(output_datetime_path / "missing_points_alternative.png") # find min and max of both categories so we can set the same limits for both plots min = np.min([np.min(missing_points_normal), np.min(missing_points_anomaly)]) max = np.max([np.max(missing_points_normal), np.max(missing_points_anomaly)]) # create bins array with min and max values bins = np.linspace(min, max, 100) # since the two histograms (normal and anomalous) have different scales, normalize their amplitude and plot a density version as well plt.clf() plt.figure(figsize=(10, 5)) plt.hist( missing_points_normal, bins=bins, alpha=0.5, label="Normal Experiments", color="blue", density=True, ) plt.hist( missing_points_anomaly, bins=bins, alpha=0.5, color="red", label="Anomaly Experiments", density=True, ) plt.title(title) plt.xlabel("Number of Missing Points") plt.ylabel("Density") plt.legend() plt.tight_layout() plt.savefig(output_datetime_path / "missing_points_density.png") # create another density version which does not plot number of missing points but percentage of measurements that are missing (total number of points is 32*2048) bins = np.linspace(0, 1, 100) plt.clf() plt.figure(figsize=(10, 5)) plt.hist( missing_points_normal / data_resolution, bins=bins, alpha=0.5, label="Normal Experiments (No Artifical Smoke)", color="blue", density=True, ) plt.hist( missing_points_anomaly / data_resolution, bins=bins, alpha=0.5, color="red", label="Anomaly Experiments (With Artifical Smoke)", density=True, ) plt.title(title) plt.xlabel("Percentage of Missing Lidar Measurements") plt.ylabel("Density") # display the x axis as percentages plt.gca().set_xticklabels( ["{:.0f}%".format(x * 100) for x in plt.gca().get_xticks()] ) plt.legend() plt.tight_layout() plt.savefig(output_datetime_path / "missing_points_density_percentage.png") # mathplotlib does not support normalizing the histograms to the same scale, so we do it manually using numpy num_bins = 100 bin_lims = np.linspace(0, 40000, num_bins + 1) bin_centers = 0.5 * (bin_lims[:-1] + bin_lims[1:]) bin_widths = bin_lims[1:] - bin_lims[:-1] # calculate the histogram for normal and anomaly experiments normal_hist, _ = np.histogram(missing_points_normal, bins=bin_lims) anomaly_hist, _ = np.histogram(missing_points_anomaly, bins=bin_lims) # normalize the histograms to the same scale normal_hist_normalized = np.array(normal_hist, dtype=float) / np.max(normal_hist) anomaly_hist_normalized = np.array(anomaly_hist, dtype=float) / np.max(anomaly_hist) # plot the normalized histograms plt.clf() plt.figure(figsize=(10, 5)) plt.bar( bin_centers, normal_hist_normalized, width=bin_widths, align="center", alpha=0.5, label="Normal Experiments", ) plt.bar( bin_centers, anomaly_hist_normalized, width=bin_widths, align="center", alpha=0.5, label="Anomaly Experiments", ) plt.title(title) plt.xlabel("Number of Missing Points") plt.ylabel("Normalized Density") plt.legend() plt.tight_layout() plt.savefig(output_datetime_path / "missing_points_normalized.png") # plot histogram of missing points for normal and anomaly experiments plot_data_points( normal_experiment_paths, anomaly_experiment_paths, "Missing Lidar Measurements per Scan", ) # delete current latest folder shutil.rmtree(latest_folder_path, ignore_errors=True) # create new latest folder latest_folder_path.mkdir(exist_ok=True, parents=True) # copy contents of output folder to the latest folder for file in output_datetime_path.iterdir(): shutil.copy2(file, latest_folder_path) # copy this python script to the output datetime folder to preserve the code used to generate the plots shutil.copy2(__file__, output_datetime_path) shutil.copy2(__file__, latest_folder_path) # move output date folder to archive shutil.move(output_datetime_path, archive_folder_path)