This commit is contained in:
Jan Kowalczyk
2025-06-10 09:31:28 +02:00
parent 3538b15073
commit 156b6d2ac1
8 changed files with 794 additions and 580 deletions

View File

@@ -18,7 +18,7 @@ def load_dataset(
ratio_pollution: float = 0.0,
random_state=None,
inference: bool = False,
k_fold: bool = False,
k_fold_num: int = None,
num_known_normal: int = 0,
num_known_outlier: int = 0,
):
@@ -45,11 +45,8 @@ def load_dataset(
if dataset_name == "subter":
dataset = SubTer_Dataset(
root=data_path,
ratio_known_normal=ratio_known_normal,
ratio_known_outlier=ratio_known_outlier,
ratio_pollution=ratio_pollution,
inference=inference,
k_fold=k_fold,
k_fold_num=k_fold_num,
num_known_normal=num_known_normal,
num_known_outlier=num_known_outlier,
)

View File

@@ -1,6 +1,5 @@
import json
import logging
import random
from pathlib import Path
from typing import Callable, Optional
@@ -8,596 +7,350 @@ import numpy as np
import torch
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import Subset
from torch.utils.data.dataset import ConcatDataset
from torchvision.datasets import VisionDataset
from base.torchvision_dataset import TorchvisionDataset
from .preprocessing import create_semisupervised_setting
class SubTer_Dataset(TorchvisionDataset):
"""
Wrapper for SubTerTraining and SubTerInference, sets up train/test/inference/data_set as needed.
"""
def __init__(
self,
root: str,
ratio_known_normal: float = 0.0,
ratio_known_outlier: float = 0.0,
ratio_pollution: float = 0.0,
inference: bool = False,
k_fold: bool = False,
num_known_normal: int = 0,
num_known_outlier: int = 0,
only_use_given_semi_targets_for_evaluation: bool = True,
k_fold_num: int = None,
inference: bool = False,
transform: Optional[Callable] = None,
target_transform: Optional[Callable] = None,
seed: int = 0,
split: float = 0.7,
):
super().__init__(root)
if Path(root).is_dir():
with open(Path(root) / "semi_targets.json", "r") as f:
data = json.load(f)
semi_targets_given = {
item["filename"]: (
item["semi_target_begin_frame"],
item["semi_target_end_frame"],
)
for item in data["files"]
}
# Define normal and outlier classes
self.n_classes = 2 # 0: normal, 1: outlier
self.normal_classes = tuple([0])
self.outlier_classes = tuple([1])
super().__init__(root, k_fold_number=k_fold_num)
self.inference_set = None
# MNIST preprocessing: feature scaling to [0, 1]
# FIXME understand mnist feature scaling and check if it or other preprocessing is necessary for elpv
transform = transforms.ToTensor()
target_transform = transforms.Lambda(lambda x: int(x in self.outlier_classes))
self.train_set = None
self.test_set = None
self.data_set = None
if inference:
self.inference_set = SubTerInference(
root=self.root,
root=root,
transform=transform,
)
return
# Always require the manual label file
manual_json_path = Path(root) / "manually_labeled_anomaly_frames.json"
if not manual_json_path.exists():
raise FileNotFoundError(f"Required file not found: {manual_json_path}")
# For k_fold, data_set is the full dataset, train/test are None
if k_fold_num is not None:
self.data_set = SubTerTraining(
root=root,
num_known_normal=num_known_normal,
num_known_outlier=num_known_outlier,
transform=transform,
target_transform=target_transform,
seed=seed,
split=1.0, # use all data for k-fold
)
self.train_set = None
self.test_set = None
else:
if k_fold:
# Get train set
data_set = SubTerTraining(
root=self.root,
transform=transform,
target_transform=target_transform,
train=True,
split=1,
semi_targets_given=semi_targets_given,
)
# Standard split
self.train_set = SubTerTraining(
root=root,
num_known_normal=num_known_normal,
num_known_outlier=num_known_outlier,
transform=transform,
target_transform=target_transform,
seed=seed,
split=split,
train=True,
)
self.test_set = SubTerTraining(
root=root,
num_known_normal=num_known_normal,
num_known_outlier=num_known_outlier,
transform=transform,
target_transform=target_transform,
seed=seed,
split=split,
train=False,
)
self.data_set = None # not used unless k_fold
np.random.seed(0)
semi_targets = data_set.semi_targets.numpy()
def get_file_name_from_idx(self, idx: int) -> Optional[str]:
"""
Get filename for a file_id by delegating to the appropriate dataset.
# Find indices where semi_targets is -1 (abnormal) or 1 (normal)
normal_indices = np.where(semi_targets == 1)[0]
abnormal_indices = np.where(semi_targets == -1)[0]
Args:
idx: The file index to look up
# Randomly select the specified number of indices to keep for each category
if len(normal_indices) > num_known_normal:
keep_normal_indices = np.random.choice(
normal_indices, size=num_known_normal, replace=False
)
else:
keep_normal_indices = (
normal_indices # Keep all if there are fewer than required
)
Returns:
str: The filename corresponding to the index, or None if not found
"""
if len(abnormal_indices) > num_known_outlier:
keep_abnormal_indices = np.random.choice(
abnormal_indices, size=num_known_outlier, replace=False
)
else:
keep_abnormal_indices = (
abnormal_indices # Keep all if there are fewer than required
)
# For non-inference, use any available dataset (they all have the same files)
if self.data_set is not None:
return self.data_set.get_file_name_from_idx(idx)
if self.train_set is not None:
return self.train_set.get_file_name_from_idx(idx)
if self.test_set is not None:
return self.test_set.get_file_name_from_idx(idx)
if self.inference_set is not None:
return self.inference_set.get_file_name_from_idx(idx)
# Set all values to 0, then restore only the selected -1 and 1 values
semi_targets[(semi_targets == 1) | (semi_targets == -1)] = 0
semi_targets[keep_normal_indices] = 1
semi_targets[keep_abnormal_indices] = -1
data_set.semi_targets = torch.tensor(semi_targets, dtype=torch.int8)
self.data_set = data_set
# # Create semi-supervised setting
# idx, _, semi_targets = create_semisupervised_setting(
# data_set.targets.cpu().data.numpy(),
# self.normal_classes,
# self.outlier_classes,
# self.outlier_classes,
# ratio_known_normal,
# ratio_known_outlier,
# ratio_pollution,
# )
# data_set.semi_targets[idx] = torch.tensor(
# np.array(semi_targets, dtype=np.int8)
# ) # set respective semi-supervised labels
# # Subset data_set to semi-supervised setup
# self.data_set = Subset(data_set, idx)
else:
# Get train set
if only_use_given_semi_targets_for_evaluation:
pass
train_set = SubTerTrainingSelective(
root=self.root,
transform=transform,
target_transform=target_transform,
train=True,
num_known_outlier=num_known_outlier,
semi_targets_given=semi_targets_given,
)
np.random.seed(0)
semi_targets = train_set.semi_targets.numpy()
# Find indices where semi_targets is -1 (abnormal) or 1 (normal)
normal_indices = np.where(semi_targets == 1)[0]
# Randomly select the specified number of indices to keep for each category
if len(normal_indices) > num_known_normal:
keep_normal_indices = np.random.choice(
normal_indices, size=num_known_normal, replace=False
)
else:
keep_normal_indices = (
normal_indices # Keep all if there are fewer than required
)
# Set all values to 0, then restore only the selected -1 and 1 values
semi_targets[semi_targets == 1] = 0
semi_targets[keep_normal_indices] = 1
train_set.semi_targets = torch.tensor(
semi_targets, dtype=torch.int8
)
self.train_set = train_set
self.test_set = SubTerTrainingSelective(
root=self.root,
transform=transform,
target_transform=target_transform,
num_known_outlier=num_known_outlier,
train=False,
semi_targets_given=semi_targets_given,
)
else:
train_set = SubTerTraining(
root=self.root,
transform=transform,
target_transform=target_transform,
train=True,
semi_targets_given=semi_targets_given,
)
# Create semi-supervised setting
idx, _, semi_targets = create_semisupervised_setting(
train_set.targets.cpu().data.numpy(),
self.normal_classes,
self.outlier_classes,
self.outlier_classes,
ratio_known_normal,
ratio_known_outlier,
ratio_pollution,
)
train_set.semi_targets[idx] = torch.tensor(
np.array(semi_targets, dtype=np.int8)
) # set respective semi-supervised labels
# Subset train_set to semi-supervised setup
self.train_set = Subset(train_set, idx)
# Get test set
self.test_set = SubTerTraining(
root=self.root,
train=False,
transform=transform,
target_transform=target_transform,
semi_targets_given=semi_targets_given,
)
return None
class SubTerTraining(VisionDataset):
"""
Loads all data, builds targets, and supports train/test split.
"""
def __init__(
self,
root: str,
transforms: Optional[Callable] = None,
num_known_normal: int = 0,
num_known_outlier: int = 0,
transform: Optional[Callable] = None,
target_transform: Optional[Callable] = None,
train=False,
split=0.7,
seed=0,
semi_targets_given=None,
only_use_given_semi_targets_for_evaluation=False,
seed: int = 0,
split: float = 0.7,
train: bool = True,
):
super(SubTerTraining, self).__init__(
root, transforms, transform, target_transform
)
experiments_data = []
experiments_targets = []
experiments_semi_targets = []
# validation_files = []
experiment_files = []
experiment_frame_ids = []
experiment_file_ids = []
file_names = {}
for file_idx, experiment_file in enumerate(sorted(Path(root).iterdir())):
# if experiment_file.is_dir() and experiment_file.name == "validation":
# for validation_file in experiment_file.iterdir():
# if validation_file.suffix != ".npy":
# continue
# validation_files.append(experiment_file)
if experiment_file.suffix != ".npy":
continue
file_names[file_idx] = experiment_file.name
experiment_files.append(experiment_file)
experiment_data = np.load(experiment_file)
experiment_targets = (
np.ones(experiment_data.shape[0], dtype=np.int8)
if "smoke" in experiment_file.name
else np.zeros(experiment_data.shape[0], dtype=np.int8)
)
# experiment_data = np.lib.format.open_memmap(experiment_file, mode='r+')
experiment_semi_targets = np.zeros(experiment_data.shape[0], dtype=np.int8)
if "smoke" not in experiment_file.name:
experiment_semi_targets = np.ones(
experiment_data.shape[0], dtype=np.int8
)
else:
if semi_targets_given:
if experiment_file.name in semi_targets_given:
semi_target_begin_frame, semi_target_end_frame = (
semi_targets_given[experiment_file.name]
)
experiment_semi_targets[
semi_target_begin_frame:semi_target_end_frame
] = -1
else:
experiment_semi_targets = (
np.ones(experiment_data.shape[0], dtype=np.int8) * -1
)
experiment_file_ids.append(
np.full(experiment_data.shape[0], file_idx, dtype=np.int8)
)
experiment_frame_ids.append(
np.arange(experiment_data.shape[0], dtype=np.int32)
)
experiments_data.append(experiment_data)
experiments_targets.append(experiment_targets)
experiments_semi_targets.append(experiment_semi_targets)
# filtered_validation_files = []
# for validation_file in validation_files:
# validation_file_name = validation_file.name
# file_exists_in_experiments = any(
# experiment_file.name == validation_file_name
# for experiment_file in experiment_files
# )
# if not file_exists_in_experiments:
# filtered_validation_files.append(validation_file)
# validation_files = filtered_validation_files
super().__init__(root, transform=transform, target_transform=target_transform)
logger = logging.getLogger()
logger.info(
f"Train/Test experiments: {[experiment_file.name for experiment_file in experiment_files]}"
)
# logger.info(
# f"Validation experiments: {[validation_file.name for validation_file in validation_files]}"
# )
manual_json_path = Path(root) / "manually_labeled_anomaly_frames.json"
with open(manual_json_path, "r") as f:
manual_data = json.load(f)
manual_anomaly_ranges = {
item["filename"]: (
item["semi_target_begin_frame"],
item["semi_target_end_frame"],
)
for item in manual_data["files"]
}
lidar_projections = np.concatenate(experiments_data)
smoke_presence = np.concatenate(experiments_targets)
semi_targets = np.concatenate(experiments_semi_targets)
file_ids = np.concatenate(experiment_file_ids)
frame_ids = np.concatenate(experiment_frame_ids)
all_data = []
all_file_ids = []
all_frame_ids = []
all_filenames = []
test_target_experiment_based = []
test_target_manually_set = []
train_semi_targets = []
file_names = {}
file_idx = 0
for experiment_file in sorted(Path(root).iterdir()):
if experiment_file.suffix != ".npy":
continue
file_names[file_idx] = experiment_file.name
experiment_data = np.load(experiment_file)
n_frames = experiment_data.shape[0]
is_smoke = "smoke" in experiment_file.name
if is_smoke:
if experiment_file.name not in manual_anomaly_ranges:
raise ValueError(
f"Experiment file {experiment_file.name} is marked as smoke but has no manual anomaly ranges."
)
manual_anomaly_start_frame, manual_anomaly_end_frame = (
manual_anomaly_ranges[experiment_file.name]
)
# Experiment-based: 1 (normal), -1 (anomaly)
exp_based_targets = (
np.full(n_frames, -1, dtype=np.int8) # anomaly
if is_smoke
else np.full(n_frames, 1, dtype=np.int8) # normal
)
# Manually set: 1 (normal), -1 (anomaly), 0 (unknown/NaN)
if not is_smoke:
manual_targets = np.full(n_frames, 1, dtype=np.int8) # normal
else:
manual_targets = np.zeros(n_frames, dtype=np.int8) # unknown
manual_targets[
manual_anomaly_start_frame:manual_anomaly_end_frame
] = -1 # anomaly
# log how many manual anomaly frames were set to each value
logger.info(
f"Experiment {experiment_file.name}: "
f"Manual targets - normal(1): {np.sum(manual_targets == 1)}, "
f"anomaly(-1): {np.sum(manual_targets == -1)}, "
f"unknown(0): {np.sum(manual_targets == 0)}"
)
# Semi-supervised targets: 1 (known normal), -1 (known anomaly), 0 (unknown)
if not is_smoke:
semi_targets = np.ones(n_frames, dtype=np.int8) # normal
else:
semi_targets = np.zeros(n_frames, dtype=np.int8) # unknown
semi_targets[
manual_anomaly_start_frame:manual_anomaly_end_frame
] = -1 # anomaly
all_data.append(experiment_data)
all_file_ids.append(np.full(n_frames, file_idx, dtype=np.int32))
all_frame_ids.append(np.arange(n_frames, dtype=np.int32))
all_filenames.extend([experiment_file.name] * n_frames)
test_target_experiment_based.append(exp_based_targets)
test_target_manually_set.append(manual_targets)
train_semi_targets.append(semi_targets)
file_idx += 1
# Flatten everything
data = np.nan_to_num(np.concatenate(all_data))
file_ids = np.concatenate(all_file_ids)
frame_ids = np.concatenate(all_frame_ids)
filenames = all_filenames
self.file_names = file_names
test_target_experiment_based = np.concatenate(test_target_experiment_based)
test_target_manually_set = np.concatenate(test_target_manually_set)
semi_targets_np = np.concatenate(train_semi_targets)
# Limit the number of known normal/anomaly samples for training
np.random.seed(seed)
normal_indices = np.where(semi_targets_np == 1)[0]
anomaly_indices = np.where(semi_targets_np == -1)[0]
shuffled_indices = np.random.permutation(lidar_projections.shape[0])
shuffled_lidar_projections = lidar_projections[shuffled_indices]
shuffled_smoke_presence = smoke_presence[shuffled_indices]
shuffled_file_ids = file_ids[shuffled_indices]
shuffled_frame_ids = frame_ids[shuffled_indices]
shuffled_semis = semi_targets[shuffled_indices]
if num_known_normal > 0 and len(normal_indices) > num_known_normal:
keep_normal = np.random.choice(
normal_indices, size=num_known_normal, replace=False
)
else:
keep_normal = normal_indices
split_idx = int(split * shuffled_lidar_projections.shape[0])
if num_known_outlier > 0 and len(anomaly_indices) > num_known_outlier:
keep_anomaly = np.random.choice(
anomaly_indices, size=num_known_outlier, replace=False
)
else:
keep_anomaly = anomaly_indices
semi_targets_np[(semi_targets_np == 1) | (semi_targets_np == -1)] = 0
semi_targets_np[keep_normal] = 1
semi_targets_np[keep_anomaly] = -1
# Shuffle and split
indices = np.arange(len(data))
np.random.seed(seed)
np.random.shuffle(indices)
split_idx = int(split * len(data))
if train:
self.data = shuffled_lidar_projections[:split_idx]
self.targets = shuffled_smoke_presence[:split_idx]
semi_targets = shuffled_semis[:split_idx]
self.shuffled_file_ids = shuffled_file_ids[:split_idx]
self.shuffled_frame_ids = shuffled_frame_ids[:split_idx]
use_idx = indices[:split_idx]
else:
self.data = shuffled_lidar_projections[split_idx:]
self.targets = shuffled_smoke_presence[split_idx:]
semi_targets = shuffled_semis[split_idx:]
self.shuffled_file_ids = shuffled_file_ids[split_idx:]
self.shuffled_frame_ids = shuffled_frame_ids[split_idx:]
use_idx = indices[split_idx:]
self.data = np.nan_to_num(self.data)
self.data = torch.tensor(data[use_idx])
self.file_ids = file_ids[use_idx]
self.frame_ids = frame_ids[use_idx]
self.filenames = [filenames[i] for i in use_idx]
self.test_target_experiment_based = torch.tensor(
test_target_experiment_based[use_idx], dtype=torch.int8
)
self.test_target_manually_set = torch.tensor(
test_target_manually_set[use_idx], dtype=torch.int8
)
self.data = torch.tensor(self.data)
self.targets = torch.tensor(self.targets, dtype=torch.int8)
# log how many of the test_target_manually_set are in each category
logger.info(
f"Test targets - normal(1): {np.sum(self.test_target_manually_set.numpy() == 1)}, "
f"anomaly(-1): {np.sum(self.test_target_manually_set.numpy() == -1)}, "
f"unknown(0): {np.sum(self.test_target_manually_set.numpy() == 0)}"
)
if semi_targets_given is not None:
self.semi_targets = torch.tensor(semi_targets, dtype=torch.int8)
else:
self.semi_targets = torch.zeros_like(self.targets, dtype=torch.int8)
self.train_semi_targets = torch.tensor(
semi_targets_np[use_idx], dtype=torch.int8
)
self.transform = transform if transform else transforms.ToTensor()
self.target_transform = target_transform
def __len__(self):
return len(self.data)
def __getitem__(self, index):
"""Override the original method of the MNIST class.
Args:
index (int): Index
img = self.data[index]
target_experiment_based = int(self.test_target_experiment_based[index])
target_manually_set = int(self.test_target_manually_set[index])
semi_target = int(self.train_semi_targets[index])
file_id = int(self.file_ids[index])
frame_id = int(self.frame_ids[index])
Returns:
tuple: (image, target, semi_target, index)
"""
img, target, semi_target, file_id, frame_id = (
self.data[index],
int(self.targets[index]),
int(self.semi_targets[index]),
int(self.shuffled_file_ids[index]),
int(self.shuffled_frame_ids[index]),
)
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = Image.fromarray(img.numpy(), mode="F")
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
target_experiment_based = self.target_transform(target_experiment_based)
target_manually_set = self.target_transform(target_manually_set)
semi_target = self.target_transform(semi_target)
return img, target, semi_target, index, (file_id, frame_id)
return (
img,
target_experiment_based,
target_manually_set,
semi_target,
index,
(file_id, frame_id),
)
def get_file_name_from_idx(self, idx: int):
return self.file_names[idx]
return self.file_names.get(idx, None)
class SubTerInference(VisionDataset):
"""
Loads a single experiment file for inference.
"""
def __init__(
self,
root: str,
transforms: Optional[Callable] = None,
transform: Optional[Callable] = None,
):
super(SubTerInference, self).__init__(root, transforms, transform)
super().__init__(root, transform=transform)
logger = logging.getLogger()
self.experiment_file_path = Path(root)
if not self.experiment_file_path.is_file():
experiment_file = Path(root)
if not experiment_file.is_file():
logger.error(
"For inference the data path has to be a single experiment file!"
)
raise Exception("Inference data is not a loadable file!")
self.data = np.load(self.experiment_file_path)
self.data = np.load(experiment_file)
self.data = np.nan_to_num(self.data)
self.data = torch.tensor(self.data)
self.filenames = [experiment_file.name] * self.data.shape[0]
self.file_ids = np.zeros(self.data.shape[0], dtype=np.int32)
self.frame_ids = np.arange(self.data.shape[0], dtype=np.int32)
self.file_names = {0: experiment_file.name}
def __len__(self):
return len(self.data)
def __getitem__(self, index):
"""Override the original method of the MNIST class.
Args:
index (int): Index
Returns:
tuple: (image, index)
"""
img = self.data[index]
file_id = int(self.file_ids[index])
frame_id = int(self.frame_ids[index])
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = Image.fromarray(img.numpy(), mode="F")
if self.transform is not None:
img = self.transform(img)
return img, index
class SubTerTrainingSelective(VisionDataset):
def __init__(
self,
root: str,
transforms: Optional[Callable] = None,
transform: Optional[Callable] = None,
target_transform: Optional[Callable] = None,
train=False,
num_known_outlier=0,
seed=0,
semi_targets_given=None,
ratio_test_normal_to_anomalous=3,
):
super(SubTerTrainingSelective, self).__init__(
root, transforms, transform, target_transform
)
logger = logging.getLogger()
if semi_targets_given is None:
raise ValueError(
"semi_targets_given must be provided for selective training"
)
experiments_data = []
experiments_targets = []
experiments_semi_targets = []
# validation_files = []
experiment_files = []
experiment_frame_ids = []
experiment_file_ids = []
file_names = {}
for file_idx, experiment_file in enumerate(sorted(Path(root).iterdir())):
if experiment_file.suffix != ".npy":
continue
file_names[file_idx] = experiment_file.name
experiment_files.append(experiment_file)
experiment_data = np.load(experiment_file)
experiment_targets = (
np.ones(experiment_data.shape[0], dtype=np.int8)
if "smoke" in experiment_file.name
else np.zeros(experiment_data.shape[0], dtype=np.int8)
)
experiment_semi_targets = np.zeros(experiment_data.shape[0], dtype=np.int8)
if "smoke" not in experiment_file.name:
experiment_semi_targets = np.ones(
experiment_data.shape[0], dtype=np.int8
)
elif experiment_file.name in semi_targets_given:
semi_target_begin_frame, semi_target_end_frame = semi_targets_given[
experiment_file.name
]
experiment_semi_targets[
semi_target_begin_frame:semi_target_end_frame
] = -1
else:
raise ValueError(
"smoke experiment not in given semi_targets. required for selective training"
)
experiment_file_ids.append(
np.full(experiment_data.shape[0], file_idx, dtype=np.int8)
)
experiment_frame_ids.append(
np.arange(experiment_data.shape[0], dtype=np.int32)
)
experiments_data.append(experiment_data)
experiments_targets.append(experiment_targets)
experiments_semi_targets.append(experiment_semi_targets)
logger.info(
f"Train/Test experiments: {[experiment_file.name for experiment_file in experiment_files]}"
)
lidar_projections = np.concatenate(experiments_data)
smoke_presence = np.concatenate(experiments_targets)
semi_targets = np.concatenate(experiments_semi_targets)
file_ids = np.concatenate(experiment_file_ids)
frame_ids = np.concatenate(experiment_frame_ids)
self.file_names = file_names
np.random.seed(seed)
shuffled_indices = np.random.permutation(lidar_projections.shape[0])
shuffled_lidar_projections = lidar_projections[shuffled_indices]
shuffled_smoke_presence = smoke_presence[shuffled_indices]
shuffled_file_ids = file_ids[shuffled_indices]
shuffled_frame_ids = frame_ids[shuffled_indices]
shuffled_semis = semi_targets[shuffled_indices]
# check if there are enough known normal and known outlier samples
outlier_indices = np.where(shuffled_semis == -1)[0]
normal_indices = np.where(shuffled_semis == 1)[0]
if len(outlier_indices) < num_known_outlier:
raise ValueError(
f"Not enough known outliers in dataset. Required: {num_known_outlier}, Found: {len(outlier_indices)}"
)
# randomly select known normal and outlier samples
keep_outlier_indices = np.random.choice(
outlier_indices, size=num_known_outlier, replace=False
)
# put outliers that are not kept into test set and the same number of normal samples aside for testing
test_outlier_indices = np.setdiff1d(outlier_indices, keep_outlier_indices)
num_test_outliers = len(test_outlier_indices)
test_normal_indices = np.random.choice(
normal_indices,
size=num_test_outliers * ratio_test_normal_to_anomalous,
replace=False,
)
# combine test indices
test_indices = np.concatenate([test_outlier_indices, test_normal_indices])
# training indices are the rest
train_indices = np.setdiff1d(np.arange(len(shuffled_semis)), test_indices)
if train:
self.data = shuffled_lidar_projections[train_indices]
self.targets = shuffled_smoke_presence[train_indices]
semi_targets = shuffled_semis[train_indices]
self.shuffled_file_ids = shuffled_file_ids[train_indices]
self.shuffled_frame_ids = shuffled_frame_ids[train_indices]
else:
self.data = shuffled_lidar_projections[test_indices]
self.targets = shuffled_smoke_presence[test_indices]
semi_targets = shuffled_semis[test_indices]
self.shuffled_file_ids = shuffled_file_ids[test_indices]
self.shuffled_frame_ids = shuffled_frame_ids[test_indices]
self.data = np.nan_to_num(self.data)
self.data = torch.tensor(self.data)
self.targets = torch.tensor(self.targets, dtype=torch.int8)
self.semi_targets = torch.tensor(semi_targets, dtype=torch.int8)
# log some stats to ensure the data is loaded correctly
if train:
logger.info(
f"Training set: {len(self.data)} samples, {sum(self.semi_targets == -1)} semi-supervised samples"
)
else:
logger.info(
f"Test set: {len(self.data)} samples, {sum(self.semi_targets == -1)} semi-supervised samples"
)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
"""Override the original method of the MNIST class.
Args:
index (int): Index
Returns:
tuple: (image, target, semi_target, index)
"""
img, target, semi_target, file_id, frame_id = (
self.data[index],
int(self.targets[index]),
int(self.semi_targets[index]),
int(self.shuffled_file_ids[index]),
int(self.shuffled_frame_ids[index]),
)
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = Image.fromarray(img.numpy(), mode="F")
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
return img, target, semi_target, index, (file_id, frame_id)
return img, index, (file_id, frame_id)
def get_file_name_from_idx(self, idx: int):
return self.file_names[idx]
return self.file_names.get(idx, None)