# this script loads the numpy array files containing the lidar frames and counts the number of frames in each file # the number of frames is then printed to the console per file as well as a congregated sum of all frames in files # containing the word smoke and ones that do not contain that word, as well as an overall sum of all frames # We also plot a pie chart of the distribution of data points in normal and anomalous experiments import shutil from datetime import datetime from pathlib import Path import numpy as np from tabulate import tabulate # define data path containing the numpy array files and output path for the plots data_path = Path("/home/fedex/mt/data/subter") output_path = Path("/home/fedex/mt/plots/data_count_lidar_frames") datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") latest_folder_path = output_path / "latest" archive_folder_path = output_path / "archive" output_datetime_path = output_path / datetime_folder_name # if output does not exist, create it output_path.mkdir(exist_ok=True, parents=True) output_datetime_path.mkdir(exist_ok=True, parents=True) latest_folder_path.mkdir(exist_ok=True, parents=True) archive_folder_path.mkdir(exist_ok=True, parents=True) # find all numpy array files and sort them correctly by name normal_experiment_paths, anomaly_experiment_paths = [], [] for npy_file_path in data_path.iterdir(): if npy_file_path.suffix != ".npy": continue if "smoke" in npy_file_path.name: anomaly_experiment_paths.append(npy_file_path) else: normal_experiment_paths.append(npy_file_path) # function that counts the number of frames in one experiment def count_frames(npy_file_path): frames = np.load(npy_file_path).shape[0] return frames # we want to print the numbers of frames in a table so we first gather all the data in two maps normal_experiment_frames = { npy_file_path.stem: count_frames(npy_file_path) for npy_file_path in normal_experiment_paths } anomaly_experiment_frames = { npy_file_path.stem: count_frames(npy_file_path) for npy_file_path in anomaly_experiment_paths } # prepare data for tabulate normal_experiment_table = [ (experiment, frames) for experiment, frames in normal_experiment_frames.items() ] anomaly_experiment_table = [ (experiment, frames) for experiment, frames in anomaly_experiment_frames.items() ] # sort the tables by experiment name normal_experiment_table.sort(key=lambda x: x[0]) anomaly_experiment_table.sort(key=lambda x: x[0]) # add the sum of all frames to the tables normal_experiment_table.append(("Sum", sum(normal_experiment_frames.values()))) anomaly_experiment_table.append(("Sum", sum(anomaly_experiment_frames.values()))) # print the number of frames in each file using tabulate print("Normal experiments:") print( tabulate(normal_experiment_table, headers=["Experiment", "Frames"], tablefmt="grid") ) # print the smallest, largest, mean and median time of the normal experiments assuming 10 frames per second normal_experiment_frames_values = list(normal_experiment_frames.values()) print( f"Smallest time: {min(normal_experiment_frames_values) / 10} seconds, Largest time: {max(normal_experiment_frames_values) / 10} seconds, Mean time: {np.mean(normal_experiment_frames_values) / 10} seconds, Median time: {np.median(normal_experiment_frames_values) / 10} seconds" ) print("Anomaly experiments:") print( tabulate( anomaly_experiment_table, headers=["Experiment", "Frames"], tablefmt="grid" ) ) # print the smallest, largest, mean and median time of the anomalous experiments assuming 10 frames per second anomaly_experiment_frames_values = list(anomaly_experiment_frames.values()) print( f"Smallest time: {min(anomaly_experiment_frames_values) / 10} seconds, Largest time: {max(anomaly_experiment_frames_values) / 10} seconds, Mean time: {np.mean(anomaly_experiment_frames_values) / 10} seconds, Median time: {np.median(anomaly_experiment_frames_values) / 10} seconds" ) # print the sum of all frames in all experiments total_frames = sum(normal_experiment_frames.values()) + sum( anomaly_experiment_frames.values() ) print(f"Total frames in all (normal and anmoaly) experiments: {total_frames} frames") # print the sum of normal and anomalous experiments as percentage of the total frames print( f"Percentage of normal experiments: {sum(normal_experiment_frames.values()) / total_frames * 100}%" ) print( f"Percentage of anomaly experiments: {sum(anomaly_experiment_frames.values()) / total_frames * 100}%" ) sum(normal_experiment_frames.values()) + sum(anomaly_experiment_frames.values()) # define function to plot pie chart of the distribution of data points in normal and anomalous experiments def plot_data_points_pie(normal_experiment_frames, anomaly_experiment_frames): import matplotlib.pyplot as plt # we want to plot the sum of all frames in normal and anomaly experiments as total values and also percentages total_normal_frames = sum(normal_experiment_frames.values()) total_anomaly_frames = sum(anomaly_experiment_frames.values()) total_frames = total_normal_frames + total_anomaly_frames # prepare data for pie chart labels = [ "Normal Lidar Frames\nNon-Degraded Pointclouds", "Anomalous Lidar Frames\nDegraded Pointclouds", ] sizes = [total_normal_frames, total_anomaly_frames] explode = (0.1, 0) # explode the normal slice # define an autopct function that shows percentage and total number of frames per slice def make_autopct(pct): return f"{pct:.1f}%\n({int(pct * total_frames / 100)} frames)" # plot pie chart without shadow and with custom autopct fig1, ax1 = plt.subplots() # set a figure size of 10x5 fig1.set_size_inches(10, 5) ax1.pie(sizes, explode=explode, labels=labels, autopct=make_autopct, shadow=False) # for labels use center alignment ax1.axis("equal") # Equal aspect ratio ensures that pie is drawn as a circle. # display the total number of frames in the center of the pie chart (adjusted vertically) plt.text( 0, 0.2, f"Total:\n{total_frames} frames", fontsize=12, ha="center", va="center", color="black", ) plt.title( "Distribution of Normal and Anomalous\nPointclouds in all Experiments (Lidar Frames)" ) plt.tight_layout() # save the plot plt.savefig(output_datetime_path / "data_points_pie.png") plot_data_points_pie(normal_experiment_frames, anomaly_experiment_frames) # delete current latest folder shutil.rmtree(latest_folder_path, ignore_errors=True) # create new latest folder latest_folder_path.mkdir(exist_ok=True, parents=True) # copy contents of output folder to the latest folder for file in output_datetime_path.iterdir(): shutil.copy2(file, latest_folder_path) # copy this python script to the output datetime folder to preserve the code used to generate the plots shutil.copy2(__file__, output_datetime_path) shutil.copy2(__file__, latest_folder_path) # move output date folder to archive shutil.move(output_datetime_path, archive_folder_path)