added deepsad base code

This commit is contained in:
Jan Kowalczyk
2024-06-28 07:42:12 +02:00
parent 2eb1bf2e05
commit 914bb020d0
57 changed files with 4974 additions and 0 deletions

View File

@@ -0,0 +1,10 @@
from .main import build_network, build_autoencoder
from .mnist_LeNet import MNIST_LeNet, MNIST_LeNet_Decoder, MNIST_LeNet_Autoencoder
from .fmnist_LeNet import FashionMNIST_LeNet, FashionMNIST_LeNet_Decoder, FashionMNIST_LeNet_Autoencoder
from .cifar10_LeNet import CIFAR10_LeNet, CIFAR10_LeNet_Decoder, CIFAR10_LeNet_Autoencoder
from .mlp import MLP, MLP_Decoder, MLP_Autoencoder
from .layers.stochastic import GaussianSample
from .layers.standard import Standardize
from .inference.distributions import log_standard_gaussian, log_gaussian, log_standard_categorical
from .vae import VariationalAutoencoder, Encoder, Decoder
from .dgm import DeepGenerativeModel, StackedDeepGenerativeModel

View File

@@ -0,0 +1,82 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
from base.base_net import BaseNet
class CIFAR10_LeNet(BaseNet):
def __init__(self, rep_dim=128):
super().__init__()
self.rep_dim = rep_dim
self.pool = nn.MaxPool2d(2, 2)
self.conv1 = nn.Conv2d(3, 32, 5, bias=False, padding=2)
self.bn2d1 = nn.BatchNorm2d(32, eps=1e-04, affine=False)
self.conv2 = nn.Conv2d(32, 64, 5, bias=False, padding=2)
self.bn2d2 = nn.BatchNorm2d(64, eps=1e-04, affine=False)
self.conv3 = nn.Conv2d(64, 128, 5, bias=False, padding=2)
self.bn2d3 = nn.BatchNorm2d(128, eps=1e-04, affine=False)
self.fc1 = nn.Linear(128 * 4 * 4, self.rep_dim, bias=False)
def forward(self, x):
x = x.view(-1, 3, 32, 32)
x = self.conv1(x)
x = self.pool(F.leaky_relu(self.bn2d1(x)))
x = self.conv2(x)
x = self.pool(F.leaky_relu(self.bn2d2(x)))
x = self.conv3(x)
x = self.pool(F.leaky_relu(self.bn2d3(x)))
x = x.view(int(x.size(0)), -1)
x = self.fc1(x)
return x
class CIFAR10_LeNet_Decoder(BaseNet):
def __init__(self, rep_dim=128):
super().__init__()
self.rep_dim = rep_dim
self.deconv1 = nn.ConvTranspose2d(int(self.rep_dim / (4 * 4)), 128, 5, bias=False, padding=2)
nn.init.xavier_uniform_(self.deconv1.weight, gain=nn.init.calculate_gain('leaky_relu'))
self.bn2d4 = nn.BatchNorm2d(128, eps=1e-04, affine=False)
self.deconv2 = nn.ConvTranspose2d(128, 64, 5, bias=False, padding=2)
nn.init.xavier_uniform_(self.deconv2.weight, gain=nn.init.calculate_gain('leaky_relu'))
self.bn2d5 = nn.BatchNorm2d(64, eps=1e-04, affine=False)
self.deconv3 = nn.ConvTranspose2d(64, 32, 5, bias=False, padding=2)
nn.init.xavier_uniform_(self.deconv3.weight, gain=nn.init.calculate_gain('leaky_relu'))
self.bn2d6 = nn.BatchNorm2d(32, eps=1e-04, affine=False)
self.deconv4 = nn.ConvTranspose2d(32, 3, 5, bias=False, padding=2)
nn.init.xavier_uniform_(self.deconv4.weight, gain=nn.init.calculate_gain('leaky_relu'))
def forward(self, x):
x = x.view(int(x.size(0)), int(self.rep_dim / (4 * 4)), 4, 4)
x = F.leaky_relu(x)
x = self.deconv1(x)
x = F.interpolate(F.leaky_relu(self.bn2d4(x)), scale_factor=2)
x = self.deconv2(x)
x = F.interpolate(F.leaky_relu(self.bn2d5(x)), scale_factor=2)
x = self.deconv3(x)
x = F.interpolate(F.leaky_relu(self.bn2d6(x)), scale_factor=2)
x = self.deconv4(x)
x = torch.sigmoid(x)
return x
class CIFAR10_LeNet_Autoencoder(BaseNet):
def __init__(self, rep_dim=128):
super().__init__()
self.rep_dim = rep_dim
self.encoder = CIFAR10_LeNet(rep_dim=rep_dim)
self.decoder = CIFAR10_LeNet_Decoder(rep_dim=rep_dim)
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x

View File

@@ -0,0 +1,123 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
from .vae import VariationalAutoencoder, Encoder, Decoder
# Acknowledgements: https://github.com/wohlert/semi-supervised-pytorch
class Classifier(nn.Module):
"""
Classifier network, i.e. q(y|x), for two classes (0: normal, 1: outlier)
:param net: neural network class to use (as parameter to use the same network over different shallow_ssad)
"""
def __init__(self, net, dims=None):
super(Classifier, self).__init__()
self.dims = dims
if dims is None:
self.net = net()
self.logits = nn.Linear(self.net.rep_dim, 2)
else:
[x_dim, h_dim, y_dim] = dims
self.dense = nn.Linear(x_dim, h_dim)
self.logits = nn.Linear(h_dim, y_dim)
def forward(self, x):
if self.dims is None:
x = self.net(x)
else:
x = F.relu(self.dense(x))
x = F.softmax(self.logits(x), dim=-1)
return x
class DeepGenerativeModel(VariationalAutoencoder):
"""
M2 model from the paper 'Semi-Supervised Learning with Deep Generative Models' (Kingma et al., 2014).
The 'Generative semi-supervised model' (M2) is a probabilistic model that incorporates label information in both
inference and generation.
:param dims: dimensions of the model given by [input_dim, label_dim, latent_dim, [hidden_dims]].
:param classifier_net: classifier network class to use.
"""
def __init__(self, dims, classifier_net=None):
[x_dim, self.y_dim, z_dim, h_dim] = dims
super(DeepGenerativeModel, self).__init__([x_dim, z_dim, h_dim])
self.encoder = Encoder([x_dim + self.y_dim, h_dim, z_dim])
self.decoder = Decoder([z_dim + self.y_dim, list(reversed(h_dim)), x_dim])
if classifier_net is None:
self.classifier = Classifier(net=None, dims=[x_dim, h_dim[0], self.y_dim])
else:
self.classifier = Classifier(classifier_net)
# Init linear layers
for m in self.modules():
if isinstance(m, nn.Linear):
init.xavier_normal_(m.weight.data)
if m.bias is not None:
m.bias.data.zero_()
def forward(self, x, y):
z, q_mu, q_log_var = self.encoder(torch.cat((x, y), dim=1))
self.kl_divergence = self._kld(z, (q_mu, q_log_var))
rec = self.decoder(torch.cat((z, y), dim=1))
return rec
def classify(self, x):
logits = self.classifier(x)
return logits
def sample(self, z, y):
"""
Samples from the Decoder to generate an x.
:param z: latent normal variable
:param y: label (one-hot encoded)
:return: x
"""
y = y.float()
x = self.decoder(torch.cat((z, y), dim=1))
return x
class StackedDeepGenerativeModel(DeepGenerativeModel):
def __init__(self, dims, features):
"""
M1+M2 model as described in (Kingma et al., 2014).
:param dims: dimensions of the model given by [input_dim, label_dim, latent_dim, [hidden_dims]].
:param classifier_net: classifier network class to use.
:param features: a pre-trained M1 model of class 'VariationalAutoencoder' trained on the same dataset.
"""
[x_dim, y_dim, z_dim, h_dim] = dims
super(StackedDeepGenerativeModel, self).__init__([features.z_dim, y_dim, z_dim, h_dim])
# Be sure to reconstruct with the same dimensions
in_features = self.decoder.reconstruction.in_features
self.decoder.reconstruction = nn.Linear(in_features, x_dim)
# Make vae feature model untrainable by freezing parameters
self.features = features
self.features.train(False)
for param in self.features.parameters():
param.requires_grad = False
def forward(self, x, y):
# Sample a new latent x from the M1 model
x_sample, _, _ = self.features.encoder(x)
# Use the sample as new input to M2
return super(StackedDeepGenerativeModel, self).forward(x_sample, y)
def classify(self, x):
_, x, _ = self.features.encoder(x)
logits = self.classifier(x)
return logits

View File

@@ -0,0 +1,76 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
from base.base_net import BaseNet
class FashionMNIST_LeNet(BaseNet):
def __init__(self, rep_dim=64):
super().__init__()
self.rep_dim = rep_dim
self.pool = nn.MaxPool2d(2, 2)
self.conv1 = nn.Conv2d(1, 16, 5, bias=False, padding=2)
self.bn2d1 = nn.BatchNorm2d(16, eps=1e-04, affine=False)
self.conv2 = nn.Conv2d(16, 32, 5, bias=False, padding=2)
self.bn2d2 = nn.BatchNorm2d(32, eps=1e-04, affine=False)
self.fc1 = nn.Linear(32 * 7 * 7, 128, bias=False)
self.bn1d1 = nn.BatchNorm1d(128, eps=1e-04, affine=False)
self.fc2 = nn.Linear(128, self.rep_dim, bias=False)
def forward(self, x):
x = x.view(-1, 1, 28, 28)
x = self.conv1(x)
x = self.pool(F.leaky_relu(self.bn2d1(x)))
x = self.conv2(x)
x = self.pool(F.leaky_relu(self.bn2d2(x)))
x = x.view(int(x.size(0)), -1)
x = F.leaky_relu(self.bn1d1(self.fc1(x)))
x = self.fc2(x)
return x
class FashionMNIST_LeNet_Decoder(BaseNet):
def __init__(self, rep_dim=64):
super().__init__()
self.rep_dim = rep_dim
self.fc3 = nn.Linear(self.rep_dim, 128, bias=False)
self.bn1d2 = nn.BatchNorm1d(128, eps=1e-04, affine=False)
self.deconv1 = nn.ConvTranspose2d(8, 32, 5, bias=False, padding=2)
self.bn2d3 = nn.BatchNorm2d(32, eps=1e-04, affine=False)
self.deconv2 = nn.ConvTranspose2d(32, 16, 5, bias=False, padding=3)
self.bn2d4 = nn.BatchNorm2d(16, eps=1e-04, affine=False)
self.deconv3 = nn.ConvTranspose2d(16, 1, 5, bias=False, padding=2)
def forward(self, x):
x = self.bn1d2(self.fc3(x))
x = x.view(int(x.size(0)), int(128 / 16), 4, 4)
x = F.interpolate(F.leaky_relu(x), scale_factor=2)
x = self.deconv1(x)
x = F.interpolate(F.leaky_relu(self.bn2d3(x)), scale_factor=2)
x = self.deconv2(x)
x = F.interpolate(F.leaky_relu(self.bn2d4(x)), scale_factor=2)
x = self.deconv3(x)
x = torch.sigmoid(x)
return x
class FashionMNIST_LeNet_Autoencoder(BaseNet):
def __init__(self, rep_dim=64):
super().__init__()
self.rep_dim = rep_dim
self.encoder = FashionMNIST_LeNet(rep_dim=rep_dim)
self.decoder = FashionMNIST_LeNet_Decoder(rep_dim=rep_dim)
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x

View File

@@ -0,0 +1,41 @@
import math
import torch
import torch.nn.functional as F
# Acknowledgements: https://github.com/wohlert/semi-supervised-pytorch
def log_standard_gaussian(x):
"""
Evaluates the log pdf of a standard normal distribution at x.
:param x: point to evaluate
:return: log N(x|0,I)
"""
return torch.sum(-0.5 * math.log(2 * math.pi) - x ** 2 / 2, dim=-1)
def log_gaussian(x, mu, log_var):
"""
Evaluates the log pdf of a normal distribution parametrized by mu and log_var at x.
:param x: point to evaluate
:param mu: mean
:param log_var: log variance
:return: log N(x|µ,σI)
"""
log_pdf = -0.5 * math.log(2 * math.pi) - log_var / 2 - (x - mu)**2 / (2 * torch.exp(log_var))
return torch.sum(log_pdf, dim=-1)
def log_standard_categorical(p):
"""
Computes the cross-entropy between a (one-hot) categorical vector and a standard (uniform) categorical distribution.
:param p: one-hot categorical distribution
:return: H(p,u)
"""
eps = 1e-8
prior = F.softmax(torch.ones_like(p), dim=1) # Uniform prior over y
prior.requires_grad = False
cross_entropy = -torch.sum(p * torch.log(prior + eps), dim=1)
return cross_entropy

View File

@@ -0,0 +1,52 @@
import torch
from torch.nn import Module
from torch.nn import init
from torch.nn.parameter import Parameter
# Acknowledgements: https://github.com/wohlert/semi-supervised-pytorch
class Standardize(Module):
"""
Applies (element-wise) standardization with trainable translation parameter μ and scale parameter σ, i.e. computes
(x - μ) / σ where '/' is applied element-wise.
Args:
in_features: size of each input sample
out_features: size of each output sample
bias: If set to False, the layer will not learn a translation parameter μ.
Default: ``True``
Attributes:
mu: the learnable translation parameter μ.
std: the learnable scale parameter σ.
"""
__constants__ = ['mu']
def __init__(self, in_features, bias=True, eps=1e-6):
super(Standardize, self).__init__()
self.in_features = in_features
self.out_features = in_features
self.eps = eps
self.std = Parameter(torch.Tensor(in_features))
if bias:
self.mu = Parameter(torch.Tensor(in_features))
else:
self.register_parameter('mu', None)
self.reset_parameters()
def reset_parameters(self):
init.constant_(self.std, 1)
if self.mu is not None:
init.constant_(self.mu, 0)
def forward(self, x):
if self.mu is not None:
x -= self.mu
x = torch.div(x, self.std + self.eps)
return x
def extra_repr(self):
return 'in_features={}, out_features={}, bias={}'.format(
self.in_features, self.out_features, self.mu is not None
)

View File

@@ -0,0 +1,53 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
# Acknowledgements: https://github.com/wohlert/semi-supervised-pytorch
class Stochastic(nn.Module):
"""
Base stochastic layer that uses the reparametrization trick (Kingma and Welling, 2013) to draw a sample from a
distribution parametrized by mu and log_var.
"""
def __init__(self):
super(Stochastic, self).__init__()
def reparametrize(self, mu, log_var):
epsilon = Variable(torch.randn(mu.size()), requires_grad=False)
if mu.is_cuda:
epsilon = epsilon.to(mu.device)
# log_std = 0.5 * log_var
# std = exp(log_std)
std = log_var.mul(0.5).exp_()
# z = std * epsilon + mu
z = mu.addcmul(std, epsilon)
return z
def forward(self, x):
raise NotImplementedError
class GaussianSample(Stochastic):
"""
Layer that represents a sample from a Gaussian distribution.
"""
def __init__(self, in_features, out_features):
super(GaussianSample, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.mu = nn.Linear(in_features, out_features)
self.log_var = nn.Linear(in_features, out_features)
def forward(self, x):
mu = self.mu(x)
log_var = F.softplus(self.log_var(x))
return self.reparametrize(mu, log_var), mu, log_var

View File

@@ -0,0 +1,138 @@
from .mnist_LeNet import MNIST_LeNet, MNIST_LeNet_Autoencoder
from .fmnist_LeNet import FashionMNIST_LeNet, FashionMNIST_LeNet_Autoencoder
from .cifar10_LeNet import CIFAR10_LeNet, CIFAR10_LeNet_Autoencoder
from .mlp import MLP, MLP_Autoencoder
from .vae import VariationalAutoencoder
from .dgm import DeepGenerativeModel, StackedDeepGenerativeModel
def build_network(net_name, ae_net=None):
"""Builds the neural network."""
implemented_networks = ('mnist_LeNet', 'mnist_DGM_M2', 'mnist_DGM_M1M2',
'fmnist_LeNet', 'fmnist_DGM_M2', 'fmnist_DGM_M1M2',
'cifar10_LeNet', 'cifar10_DGM_M2', 'cifar10_DGM_M1M2',
'arrhythmia_mlp', 'cardio_mlp', 'satellite_mlp', 'satimage-2_mlp', 'shuttle_mlp',
'thyroid_mlp',
'arrhythmia_DGM_M2', 'cardio_DGM_M2', 'satellite_DGM_M2', 'satimage-2_DGM_M2',
'shuttle_DGM_M2', 'thyroid_DGM_M2')
assert net_name in implemented_networks
net = None
if net_name == 'mnist_LeNet':
net = MNIST_LeNet()
if net_name == 'mnist_DGM_M2':
net = DeepGenerativeModel([1*28*28, 2, 32, [128, 64]], classifier_net=MNIST_LeNet)
if net_name == 'mnist_DGM_M1M2':
net = StackedDeepGenerativeModel([1*28*28, 2, 32, [128, 64]], features=ae_net)
if net_name == 'fmnist_LeNet':
net = FashionMNIST_LeNet()
if net_name == 'fmnist_DGM_M2':
net = DeepGenerativeModel([1*28*28, 2, 64, [256, 128]], classifier_net=FashionMNIST_LeNet)
if net_name == 'fmnist_DGM_M1M2':
net = StackedDeepGenerativeModel([1*28*28, 2, 64, [256, 128]], features=ae_net)
if net_name == 'cifar10_LeNet':
net = CIFAR10_LeNet()
if net_name == 'cifar10_DGM_M2':
net = DeepGenerativeModel([3*32*32, 2, 128, [512, 256]], classifier_net=CIFAR10_LeNet)
if net_name == 'cifar10_DGM_M1M2':
net = StackedDeepGenerativeModel([3*32*32, 2, 128, [512, 256]], features=ae_net)
if net_name == 'arrhythmia_mlp':
net = MLP(x_dim=274, h_dims=[128, 64], rep_dim=32, bias=False)
if net_name == 'cardio_mlp':
net = MLP(x_dim=21, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'satellite_mlp':
net = MLP(x_dim=36, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'satimage-2_mlp':
net = MLP(x_dim=36, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'shuttle_mlp':
net = MLP(x_dim=9, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'thyroid_mlp':
net = MLP(x_dim=6, h_dims=[32, 16], rep_dim=4, bias=False)
if net_name == 'arrhythmia_DGM_M2':
net = DeepGenerativeModel([274, 2, 32, [128, 64]])
if net_name == 'cardio_DGM_M2':
net = DeepGenerativeModel([21, 2, 8, [32, 16]])
if net_name == 'satellite_DGM_M2':
net = DeepGenerativeModel([36, 2, 8, [32, 16]])
if net_name == 'satimage-2_DGM_M2':
net = DeepGenerativeModel([36, 2, 8, [32, 16]])
if net_name == 'shuttle_DGM_M2':
net = DeepGenerativeModel([9, 2, 8, [32, 16]])
if net_name == 'thyroid_DGM_M2':
net = DeepGenerativeModel([6, 2, 4, [32, 16]])
return net
def build_autoencoder(net_name):
"""Builds the corresponding autoencoder network."""
implemented_networks = ('mnist_LeNet', 'mnist_DGM_M1M2',
'fmnist_LeNet', 'fmnist_DGM_M1M2',
'cifar10_LeNet', 'cifar10_DGM_M1M2',
'arrhythmia_mlp', 'cardio_mlp', 'satellite_mlp', 'satimage-2_mlp', 'shuttle_mlp',
'thyroid_mlp')
assert net_name in implemented_networks
ae_net = None
if net_name == 'mnist_LeNet':
ae_net = MNIST_LeNet_Autoencoder()
if net_name == 'mnist_DGM_M1M2':
ae_net = VariationalAutoencoder([1*28*28, 32, [128, 64]])
if net_name == 'fmnist_LeNet':
ae_net = FashionMNIST_LeNet_Autoencoder()
if net_name == 'fmnist_DGM_M1M2':
ae_net = VariationalAutoencoder([1*28*28, 64, [256, 128]])
if net_name == 'cifar10_LeNet':
ae_net = CIFAR10_LeNet_Autoencoder()
if net_name == 'cifar10_DGM_M1M2':
ae_net = VariationalAutoencoder([3*32*32, 128, [512, 256]])
if net_name == 'arrhythmia_mlp':
ae_net = MLP_Autoencoder(x_dim=274, h_dims=[128, 64], rep_dim=32, bias=False)
if net_name == 'cardio_mlp':
ae_net = MLP_Autoencoder(x_dim=21, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'satellite_mlp':
ae_net = MLP_Autoencoder(x_dim=36, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'satimage-2_mlp':
ae_net = MLP_Autoencoder(x_dim=36, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'shuttle_mlp':
ae_net = MLP_Autoencoder(x_dim=9, h_dims=[32, 16], rep_dim=8, bias=False)
if net_name == 'thyroid_mlp':
ae_net = MLP_Autoencoder(x_dim=6, h_dims=[32, 16], rep_dim=4, bias=False)
return ae_net

View File

@@ -0,0 +1,76 @@
import torch.nn as nn
import torch.nn.functional as F
from base.base_net import BaseNet
class MLP(BaseNet):
def __init__(self, x_dim, h_dims=[128, 64], rep_dim=32, bias=False):
super().__init__()
self.rep_dim = rep_dim
neurons = [x_dim, *h_dims]
layers = [Linear_BN_leakyReLU(neurons[i - 1], neurons[i], bias=bias) for i in range(1, len(neurons))]
self.hidden = nn.ModuleList(layers)
self.code = nn.Linear(h_dims[-1], rep_dim, bias=bias)
def forward(self, x):
x = x.view(int(x.size(0)), -1)
for layer in self.hidden:
x = layer(x)
return self.code(x)
class MLP_Decoder(BaseNet):
def __init__(self, x_dim, h_dims=[64, 128], rep_dim=32, bias=False):
super().__init__()
self.rep_dim = rep_dim
neurons = [rep_dim, *h_dims]
layers = [Linear_BN_leakyReLU(neurons[i - 1], neurons[i], bias=bias) for i in range(1, len(neurons))]
self.hidden = nn.ModuleList(layers)
self.reconstruction = nn.Linear(h_dims[-1], x_dim, bias=bias)
self.output_activation = nn.Sigmoid()
def forward(self, x):
x = x.view(int(x.size(0)), -1)
for layer in self.hidden:
x = layer(x)
x = self.reconstruction(x)
return self.output_activation(x)
class MLP_Autoencoder(BaseNet):
def __init__(self, x_dim, h_dims=[128, 64], rep_dim=32, bias=False):
super().__init__()
self.rep_dim = rep_dim
self.encoder = MLP(x_dim, h_dims, rep_dim, bias)
self.decoder = MLP_Decoder(x_dim, list(reversed(h_dims)), rep_dim, bias)
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x
class Linear_BN_leakyReLU(nn.Module):
"""
A nn.Module that consists of a Linear layer followed by BatchNorm1d and a leaky ReLu activation
"""
def __init__(self, in_features, out_features, bias=False, eps=1e-04):
super(Linear_BN_leakyReLU, self).__init__()
self.linear = nn.Linear(in_features, out_features, bias=bias)
self.bn = nn.BatchNorm1d(out_features, eps=eps, affine=bias)
def forward(self, x):
return F.leaky_relu(self.bn(self.linear(x)))

View File

@@ -0,0 +1,71 @@
import torch
import torch.nn as nn
import torch.nn.functional as F
from base.base_net import BaseNet
class MNIST_LeNet(BaseNet):
def __init__(self, rep_dim=32):
super().__init__()
self.rep_dim = rep_dim
self.pool = nn.MaxPool2d(2, 2)
self.conv1 = nn.Conv2d(1, 8, 5, bias=False, padding=2)
self.bn1 = nn.BatchNorm2d(8, eps=1e-04, affine=False)
self.conv2 = nn.Conv2d(8, 4, 5, bias=False, padding=2)
self.bn2 = nn.BatchNorm2d(4, eps=1e-04, affine=False)
self.fc1 = nn.Linear(4 * 7 * 7, self.rep_dim, bias=False)
def forward(self, x):
x = x.view(-1, 1, 28, 28)
x = self.conv1(x)
x = self.pool(F.leaky_relu(self.bn1(x)))
x = self.conv2(x)
x = self.pool(F.leaky_relu(self.bn2(x)))
x = x.view(int(x.size(0)), -1)
x = self.fc1(x)
return x
class MNIST_LeNet_Decoder(BaseNet):
def __init__(self, rep_dim=32):
super().__init__()
self.rep_dim = rep_dim
# Decoder network
self.deconv1 = nn.ConvTranspose2d(2, 4, 5, bias=False, padding=2)
self.bn3 = nn.BatchNorm2d(4, eps=1e-04, affine=False)
self.deconv2 = nn.ConvTranspose2d(4, 8, 5, bias=False, padding=3)
self.bn4 = nn.BatchNorm2d(8, eps=1e-04, affine=False)
self.deconv3 = nn.ConvTranspose2d(8, 1, 5, bias=False, padding=2)
def forward(self, x):
x = x.view(int(x.size(0)), int(self.rep_dim / 16), 4, 4)
x = F.interpolate(F.leaky_relu(x), scale_factor=2)
x = self.deconv1(x)
x = F.interpolate(F.leaky_relu(self.bn3(x)), scale_factor=2)
x = self.deconv2(x)
x = F.interpolate(F.leaky_relu(self.bn4(x)), scale_factor=2)
x = self.deconv3(x)
x = torch.sigmoid(x)
return x
class MNIST_LeNet_Autoencoder(BaseNet):
def __init__(self, rep_dim=32):
super().__init__()
self.rep_dim = rep_dim
self.encoder = MNIST_LeNet(rep_dim=rep_dim)
self.decoder = MNIST_LeNet_Decoder(rep_dim=rep_dim)
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x

View File

@@ -0,0 +1,145 @@
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import init
from .layers.stochastic import GaussianSample
from .inference.distributions import log_standard_gaussian, log_gaussian
# Acknowledgements: https://github.com/wohlert/semi-supervised-pytorch
class Encoder(nn.Module):
"""
Encoder, i.e. the inference network.
Attempts to infer the latent probability distribution p(z|x) from the data x by fitting a
variational distribution q_φ(z|x). Returns the two parameters of the distribution (µ, log σ²).
:param dims: dimensions of the network given by [input_dim, [hidden_dims], latent_dim].
"""
def __init__(self, dims, sample_layer=GaussianSample):
super(Encoder, self).__init__()
[x_dim, h_dim, z_dim] = dims
neurons = [x_dim, *h_dim]
linear_layers = [nn.Linear(neurons[i-1], neurons[i]) for i in range(1, len(neurons))]
self.hidden = nn.ModuleList(linear_layers)
self.sample = sample_layer(h_dim[-1], z_dim)
def forward(self, x):
for layer in self.hidden:
x = F.relu(layer(x))
return self.sample(x)
class Decoder(nn.Module):
"""
Decoder, i.e. the generative network.
Generates samples from an approximation p_θ(x|z) of the original distribution p(x)
by transforming a latent representation z.
:param dims: dimensions of the network given by [latent_dim, [hidden_dims], input_dim].
"""
def __init__(self, dims):
super(Decoder, self).__init__()
[z_dim, h_dim, x_dim] = dims
neurons = [z_dim, *h_dim]
linear_layers = [nn.Linear(neurons[i-1], neurons[i]) for i in range(1, len(neurons))]
self.hidden = nn.ModuleList(linear_layers)
self.reconstruction = nn.Linear(h_dim[-1], x_dim)
self.output_activation = nn.Sigmoid()
def forward(self, x):
for layer in self.hidden:
x = F.relu(layer(x))
return self.output_activation(self.reconstruction(x))
class VariationalAutoencoder(nn.Module):
"""
Variational Autoencoder (VAE) (Kingma and Welling, 2013) model consisting of an encoder-decoder pair for which
a variational distribution is fitted to the encoder.
Also known as the M1 model in (Kingma et al., 2014)
:param dims: dimensions of the networks given by [input_dim, latent_dim, [hidden_dims]]. Encoder and decoder
are build symmetrically.
"""
def __init__(self, dims):
super(VariationalAutoencoder, self).__init__()
[x_dim, z_dim, h_dim] = dims
self.z_dim = z_dim
self.flow = None
self.encoder = Encoder([x_dim, h_dim, z_dim])
self.decoder = Decoder([z_dim, list(reversed(h_dim)), x_dim])
self.kl_divergence = 0
# Init linear layers
for m in self.modules():
if isinstance(m, nn.Linear):
init.xavier_normal_(m.weight.data)
if m.bias is not None:
m.bias.data.zero_()
def _kld(self, z, q_param, p_param=None):
"""
Computes the KL-divergence of some latent variable z.
KL(q||p) = - ∫ q(z) log [ p(z) / q(z) ] = - E_q[ log p(z) - log q(z) ]
:param z: sample from q-distribuion
:param q_param: (mu, log_var) of the q-distribution
:param p_param: (mu, log_var) of the p-distribution
:return: KL(q||p)
"""
(mu, log_var) = q_param
if self.flow is not None:
f_z, log_det_z = self.flow(z)
qz = log_gaussian(z, mu, log_var) - sum(log_det_z)
z = f_z
else:
qz = log_gaussian(z, mu, log_var)
if p_param is None:
pz = log_standard_gaussian(z)
else:
(mu, log_var) = p_param
pz = log_gaussian(z, mu, log_var)
kl = qz - pz
return kl
def add_flow(self, flow):
self.flow = flow
def forward(self, x, y=None):
"""
Runs a forward pass on a data point through the VAE model to provide its reconstruction and the parameters of
the variational approximate distribution q.
:param x: input data
:return: reconstructed input
"""
z, q_mu, q_log_var = self.encoder(x)
self.kl_divergence = self._kld(z, (q_mu, q_log_var))
rec = self.decoder(z)
return rec
def sample(self, z):
"""
Given z ~ N(0, I) generates a sample from the learned distribution based on p_θ(x|z).
:param z: (torch.autograd.Variable) latent normal variable
:return: (torch.autograd.Variable) generated sample
"""
return self.decoder(z)