Source code for glasscut.stain_normalizers.macenko

import warnings
from warnings import warn
from typing import Any, List
import logging

import numpy as np
from PIL import Image

from glasscut.tile import Tile
from glasscut.tissue_detectors import OtsuTissueDetector

from .base import StainNormalizer, TransformerStainMatrixMixin


[docs] class MacenkoStainNormalizer(TransformerStainMatrixMixin, StainNormalizer): """Stain normalizer using M. Macenko et al.'s method. This method performs unsupervised color deconvolution to identify stain vectors, then normalizes stain concentrations to a reference image. It works well for standard H&E stained histopathology images. The algorithm: 1. Converts image to optical density (OD) space 2. Performs principal component analysis on OD values 3. Identifies stain vectors using angular decomposition 4. Normalizes stain concentrations to match reference Attributes ---------- stain_color_map : dict Mapping of stain names to their normalized OD vectors. Examples -------- >>> from PIL import Image >>> from glasscut.stain_normalizers import MacenkoStainNormalizer >>> normalizer = MacenkoStainNormalizer() >>> ref_image = Image.open("reference.png") >>> normalizer.fit(ref_image) >>> test_image = Image.open("test.png") >>> normalized_image = normalizer.transform(test_image) """ # Normalized OD vectors for standard stains stain_color_map = { "hematoxylin": np.array([0.65, 0.70, 0.29]), "eosin": np.array([0.07, 0.99, 0.11]), "dab": np.array([0.27, 0.57, 0.78]), "null": np.array([0.0, 0.0, 0.0]), } _logger = logging.getLogger(__name__)
[docs] def __init__(self): """Initialize MacenkoStainNormalizer.""" warnings.warn( "MacenkoStainNormalizer is experimental and may produce errors or " "unexpected results on certain images. Use with caution.", UserWarning, stacklevel=2, ) super().__init__() self.stain_matrix_target = None self.max_concentrations_target = None
[docs] def stain_matrix( self, img_rgb: Image.Image, background_intensity: int = 240, **kwargs: Any, ) -> np.ndarray: """Estimate stain matrix using Macenko's method. Parameters ---------- img_rgb : PIL.Image.Image Input image in RGB or RGBA format. background_intensity : int, optional Background transmitted light intensity. Default is 240. **kwargs Additional keyword arguments. Other Parameters ---------------- alpha : int, optional Minimum angular percentile. Default is 1. beta : float, optional Threshold for OD magnitude filtering. Default is 0.15. stains : list of str, optional List of stain names in order. Default is ``["hematoxylin", "eosin"]``. Returns ------- np.ndarray Calculated 3×3 stain matrix with stain vectors as columns. Raises ------ ValueError If stains is not a 2-element list. ValueError If image is not RGB or RGBA. """ alpha = kwargs.get("alpha", 1) beta = kwargs.get("beta", 0.15) stains = kwargs.get("stains", None) if stains is not None and len(stains) != 2: raise ValueError("Only 2-stain lists are currently supported.") stains = ["hematoxylin", "eosin"] if stains is None else stains if img_rgb.mode not in ["RGB", "RGBA"]: raise ValueError("Input image must be RGB or RGBA") # Convert to OD and apply tissue masking tile = Tile(img_rgb, None, None, tissue_detector=OtsuTissueDetector()) tissue_mask = tile.tissue_mask.astype(bool) if img_rgb.mode == "RGBA": red, green, blue, _ = img_rgb.split() img_rgb = Image.merge("RGB", (red, green, blue)) warn( "Input image is RGBA. Converting to RGB before OD conversion. " "Alpha channel will be discarded.", stacklevel=2, ) img_arr = np.array(img_rgb) od = -np.log((img_arr.astype(np.float64) + 1) / background_intensity) od = od[tissue_mask].reshape(-1, 3) if od.size == 0: self._logger.warning( "Macenko fallback: no tissue pixels available for stain matrix estimation." ) return self._default_stain_matrix(stains) # Remove data with OD intensity less than β od_hat = od[~np.any(od < beta, axis=1)] if od_hat.shape[0] < 3: self._logger.warning( "Macenko fallback: insufficient OD samples after beta filtering (%d).", od_hat.shape[0], ) return self._default_stain_matrix(stains) # Calculate principal components and project input try: V = np.linalg.eigh(np.cov(od_hat, rowvar=False))[1][:, -2:] except np.linalg.LinAlgError: self._logger.warning( "Macenko fallback: covariance eigendecomposition failed.", exc_info=True, ) return self._default_stain_matrix(stains) if not np.isfinite(V).all(): self._logger.warning( "Macenko fallback: non-finite principal components encountered." ) return self._default_stain_matrix(stains) proj = np.dot(od_hat, V) # Angular coordinates with respect to principal orthogonal eigenvectors phi = np.arctan2(proj[:, 1], proj[:, 0]) # Min and max angles min_phi = np.percentile(phi, alpha) max_phi = np.percentile(phi, 100 - alpha) # The two principal stains min_v = V.dot(np.array([(np.cos(min_phi), np.sin(min_phi))]).T) max_v = V.dot(np.array([(np.cos(max_phi), np.sin(max_phi))]).T) # Fill out empty columns in stain matrix and reorder _arr = np.hstack([min_v, max_v]) if not np.isfinite(_arr).all(): self._logger.warning( "Macenko fallback: non-finite stain vectors encountered." ) return self._default_stain_matrix(stains) unordered_stain_matrix = self._complement_stain_matrix( _arr / np.linalg.norm(_arr, axis=0) ) ordered_stain_matrix = self._reorder_stains( unordered_stain_matrix, stains=stains ) return ordered_stain_matrix
@classmethod def _default_stain_matrix(cls, stains: List[str]) -> np.ndarray: """Return a robust fallback 3x3 stain matrix for H&E-like normalization.""" first = np.array(cls.stain_color_map[stains[0]], dtype=np.float64) second = np.array(cls.stain_color_map[stains[1]], dtype=np.float64) first = first / np.linalg.norm(first) second = second / np.linalg.norm(second) third = np.cross(first, second) third_norm = np.linalg.norm(third) if third_norm == 0: third = np.array(cls.stain_color_map["null"], dtype=np.float64) else: third = third / third_norm return np.stack([first, second, third], axis=1) @staticmethod def _complement_stain_matrix(stain_matrix: np.ndarray) -> np.ndarray: """Complete a 3×2 stain matrix to 3×3 using cross product. Parameters ---------- stain_matrix : np.ndarray A 3×2 stain matrix with normalized stain vectors as columns. Returns ------- np.ndarray A 3×3 stain matrix with a third orthogonal column computed as the normalized cross product of the first two columns. """ stain0 = stain_matrix[:, 0] stain1 = stain_matrix[:, 1] stain2 = np.cross(stain0, stain1) # Normalize new vector to unit norm return np.array([stain0, stain1, stain2 / np.linalg.norm(stain2)]).T @staticmethod def _find_stain_index(reference: np.ndarray, stain_matrix: np.ndarray) -> int: """Find column index in stain matrix closest to reference vector. Parameters ---------- reference : np.ndarray Reference stain vector (1D array). stain_matrix : np.ndarray Stain matrix with normalized column vectors. Returns ------- int Index of the column with smallest angular distance to reference. Notes ----- Uses absolute dot product as similarity metric (angular distance). """ dot_products = np.dot(reference, stain_matrix) return int(np.argmax(np.abs(dot_products))) @staticmethod def _reorder_stains(stain_matrix: np.ndarray, stains: List[str]) -> np.ndarray: """Reorder stain matrix columns to match expected stain order. Parameters ---------- stain_matrix : np.ndarray A 3×3 matrix of stain column vectors. stains : list of str Ordered list of stain names. Must be length 2. Returns ------- np.ndarray Reordered 3×3 stain matrix with columns in specified order. Notes ----- The third column (null stain) is always placed last. """ def _get_channel_order(stain_matrix: np.ndarray) -> tuple[int, int, int]: first = MacenkoStainNormalizer._find_stain_index( MacenkoStainNormalizer.stain_color_map[stains[0]], stain_matrix ) second = 1 - first # If 2 stains, third "stain" is cross product of 1st 2 channels third = 2 return first, second, third def _ordered_stack( stain_matrix: np.ndarray, order: tuple[int, int, int] ) -> np.ndarray: return np.stack([stain_matrix[..., j] for j in order], -1) return _ordered_stack(stain_matrix, _get_channel_order(stain_matrix))