2025-03-14 18:02:23 +01:00
import pickle
import shutil
from datetime import datetime
from pathlib import Path
import matplotlib . pyplot as plt
import numpy as np
# define data path containing the bag files
all_data_path = Path ( " /home/fedex/mt/data/subter " )
output_path = Path ( " /home/fedex/mt/plots/data_particles_near_sensor " )
datetime_folder_name = datetime . now ( ) . strftime ( " % Y- % m- %d _ % H- % M- % S " )
latest_folder_path = output_path / " latest "
archive_folder_path = output_path / " archive "
output_datetime_path = output_path / datetime_folder_name
# if output does not exist, create it
output_path . mkdir ( exist_ok = True , parents = True )
output_datetime_path . mkdir ( exist_ok = True , parents = True )
latest_folder_path . mkdir ( exist_ok = True , parents = True )
archive_folder_path . mkdir ( exist_ok = True , parents = True )
normal_experiment_paths , anomaly_experiment_paths = [ ] , [ ]
# find all bag files and sort them correctly by name (experiments with smoke in the name are anomalies)
for bag_file_path in all_data_path . iterdir ( ) :
if bag_file_path . suffix != " .bag " :
continue
if " smoke " in bag_file_path . name :
anomaly_experiment_paths . append ( bag_file_path )
else :
normal_experiment_paths . append ( bag_file_path )
# print out the names of the normal and anomaly experiments that we found
print ( " Normal experiments: " )
for path in normal_experiment_paths :
print ( path . name )
print ( " \n Anomaly experiments: " )
for path in anomaly_experiment_paths :
print ( path . name )
# sort anomaly and normal experiments by filesize, ascending
anomaly_experiment_paths . sort ( key = lambda path : path . stat ( ) . st_size )
normal_experiment_paths . sort ( key = lambda path : path . stat ( ) . st_size )
# function that plots histogram of how many measurements have a range smaller than 0.5m for both normal and anomaly experiments
def plot_data_points ( normal_experiment_paths , anomaly_experiment_paths , title ) :
# function that finds the number of measurements with a range smaller than 0.5m in list of experiments (used for both normal and anomalous)
def find_particles_near_sensor ( experiment_paths , range_threshold ) :
particles_near_sensor = [ ]
for dataset in (
Dataset . from_file ( experiment_path , topic = " /ouster/points " )
for experiment_path in experiment_paths
) :
particles_near_sensor_per_pc = [ ]
for pc in dataset :
particles_near_sensor_per_pc . append (
pc . data [ pc . data [ " range " ] < range_threshold ] . shape [ 0 ]
)
particles_near_sensor . append ( particles_near_sensor_per_pc )
return particles_near_sensor
range_thresholds = [ 500 , 750 , 1000 , 1250 , 1500 ]
for rt in range_thresholds :
print ( f " Processing range threshold { rt } ... " )
# check if the data has already been calculated and saved to a pickle file
if ( output_path / f " particles_near_sensor_counts_ { rt } .pkl " ) . exists ( ) :
with open (
output_path / f " particles_near_sensor_counts_ { rt } .pkl " , " rb "
) as file :
particles_near_sensor_normal , particles_near_sensor_anomaly = (
pickle . load ( file )
)
else :
particles_near_sensor_normal = find_particles_near_sensor (
normal_experiment_paths ,
rt ,
)
particles_near_sensor_anomaly = find_particles_near_sensor (
anomaly_experiment_paths ,
rt ,
)
# for faster subsequent runs, save the data to a pickle file
with open ( output_path / f " particles_near_sensor_counts_ { rt } .pkl " , " wb " ) as file :
pickle . dump (
( particles_near_sensor_normal , particles_near_sensor_anomaly ) ,
file ,
protocol = pickle . HIGHEST_PROTOCOL ,
)
# combine all counts of how many particles are close to the sensor into one list for each type of experiment
particles_near_sensor_normal = np . concatenate ( particles_near_sensor_normal )
particles_near_sensor_anomaly = np . concatenate ( particles_near_sensor_anomaly )
# find min and max of both categories so we can set the same limits for both plots
min = np . min (
[
np . min ( particles_near_sensor_normal ) ,
np . min ( particles_near_sensor_anomaly ) ,
]
)
max = np . max (
[
np . max ( particles_near_sensor_normal ) ,
np . max ( particles_near_sensor_anomaly ) ,
]
)
# create bins array with min and max values
bins = np . linspace ( min , max , 100 )
# since the two histograms (normal and anomalous) have different scales, normalize their amplitude and plot a density version as well
# commented out since boxplot is more informative
# plt.clf()
# plt.figure(figsize=(10, 5))
# plt.hist(
# particles_near_sensor_normal,
# bins=bins,
# alpha=0.5,
# label="Normal Experiments (No Artifical Smoke)",
# color="blue",
# density=True,
# )
# plt.hist(
# particles_near_sensor_anomaly,
# bins=bins,
# alpha=0.5,
# color="red",
# label="Anomaly Experiments (With Artifical Smoke)",
# density=True,
# )
# plt.title(title)
# plt.xlabel("Number of Particles Near Sensor")
# plt.ylabel("Density")
# plt.legend()
# plt.tight_layout()
# plt.savefig(output_datetime_path / f"particles_near_sensor_density_{rt}.png")
# alternatively create a box plot to show the distribution of the data
# instead of plotting the frequency of particles near sensor we'll plot the percentage of points (compared to the total number of points in the pointcloud)
data_resolution = 32 * 2048
plt . clf ( )
plt . figure ( figsize = ( 10 , 5 ) )
plt . boxplot (
[
particles_near_sensor_normal / data_resolution ,
particles_near_sensor_anomaly / data_resolution ,
] ,
tick_labels = [
" Normal Experiments (No Artifical Smoke) " ,
" Anomaly Experiments (With Artifical Smoke) " ,
] ,
)
# format the y axis labels as percentages
plt . gca ( ) . set_yticklabels (
[ " {:.0f} % " . format ( y * 100 ) for y in plt . gca ( ) . get_yticks ( ) ]
)
2025-10-21 19:04:19 +02:00
# plt.title("Particles Closer than 0.5m to the Sensor")
2025-03-14 18:02:23 +01:00
plt . ylabel ( " Percentage of measurements closer than 0.5m " )
plt . tight_layout ( )
plt . savefig ( output_datetime_path / f " particles_near_sensor_boxplot_ { rt } .png " )
# we create the same boxplot but limit the y-axis to 5% to better see the distribution of the data
plt . clf ( )
plt . figure ( figsize = ( 10 , 5 ) )
plt . boxplot (
[
particles_near_sensor_normal / data_resolution ,
particles_near_sensor_anomaly / data_resolution ,
] ,
tick_labels = [
" Normal Experiments (No Artifical Smoke) " ,
" Anomaly Experiments (With Artifical Smoke) " ,
] ,
)
# format the y axis labels as percentages
plt . gca ( ) . set_yticklabels (
[ " {:.0f} % " . format ( y * 100 ) for y in plt . gca ( ) . get_yticks ( ) ]
)
2025-10-21 19:04:19 +02:00
# plt.title("Particles Closer than 0.5m to the Sensor")
2025-03-14 18:02:23 +01:00
plt . ylabel ( " Percentage of measurements closer than 0.5m " )
plt . ylim ( 0 , 0.05 )
plt . tight_layout ( )
plt . savefig (
output_datetime_path / f " particles_near_sensor_boxplot_zoomed_ { rt } .png "
)
# plot histogram of how many measurements have a range smaller than 0.5m for both normal and anomaly experiments
plot_data_points (
normal_experiment_paths ,
anomaly_experiment_paths ,
" Density of Number of Particles Near Sensor " ,
)
# delete current latest folder
shutil . rmtree ( latest_folder_path , ignore_errors = True )
# create new latest folder
latest_folder_path . mkdir ( exist_ok = True , parents = True )
# copy contents of output folder to the latest folder
for file in output_datetime_path . iterdir ( ) :
shutil . copy2 ( file , latest_folder_path )
# copy this python script to the output datetime folder to preserve the code used to generate the plots
shutil . copy2 ( __file__ , output_datetime_path )
shutil . copy2 ( __file__ , latest_folder_path )
# move output date folder to archive
shutil . move ( output_datetime_path , archive_folder_path )