169 lines
5.8 KiB
Python
169 lines
5.8 KiB
Python
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import matplotlib.pyplot as plt
|
|
import numpy as np
|
|
from sklearn.decomposition import PCA
|
|
from sklearn.manifold import TSNE
|
|
|
|
# define data path containing the npy output files from your method
|
|
all_data_path = Path(
|
|
"/home/fedex/mt/projects/thesis-kowalczyk-jan/Deep-SAD-PyTorch/infer/DeepSAD/all_infer/inference"
|
|
)
|
|
|
|
output_path = Path("/home/fedex/mt/plots/deepsad_reduced_latent_space")
|
|
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
|
|
latest_folder_path = output_path / "latest"
|
|
archive_folder_path = output_path / "archive"
|
|
output_datetime_path = output_path / datetime_folder_name
|
|
|
|
# create required output directories if they do not exist
|
|
output_path.mkdir(exist_ok=True, parents=True)
|
|
output_datetime_path.mkdir(exist_ok=True, parents=True)
|
|
latest_folder_path.mkdir(exist_ok=True, parents=True)
|
|
archive_folder_path.mkdir(exist_ok=True, parents=True)
|
|
|
|
normal_experiment_paths = []
|
|
anomaly_experiment_paths = []
|
|
|
|
# locate and sort the npy files (experiment outputs) based on file size
|
|
for file_path in all_data_path.iterdir():
|
|
if file_path.suffix != ".npy":
|
|
continue
|
|
# check if the file name contains "output" to ensure it's an experiment output file
|
|
if "output" not in file_path.stem:
|
|
continue
|
|
if "smoke" in file_path.name:
|
|
anomaly_experiment_paths.append(file_path)
|
|
else:
|
|
normal_experiment_paths.append(file_path)
|
|
|
|
print("Normal experiments:")
|
|
for path in normal_experiment_paths:
|
|
print(path.name)
|
|
|
|
print("\nAnomaly experiments:")
|
|
for path in anomaly_experiment_paths:
|
|
print(path.name)
|
|
|
|
normal_experiment_paths.sort(key=lambda path: path.stat().st_size)
|
|
anomaly_experiment_paths.sort(key=lambda path: path.stat().st_size)
|
|
|
|
|
|
def load_latent_space_data(experiment_paths):
|
|
"""
|
|
Load latent space data from npy files and return a single numpy array.
|
|
Modify this function if your file structure is different.
|
|
"""
|
|
data_list = []
|
|
for path in experiment_paths:
|
|
latent_data = np.load(path)
|
|
data_list.append(latent_data)
|
|
return np.vstack(data_list)
|
|
|
|
|
|
def reduce_dimensionality(data, n_components=50):
|
|
"""
|
|
Reduce the dimensionality of the data using PCA.
|
|
This function can be re-used by TSNE or other methods for an initial reduction.
|
|
"""
|
|
pca = PCA(n_components=n_components, random_state=42)
|
|
return pca.fit_transform(data)
|
|
|
|
|
|
def plot_tsne_latent_space(normal_data, anomaly_data, title="TSNE of Latent Space"):
|
|
"""
|
|
Plot the TSNE representation of the latent space.
|
|
This function first applies a PCA-based dimensionality reduction for efficiency.
|
|
"""
|
|
# Hardcoded variables to choose every nth normal sample and mth anomaly sample
|
|
n = 10 # Change this value to select every nth normal sample
|
|
m = 2 # Change this value to select every mth anomaly sample
|
|
|
|
# Select every nth normal sample and mth anomaly sample
|
|
normal_data = normal_data[::n]
|
|
anomaly_data = anomaly_data[::m]
|
|
|
|
# Combine normal and anomaly data
|
|
combined_data = np.vstack((normal_data, anomaly_data))
|
|
|
|
# Initial dimensionality reduction with PCA
|
|
reduced_data = reduce_dimensionality(combined_data, n_components=100)
|
|
|
|
# Apply TSNE transformation on the PCA-reduced data
|
|
tsne = TSNE(n_components=2, random_state=42)
|
|
tsne_results = tsne.fit_transform(reduced_data)
|
|
|
|
# Split the TSNE results back into normal and anomaly arrays
|
|
tsne_normal = tsne_results[: len(normal_data)]
|
|
tsne_anomaly = tsne_results[len(normal_data) :]
|
|
|
|
# Plotting TSNE results
|
|
plt.clf()
|
|
plt.figure(figsize=(10, 5))
|
|
plt.scatter(
|
|
tsne_anomaly[:, 0], tsne_anomaly[:, 1], label="Anomaly", alpha=0.6, marker="x"
|
|
)
|
|
plt.scatter(
|
|
tsne_normal[:, 0], tsne_normal[:, 1], label="Normal", alpha=0.6, marker="o"
|
|
)
|
|
plt.title(title)
|
|
plt.legend()
|
|
plt.tight_layout()
|
|
plt.savefig(output_datetime_path / "tsne_latent_space_plot.png")
|
|
|
|
|
|
def plot_pca_scatter(normal_data, anomaly_data, title="PCA Scatter Plot"):
|
|
"""
|
|
Plot a 2-dimensional scatterplot of the latent space using PCA.
|
|
This is useful for visualization and can be easily extended.
|
|
"""
|
|
# Combine normal and anomaly data
|
|
combined_data = np.vstack((normal_data, anomaly_data))
|
|
pca = PCA(n_components=2, random_state=42)
|
|
pca_results = pca.fit_transform(combined_data)
|
|
|
|
# Split the PCA results back into normal and anomaly arrays
|
|
pca_normal = pca_results[: len(normal_data)]
|
|
pca_anomaly = pca_results[len(normal_data) :]
|
|
|
|
# Plotting PCA scatter results
|
|
plt.clf()
|
|
plt.figure(figsize=(10, 5))
|
|
plt.scatter(
|
|
pca_anomaly[:, 0], pca_anomaly[:, 1], label="Anomaly", alpha=0.6, marker="x"
|
|
)
|
|
plt.scatter(
|
|
pca_normal[:, 0], pca_normal[:, 1], label="Normal", alpha=0.6, marker="o"
|
|
)
|
|
plt.title(title)
|
|
plt.legend()
|
|
plt.tight_layout()
|
|
plt.savefig(output_datetime_path / "pca_latent_space_plot.png")
|
|
|
|
|
|
# load latent space data for both normal and anomalous experiments
|
|
normal_data = load_latent_space_data(normal_experiment_paths)
|
|
anomaly_data = load_latent_space_data(anomaly_experiment_paths)
|
|
|
|
# call your plotting functions
|
|
plot_tsne_latent_space(normal_data, anomaly_data)
|
|
plot_pca_scatter(normal_data, anomaly_data)
|
|
|
|
# update the 'latest' results folder: delete previous and copy current outputs
|
|
shutil.rmtree(latest_folder_path, ignore_errors=True)
|
|
latest_folder_path.mkdir(exist_ok=True, parents=True)
|
|
|
|
for file in output_datetime_path.iterdir():
|
|
shutil.copy2(file, latest_folder_path)
|
|
|
|
# copy this script to the output folder and to the latest folder to preserve the used code
|
|
script_path = Path(__file__)
|
|
shutil.copy2(script_path, output_datetime_path)
|
|
shutil.copy2(script_path, latest_folder_path)
|
|
|
|
# move the output date folder to the archive folder for record keeping
|
|
shutil.move(output_datetime_path, archive_folder_path)
|