black formatted files before changes

This commit is contained in:
Jan Kowalczyk
2024-06-28 11:36:46 +02:00
parent d33c6b1e16
commit 71f9662022
40 changed files with 2938 additions and 1260 deletions

View File

@@ -36,17 +36,13 @@ class SemiDeepGenerativeModel(object):
self.vae_optimizer_name = None
self.results = {
'train_time': None,
'test_auc': None,
'test_time': None,
'test_scores': None,
"train_time": None,
"test_auc": None,
"test_time": None,
"test_scores": None,
}
self.vae_results = {
'train_time': None,
'test_auc': None,
'test_time': None
}
self.vae_results = {"train_time": None, "test_auc": None, "test_time": None}
def set_vae(self, net_name):
"""Builds the variational autoencoder network for pretraining."""
@@ -58,71 +54,106 @@ class SemiDeepGenerativeModel(object):
self.net_name = net_name
self.net = build_network(net_name, ae_net=self.vae_net) # full M1+M2 model
def train(self, dataset: BaseADDataset, optimizer_name: str = 'adam', lr: float = 0.001, n_epochs: int = 50,
lr_milestones: tuple = (), batch_size: int = 128, weight_decay: float = 1e-6, device: str = 'cuda',
n_jobs_dataloader: int = 0):
def train(
self,
dataset: BaseADDataset,
optimizer_name: str = "adam",
lr: float = 0.001,
n_epochs: int = 50,
lr_milestones: tuple = (),
batch_size: int = 128,
weight_decay: float = 1e-6,
device: str = "cuda",
n_jobs_dataloader: int = 0,
):
"""Trains the Semi-Supervised Deep Generative model on the training data."""
self.optimizer_name = optimizer_name
self.trainer = SemiDeepGenerativeTrainer(alpha=self.alpha, optimizer_name=optimizer_name, lr=lr,
n_epochs=n_epochs, lr_milestones=lr_milestones, batch_size=batch_size,
weight_decay=weight_decay, device=device,
n_jobs_dataloader=n_jobs_dataloader)
self.trainer = SemiDeepGenerativeTrainer(
alpha=self.alpha,
optimizer_name=optimizer_name,
lr=lr,
n_epochs=n_epochs,
lr_milestones=lr_milestones,
batch_size=batch_size,
weight_decay=weight_decay,
device=device,
n_jobs_dataloader=n_jobs_dataloader,
)
self.net = self.trainer.train(dataset, self.net)
self.results['train_time'] = self.trainer.train_time
self.results["train_time"] = self.trainer.train_time
def test(self, dataset: BaseADDataset, device: str = 'cuda', n_jobs_dataloader: int = 0):
def test(
self, dataset: BaseADDataset, device: str = "cuda", n_jobs_dataloader: int = 0
):
"""Tests the Semi-Supervised Deep Generative model on the test data."""
if self.trainer is None:
self.trainer = SemiDeepGenerativeTrainer(alpha=self.alpha, device=device,
n_jobs_dataloader=n_jobs_dataloader)
self.trainer = SemiDeepGenerativeTrainer(
alpha=self.alpha, device=device, n_jobs_dataloader=n_jobs_dataloader
)
self.trainer.test(dataset, self.net)
# Get results
self.results['test_auc'] = self.trainer.test_auc
self.results['test_time'] = self.trainer.test_time
self.results['test_scores'] = self.trainer.test_scores
self.results["test_auc"] = self.trainer.test_auc
self.results["test_time"] = self.trainer.test_time
self.results["test_scores"] = self.trainer.test_scores
def pretrain(self, dataset: BaseADDataset, optimizer_name: str = 'adam', lr: float = 0.001, n_epochs: int = 100,
lr_milestones: tuple = (), batch_size: int = 128, weight_decay: float = 1e-6, device: str = 'cuda',
n_jobs_dataloader: int = 0):
def pretrain(
self,
dataset: BaseADDataset,
optimizer_name: str = "adam",
lr: float = 0.001,
n_epochs: int = 100,
lr_milestones: tuple = (),
batch_size: int = 128,
weight_decay: float = 1e-6,
device: str = "cuda",
n_jobs_dataloader: int = 0,
):
"""Pretrains a variational autoencoder (M1) for the Semi-Supervised Deep Generative model."""
# Train
self.vae_optimizer_name = optimizer_name
self.vae_trainer = VAETrainer(optimizer_name=optimizer_name, lr=lr, n_epochs=n_epochs,
lr_milestones=lr_milestones, batch_size=batch_size, weight_decay=weight_decay,
device=device, n_jobs_dataloader=n_jobs_dataloader)
self.vae_trainer = VAETrainer(
optimizer_name=optimizer_name,
lr=lr,
n_epochs=n_epochs,
lr_milestones=lr_milestones,
batch_size=batch_size,
weight_decay=weight_decay,
device=device,
n_jobs_dataloader=n_jobs_dataloader,
)
self.vae_net = self.vae_trainer.train(dataset, self.vae_net)
# Get train results
self.vae_results['train_time'] = self.vae_trainer.train_time
self.vae_results["train_time"] = self.vae_trainer.train_time
# Test
self.vae_trainer.test(dataset, self.vae_net)
# Get test results
self.vae_results['test_auc'] = self.vae_trainer.test_auc
self.vae_results['test_time'] = self.vae_trainer.test_time
self.vae_results["test_auc"] = self.vae_trainer.test_auc
self.vae_results["test_time"] = self.vae_trainer.test_time
def save_model(self, export_model):
"""Save a Semi-Supervised Deep Generative model to export_model."""
net_dict = self.net.state_dict()
torch.save({'net_dict': net_dict}, export_model)
torch.save({"net_dict": net_dict}, export_model)
def load_model(self, model_path):
"""Load a Semi-Supervised Deep Generative model from model_path."""
model_dict = torch.load(model_path)
self.net.load_state_dict(model_dict['net_dict'])
self.net.load_state_dict(model_dict["net_dict"])
def save_results(self, export_json):
"""Save results dict to a JSON-file."""
with open(export_json, 'w') as fp:
with open(export_json, "w") as fp:
json.dump(self.results, fp)
def save_vae_results(self, export_json):
"""Save variational autoencoder results dict to a JSON-file."""
with open(export_json, 'w') as fp:
with open(export_json, "w") as fp:
json.dump(self.vae_results, fp)

View File

@@ -14,8 +14,16 @@ from networks.main import build_autoencoder
class IsoForest(object):
"""A class for Isolation Forest models."""
def __init__(self, hybrid=False, n_estimators=100, max_samples='auto', contamination=0.1, n_jobs=-1, seed=None,
**kwargs):
def __init__(
self,
hybrid=False,
n_estimators=100,
max_samples="auto",
contamination=0.1,
n_jobs=-1,
seed=None,
**kwargs
):
"""Init Isolation Forest instance."""
self.n_estimators = n_estimators
self.max_samples = max_samples
@@ -23,26 +31,39 @@ class IsoForest(object):
self.n_jobs = n_jobs
self.seed = seed
self.model = IsolationForest(n_estimators=n_estimators, max_samples=max_samples, contamination=contamination,
n_jobs=n_jobs, random_state=seed, **kwargs)
self.model = IsolationForest(
n_estimators=n_estimators,
max_samples=max_samples,
contamination=contamination,
n_jobs=n_jobs,
random_state=seed,
**kwargs
)
self.hybrid = hybrid
self.ae_net = None # autoencoder network for the case of a hybrid model
self.results = {
'train_time': None,
'test_time': None,
'test_auc': None,
'test_scores': None
"train_time": None,
"test_time": None,
"test_auc": None,
"test_scores": None,
}
def train(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0):
def train(
self, dataset: BaseADDataset, device: str = "cpu", n_jobs_dataloader: int = 0
):
"""Trains the Isolation Forest model on the training data."""
logger = logging.getLogger()
# do not drop last batch for non-SGD optimization shallow_ssad
train_loader = DataLoader(dataset=dataset.train_set, batch_size=128, shuffle=True,
num_workers=n_jobs_dataloader, drop_last=False)
train_loader = DataLoader(
dataset=dataset.train_set,
batch_size=128,
shuffle=True,
num_workers=n_jobs_dataloader,
drop_last=False,
)
# Get data from loader
X = ()
@@ -50,22 +71,28 @@ class IsoForest(object):
inputs, _, _, _ = data
inputs = inputs.to(device)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
X = np.concatenate(X)
# Training
logger.info('Starting training...')
logger.info("Starting training...")
start_time = time.time()
self.model.fit(X)
train_time = time.time() - start_time
self.results['train_time'] = train_time
self.results["train_time"] = train_time
logger.info('Training Time: {:.3f}s'.format(self.results['train_time']))
logger.info('Finished training.')
logger.info("Training Time: {:.3f}s".format(self.results["train_time"]))
logger.info("Finished training.")
def test(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0):
def test(
self, dataset: BaseADDataset, device: str = "cpu", n_jobs_dataloader: int = 0
):
"""Tests the Isolation Forest model on the test data."""
logger = logging.getLogger()
@@ -78,46 +105,54 @@ class IsoForest(object):
labels = []
for data in test_loader:
inputs, label_batch, _, idx = data
inputs, label_batch, idx = inputs.to(device), label_batch.to(device), idx.to(device)
inputs, label_batch, idx = (
inputs.to(device),
label_batch.to(device),
idx.to(device),
)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
idxs += idx.cpu().data.numpy().astype(np.int64).tolist()
labels += label_batch.cpu().data.numpy().astype(np.int64).tolist()
X = np.concatenate(X)
# Testing
logger.info('Starting testing...')
logger.info("Starting testing...")
start_time = time.time()
scores = (-1.0) * self.model.decision_function(X)
self.results['test_time'] = time.time() - start_time
self.results["test_time"] = time.time() - start_time
scores = scores.flatten()
# Save triples of (idx, label, score) in a list
idx_label_score += list(zip(idxs, labels, scores.tolist()))
self.results['test_scores'] = idx_label_score
self.results["test_scores"] = idx_label_score
# Compute AUC
_, labels, scores = zip(*idx_label_score)
labels = np.array(labels)
scores = np.array(scores)
self.results['test_auc'] = roc_auc_score(labels, scores)
self.results["test_auc"] = roc_auc_score(labels, scores)
# Log results
logger.info('Test AUC: {:.2f}%'.format(100. * self.results['test_auc']))
logger.info('Test Time: {:.3f}s'.format(self.results['test_time']))
logger.info('Finished testing.')
logger.info("Test AUC: {:.2f}%".format(100.0 * self.results["test_auc"]))
logger.info("Test Time: {:.3f}s".format(self.results["test_time"]))
logger.info("Finished testing.")
def load_ae(self, dataset_name, model_path):
"""Load pretrained autoencoder from model_path for feature extraction in a hybrid Isolation Forest model."""
model_dict = torch.load(model_path, map_location='cpu')
ae_net_dict = model_dict['ae_net_dict']
if dataset_name in ['mnist', 'fmnist', 'cifar10']:
net_name = dataset_name + '_LeNet'
model_dict = torch.load(model_path, map_location="cpu")
ae_net_dict = model_dict["ae_net_dict"]
if dataset_name in ["mnist", "fmnist", "cifar10"]:
net_name = dataset_name + "_LeNet"
else:
net_name = dataset_name + '_mlp'
net_name = dataset_name + "_mlp"
if self.ae_net is None:
self.ae_net = build_autoencoder(net_name)
@@ -137,11 +172,11 @@ class IsoForest(object):
"""Save Isolation Forest model to export_path."""
pass
def load_model(self, import_path, device: str = 'cpu'):
def load_model(self, import_path, device: str = "cpu"):
"""Load Isolation Forest model from import_path."""
pass
def save_results(self, export_json):
"""Save results dict to a JSON-file."""
with open(export_json, 'w') as fp:
with open(export_json, "w") as fp:
json.dump(self.results, fp)

View File

@@ -16,7 +16,7 @@ from networks.main import build_autoencoder
class KDE(object):
"""A class for Kernel Density Estimation models."""
def __init__(self, hybrid=False, kernel='gaussian', n_jobs=-1, seed=None, **kwargs):
def __init__(self, hybrid=False, kernel="gaussian", n_jobs=-1, seed=None, **kwargs):
"""Init Kernel Density Estimation instance."""
self.kernel = kernel
self.n_jobs = n_jobs
@@ -29,20 +29,30 @@ class KDE(object):
self.ae_net = None # autoencoder network for the case of a hybrid model
self.results = {
'train_time': None,
'test_time': None,
'test_auc': None,
'test_scores': None
"train_time": None,
"test_time": None,
"test_auc": None,
"test_scores": None,
}
def train(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0,
bandwidth_GridSearchCV: bool = True):
def train(
self,
dataset: BaseADDataset,
device: str = "cpu",
n_jobs_dataloader: int = 0,
bandwidth_GridSearchCV: bool = True,
):
"""Trains the Kernel Density Estimation model on the training data."""
logger = logging.getLogger()
# do not drop last batch for non-SGD optimization shallow_ssad
train_loader = DataLoader(dataset=dataset.train_set, batch_size=128, shuffle=True,
num_workers=n_jobs_dataloader, drop_last=False)
train_loader = DataLoader(
dataset=dataset.train_set,
batch_size=128,
shuffle=True,
num_workers=n_jobs_dataloader,
drop_last=False,
)
# Get data from loader
X = ()
@@ -50,39 +60,51 @@ class KDE(object):
inputs, _, _, _ = data
inputs = inputs.to(device)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
X = np.concatenate(X)
# Training
logger.info('Starting training...')
logger.info("Starting training...")
start_time = time.time()
if bandwidth_GridSearchCV:
# use grid search cross-validation to select bandwidth
logger.info('Using GridSearchCV for bandwidth selection...')
params = {'bandwidth': np.logspace(0.5, 5, num=10, base=2)}
hyper_kde = GridSearchCV(KernelDensity(kernel=self.kernel), params, n_jobs=self.n_jobs, cv=5, verbose=0)
logger.info("Using GridSearchCV for bandwidth selection...")
params = {"bandwidth": np.logspace(0.5, 5, num=10, base=2)}
hyper_kde = GridSearchCV(
KernelDensity(kernel=self.kernel),
params,
n_jobs=self.n_jobs,
cv=5,
verbose=0,
)
hyper_kde.fit(X)
self.bandwidth = hyper_kde.best_estimator_.bandwidth
logger.info('Best bandwidth: {:.8f}'.format(self.bandwidth))
logger.info("Best bandwidth: {:.8f}".format(self.bandwidth))
self.model = hyper_kde.best_estimator_
else:
# if exponential kernel, re-initialize kde with bandwidth minimizing the numerical error
if self.kernel == 'exponential':
if self.kernel == "exponential":
self.bandwidth = np.max(pairwise_distances(X)) ** 2
self.model = KernelDensity(kernel=self.kernel, bandwidth=self.bandwidth)
self.model.fit(X)
train_time = time.time() - start_time
self.results['train_time'] = train_time
self.results["train_time"] = train_time
logger.info('Training Time: {:.3f}s'.format(self.results['train_time']))
logger.info('Finished training.')
logger.info("Training Time: {:.3f}s".format(self.results["train_time"]))
logger.info("Finished training.")
def test(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0):
def test(
self, dataset: BaseADDataset, device: str = "cpu", n_jobs_dataloader: int = 0
):
"""Tests the Kernel Density Estimation model on the test data."""
logger = logging.getLogger()
@@ -95,46 +117,54 @@ class KDE(object):
labels = []
for data in test_loader:
inputs, label_batch, _, idx = data
inputs, label_batch, idx = inputs.to(device), label_batch.to(device), idx.to(device)
inputs, label_batch, idx = (
inputs.to(device),
label_batch.to(device),
idx.to(device),
)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
idxs += idx.cpu().data.numpy().astype(np.int64).tolist()
labels += label_batch.cpu().data.numpy().astype(np.int64).tolist()
X = np.concatenate(X)
# Testing
logger.info('Starting testing...')
logger.info("Starting testing...")
start_time = time.time()
scores = (-1.0) * self.model.score_samples(X)
self.results['test_time'] = time.time() - start_time
self.results["test_time"] = time.time() - start_time
scores = scores.flatten()
# Save triples of (idx, label, score) in a list
idx_label_score += list(zip(idxs, labels, scores.tolist()))
self.results['test_scores'] = idx_label_score
self.results["test_scores"] = idx_label_score
# Compute AUC
_, labels, scores = zip(*idx_label_score)
labels = np.array(labels)
scores = np.array(scores)
self.results['test_auc'] = roc_auc_score(labels, scores)
self.results["test_auc"] = roc_auc_score(labels, scores)
# Log results
logger.info('Test AUC: {:.2f}%'.format(100. * self.results['test_auc']))
logger.info('Test Time: {:.3f}s'.format(self.results['test_time']))
logger.info('Finished testing.')
logger.info("Test AUC: {:.2f}%".format(100.0 * self.results["test_auc"]))
logger.info("Test Time: {:.3f}s".format(self.results["test_time"]))
logger.info("Finished testing.")
def load_ae(self, dataset_name, model_path):
"""Load pretrained autoencoder from model_path for feature extraction in a hybrid KDE model."""
model_dict = torch.load(model_path, map_location='cpu')
ae_net_dict = model_dict['ae_net_dict']
if dataset_name in ['mnist', 'fmnist', 'cifar10']:
net_name = dataset_name + '_LeNet'
model_dict = torch.load(model_path, map_location="cpu")
ae_net_dict = model_dict["ae_net_dict"]
if dataset_name in ["mnist", "fmnist", "cifar10"]:
net_name = dataset_name + "_LeNet"
else:
net_name = dataset_name + '_mlp'
net_name = dataset_name + "_mlp"
if self.ae_net is None:
self.ae_net = build_autoencoder(net_name)
@@ -154,11 +184,11 @@ class KDE(object):
"""Save KDE model to export_path."""
pass
def load_model(self, import_path, device: str = 'cpu'):
def load_model(self, import_path, device: str = "cpu"):
"""Load KDE model from import_path."""
pass
def save_results(self, export_json):
"""Save results dict to a JSON-file."""
with open(export_json, 'w') as fp:
with open(export_json, "w") as fp:
json.dump(self.results, fp)

View File

@@ -14,7 +14,7 @@ from networks.main import build_autoencoder
class OCSVM(object):
"""A class for One-Class SVM models."""
def __init__(self, kernel='rbf', nu=0.1, hybrid=False):
def __init__(self, kernel="rbf", nu=0.1, hybrid=False):
"""Init OCSVM instance."""
self.kernel = kernel
self.nu = nu
@@ -25,25 +25,34 @@ class OCSVM(object):
self.hybrid = hybrid
self.ae_net = None # autoencoder network for the case of a hybrid model
self.linear_model = None # also init a model with linear kernel if hybrid approach
self.linear_model = (
None # also init a model with linear kernel if hybrid approach
)
self.results = {
'train_time': None,
'test_time': None,
'test_auc': None,
'test_scores': None,
'train_time_linear': None,
'test_time_linear': None,
'test_auc_linear': None
"train_time": None,
"test_time": None,
"test_auc": None,
"test_scores": None,
"train_time_linear": None,
"test_time_linear": None,
"test_auc_linear": None,
}
def train(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0):
def train(
self, dataset: BaseADDataset, device: str = "cpu", n_jobs_dataloader: int = 0
):
"""Trains the OC-SVM model on the training data."""
logger = logging.getLogger()
# do not drop last batch for non-SGD optimization shallow_ssad
train_loader = DataLoader(dataset=dataset.train_set, batch_size=128, shuffle=True,
num_workers=n_jobs_dataloader, drop_last=False)
train_loader = DataLoader(
dataset=dataset.train_set,
batch_size=128,
shuffle=True,
num_workers=n_jobs_dataloader,
drop_last=False,
)
# Get data from loader
X = ()
@@ -51,13 +60,17 @@ class OCSVM(object):
inputs, _, _, _ = data
inputs = inputs.to(device)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
X = np.concatenate(X)
# Training
logger.info('Starting training...')
logger.info("Starting training...")
# Select model via hold-out test set of 1000 samples
gammas = np.logspace(-7, 2, num=10, base=2)
@@ -72,17 +85,31 @@ class OCSVM(object):
inputs, label_batch, _, _ = data
inputs, label_batch = inputs.to(device), label_batch.to(device)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X_test += (X_batch.cpu().data.numpy(),)
labels += label_batch.cpu().data.numpy().astype(np.int64).tolist()
X_test, labels = np.concatenate(X_test), np.array(labels)
n_test, n_normal, n_outlier = len(X_test), np.sum(labels == 0), np.sum(labels == 1)
n_test, n_normal, n_outlier = (
len(X_test),
np.sum(labels == 0),
np.sum(labels == 1),
)
n_val = int(0.1 * n_test)
n_val_normal, n_val_outlier = int(n_val * (n_normal/n_test)), int(n_val * (n_outlier/n_test))
n_val_normal, n_val_outlier = int(n_val * (n_normal / n_test)), int(
n_val * (n_outlier / n_test)
)
perm = np.random.permutation(n_test)
X_val = np.concatenate((X_test[perm][labels[perm] == 0][:n_val_normal],
X_test[perm][labels[perm] == 1][:n_val_outlier]))
X_val = np.concatenate(
(
X_test[perm][labels[perm] == 0][:n_val_normal],
X_test[perm][labels[perm] == 1][:n_val_outlier],
)
)
labels = np.array([0] * n_val_normal + [1] * n_val_outlier)
i = 1
@@ -103,30 +130,36 @@ class OCSVM(object):
# Compute AUC
auc = roc_auc_score(labels, scores)
logger.info(f' | Model {i:02}/{len(gammas):02} | Gamma: {gamma:.8f} | Train Time: {train_time:.3f}s '
f'| Val AUC: {100. * auc:.2f} |')
logger.info(
f" | Model {i:02}/{len(gammas):02} | Gamma: {gamma:.8f} | Train Time: {train_time:.3f}s "
f"| Val AUC: {100. * auc:.2f} |"
)
if auc > best_auc:
best_auc = auc
self.model = model
self.gamma = gamma
self.results['train_time'] = train_time
self.results["train_time"] = train_time
i += 1
# If hybrid, also train a model with linear kernel
if self.hybrid:
self.linear_model = OneClassSVM(kernel='linear', nu=self.nu)
self.linear_model = OneClassSVM(kernel="linear", nu=self.nu)
start_time = time.time()
self.linear_model.fit(X)
train_time = time.time() - start_time
self.results['train_time_linear'] = train_time
self.results["train_time_linear"] = train_time
logger.info(f'Best Model: | Gamma: {self.gamma:.8f} | AUC: {100. * best_auc:.2f}')
logger.info('Training Time: {:.3f}s'.format(self.results['train_time']))
logger.info('Finished training.')
logger.info(
f"Best Model: | Gamma: {self.gamma:.8f} | AUC: {100. * best_auc:.2f}"
)
logger.info("Training Time: {:.3f}s".format(self.results["train_time"]))
logger.info("Finished training.")
def test(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0):
def test(
self, dataset: BaseADDataset, device: str = "cpu", n_jobs_dataloader: int = 0
):
"""Tests the OC-SVM model on the test data."""
logger = logging.getLogger()
@@ -139,59 +172,75 @@ class OCSVM(object):
labels = []
for data in test_loader:
inputs, label_batch, _, idx = data
inputs, label_batch, idx = inputs.to(device), label_batch.to(device), idx.to(device)
inputs, label_batch, idx = (
inputs.to(device),
label_batch.to(device),
idx.to(device),
)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
idxs += idx.cpu().data.numpy().astype(np.int64).tolist()
labels += label_batch.cpu().data.numpy().astype(np.int64).tolist()
X = np.concatenate(X)
# Testing
logger.info('Starting testing...')
logger.info("Starting testing...")
start_time = time.time()
scores = (-1.0) * self.model.decision_function(X)
self.results['test_time'] = time.time() - start_time
self.results["test_time"] = time.time() - start_time
scores = scores.flatten()
self.rho = -self.model.intercept_[0]
# Save triples of (idx, label, score) in a list
idx_label_score += list(zip(idxs, labels, scores.tolist()))
self.results['test_scores'] = idx_label_score
self.results["test_scores"] = idx_label_score
# Compute AUC
_, labels, scores = zip(*idx_label_score)
labels = np.array(labels)
scores = np.array(scores)
self.results['test_auc'] = roc_auc_score(labels, scores)
self.results["test_auc"] = roc_auc_score(labels, scores)
# If hybrid, also test model with linear kernel
if self.hybrid:
start_time = time.time()
scores_linear = (-1.0) * self.linear_model.decision_function(X)
self.results['test_time_linear'] = time.time() - start_time
self.results["test_time_linear"] = time.time() - start_time
scores_linear = scores_linear.flatten()
self.results['test_auc_linear'] = roc_auc_score(labels, scores_linear)
logger.info('Test AUC linear model: {:.2f}%'.format(100. * self.results['test_auc_linear']))
logger.info('Test Time linear model: {:.3f}s'.format(self.results['test_time_linear']))
self.results["test_auc_linear"] = roc_auc_score(labels, scores_linear)
logger.info(
"Test AUC linear model: {:.2f}%".format(
100.0 * self.results["test_auc_linear"]
)
)
logger.info(
"Test Time linear model: {:.3f}s".format(
self.results["test_time_linear"]
)
)
# Log results
logger.info('Test AUC: {:.2f}%'.format(100. * self.results['test_auc']))
logger.info('Test Time: {:.3f}s'.format(self.results['test_time']))
logger.info('Finished testing.')
logger.info("Test AUC: {:.2f}%".format(100.0 * self.results["test_auc"]))
logger.info("Test Time: {:.3f}s".format(self.results["test_time"]))
logger.info("Finished testing.")
def load_ae(self, dataset_name, model_path):
"""Load pretrained autoencoder from model_path for feature extraction in a hybrid OC-SVM model."""
model_dict = torch.load(model_path, map_location='cpu')
ae_net_dict = model_dict['ae_net_dict']
if dataset_name in ['mnist', 'fmnist', 'cifar10']:
net_name = dataset_name + '_LeNet'
model_dict = torch.load(model_path, map_location="cpu")
ae_net_dict = model_dict["ae_net_dict"]
if dataset_name in ["mnist", "fmnist", "cifar10"]:
net_name = dataset_name + "_LeNet"
else:
net_name = dataset_name + '_mlp'
net_name = dataset_name + "_mlp"
if self.ae_net is None:
self.ae_net = build_autoencoder(net_name)
@@ -211,11 +260,11 @@ class OCSVM(object):
"""Save OC-SVM model to export_path."""
pass
def load_model(self, import_path, device: str = 'cpu'):
def load_model(self, import_path, device: str = "cpu"):
"""Load OC-SVM model from import_path."""
pass
def save_results(self, export_json):
"""Save results dict to a JSON-file."""
with open(export_json, 'w') as fp:
with open(export_json, "w") as fp:
json.dump(self.results, fp)

View File

@@ -8,31 +8,32 @@ from cvxopt.solvers import qp
class ConvexSSAD:
""" Convex semi-supervised anomaly detection with hinge-loss and L2 regularizer
as described in Goernitz et al., Towards Supervised Anomaly Detection, JAIR, 2013
"""Convex semi-supervised anomaly detection with hinge-loss and L2 regularizer
as described in Goernitz et al., Towards Supervised Anomaly Detection, JAIR, 2013
minimize 0.5 ||w||^2_2 - rho - kappa*gamma + eta_u sum_i xi_i + eta_l sum_j xi_j
{w,rho,gamma>=0,xi>=0}
subject to <w,phi(x_i)> >= rho - xi_i
y_j<w,phi(x_j)> >= y_j*rho + gamma - xi_j
minimize 0.5 ||w||^2_2 - rho - kappa*gamma + eta_u sum_i xi_i + eta_l sum_j xi_j
{w,rho,gamma>=0,xi>=0}
subject to <w,phi(x_i)> >= rho - xi_i
y_j<w,phi(x_j)> >= y_j*rho + gamma - xi_j
And the corresponding dual optimization problem:
And the corresponding dual optimization problem:
maximize -0.5 sum_(i,j) alpha_i alpha_j y_i y_j k(x_i,x_j)
{0<=alpha_i<=eta_i}
subject to kappa <= sum_j alpha_j (for all labeled examples)
1 = sum_j y_i alpha_j (for all examples)
maximize -0.5 sum_(i,j) alpha_i alpha_j y_i y_j k(x_i,x_j)
{0<=alpha_i<=eta_i}
subject to kappa <= sum_j alpha_j (for all labeled examples)
1 = sum_j y_i alpha_j (for all examples)
We introduce labels y_i = +1 for all unlabeled examples which enables us to combine sums.
We introduce labels y_i = +1 for all unlabeled examples which enables us to combine sums.
Note: Only dual solution is supported.
Note: Only dual solution is supported.
Written by: Nico Goernitz, TU Berlin, 2013/14
Written by: Nico Goernitz, TU Berlin, 2013/14
"""
PRECISION = 1e-9 # important: effects the threshold, support vectors and speed!
def __init__(self, kernel, y, kappa=1.0, Cp=1.0, Cu=1.0, Cn=1.0):
assert(len(y.shape) == 1)
assert len(y.shape) == 1
self.kernel = kernel
self.y = y # (vector) corresponding labels (+1,-1 and 0 for unlabeled)
self.kappa = kappa # (scalar) regularizer for importance of the margin
@@ -53,7 +54,7 @@ class ConvexSSAD:
self.cC = np.zeros(y.size) # cC=Cu (unlabeled) cC=Cp (pos) cC=Cn (neg)
self.cC[y == 0] = Cu
self.cC[y == 1] = Cp
self.cC[y ==-1] = Cn
self.cC[y == -1] = Cn
self.alphas = None
self.svs = None # (vector) list of support vector (contains indices)
@@ -63,14 +64,18 @@ class ConvexSSAD:
# the dual constraint kappa <= sum_{i \in labeled} alpha_i = 0.0 will
# prohibit a solution
if self.labeled == 0:
print('There are no labeled examples hence, setting kappa=0.0')
print("There are no labeled examples hence, setting kappa=0.0")
self.kappa = 0.0
print('Convex semi-supervised anomaly detection with {0} samples ({1} labeled).'.format(self.samples, self.labeled))
print(
"Convex semi-supervised anomaly detection with {0} samples ({1} labeled).".format(
self.samples, self.labeled
)
)
def set_train_kernel(self, kernel):
dim1, dim2 = kernel.shape
print([dim1, dim2])
assert(dim1 == dim2 and dim1 == self.samples)
assert dim1 == dim2 and dim1 == self.samples
self.kernel = kernel
def fit(self, check_psd_eigs=False):
@@ -81,20 +86,20 @@ class ConvexSSAD:
Y = self.cy.dot(self.cy.T)
# generate the final PDS kernel
P = matrix(self.kernel*Y)
P = matrix(self.kernel * Y)
# check for PSD
if check_psd_eigs:
eigs = np.linalg.eigvalsh(np.array(P))
if eigs[0] < 0.0:
print('Smallest eigenvalue is {0}'.format(eigs[0]))
print("Smallest eigenvalue is {0}".format(eigs[0]))
P += spdiag([-eigs[0] for i in range(N)])
# there is no linear part of the objective
q = matrix(0.0, (N, 1))
# sum_i y_i alpha_i = A alpha = b = 1.0
A = matrix(self.cy, (1, self.samples), 'd')
A = matrix(self.cy, (1, self.samples), "d")
b = matrix(1.0, (1, 1))
# inequality constraints: G alpha <= h
@@ -107,8 +112,8 @@ class ConvexSSAD:
h = matrix([h1, h2])
if self.labeled > 0:
# 3) kappa <= \sum_i labeled_i alpha_i -> -cl' alpha <= -kappa
print('Labeled data found.')
G3 = -matrix(self.cl, (1, self.cl.size), 'd')
print("Labeled data found.")
G3 = -matrix(self.cl, (1, self.cl.size), "d")
h3 = -matrix(self.kappa, (1, 1))
G = sparse([G12, -G12, G3])
h = matrix([h1, h2, h3])
@@ -117,27 +122,49 @@ class ConvexSSAD:
sol = qp(P, -q, G, h, A, b)
# store solution
self.alphas = np.array(sol['x'])
self.alphas = np.array(sol["x"])
# 1. find all support vectors, i.e. 0 < alpha_i <= C
# 2. store all support vector with alpha_i < C in 'margins'
self.svs = np.where(self.alphas >= ConvexSSAD.PRECISION)[0]
# these should sum to one
print('Validate solution:')
print('- found {0} support vectors'.format(len(self.svs)))
print('0 <= alpha_i : {0} of {1}'.format(np.sum(0. <= self.alphas), N))
print('- sum_(i) alpha_i cy_i = {0} = 1.0'.format(np.sum(self.alphas*self.cy)))
print('- sum_(i in sv) alpha_i cy_i = {0} ~ 1.0 (approx error)'.format(np.sum(self.alphas[self.svs]*self.cy[self.svs])))
print('- sum_(i in labeled) alpha_i = {0} >= {1} = kappa'.format(np.sum(self.alphas[self.cl == 1]), self.kappa))
print('- sum_(i in unlabeled) alpha_i = {0}'.format(np.sum(self.alphas[self.y == 0])))
print('- sum_(i in positives) alpha_i = {0}'.format(np.sum(self.alphas[self.y == 1])))
print('- sum_(i in negatives) alpha_i = {0}'.format(np.sum(self.alphas[self.y ==-1])))
print("Validate solution:")
print("- found {0} support vectors".format(len(self.svs)))
print("0 <= alpha_i : {0} of {1}".format(np.sum(0.0 <= self.alphas), N))
print(
"- sum_(i) alpha_i cy_i = {0} = 1.0".format(np.sum(self.alphas * self.cy))
)
print(
"- sum_(i in sv) alpha_i cy_i = {0} ~ 1.0 (approx error)".format(
np.sum(self.alphas[self.svs] * self.cy[self.svs])
)
)
print(
"- sum_(i in labeled) alpha_i = {0} >= {1} = kappa".format(
np.sum(self.alphas[self.cl == 1]), self.kappa
)
)
print(
"- sum_(i in unlabeled) alpha_i = {0}".format(
np.sum(self.alphas[self.y == 0])
)
)
print(
"- sum_(i in positives) alpha_i = {0}".format(
np.sum(self.alphas[self.y == 1])
)
)
print(
"- sum_(i in negatives) alpha_i = {0}".format(
np.sum(self.alphas[self.y == -1])
)
)
# infer threshold (rho)
psvs = np.where(self.y[self.svs] == 0)[0]
# case 1: unlabeled support vectors available
self.threshold = 0.
self.threshold = 0.0
unl_threshold = -1e12
lbl_threshold = -1e12
if psvs.size > 0:
@@ -146,7 +173,7 @@ class ConvexSSAD:
unl_threshold = np.max(self.apply(k))
if np.sum(self.cl) > 1e-12:
# case 2: only labeled examples available
# case 2: only labeled examples available
k = self.kernel[:, self.svs]
k = k[self.svs, :]
thres = self.apply(k)
@@ -154,7 +181,7 @@ class ConvexSSAD:
ninds = np.where(self.y[self.svs] == -1)[0]
# only negatives is not possible
if ninds.size > 0 and pinds.size == 0:
print('ERROR: Check pre-defined PRECISION.')
print("ERROR: Check pre-defined PRECISION.")
lbl_threshold = np.max(thres[ninds])
elif ninds.size == 0:
lbl_threshold = np.max(thres[pinds])
@@ -162,7 +189,7 @@ class ConvexSSAD:
# smallest negative + largest positive
p = np.max(thres[pinds])
n = np.min(thres[ninds])
lbl_threshold = (n+p)/2.
lbl_threshold = (n + p) / 2.0
self.threshold = np.max((unl_threshold, lbl_threshold))
def get_threshold(self):
@@ -175,8 +202,8 @@ class ConvexSSAD:
return self.alphas
def apply(self, kernel):
""" Application of dual trained ssad.
kernel = get_kernel(Y, X[:, cssad.svs], kernel_type, kernel_param)
"""Application of dual trained ssad.
kernel = get_kernel(Y, X[:, cssad.svs], kernel_type, kernel_param)
"""
if kernel.shape[1] == self.samples:
# if kernel is not restricted to support vectors

View File

@@ -17,7 +17,7 @@ class SSAD(object):
A class for kernel SSAD models as described in Goernitz et al., Towards Supervised Anomaly Detection, JAIR, 2013.
"""
def __init__(self, kernel='rbf', kappa=1.0, Cp=1.0, Cu=1.0, Cn=1.0, hybrid=False):
def __init__(self, kernel="rbf", kappa=1.0, Cp=1.0, Cu=1.0, Cn=1.0, hybrid=False):
"""Init SSAD instance."""
self.kernel = kernel
self.kappa = kappa
@@ -32,42 +32,59 @@ class SSAD(object):
self.hybrid = hybrid
self.ae_net = None # autoencoder network for the case of a hybrid model
self.linear_model = None # also init a model with linear kernel if hybrid approach
self.linear_model = (
None # also init a model with linear kernel if hybrid approach
)
self.linear_X_svs = None
self.results = {
'train_time': None,
'test_time': None,
'test_auc': None,
'test_scores': None,
'train_time_linear': None,
'test_time_linear': None,
'test_auc_linear': None
"train_time": None,
"test_time": None,
"test_auc": None,
"test_scores": None,
"train_time_linear": None,
"test_time_linear": None,
"test_auc_linear": None,
}
def train(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0):
def train(
self, dataset: BaseADDataset, device: str = "cpu", n_jobs_dataloader: int = 0
):
"""Trains the SSAD model on the training data."""
logger = logging.getLogger()
# do not drop last batch for non-SGD optimization shallow_ssad
train_loader = DataLoader(dataset=dataset.train_set, batch_size=128, shuffle=True,
num_workers=n_jobs_dataloader, drop_last=False)
train_loader = DataLoader(
dataset=dataset.train_set,
batch_size=128,
shuffle=True,
num_workers=n_jobs_dataloader,
drop_last=False,
)
# Get data from loader
X = ()
semi_targets = []
for data in train_loader:
inputs, _, semi_targets_batch, _ = data
inputs, semi_targets_batch = inputs.to(device), semi_targets_batch.to(device)
inputs, semi_targets_batch = inputs.to(device), semi_targets_batch.to(
device
)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
semi_targets += semi_targets_batch.cpu().data.numpy().astype(np.int).tolist()
semi_targets += (
semi_targets_batch.cpu().data.numpy().astype(np.int).tolist()
)
X, semi_targets = np.concatenate(X), np.array(semi_targets)
# Training
logger.info('Starting training...')
logger.info("Starting training...")
# Select model via hold-out test set of 1000 samples
gammas = np.logspace(-7, 2, num=10, base=2)
@@ -82,17 +99,31 @@ class SSAD(object):
inputs, label_batch, _, _ = data
inputs, label_batch = inputs.to(device), label_batch.to(device)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X_test += (X_batch.cpu().data.numpy(),)
labels += label_batch.cpu().data.numpy().astype(np.int64).tolist()
X_test, labels = np.concatenate(X_test), np.array(labels)
n_test, n_normal, n_outlier = len(X_test), np.sum(labels == 0), np.sum(labels == 1)
n_test, n_normal, n_outlier = (
len(X_test),
np.sum(labels == 0),
np.sum(labels == 1),
)
n_val = int(0.1 * n_test)
n_val_normal, n_val_outlier = int(n_val * (n_normal/n_test)), int(n_val * (n_outlier/n_test))
n_val_normal, n_val_outlier = int(n_val * (n_normal / n_test)), int(
n_val * (n_outlier / n_test)
)
perm = np.random.permutation(n_test)
X_val = np.concatenate((X_test[perm][labels[perm] == 0][:n_val_normal],
X_test[perm][labels[perm] == 1][:n_val_outlier]))
X_val = np.concatenate(
(
X_test[perm][labels[perm] == 0][:n_val_normal],
X_test[perm][labels[perm] == 1][:n_val_outlier],
)
)
labels = np.array([0] * n_val_normal + [1] * n_val_outlier)
i = 1
@@ -110,21 +141,25 @@ class SSAD(object):
train_time = time.time() - start_time
# Test on small hold-out set from test set
kernel_val = pairwise_kernels(X_val, X[model.svs, :], metric=self.kernel, gamma=gamma)
kernel_val = pairwise_kernels(
X_val, X[model.svs, :], metric=self.kernel, gamma=gamma
)
scores = (-1.0) * model.apply(kernel_val)
scores = scores.flatten()
# Compute AUC
auc = roc_auc_score(labels, scores)
logger.info(f' | Model {i:02}/{len(gammas):02} | Gamma: {gamma:.8f} | Train Time: {train_time:.3f}s '
f'| Val AUC: {100. * auc:.2f} |')
logger.info(
f" | Model {i:02}/{len(gammas):02} | Gamma: {gamma:.8f} | Train Time: {train_time:.3f}s "
f"| Val AUC: {100. * auc:.2f} |"
)
if auc > best_auc:
best_auc = auc
self.model = model
self.gamma = gamma
self.results['train_time'] = train_time
self.results["train_time"] = train_time
i += 1
@@ -133,19 +168,25 @@ class SSAD(object):
# If hybrid, also train a model with linear kernel
if self.hybrid:
linear_kernel = pairwise_kernels(X, X, metric='linear')
self.linear_model = ConvexSSAD(linear_kernel, semi_targets, Cp=self.Cp, Cu=self.Cu, Cn=self.Cn)
linear_kernel = pairwise_kernels(X, X, metric="linear")
self.linear_model = ConvexSSAD(
linear_kernel, semi_targets, Cp=self.Cp, Cu=self.Cu, Cn=self.Cn
)
start_time = time.time()
self.linear_model.fit()
train_time = time.time() - start_time
self.results['train_time_linear'] = train_time
self.results["train_time_linear"] = train_time
self.linear_X_svs = X[self.linear_model.svs, :]
logger.info(f'Best Model: | Gamma: {self.gamma:.8f} | AUC: {100. * best_auc:.2f}')
logger.info('Training Time: {:.3f}s'.format(self.results['train_time']))
logger.info('Finished training.')
logger.info(
f"Best Model: | Gamma: {self.gamma:.8f} | AUC: {100. * best_auc:.2f}"
)
logger.info("Training Time: {:.3f}s".format(self.results["train_time"]))
logger.info("Finished training.")
def test(self, dataset: BaseADDataset, device: str = 'cpu', n_jobs_dataloader: int = 0):
def test(
self, dataset: BaseADDataset, device: str = "cpu", n_jobs_dataloader: int = 0
):
"""Tests the SSAD model on the test data."""
logger = logging.getLogger()
@@ -158,17 +199,25 @@ class SSAD(object):
labels = []
for data in test_loader:
inputs, label_batch, _, idx = data
inputs, label_batch, idx = inputs.to(device), label_batch.to(device), idx.to(device)
inputs, label_batch, idx = (
inputs.to(device),
label_batch.to(device),
idx.to(device),
)
if self.hybrid:
inputs = self.ae_net.encoder(inputs) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(inputs.size(0), -1) # X_batch.shape = (batch_size, n_channels * height * width)
inputs = self.ae_net.encoder(
inputs
) # in hybrid approach, take code representation of AE as features
X_batch = inputs.view(
inputs.size(0), -1
) # X_batch.shape = (batch_size, n_channels * height * width)
X += (X_batch.cpu().data.numpy(),)
idxs += idx.cpu().data.numpy().astype(np.int64).tolist()
labels += label_batch.cpu().data.numpy().astype(np.int64).tolist()
X = np.concatenate(X)
# Testing
logger.info('Starting testing...')
logger.info("Starting testing...")
start_time = time.time()
# Build kernel
@@ -176,45 +225,53 @@ class SSAD(object):
scores = (-1.0) * self.model.apply(kernel)
self.results['test_time'] = time.time() - start_time
self.results["test_time"] = time.time() - start_time
scores = scores.flatten()
self.rho = -self.model.threshold
# Save triples of (idx, label, score) in a list
idx_label_score += list(zip(idxs, labels, scores.tolist()))
self.results['test_scores'] = idx_label_score
self.results["test_scores"] = idx_label_score
# Compute AUC
_, labels, scores = zip(*idx_label_score)
labels = np.array(labels)
scores = np.array(scores)
self.results['test_auc'] = roc_auc_score(labels, scores)
self.results["test_auc"] = roc_auc_score(labels, scores)
# If hybrid, also test model with linear kernel
if self.hybrid:
start_time = time.time()
linear_kernel = pairwise_kernels(X, self.linear_X_svs, metric='linear')
linear_kernel = pairwise_kernels(X, self.linear_X_svs, metric="linear")
scores_linear = (-1.0) * self.linear_model.apply(linear_kernel)
self.results['test_time_linear'] = time.time() - start_time
self.results["test_time_linear"] = time.time() - start_time
scores_linear = scores_linear.flatten()
self.results['test_auc_linear'] = roc_auc_score(labels, scores_linear)
logger.info('Test AUC linear model: {:.2f}%'.format(100. * self.results['test_auc_linear']))
logger.info('Test Time linear model: {:.3f}s'.format(self.results['test_time_linear']))
self.results["test_auc_linear"] = roc_auc_score(labels, scores_linear)
logger.info(
"Test AUC linear model: {:.2f}%".format(
100.0 * self.results["test_auc_linear"]
)
)
logger.info(
"Test Time linear model: {:.3f}s".format(
self.results["test_time_linear"]
)
)
# Log results
logger.info('Test AUC: {:.2f}%'.format(100. * self.results['test_auc']))
logger.info('Test Time: {:.3f}s'.format(self.results['test_time']))
logger.info('Finished testing.')
logger.info("Test AUC: {:.2f}%".format(100.0 * self.results["test_auc"]))
logger.info("Test Time: {:.3f}s".format(self.results["test_time"]))
logger.info("Finished testing.")
def load_ae(self, dataset_name, model_path):
"""Load pretrained autoencoder from model_path for feature extraction in a hybrid SSAD model."""
model_dict = torch.load(model_path, map_location='cpu')
ae_net_dict = model_dict['ae_net_dict']
if dataset_name in ['mnist', 'fmnist', 'cifar10']:
net_name = dataset_name + '_LeNet'
model_dict = torch.load(model_path, map_location="cpu")
ae_net_dict = model_dict["ae_net_dict"]
if dataset_name in ["mnist", "fmnist", "cifar10"]:
net_name = dataset_name + "_LeNet"
else:
net_name = dataset_name + '_mlp'
net_name = dataset_name + "_mlp"
if self.ae_net is None:
self.ae_net = build_autoencoder(net_name)
@@ -234,11 +291,11 @@ class SSAD(object):
"""Save SSAD model to export_path."""
pass
def load_model(self, import_path, device: str = 'cpu'):
def load_model(self, import_path, device: str = "cpu"):
"""Load SSAD model from import_path."""
pass
def save_results(self, export_json):
"""Save results dict to a JSON-file."""
with open(export_json, 'w') as fp:
with open(export_json, "w") as fp:
json.dump(self.results, fp)