Source code for paftacular.comps.modifiers

"""Modifier components for mzPAF annotations"""

import re
from collections import Counter
from dataclasses import dataclass
from typing import ClassVar, Literal

from tacular import ELEMENT_LOOKUP, REFMOL_LOOKUP, ElementInfo, RefMolInfo

from paftacular.constants import ADDUCT_REGEX_PATTERN, ISOTOPE_REGEX_PATTERN

from ..constants import MAX_CACHE_SIZE
from .base import CompositionProvider, MassProvider, ScalableComposition, Serializable
from .util import composition_to_proforma_formula_string, formula_to_composition


[docs] @dataclass(frozen=True, slots=True) class MassError(Serializable): """Represents mass error with value and unit""" value: float unit: Literal["da", "ppm"] = "da"
[docs] def serialize(self) -> str: if self.unit == "ppm": return f"{self.value:g}ppm" elif self.unit == "da": return f"{self.value:g}" else: raise ValueError(f"Unknown mass error unit: {self.unit}")
[docs] @staticmethod def parse(s: str) -> "MassError": """Parse mass error string like '0.55ppm' or '0.06'""" s = s.strip() if s.endswith("ppm"): return MassError(value=float(s[:-3]), unit="ppm") else: return MassError(value=float(s), unit="da")
[docs] @dataclass(frozen=True, slots=True) class IsotopeSpecification(Serializable, CompositionProvider, MassProvider): """Represents isotope information""" count: int = 0 # number of isotopes above/below monoisotope element: str | None = None # e.g., "13C", "15N" is_average: bool = False # True for averaged isotopomers _cache: ClassVar[dict[tuple, "IsotopeSpecification"]] = {} def __new__(cls, count: int = 0, element: str | None = None, is_average: bool = False): """Create or retrieve cached instance""" key = (count, element, is_average) if key not in cls._cache: # Evict oldest entry if cache is full if len(cls._cache) >= MAX_CACHE_SIZE: cls._cache.pop(next(iter(cls._cache))) instance = object.__new__(cls) cls._cache[key] = instance return cls._cache[key] @property def _prefix(self) -> str: """Get prefix for serialization""" sign = "+" if self.count > 0 else "-" count_str = "" if abs(self.count) == 1 else str(abs(self.count)) return f"{sign}{count_str}"
[docs] def serialize(self) -> str: if self.count == 0: return "" if self.is_average is True: return f"{self._prefix}iA" elif self.element is not None: return f"{self._prefix}i{self.element}" else: return f"{self._prefix}i"
[docs] @staticmethod def parse(s: str) -> "IsotopeSpecification": """Parse isotope string like '+i', '-2i13C', '+iA'""" s = s.strip() match = re.match(ISOTOPE_REGEX_PATTERN, s) if not match: raise ValueError(f"Invalid isotope specification: '{s}'") sign_str, count_str, element_or_avg = match.groups() sign = -1 if sign_str == "-" else 1 count = (int(count_str) if count_str else 1) * sign if element_or_avg == "A": return IsotopeSpecification(count=count, is_average=True) elif element_or_avg: return IsotopeSpecification(count=count, element=element_or_avg) else: return IsotopeSpecification(count=count)
[docs] def mass(self, monoisotopic: bool = True) -> float: """Calculate mass contribution of isotope specification""" if monoisotopic is False: raise ValueError("Cannot calculate mass shift for average isotopomer specification") comp = self.composition m = 0.0 for elem, count in comp.items(): m += elem.get_mass(monoisotopic=True) * count return m
@property def composition(self) -> Counter[ElementInfo]: # lose mono and gain isotope if self.count == 0: return Counter() if self.is_average: raise ValueError("Cannot calculate composition for average isotopomer specification") if self.element is None: raise ValueError("Cannot calculate composition for generic isotope specification without element") if self.element not in ELEMENT_LOOKUP: raise ValueError(f"Unknown element for isotope specification: {self.element}") elem_info: ElementInfo = ELEMENT_LOOKUP[self.element] # Get monoisotopic using the base element symbol (e.g., "C" from "13C") base_symbol = elem_info.symbol mono_info: ElementInfo = ELEMENT_LOOKUP.get_monoisotopic(base_symbol) comp: Counter[ElementInfo] = Counter() comp[elem_info] = self.count comp[mono_info] = -self.count return comp
[docs] def as_dict(self) -> dict: """Convert isotope specification to dictionary representation""" try: _monoisotopic_mass = round(self.monoisotopic_mass, 5) except ValueError: _monoisotopic_mass = None try: _average_mass = round(self.average_mass, 5) except ValueError: _average_mass = None try: _dict_composition = self.dict_composition except ValueError: _dict_composition = None return { "count": self.count, "element": self.element, "is_average": self.is_average, "monoisotopic_mass": _monoisotopic_mass, "average_mass": _average_mass, "composition": _dict_composition, }
[docs] @dataclass(frozen=True, slots=True) class NeutralLoss( Serializable, ScalableComposition, MassProvider, ): """Represents a neutral loss or gain""" count: int base_formula: str | None = None # e.g., "H2O", "NH3" base_mass: float | None = None # e.g., 17.03 for direct mass specification base_reference: str | None = None # e.g., "Phospho", "iTRAQ115" (without brackets) _cache: ClassVar[dict[tuple, "NeutralLoss"]] = {} def __new__(cls, count: int, base_formula: str | None = None, base_mass: float | None = None, base_reference: str | None = None): """Create or retrieve cached instance""" key = (count, base_formula, base_mass, base_reference) if key not in cls._cache: # Evict oldest entry if cache is full if len(cls._cache) >= MAX_CACHE_SIZE: cls._cache.pop(next(iter(cls._cache))) instance = object.__new__(cls) cls._cache[key] = instance return cls._cache[key] def __post_init__(self): """Validate that exactly one of formula/mass/reference is set""" set_count = sum([self.base_formula is not None, self.base_mass is not None, self.base_reference is not None]) if set_count != 1: raise ValueError("Exactly one of formula, mass, or reference must be set") @property def reference(self) -> RefMolInfo | str | None: if self.base_reference is None: return None try: val = REFMOL_LOOKUP[self.base_reference] return val except KeyError: return self.base_reference @property def loss_type(self) -> Literal["mass", "formula", "reference"]: if self.base_mass is not None: return "mass" elif self.base_formula is not None: return "formula" elif self.base_reference is not None: return "reference" else: raise ValueError("Invalid NeutralLoss state") @property def _single_composition(self) -> Counter[ElementInfo]: match self.loss_type: case "formula": if self.base_formula is None: # This shouldn't happen given __post_init__ raise RuntimeError("Invalid state: formula is None") return formula_to_composition(self.base_formula) case "reference": refmol = self.reference if not isinstance(refmol, RefMolInfo): raise ValueError(f"Unknown reference molecule '{self.base_reference}'. Check that it exists in REFMOL_LOOKUP.") return refmol.composition case "mass": raise ValueError(f"Cannot calculate composition for mass-based loss ({self.base_mass} Da). Use a formula or reference instead.") @property def proforma_formula(self) -> str: return composition_to_proforma_formula_string(self.composition) @property def _single_formula(self) -> str: """Get formula for a single instance of the loss (without count/sign)""" match self.loss_type: case "formula": if self.base_formula is None: raise RuntimeError("Formula is None for formula-based loss") return self.base_formula case "reference": refmol: RefMolInfo | str | None = self.reference if isinstance(refmol, RefMolInfo): return refmol.chemical_formula else: raise ValueError(f"Cannot get formula for unknown reference molecule '{refmol}' of type: {type(refmol)}") case "mass": raise ValueError(f"Cannot get formula for mass-based loss: {self.base_mass}") case _: raise ValueError(f"Invalid loss_type: {self.loss_type}") @property def formula(self) -> str: single_formula = self._single_formula return f"{self._sign_prefix}{single_formula}" def _mass_single(self, monoisotopic: bool = True) -> float: match self.loss_type: case "mass": if self.base_mass is None: raise RuntimeError("Mass is None for mass-based loss") return self.base_mass case "formula": comp: Counter[ElementInfo] = self._single_composition if comp is None: raise RuntimeError("Composition is None for formula-based loss") m = 0 for elem, count in comp.items(): m += elem.get_mass(monoisotopic) * count return m case "reference": refmol: RefMolInfo | str | None = self.reference if isinstance(refmol, str) or refmol is None: raise ValueError(f"Cannot get mass for unknown reference molecule '{refmol}'") return refmol.get_mass(monoisotopic)
[docs] def mass(self, monoisotopic: bool = True) -> float: single_mass: float = self._mass_single(monoisotopic) return single_mass * self.count
[docs] def serialize(self, loss_type: Literal["mass", "formula", "reference"] | None = None, monoisotopic: bool = True) -> str: if loss_type is None: loss_type = self.loss_type match loss_type: case "mass": mass = self.mass(monoisotopic=monoisotopic) return f"{mass:+.5f}" case "formula": formula = self.formula return f"{formula}" case "reference": if self.base_reference is not None: ref_name = self.base_reference return f"{self._sign_prefix}[{ref_name}]" else: raise ValueError("Cannot serialize reference: reference name is undefined") raise ValueError("Invalid loss_type for serialization")
[docs] def as_dict(self) -> dict: """Convert the neutral loss to a dictionary representation""" try: _monoisotopic_mass = round(self.monoisotopic_mass, 5) except ValueError: _monoisotopic_mass = None try: _average_mass = round(self.average_mass, 5) except ValueError: _average_mass = None try: _formula = self.formula except ValueError: _formula = None try: _dict_composition = self.dict_composition except ValueError: _dict_composition = None return { "count": self.count, "base_formula": self.base_formula, "base_mass": self.base_mass, "base_reference": self.base_reference, "monoisotopic_mass": _monoisotopic_mass, "average_mass": _average_mass, "composition": _dict_composition, "formula": _formula, }
[docs] @staticmethod def parse(loss_str: str) -> "NeutralLoss": """Parse a neutral loss string into a NeutralLoss object""" sign = loss_str[0] sign_mult: int if sign == "+": sign_mult = 1 elif sign == "-": sign_mult = -1 else: raise ValueError(f"Invalid sign in neutral loss: '{loss_str}'") content = loss_str[1:] # Remove sign # Try to parse as mass (decimal number) if re.match(r"^\d+(?:\.\d+)?$", content): count = 1 * sign_mult return NeutralLoss(count=count, base_mass=float(content)) # Parse as reference group [Name] or COUNT[Name] elif "[" in content: # Changed from content.startswith('[') # Extract count and reference name match = re.match(r"^(\d*)\[([^\]]+)\]$", content) if match: count_str, ref_name = match.groups() count = int(count_str) if count_str else 1 return NeutralLoss(count=count * sign_mult, base_reference=ref_name) # Parse as formula (with optional count prefix) else: # Extract count and formula match = re.match(r"^(\d*)([A-Z].*)$", content) if match: count_str, formula = match.groups() count = int(count_str) if count_str else 1 return NeutralLoss(count=count * sign_mult, base_formula=formula) raise ValueError(f"Could not parse neutral loss: '{loss_str}'")
[docs] @dataclass(frozen=True, slots=True) class Adduct(Serializable, ScalableComposition, MassProvider): count: int base_formula: str _cache: ClassVar[dict[tuple, "Adduct"]] = {} def __new__(cls, count: int, base_formula: str): """Create or retrieve cached instance""" key = (count, base_formula) if key not in cls._cache: # Evict oldest entry if cache is full if len(cls._cache) >= MAX_CACHE_SIZE: cls._cache.pop(next(iter(cls._cache))) instance = object.__new__(cls) cls._cache[key] = instance return cls._cache[key] def __post_init__(self): """Validate adduct""" if self.count == 0: raise ValueError(f"Count must be non-zero, got {self.count}") if not self.base_formula: raise ValueError("Formula cannot be empty") @property def _single_composition(self) -> Counter[ElementInfo]: return formula_to_composition(self.base_formula) # Use helper! @property def formula(self) -> str: return f"{self._sign_prefix}{self.base_formula}" @property def proforma_formula(self) -> str: return composition_to_proforma_formula_string(self._single_composition) # Use helper!
[docs] def serialize(self) -> str: return f"{self._sign_prefix}{self.base_formula}"
[docs] @staticmethod def parse(s: str) -> "Adduct": """Parse a single adduct string like '+H', '+2Na', '-NH4'""" s = s.strip() match = re.match(ADDUCT_REGEX_PATTERN, s) if not match: raise ValueError(f"Invalid adduct: '{s}'") sign_str, count_str, formula = match.groups() sign = 1 if sign_str == "+" else -1 count = (int(count_str) if count_str else 1) * sign return Adduct(count=count, base_formula=formula)
[docs] def as_dict(self) -> dict: """Convert the adduct to a dictionary representation""" try: _monoisotopic_mass = round(self.monoisotopic_mass, 5) except ValueError: _monoisotopic_mass = None try: _average_mass = round(self.average_mass, 5) except ValueError: _average_mass = None try: _formula = self.formula except ValueError: _formula = None try: _dict_composition = self.dict_composition except ValueError: _dict_composition = None return { "count": self.count, "base_formula": self.base_formula, "monoisotopic_mass": _monoisotopic_mass, "average_mass": _average_mass, "composition": _dict_composition, "formula": _formula, }