initial work for elpv and subter datasets

elpv as example dataset/implementation
subter with final dataset
This commit is contained in:
Jan Kowalczyk
2024-06-28 11:40:19 +02:00
parent 71f9662022
commit d6a019a8bb
13 changed files with 1585 additions and 4 deletions

View File

@@ -0,0 +1,163 @@
from torch.utils.data import Subset
from PIL import Image
from torch.utils.data.dataset import ConcatDataset
from torchvision.datasets import VisionDataset
from base.torchvision_dataset import TorchvisionDataset
from .preprocessing import create_semisupervised_setting
from typing import Callable, Optional
import torch
import torchvision.transforms as transforms
import random
import numpy as np
import importlib.util
import sys
from pathlib import Path
def load_function_from_path(root_path, subfolder, module_name, function_name):
root_path = Path(root_path)
module_path = root_path / subfolder / f"{module_name}.py"
if not module_path.exists():
raise FileNotFoundError(f"The module {module_path} does not exist.")
spec = importlib.util.spec_from_file_location(module_name, str(module_path))
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
if not hasattr(module, function_name):
raise AttributeError(
f"The function {function_name} does not exist in the module {module_name}."
)
return getattr(module, function_name)
class ELPV_Dataset(TorchvisionDataset):
def __init__(
self,
root: str,
ratio_known_normal: float = 0.0,
ratio_known_outlier: float = 0.0,
ratio_pollution: float = 0.0,
):
super().__init__(root)
# Define normal and outlier classes
self.n_classes = 2 # 0: normal, 1: outlier
self.normal_classes = tuple([0])
self.outlier_classes = tuple([1])
# MNIST preprocessing: feature scaling to [0, 1]
# FIXME understand mnist feature scaling and check if it or other preprocessing is necessary for elpv
transform = transforms.ToTensor()
target_transform = transforms.Lambda(lambda x: int(x in self.outlier_classes))
# Get train set
train_set = MyELPV(
root=self.root,
transform=transform,
target_transform=target_transform,
train=True,
)
# Create semi-supervised setting
idx, _, semi_targets = create_semisupervised_setting(
train_set.targets.cpu().data.numpy(),
self.normal_classes,
self.outlier_classes,
self.outlier_classes,
ratio_known_normal,
ratio_known_outlier,
ratio_pollution,
)
train_set.semi_targets[idx] = torch.tensor(
semi_targets
) # set respective semi-supervised labels
# Subset train_set to semi-supervised setup
self.train_set = Subset(train_set, idx)
# Get test set
self.test_set = MyELPV(
root=self.root,
train=False,
transform=transform,
target_transform=target_transform,
)
class MyELPV(VisionDataset):
def __init__(
self,
root: str,
transforms: Optional[Callable] = None,
transform: Optional[Callable] = None,
target_transform: Optional[Callable] = None,
train=False,
split=0.7,
seed=0,
):
super(MyELPV, self).__init__(root, transforms, transform, target_transform)
load_dataset = load_function_from_path(
root, "utils", "elpv_reader", "load_dataset"
)
images, proba, _ = load_dataset()
np.random.seed(seed)
shuffled_indices = np.random.permutation(images.shape[0])
shuffled_data = images[shuffled_indices]
shuffled_proba = proba[shuffled_indices]
split_idx = int(split * shuffled_data.shape[0])
if train:
self.data = shuffled_data[:split_idx]
self.targets = shuffled_proba[:split_idx]
else:
self.data = shuffled_data[split_idx:]
self.targets = shuffled_proba[split_idx:]
self.data = torch.tensor(self.data)
self.targets[self.targets > 0] = 1
self.targets = torch.tensor(self.targets, dtype=torch.int64)
self.semi_targets = torch.zeros_like(self.targets)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
"""Override the original method of the MNIST class.
Args:
index (int): Index
Returns:
tuple: (image, target, semi_target, index)
"""
img, target, semi_target = (
self.data[index],
int(self.targets[index]),
int(self.semi_targets[index]),
)
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = Image.fromarray(img.numpy(), mode="L")
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
return img, target, semi_target, index

View File

@@ -1,4 +1,6 @@
from .mnist import MNIST_Dataset
from .elpv import ELPV_Dataset
from .subter import SubTer_Dataset
from .fmnist import FashionMNIST_Dataset
from .cifar10 import CIFAR10_Dataset
from .odds import ODDSADDataset
@@ -19,6 +21,8 @@ def load_dataset(
implemented_datasets = (
"mnist",
"elpv",
"subter",
"fmnist",
"cifar10",
"arrhythmia",
@@ -32,6 +36,22 @@ def load_dataset(
dataset = None
if dataset_name == "subter":
dataset = SubTer_Dataset(
root=data_path,
ratio_known_normal=ratio_known_normal,
ratio_known_outlier=ratio_known_outlier,
ratio_pollution=ratio_pollution,
)
if dataset_name == "elpv":
dataset = ELPV_Dataset(
root=data_path,
ratio_known_normal=ratio_known_normal,
ratio_known_outlier=ratio_known_outlier,
ratio_pollution=ratio_pollution,
)
if dataset_name == "mnist":
dataset = MNIST_Dataset(
root=data_path,

View File

@@ -0,0 +1,155 @@
from torch.utils.data import Subset
from PIL import Image
from torch.utils.data.dataset import ConcatDataset
from torchvision.datasets import VisionDataset
from base.torchvision_dataset import TorchvisionDataset
from .preprocessing import create_semisupervised_setting
from typing import Callable, Optional
import torch
import torchvision.transforms as transforms
import random
import numpy as np
from pathlib import Path
class SubTer_Dataset(TorchvisionDataset):
def __init__(
self,
root: str,
ratio_known_normal: float = 0.0,
ratio_known_outlier: float = 0.0,
ratio_pollution: float = 0.0,
):
super().__init__(root)
# Define normal and outlier classes
self.n_classes = 2 # 0: normal, 1: outlier
self.normal_classes = tuple([0])
self.outlier_classes = tuple([1])
# MNIST preprocessing: feature scaling to [0, 1]
# FIXME understand mnist feature scaling and check if it or other preprocessing is necessary for elpv
transform = transforms.ToTensor()
target_transform = transforms.Lambda(lambda x: int(x in self.outlier_classes))
# Get train set
train_set = MySubTer(
root=self.root,
transform=transform,
target_transform=target_transform,
train=True,
)
# Create semi-supervised setting
idx, _, semi_targets = create_semisupervised_setting(
train_set.targets.cpu().data.numpy(),
self.normal_classes,
self.outlier_classes,
self.outlier_classes,
ratio_known_normal,
ratio_known_outlier,
ratio_pollution,
)
train_set.semi_targets[idx] = torch.tensor(
np.array(semi_targets, dtype=np.int8)
) # set respective semi-supervised labels
# Subset train_set to semi-supervised setup
self.train_set = Subset(train_set, idx)
# Get test set
self.test_set = MySubTer(
root=self.root,
train=False,
transform=transform,
target_transform=target_transform,
)
class MySubTer(VisionDataset):
def __init__(
self,
root: str,
transforms: Optional[Callable] = None,
transform: Optional[Callable] = None,
target_transform: Optional[Callable] = None,
train=False,
split=0.7,
seed=0,
):
super(MySubTer, self).__init__(root, transforms, transform, target_transform)
experiments_data = []
experiments_targets = []
for experiment_file in Path(root).iterdir():
if experiment_file.suffix != ".npy":
continue
experiment_data = np.load(experiment_file)
# experiment_data = np.lib.format.open_memmap(experiment_file, mode='r+')
experiment_targets = (
np.ones(experiment_data.shape[0], dtype=np.int8)
if "smoke" in experiment_file.name
else np.zeros(experiment_data.shape[0], dtype=np.int8)
)
experiments_data.append(experiment_data)
experiments_targets.append(experiment_targets)
lidar_projections = np.concatenate(experiments_data)
smoke_presence = np.concatenate(experiments_targets)
np.random.seed(seed)
shuffled_indices = np.random.permutation(lidar_projections.shape[0])
shuffled_lidar_projections = lidar_projections[shuffled_indices]
shuffled_smoke_presence = smoke_presence[shuffled_indices]
split_idx = int(split * shuffled_lidar_projections.shape[0])
if train:
self.data = shuffled_lidar_projections[:split_idx]
self.targets = shuffled_smoke_presence[:split_idx]
else:
self.data = shuffled_lidar_projections[split_idx:]
self.targets = shuffled_smoke_presence[split_idx:]
self.data = np.nan_to_num(self.data)
self.data = torch.tensor(self.data)
self.targets = torch.tensor(self.targets, dtype=torch.int8)
self.semi_targets = torch.zeros_like(self.targets, dtype=torch.int8)
def __len__(self):
return len(self.data)
def __getitem__(self, index):
"""Override the original method of the MNIST class.
Args:
index (int): Index
Returns:
tuple: (image, target, semi_target, index)
"""
img, target, semi_target = (
self.data[index],
int(self.targets[index]),
int(self.semi_targets[index]),
)
# doing this so that it is consistent with all other datasets
# to return a PIL Image
img = Image.fromarray(img.numpy(), mode="F")
if self.transform is not None:
img = self.transform(img)
if self.target_transform is not None:
target = self.target_transform(target)
return img, target, semi_target, index