import pickle import shutil from datetime import datetime from pathlib import Path import matplotlib.pyplot as plt import numpy as np # define data path containing the bag files all_data_path = Path("/home/fedex/mt/data/subter") output_path = Path("/home/fedex/mt/plots/data_particles_near_sensor") datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") latest_folder_path = output_path / "latest" archive_folder_path = output_path / "archive" output_datetime_path = output_path / datetime_folder_name # if output does not exist, create it output_path.mkdir(exist_ok=True, parents=True) output_datetime_path.mkdir(exist_ok=True, parents=True) latest_folder_path.mkdir(exist_ok=True, parents=True) archive_folder_path.mkdir(exist_ok=True, parents=True) normal_experiment_paths, anomaly_experiment_paths = [], [] # find all bag files and sort them correctly by name (experiments with smoke in the name are anomalies) for bag_file_path in all_data_path.iterdir(): if bag_file_path.suffix != ".bag": continue if "smoke" in bag_file_path.name: anomaly_experiment_paths.append(bag_file_path) else: normal_experiment_paths.append(bag_file_path) # print out the names of the normal and anomaly experiments that we found print("Normal experiments:") for path in normal_experiment_paths: print(path.name) print("\nAnomaly experiments:") for path in anomaly_experiment_paths: print(path.name) # sort anomaly and normal experiments by filesize, ascending anomaly_experiment_paths.sort(key=lambda path: path.stat().st_size) normal_experiment_paths.sort(key=lambda path: path.stat().st_size) # function that plots histogram of how many measurements have a range smaller than 0.5m for both normal and anomaly experiments def plot_data_points(normal_experiment_paths, anomaly_experiment_paths, title): # function that finds the number of measurements with a range smaller than 0.5m in list of experiments (used for both normal and anomalous) def find_particles_near_sensor(experiment_paths, range_threshold): particles_near_sensor = [] for dataset in ( Dataset.from_file(experiment_path, topic="/ouster/points") for experiment_path in experiment_paths ): particles_near_sensor_per_pc = [] for pc in dataset: particles_near_sensor_per_pc.append( pc.data[pc.data["range"] < range_threshold].shape[0] ) particles_near_sensor.append(particles_near_sensor_per_pc) return particles_near_sensor range_thresholds = [500, 750, 1000, 1250, 1500] for rt in range_thresholds: print(f"Processing range threshold {rt}...") # check if the data has already been calculated and saved to a pickle file if (output_path / f"particles_near_sensor_counts_{rt}.pkl").exists(): with open( output_path / f"particles_near_sensor_counts_{rt}.pkl", "rb" ) as file: particles_near_sensor_normal, particles_near_sensor_anomaly = ( pickle.load(file) ) else: particles_near_sensor_normal = find_particles_near_sensor( normal_experiment_paths, rt, ) particles_near_sensor_anomaly = find_particles_near_sensor( anomaly_experiment_paths, rt, ) # for faster subsequent runs, save the data to a pickle file with open(output_path / f"particles_near_sensor_counts_{rt}.pkl", "wb") as file: pickle.dump( (particles_near_sensor_normal, particles_near_sensor_anomaly), file, protocol=pickle.HIGHEST_PROTOCOL, ) # combine all counts of how many particles are close to the sensor into one list for each type of experiment particles_near_sensor_normal = np.concatenate(particles_near_sensor_normal) particles_near_sensor_anomaly = np.concatenate(particles_near_sensor_anomaly) # find min and max of both categories so we can set the same limits for both plots min = np.min( [ np.min(particles_near_sensor_normal), np.min(particles_near_sensor_anomaly), ] ) max = np.max( [ np.max(particles_near_sensor_normal), np.max(particles_near_sensor_anomaly), ] ) # create bins array with min and max values bins = np.linspace(min, max, 100) # since the two histograms (normal and anomalous) have different scales, normalize their amplitude and plot a density version as well # commented out since boxplot is more informative # plt.clf() # plt.figure(figsize=(10, 5)) # plt.hist( # particles_near_sensor_normal, # bins=bins, # alpha=0.5, # label="Normal Experiments (No Artifical Smoke)", # color="blue", # density=True, # ) # plt.hist( # particles_near_sensor_anomaly, # bins=bins, # alpha=0.5, # color="red", # label="Anomaly Experiments (With Artifical Smoke)", # density=True, # ) # plt.title(title) # plt.xlabel("Number of Particles Near Sensor") # plt.ylabel("Density") # plt.legend() # plt.tight_layout() # plt.savefig(output_datetime_path / f"particles_near_sensor_density_{rt}.png") # alternatively create a box plot to show the distribution of the data # instead of plotting the frequency of particles near sensor we'll plot the percentage of points (compared to the total number of points in the pointcloud) data_resolution = 32 * 2048 plt.clf() plt.figure(figsize=(10, 5)) plt.boxplot( [ particles_near_sensor_normal / data_resolution, particles_near_sensor_anomaly / data_resolution, ], tick_labels=[ "Normal Experiments (No Artifical Smoke)", "Anomaly Experiments (With Artifical Smoke)", ], ) # format the y axis labels as percentages plt.gca().set_yticklabels( ["{:.0f}%".format(y * 100) for y in plt.gca().get_yticks()] ) # plt.title("Particles Closer than 0.5m to the Sensor") plt.ylabel("Percentage of measurements closer than 0.5m") plt.tight_layout() plt.savefig(output_datetime_path / f"particles_near_sensor_boxplot_{rt}.png") # we create the same boxplot but limit the y-axis to 5% to better see the distribution of the data plt.clf() plt.figure(figsize=(10, 5)) plt.boxplot( [ particles_near_sensor_normal / data_resolution, particles_near_sensor_anomaly / data_resolution, ], tick_labels=[ "Normal Experiments (No Artifical Smoke)", "Anomaly Experiments (With Artifical Smoke)", ], ) # format the y axis labels as percentages plt.gca().set_yticklabels( ["{:.0f}%".format(y * 100) for y in plt.gca().get_yticks()] ) # plt.title("Particles Closer than 0.5m to the Sensor") plt.ylabel("Percentage of measurements closer than 0.5m") plt.ylim(0, 0.05) plt.tight_layout() plt.savefig( output_datetime_path / f"particles_near_sensor_boxplot_zoomed_{rt}.png" ) # plot histogram of how many measurements have a range smaller than 0.5m for both normal and anomaly experiments plot_data_points( normal_experiment_paths, anomaly_experiment_paths, "Density of Number of Particles Near Sensor", ) # delete current latest folder shutil.rmtree(latest_folder_path, ignore_errors=True) # create new latest folder latest_folder_path.mkdir(exist_ok=True, parents=True) # copy contents of output folder to the latest folder for file in output_datetime_path.iterdir(): shutil.copy2(file, latest_folder_path) # copy this python script to the output datetime folder to preserve the code used to generate the plots shutil.copy2(__file__, output_datetime_path) shutil.copy2(__file__, latest_folder_path) # move output date folder to archive shutil.move(output_datetime_path, archive_folder_path)