Spaces:
Running
Running
| # src/patchcore.py | |
| # PatchCore feature extraction and anomaly scoring | |
| # WideResNet-50 frozen backbone, layer2 + layer3 hooks | |
| # This is the core ML component β built from scratch, no Anomalib | |
| import numpy as np | |
| import torch | |
| import torch.nn as nn | |
| import torchvision.models as models | |
| import torchvision.transforms as T | |
| from PIL import Image | |
| import joblib | |
| import os | |
| import scipy.ndimage | |
| DATA_DIR = os.environ.get("DATA_DIR", "data") | |
| DEVICE = "cpu" # HF Spaces has no GPU β always CPU at inference | |
| IMG_SIZE = 224 | |
| class PatchCoreExtractor: | |
| """ | |
| WideResNet-50 feature extractor with forward hooks. | |
| Why two layers: | |
| - layer2 (28x28): captures fine-grained texture anomalies | |
| - layer3 (14x14): captures structural/shape anomalies | |
| Single layer misses one or the other. Multi-scale = better AUROC. | |
| Why frozen: | |
| We never update any weights. PatchCore does not train on defects. | |
| It memorises normal patches, then measures deviation at inference. | |
| """ | |
| def __init__(self, data_dir=DATA_DIR): | |
| self.data_dir = data_dir | |
| self.model = None | |
| self.pca = None | |
| self._layer2_feat = {} | |
| self._layer3_feat = {} | |
| self.transform = T.Compose([ | |
| T.Resize((IMG_SIZE, IMG_SIZE)), | |
| T.ToTensor(), | |
| T.Normalize(mean=[0.485, 0.456, 0.406], | |
| std=[0.229, 0.224, 0.225]) | |
| ]) | |
| def load(self): | |
| # ββ Load WideResNet-50 ββββββββββββββββββββββββββββββββ | |
| self.model = models.wide_resnet50_2(pretrained=False) | |
| weights_path = os.path.join(self.data_dir, "wide_resnet50_2.pth") | |
| if os.path.exists(weights_path): | |
| self.model.load_state_dict(torch.load(weights_path, | |
| map_location="cpu")) | |
| else: | |
| # Download pretrained weights | |
| self.model = models.wide_resnet50_2(pretrained=True) | |
| self.model = self.model.to(DEVICE) | |
| self.model.eval() | |
| # Freeze all weights β never updated | |
| for param in self.model.parameters(): | |
| param.requires_grad = False | |
| # Register hooks | |
| self.model.layer2.register_forward_hook(self._hook_layer2) | |
| self.model.layer3.register_forward_hook(self._hook_layer3) | |
| # ββ Load PCA model ββββββββββββββββββββββββββββββββββββ | |
| pca_path = os.path.join(self.data_dir, "pca_256.pkl") | |
| if not os.path.exists(pca_path): | |
| raise FileNotFoundError(f"PCA model not found: {pca_path}") | |
| self.pca = joblib.load(pca_path) | |
| print(f"PatchCore extractor loaded | " | |
| f"PCA: {self.pca.n_components_} components") | |
| def _hook_layer2(self, module, input, output): | |
| self._layer2_feat["feat"] = output | |
| def _hook_layer3(self, module, input, output): | |
| self._layer3_feat["feat"] = output | |
| def extract_patches(self, pil_img: Image.Image) -> np.ndarray: | |
| """ | |
| Extract 784 patch descriptors from one image. | |
| Pipeline: | |
| 1. Forward pass through WideResNet (hooks capture layer2, layer3) | |
| 2. Upsample layer3 to match layer2 spatial size (14β28) | |
| 3. Concatenate: [1, C2+C3, 28, 28] | |
| 4. 3x3 neighbourhood aggregation (makes each patch context-aware) | |
| 5. Reshape to [784, C2+C3] | |
| 6. PCA reduce to [784, 256] | |
| Returns: [784, 256] float32 numpy array | |
| """ | |
| tensor = self.transform(pil_img).unsqueeze(0).to(DEVICE) | |
| _ = self.model(tensor) # triggers hooks | |
| l2 = self._layer2_feat["feat"] # [1, C2, 28, 28] | |
| l3 = self._layer3_feat["feat"] # [1, C3, 14, 14] | |
| # Upsample layer3 to 28x28 | |
| l3_up = nn.functional.interpolate( | |
| l3, size=(28, 28), mode="bilinear", align_corners=False | |
| ) | |
| combined = torch.cat([l2, l3_up], dim=1) # [1, C2+C3, 28, 28] | |
| # 3x3 neighbourhood aggregation | |
| combined = nn.functional.avg_pool2d( | |
| combined, kernel_size=3, stride=1, padding=1 | |
| ) | |
| # Reshape: [1, C, 28, 28] β [784, C] | |
| B, C, H, W = combined.shape | |
| patches = combined.permute(0, 2, 3, 1).reshape(-1, C) | |
| patches_np = patches.cpu().numpy().astype(np.float32) | |
| # PCA reduce: [784, C] β [784, 256] | |
| patches_reduced = self.pca.transform(patches_np).astype(np.float32) | |
| return patches_reduced # [784, 256] | |
| def build_anomaly_map(self, | |
| patch_scores: np.ndarray, | |
| smooth: bool = True) -> np.ndarray: | |
| """ | |
| Convert [28, 28] patch distance grid to [224, 224] anomaly heatmap. | |
| Steps: | |
| 1. Upsample 28x28 β 224x224 (bilinear) | |
| 2. Gaussian smoothing (sigma=4) β removes patch-boundary artifacts | |
| 3. Normalise to [0, 1] | |
| Returns: [224, 224] float32 heatmap | |
| """ | |
| # Upsample via PIL for bilinear interpolation | |
| from PIL import Image as PILImage | |
| heatmap_pil = PILImage.fromarray(patch_scores.astype(np.float32)) | |
| heatmap = np.array( | |
| heatmap_pil.resize((224, 224), PILImage.BILINEAR), | |
| dtype=np.float32 | |
| ) | |
| # Gaussian smoothing | |
| if smooth: | |
| heatmap = scipy.ndimage.gaussian_filter(heatmap, sigma=2) | |
| # Normalise to [0, 1] | |
| h_min, h_max = heatmap.min(), heatmap.max() | |
| if h_max - h_min > 1e-8: | |
| heatmap = (heatmap - h_min) / (h_max - h_min) | |
| return heatmap | |
| def get_anomaly_centroid(self, heatmap: np.ndarray) -> tuple: | |
| """ | |
| Find the peak (highest activation) location of the anomaly. | |
| Used to locate defect crop for Index 2 retrieval. | |
| Returns: (cx, cy) pixel coordinates of maximum activation | |
| """ | |
| if heatmap.size == 0: | |
| return (112, 112) # centre fallback | |
| # Use peak location, not mean of thresholded region | |
| max_idx = np.unravel_index(np.argmax(heatmap), heatmap.shape) | |
| return (int(max_idx[1]), int(max_idx[0])) # cx, cy | |
| def calibrate_score(self, | |
| raw_score: float, | |
| category: str, | |
| thresholds: dict) -> float: | |
| """ | |
| Calibrated score: sigmoid((score - mean) / std) | |
| Raw k-NN distance is NOT a probability. | |
| Calibrated score IS interpretable as anomaly confidence. | |
| Interview line: "My scores are calibrated against the distribution | |
| of normal patch distances in the training set, not raw distances." | |
| """ | |
| if category not in thresholds: | |
| return float(1 / (1 + np.exp(-raw_score))) | |
| cal_mean = thresholds[category]["cal_mean"] | |
| cal_std = thresholds[category]["cal_std"] | |
| z = (raw_score - cal_mean) / (cal_std + 1e-8) | |
| return float(1 / (1 + np.exp(-z))) | |
| # Global instance | |
| patchcore = PatchCoreExtractor() |