Files
mt/tools/plot_scripts/background_ml_semisupervised.py

275 lines
9.7 KiB
Python
Raw Normal View History

2025-08-13 14:17:12 +02:00
"""
Downloads the cats_vs_dogs dataset, extracts deep features using MobileNetV2,
then generates two clustering comparison illustrations using two different embedding pipelines:
- PCA followed by t-SNE (saved as: semi_supervised_clustering_tsne.png)
- PCA followed by UMAP (saved as: semi_supervised_clustering_umap.png)
Each illustration compares:
- Unsupervised clustering using the deep embedding + KMeans
- Semi-supervised clustering using Label Spreading with a few labeled seeds
This script saves outputs in a datetime folder and also copies the latest outputs
to a "latest" folder. All versions of the outputs and scripts are archived.
"""
import random
import shutil
from datetime import datetime
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import umap
from scipy.optimize import linear_sum_assignment
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.semi_supervised import LabelSpreading
# -----------------------------------------------------------------------------
# CONFIGURATION
# -----------------------------------------------------------------------------
UNSUP_SAMPLES = 200 # number of samples to use for the demo
N_LABELED_CLASS = 20 # number of labeled seeds for the semi-supervised approach
output_path = Path("/home/fedex/mt/plots/background_ml_semisupervised")
datetime_folder_name = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
latest_folder_path = output_path / "latest"
archive_folder_path = output_path / "archive"
output_datetime_path = output_path / datetime_folder_name
# -----------------------------------------------------------------------------
# UTILITIES
# -----------------------------------------------------------------------------
def ensure_dir(directory: Path):
directory.mkdir(exist_ok=True, parents=True)
def cluster_accuracy(y_true, y_pred):
"""
Compute clustering accuracy by determining the optimal mapping
between predicted clusters and true labels using the Hungarian algorithm.
"""
y_true = y_true.astype(np.int64)
y_pred = y_pred.astype(np.int64)
labels = np.unique(y_true)
clusters = np.unique(y_pred)
contingency = np.zeros((labels.size, clusters.size), dtype=np.int64)
for i, label in enumerate(labels):
for j, cluster in enumerate(clusters):
contingency[i, j] = np.sum((y_true == label) & (y_pred == cluster))
row_ind, col_ind = linear_sum_assignment(-contingency)
accuracy = contingency[row_ind, col_ind].sum() / y_true.size
return accuracy
def plot_clustering_comparison_embedding(embedding, y_true, outpath, method_name=""):
"""
Given a 2D data embedding (e.g., from PCA+t-SNE or PCA+UMAP), this function:
- Performs unsupervised clustering with KMeans.
- Performs semi-supervised clustering with Label Spreading using a few labeled seeds.
- Computes accuracy via the Hungarian algorithm.
- Plots the decision boundaries from both methods overlaid with the true labels.
- Annotates the plot with the accuracy results.
The 'method_name' is used in the plot title to indicate which embedding is used.
"""
n = embedding.shape[0]
all_idx = list(range(n))
labeled_idx = random.sample(all_idx, N_LABELED_CLASS)
# Unsupervised clustering using KMeans on all embedded data
km = KMeans(n_clusters=2, random_state=0).fit(embedding)
unsup_pred = km.predict(embedding)
unsup_accuracy = cluster_accuracy(y_true, unsup_pred)
# Create a grid over the space for decision boundaries
x_min, x_max = embedding[:, 0].min() - 1, embedding[:, 0].max() + 1
y_min, y_max = embedding[:, 1].min() - 1, embedding[:, 1].max() + 1
xx, yy = np.meshgrid(np.linspace(x_min, x_max, 200), np.linspace(y_min, y_max, 200))
grid = np.c_[xx.ravel(), yy.ravel()]
pred_unsup = km.predict(grid).reshape(xx.shape)
# Semi-supervised clustering using Label Spreading with labeled seeds
y_train = np.full(n, -1, dtype=int)
y_train[labeled_idx] = y_true[labeled_idx]
ls = LabelSpreading().fit(embedding, y_train)
semi_pred = ls.predict(embedding)
semi_accuracy = cluster_accuracy(y_true, semi_pred)
pred_semi = ls.predict(grid).reshape(xx.shape)
cmap = plt.cm.coolwarm
fig, axes = plt.subplots(1, 2, figsize=(12, 6))
# Unsupervised plot:
ax = axes[0]
ax.contourf(xx, yy, pred_unsup, alpha=0.2, cmap=cmap)
sc1 = ax.scatter(
embedding[:, 0],
embedding[:, 1],
c=y_true,
cmap=cmap,
s=30,
alpha=0.8,
edgecolor="k",
)
ax.set_title(
f"{method_name}\nUnsupervised (KMeans) - Acc: {unsup_accuracy:.2f}", fontsize=10
)
ax.set_xlabel("Dim 1")
ax.set_ylabel("Dim 2")
handles = []
for cl in np.unique(y_true):
handles.append(
plt.Line2D(
[],
[],
marker="o",
linestyle="",
color=cmap(cl / (np.max(y_true) + 1)),
label=f"Class {cl}",
markersize=6,
)
)
ax.legend(handles=handles, loc="upper right")
# Semi-supervised plot:
ax = axes[1]
ax.contourf(xx, yy, pred_semi, alpha=0.2, cmap=cmap)
sc2 = ax.scatter(
embedding[:, 0],
embedding[:, 1],
c=y_true,
cmap=cmap,
s=30,
alpha=0.8,
edgecolor="none",
)
sc3 = ax.scatter(
embedding[labeled_idx, 0],
embedding[labeled_idx, 1],
c=y_true[labeled_idx],
cmap=cmap,
s=80,
edgecolor="k",
marker="o",
label="Labeled Seeds",
)
ax.set_title(
f"{method_name}\nSemi-Supervised (Label Spreading) - Acc: {semi_accuracy:.2f}",
fontsize=10,
)
ax.set_xlabel("Dim 1")
ax.set_ylabel("Dim 2")
handles = []
for cl in np.unique(y_true):
handles.append(
plt.Line2D(
[],
[],
marker="o",
linestyle="",
color=cmap(cl / (np.max(y_true) + 1)),
label=f"Class {cl}",
markersize=6,
)
)
handles.append(
plt.Line2D(
[],
[],
marker="o",
linestyle="",
color="black",
label="Labeled Seed",
markersize=8,
)
)
ax.legend(handles=handles, loc="upper right")
plt.tight_layout()
plt.savefig(outpath, dpi=150)
plt.close()
print(f"✔ Saved clustering comparison illustration → {outpath}")
print(f"Unsupervised Accuracy: {unsup_accuracy:.2f}")
print(f"Semi-Supervised Accuracy: {semi_accuracy:.2f}")
# -----------------------------------------------------------------------------
# MAIN WITH PRE-TRAINED CNN FEATURE EXTRACTION
# -----------------------------------------------------------------------------
if __name__ == "__main__":
# Create output directories
ensure_dir(output_path)
ensure_dir(output_datetime_path)
ensure_dir(latest_folder_path)
ensure_dir(archive_folder_path)
print("▶ Loading cats_vs_dogs dataset...")
ds, info = tfds.load(
"cats_vs_dogs", split="train", with_info=True, as_supervised=True
)
ds = ds.shuffle(1000, reshuffle_each_iteration=False).cache()
# Load a pre-trained CNN (MobileNetV2) for feature extraction.
cnn_model = tf.keras.applications.MobileNetV2(
include_top=False, weights="imagenet", pooling="avg", input_shape=(224, 224, 3)
)
# Extract deep features for all samples.
features_list = []
labels = []
for img, lbl in ds.take(UNSUP_SAMPLES):
# Resize to 224x224 and keep 3 channels.
img_resized = tf.image.resize(img, (224, 224))
# Preprocess the image for MobileNetV2.
img_preprocessed = tf.keras.applications.mobilenet_v2.preprocess_input(
img_resized
)
# Expand dims for batch, run through CNN, then squeeze.
features = cnn_model(tf.expand_dims(img_preprocessed, axis=0))
features = features.numpy().squeeze()
features_list.append(features)
labels.append(lbl.numpy())
X = np.stack(features_list)
y_true = np.array(labels)
# First, apply PCA to reduce dimensionality to 50
pca_50 = PCA(n_components=50, random_state=0).fit_transform(X)
# Then compute embedding with t-SNE
from sklearn.manifold import TSNE
X_tsne = TSNE(n_components=2, random_state=0, init="pca").fit_transform(pca_50)
outfile_tsne = output_datetime_path / "semi_supervised_clustering_tsne.png"
plot_clustering_comparison_embedding(
X_tsne, y_true, outfile_tsne, "CNN + PCA + t-SNE"
)
# Then compute embedding with UMAP
X_umap = umap.UMAP(n_components=2, random_state=0).fit_transform(pca_50)
outfile_umap = output_datetime_path / "semi_supervised_clustering_umap.png"
plot_clustering_comparison_embedding(
X_umap, y_true, outfile_umap, "CNN + PCA + UMAP"
)
# -----------------------------------------------------------------------------
# Update the 'latest' results folder: remove previous and copy current outputs
# -----------------------------------------------------------------------------
shutil.rmtree(latest_folder_path, ignore_errors=True)
ensure_dir(latest_folder_path)
for file in output_datetime_path.iterdir():
shutil.copy2(file, latest_folder_path)
# Copy this script to preserve the code used for the outputs
script_path = Path(__file__)
shutil.copy2(script_path, output_datetime_path)
shutil.copy2(script_path, latest_folder_path)
# Archive the outputs
shutil.move(output_datetime_path, archive_folder_path)