Files
mt/tools/plot_scripts/data_count_lidar_frames.py
2025-03-14 18:02:23 +01:00

180 lines
7.0 KiB
Python

# this script loads the numpy array files containing the lidar frames and counts the number of frames in each file
# the number of frames is then printed to the console per file as well as a congregated sum of all frames in files
# containing the word smoke and ones that do not contain that word, as well as an overall sum of all frames
# We also plot a pie chart of the distribution of data points in normal and anomalous experiments
import shutil
from datetime import datetime
from pathlib import Path
import numpy as np
from tabulate import tabulate
# define data path containing the numpy array files and output path for the plots
data_path = Path("/home/fedex/mt/data/subter")
output_path = Path("/home/fedex/mt/plots/data_count_lidar_frames")
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
latest_folder_path = output_path / "latest"
archive_folder_path = output_path / "archive"
output_datetime_path = output_path / datetime_folder_name
# if output does not exist, create it
output_path.mkdir(exist_ok=True, parents=True)
output_datetime_path.mkdir(exist_ok=True, parents=True)
latest_folder_path.mkdir(exist_ok=True, parents=True)
archive_folder_path.mkdir(exist_ok=True, parents=True)
# find all numpy array files and sort them correctly by name
normal_experiment_paths, anomaly_experiment_paths = [], []
for npy_file_path in data_path.iterdir():
if npy_file_path.suffix != ".npy":
continue
if "smoke" in npy_file_path.name:
anomaly_experiment_paths.append(npy_file_path)
else:
normal_experiment_paths.append(npy_file_path)
# function that counts the number of frames in one experiment
def count_frames(npy_file_path):
frames = np.load(npy_file_path).shape[0]
return frames
# we want to print the numbers of frames in a table so we first gather all the data in two maps
normal_experiment_frames = {
npy_file_path.stem: count_frames(npy_file_path)
for npy_file_path in normal_experiment_paths
}
anomaly_experiment_frames = {
npy_file_path.stem: count_frames(npy_file_path)
for npy_file_path in anomaly_experiment_paths
}
# prepare data for tabulate
normal_experiment_table = [
(experiment, frames) for experiment, frames in normal_experiment_frames.items()
]
anomaly_experiment_table = [
(experiment, frames) for experiment, frames in anomaly_experiment_frames.items()
]
# sort the tables by experiment name
normal_experiment_table.sort(key=lambda x: x[0])
anomaly_experiment_table.sort(key=lambda x: x[0])
# add the sum of all frames to the tables
normal_experiment_table.append(("Sum", sum(normal_experiment_frames.values())))
anomaly_experiment_table.append(("Sum", sum(anomaly_experiment_frames.values())))
# print the number of frames in each file using tabulate
print("Normal experiments:")
print(
tabulate(normal_experiment_table, headers=["Experiment", "Frames"], tablefmt="grid")
)
# print the smallest, largest, mean and median time of the normal experiments assuming 10 frames per second
normal_experiment_frames_values = list(normal_experiment_frames.values())
print(
f"Smallest time: {min(normal_experiment_frames_values) / 10} seconds, Largest time: {max(normal_experiment_frames_values) / 10} seconds, Mean time: {np.mean(normal_experiment_frames_values) / 10} seconds, Median time: {np.median(normal_experiment_frames_values) / 10} seconds"
)
print("Anomaly experiments:")
print(
tabulate(
anomaly_experiment_table, headers=["Experiment", "Frames"], tablefmt="grid"
)
)
# print the smallest, largest, mean and median time of the anomalous experiments assuming 10 frames per second
anomaly_experiment_frames_values = list(anomaly_experiment_frames.values())
print(
f"Smallest time: {min(anomaly_experiment_frames_values) / 10} seconds, Largest time: {max(anomaly_experiment_frames_values) / 10} seconds, Mean time: {np.mean(anomaly_experiment_frames_values) / 10} seconds, Median time: {np.median(anomaly_experiment_frames_values) / 10} seconds"
)
# print the sum of all frames in all experiments
total_frames = sum(normal_experiment_frames.values()) + sum(
anomaly_experiment_frames.values()
)
print(f"Total frames in all (normal and anmoaly) experiments: {total_frames} frames")
# print the sum of normal and anomalous experiments as percentage of the total frames
print(
f"Percentage of normal experiments: {sum(normal_experiment_frames.values()) / total_frames * 100}%"
)
print(
f"Percentage of anomaly experiments: {sum(anomaly_experiment_frames.values()) / total_frames * 100}%"
)
sum(normal_experiment_frames.values()) + sum(anomaly_experiment_frames.values())
# define function to plot pie chart of the distribution of data points in normal and anomalous experiments
def plot_data_points_pie(normal_experiment_frames, anomaly_experiment_frames):
import matplotlib.pyplot as plt
# we want to plot the sum of all frames in normal and anomaly experiments as total values and also percentages
total_normal_frames = sum(normal_experiment_frames.values())
total_anomaly_frames = sum(anomaly_experiment_frames.values())
total_frames = total_normal_frames + total_anomaly_frames
# prepare data for pie chart
labels = [
"Normal Lidar Frames\nNon-Degraded Pointclouds",
"Anomalous Lidar Frames\nDegraded Pointclouds",
]
sizes = [total_normal_frames, total_anomaly_frames]
explode = (0.1, 0) # explode the normal slice
# define an autopct function that shows percentage and total number of frames per slice
def make_autopct(pct):
return f"{pct:.1f}%\n({int(pct * total_frames / 100)} frames)"
# plot pie chart without shadow and with custom autopct
fig1, ax1 = plt.subplots()
# set a figure size of 10x5
fig1.set_size_inches(10, 5)
ax1.pie(sizes, explode=explode, labels=labels, autopct=make_autopct, shadow=False)
# for labels use center alignment
ax1.axis("equal") # Equal aspect ratio ensures that pie is drawn as a circle.
# display the total number of frames in the center of the pie chart (adjusted vertically)
plt.text(
0,
0.2,
f"Total:\n{total_frames} frames",
fontsize=12,
ha="center",
va="center",
color="black",
)
plt.title(
"Distribution of Normal and Anomalous\nPointclouds in all Experiments (Lidar Frames)"
)
plt.tight_layout()
# save the plot
plt.savefig(output_datetime_path / "data_points_pie.png")
plot_data_points_pie(normal_experiment_frames, anomaly_experiment_frames)
# delete current latest folder
shutil.rmtree(latest_folder_path, ignore_errors=True)
# create new latest folder
latest_folder_path.mkdir(exist_ok=True, parents=True)
# copy contents of output folder to the latest folder
for file in output_datetime_path.iterdir():
shutil.copy2(file, latest_folder_path)
# copy this python script to the output datetime folder to preserve the code used to generate the plots
shutil.copy2(__file__, output_datetime_path)
shutil.copy2(__file__, latest_folder_path)
# move output date folder to archive
shutil.move(output_datetime_path, archive_folder_path)