Source code for soursop.ssbme

##     _____  ____  _    _ _____   _____  ____  _____
##   / ____|/ __ \| |  | |  __ \ / ____|/ __ \|  __ \
##  | (___ | |  | | |  | | |__) | (___ | |  | | |__) |
##   \___ \| |  | | |  | |  _  / \___ \| |  | |  ___/
##   ____) | |__| | |__| | | \ \ ____) | |__| | |
##  |_____/ \____/ \____/|_|  \_\_____/ \____/|_|

## Alex Holehouse (Pappu Lab and Holehouse Lab) and Jared Lalmansing (Pappu lab)
## Simulation analysis package
## Copyright 2014 - 2026
##

"""
ssbme - Bayesian Maximum Entropy (BME) and iterative BME (iBME) reweighting.

This module reweights molecular ensembles against experimental observables
using the Bayesian Maximum Entropy framework. BME finds the minimally biased
set of frame weights (smallest relative entropy from the prior) that brings
the ensemble averages into agreement with experimental data.

Two reweighters are provided:

``BME``
    Standard maximum-entropy reweighting. Use when the calculated observables
    are already on the same scale as the experimental values.

``iBME``
    Iterative BME. At every iteration a (weighted) linear regression of the
    ensemble-averaged calculated values against the experimental values is
    used to refit a global scale and offset before the maxent step. This is
    the method of choice when there is an unknown global scale/offset between
    the calculated and experimental data (e.g. SAXS intensities). Ported from
    the reference implementation of Bottaro et al.

``ExperimentalObservable``
    Container for a single experimental data point (value, uncertainty and
    constraint type).

The optimal regularisation strength ``theta`` can be selected automatically
with an L-curve scan via :func:`theta_scan` (also exposed as
``BME.scan_theta`` / ``iBME.scan_theta``).

Example
-------
>>> import numpy as np
>>> from soursop.ssbme import BME, iBME, ExperimentalObservable
>>> rng = np.random.default_rng(0)
>>> calc = rng.normal([24, 65], [3, 6], size=(2000, 2))
>>> obs = [ExperimentalObservable(23.0, 1.0, name="Rg"),
...        ExperimentalObservable(60.0, 2.0, name="Ree")]
>>> result = BME(obs, calc).fit(theta=2.0, auto_theta=False)
>>> reweighted_means = result.predict(calc)

References
----------
Primary reference for the BME procedure and software this module is
adapted from:

* Bottaro, S., Bengtsen, T. & Lindorff-Larsen, K. *Integrating Molecular
  Simulation and Experimental Data: A Bayesian/Maximum Entropy Reweighting
  Approach.* Methods Mol. Biol. 2112, 219-240 (2020).
  https://doi.org/10.1007/978-1-0716-0270-6_15
  (preprint: bioRxiv 2018, https://doi.org/10.1101/457952). Reference
  implementation: https://github.com/KULL-Centre/BME.

Foundational and closely related work:

* Hummer, G. & Koefinger, J. *Bayesian ensemble refinement by replica
  simulations and reweighting.* J. Chem. Phys. 143, 243150 (2015) - the
  BioEn reweighting functional that BME is mathematically equivalent to.
* Cesari, A., Gil-Ley, A. & Bussi, G. *Combining simulations and solution
  experiments as a paradigm for RNA force field refinement.* J. Chem.
  Theory Comput. 12, 6192-6200 (2016) - the maximum-entropy-with-error
  objective Gamma(lambda) minimised here.
* Cesari, A., Reisser, S. & Bussi, G. *Using the maximum entropy
  principle to combine simulations and solution experiments.*
  Computation 6, 15 (2018) - review of the method and its pitfalls.
* Rozycki, B., Kim, Y. C. & Hummer, G. *SAXS ensemble refinement of
  ESCRT-III CHMP3 conformational transitions.* Structure 19, 109-116
  (2011) - EROS; origin of the global scaling parameter theta and the
  SAXS scale/offset problem addressed by iBME.
* Bottaro, S. & Lindorff-Larsen, K. *Biophysical experiments and
  biomolecular simulations: A perfect match?* Science 361, 355-360
  (2018) - perspective on integrating simulation and experiment.

**Author(s):** Alex Holehouse
"""

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Optional, Tuple, Union

import numpy as np
from scipy.optimize import minimize
from scipy.special import logsumexp, rel_entr

from .ssexceptions import SSException
from .ssutils import (
    MIN_WEIGHT_THRESHOLD,
    VALID_CONSTRAINTS,
    ExperimentalObservable,
    constraint_chi_squared,
    find_optimal_theta,
    relative_entropy,
    validate_reweighting_inputs,
    weighted_linear_regression,
)

# These reweighting primitives were factored out to soursop.ssutils so
# they can be shared with soursop.sscoper (COPER / iCOPER). The aliases
# below preserve this module's public API
# (``ExperimentalObservable``, ``find_optimal_theta``,
# ``MIN_WEIGHT_THRESHOLD``, ``VALID_CONSTRAINTS``) and its internal call
# sites (``_srel``, ``_weighted_linear_regression``, ``_validate_inputs``)
# unchanged.
_srel = relative_entropy
_weighted_linear_regression = weighted_linear_regression
_validate_inputs = validate_reweighting_inputs

__all__ = [
    "BME",
    "iBME",
    "BMECustom",
    "BMEResult",
    "BMECustomResult",
    "ThetaScanResult",
    "theta_scan",
    "ExperimentalObservable",
    "find_optimal_theta",
    "MIN_WEIGHT_THRESHOLD",
    "VALID_CONSTRAINTS",
]

# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
# Module constants
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
DEFAULT_THETA = 0.5
DEFAULT_MAX_ITERATIONS = 50000
DEFAULT_OPTIMIZER = "L-BFGS-B"
LAMBDA_INIT_SCALE = 1e-3


# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
[docs] @dataclass class BMEResult: """Container for a BME / iBME reweighting result. Attributes ---------- weights : numpy.ndarray Optimized (posterior) frame weights, summing to 1. initial_weights : numpy.ndarray Prior frame weights. lambdas : numpy.ndarray Optimized Lagrange multipliers (one per observable). chi_squared_initial : float Reduced chi-squared before reweighting. chi_squared_final : float Reduced chi-squared after reweighting. phi : float Fraction of effective frames, ``exp(-D_KL(w || w0))``. n_iterations : int Number of optimizer iterations (maxent) or iBME iterations. success : bool Whether the optimization succeeded. message : str Optimizer status message. theta : float Regularisation strength used. observables : list of ExperimentalObservable Observables used in the fit. calculated_values : numpy.ndarray Calculated values used in the fit (for iBME, the final scaled array). metadata : dict Free-form metadata (optimizer, timestamp, ...). scale : float, optional Final iBME scale factor (alpha) relative to the original input. ``None`` for plain BME. offset : float, optional Final iBME offset (beta) relative to the original input. ``None`` for plain BME. ibme_iterations : list of dict Per-iteration iBME log (empty for plain BME). Each entry has ``iteration``, ``scale``, ``offset``, ``chi_squared`` and ``diff``. """ weights: np.ndarray initial_weights: np.ndarray lambdas: np.ndarray chi_squared_initial: float chi_squared_final: float phi: float n_iterations: int success: bool message: str theta: float observables: List[ExperimentalObservable] calculated_values: np.ndarray metadata: dict scale: Optional[float] = None offset: Optional[float] = None ibme_iterations: List[dict] = field(default_factory=list) def __str__(self): status = "SUCCESS" if self.success else "FAILED" lines = [ f"BME Result [{status}]", f" Chi-squared initial: {self.chi_squared_initial:.4f}", f" Chi-squared final: {self.chi_squared_final:.4f}", f" phi (effective fraction): {self.phi:.4f}", f" Iterations: {self.n_iterations}", f" Theta: {self.theta}", ] if self.scale is not None: lines.append(f" iBME scale/offset: {self.scale:.4g} / {self.offset:.4g}") return "\n".join(lines) def __repr__(self): return self.__str__() @property def kl_divergence(self) -> float: """Relative entropy (KL divergence) of ``weights`` from prior.""" if self.phi > 0: return -np.log(self.phi) return np.inf
[docs] def predict(self, calculated_values: np.ndarray) -> np.ndarray: """Weighted average of arbitrary observables using these weights. Parameters ---------- calculated_values : numpy.ndarray Per-frame observable values, shape ``(n_frames, n_observables)``. ``n_frames`` must match the fitted ensemble. Returns ------- numpy.ndarray Weighted means, shape ``(n_observables,)``. Raises ------ SSException If the number of frames does not match the fitted ensemble. """ calculated_values = np.asarray(calculated_values) if calculated_values.shape[0] != self.weights.shape[0]: raise SSException( f"Number of frames in calculated_values " f"({calculated_values.shape[0]}) must match the fitted " f"ensemble ({self.weights.shape[0]})" ) return np.sum(self.weights[:, np.newaxis] * calculated_values, axis=0)
[docs] def diagnostics(self, warn_threshold: float = 0.5) -> dict: """Diagnose the reweighting result and flag potential issues. Parameters ---------- warn_threshold : float, optional Minimum acceptable ``phi`` before a low-diversity warning is raised. Default 0.5. Returns ------- dict Diagnostic metrics and a list of human-readable ``warnings``. Notes ----- Two effective sample sizes are reported: the entropy-based ``neff_entropy`` (``N * phi``, the usual BME measure) and the Renyi-2 / participation ratio ``neff_renyi2`` (``1 / sum w^2``), which is more sensitive to a few dominant weights. """ diag: dict = {} warnings: List[str] = [] n = len(self.weights) neff_entropy = n * float(self.phi) diag["neff_entropy"] = neff_entropy diag["neff_entropy_fraction"] = float(self.phi) neff_renyi2 = 1.0 / np.sum(self.weights**2) diag["neff_renyi2"] = float(neff_renyi2) diag["neff_renyi2_fraction"] = float(neff_renyi2 / n) diag["weight_min"] = float(self.weights.min()) diag["weight_max"] = float(self.weights.max()) diag["weight_std"] = float(self.weights.std()) if diag["weight_min"] > 0: diag["weight_range_orders"] = float( np.log10(diag["weight_max"] / diag["weight_min"]) ) else: diag["weight_range_orders"] = np.inf diag["chi2_improvement"] = self.chi_squared_initial - self.chi_squared_final diag["chi2_improvement_pct"] = ( (diag["chi2_improvement"] / self.chi_squared_initial) * 100.0 if self.chi_squared_initial > 0 else np.nan ) if self.phi < warn_threshold: warnings.append( f"Low Phi ({self.phi:.3f} < {warn_threshold}): significant " "loss of ensemble diversity. Consider increasing theta or " "loosening observable uncertainties." ) if neff_renyi2 < 0.1 * n: warnings.append( f"Low effective sample size (1/sum w^2) ({neff_renyi2:.1f} / " f"{n}): only ~{diag['neff_renyi2_fraction'] * 100:.1f}% of " "frames are effectively used." ) if diag["weight_range_orders"] > 3: warnings.append( f"Large weight range ({diag['weight_range_orders']:.1f} " "orders of magnitude): a few frames dominate the ensemble." ) if self.chi_squared_final > 2 * len(self.observables): warnings.append( f"High final Chi-squared ({self.chi_squared_final:.2f}): " "poor fit; observables may be incompatible with the ensemble." ) diag["warnings"] = warnings diag["status"] = "OK" if len(warnings) == 0 else "WARNING" return diag
[docs] def print_diagnostics(self, warn_threshold: float = 0.5): """Print a formatted diagnostic report for this result. Parameters ---------- warn_threshold : float, optional Passed through to :meth:`diagnostics`. Default 0.5. """ diag = self.diagnostics(warn_threshold) n = len(self.weights) key_w, val_w = 32, 14 print("\n" + "=" * 60) print("BME DIAGNOSTIC REPORT") print("=" * 60) print(f"\nOptimization Status: {self.message}") print(f"Success: {self.success}") print(f"Iterations: {self.n_iterations}") print("\nChi-squared:") print(f" {'Initial':<{key_w}} {self.chi_squared_initial:>{val_w}.4f}") print(f" {'Final':<{key_w}} {self.chi_squared_final:>{val_w}.4f}") print( f" {'Improvement':<{key_w}} {diag['chi2_improvement']:>{val_w}.4f}" f" ({diag['chi2_improvement_pct']:.1f}%)" ) print("\nEnsemble Diversity:") print(f" {'Phi (entropy fraction)':<{key_w}} {self.phi:>{val_w}.4f}") print( f" {'N_eff (entropy-based)':<{key_w}} " f"{diag['neff_entropy']:>{val_w}.1f} / {n}" ) print( f" {'N_eff (1/sum w^2, Renyi-2)':<{key_w}} " f"{diag['neff_renyi2']:>{val_w}.1f} / {n}" ) print(f" {'Theta':<{key_w}} {self.theta:>{val_w}.4f}") if self.scale is not None: print(f" {'iBME scale':<{key_w}} {self.scale:>{val_w}.4g}") print(f" {'iBME offset':<{key_w}} {self.offset:>{val_w}.4g}") print("\nWeight Distribution:") print(f" {'Min':<{key_w}} {diag['weight_min']:>{val_w}.2e}") print(f" {'Max':<{key_w}} {diag['weight_max']:>{val_w}.2e}") print(f" {'Std Dev':<{key_w}} {diag['weight_std']:>{val_w}.2e}") print( f" {'Range (orders of magnitude)':<{key_w}} " f"{diag['weight_range_orders']:>{val_w}.1f}" ) if len(diag["warnings"]) > 0: print(f"\nWARNINGS ({len(diag['warnings'])}):") for i, warning in enumerate(diag["warnings"], 1): print(f" {i}. {warning}") else: print(f"\nStatus: {diag['status']} - No issues detected") print("=" * 60)
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
[docs] @dataclass class ThetaScanResult: """Container for an L-curve theta scan. Attributes ---------- theta_values : numpy.ndarray Scanned theta grid. chi_squared_values : numpy.ndarray Final chi-squared at each theta. phi_values : numpy.ndarray Effective-fraction (phi) at each theta. kl_divergence_values : numpy.ndarray Relative entropy at each theta. results : list of BMEResult Per-theta fit results. optimal_theta : float Selected theta (L-curve knee). optimal_idx : int Index of ``optimal_theta`` in ``theta_values``. method : str Human-readable knee-selection method used. """ theta_values: np.ndarray chi_squared_values: np.ndarray phi_values: np.ndarray kl_divergence_values: np.ndarray results: List[BMEResult] optimal_theta: float optimal_idx: int method: str
[docs] def print_summary(self): """Print a formatted summary of the theta scan.""" print("\n" + "=" * 60) print("THETA SCAN SUMMARY") print("=" * 60) print( f"\nScan range: {self.theta_values[0]:.4f} to {self.theta_values[-1]:.4f}" ) print(f"Number of points: {len(self.theta_values)}") print(f"Method: {self.method}\n") opt = self.optimal_idx print(f"RECOMMENDED THETA: {self.optimal_theta:.4f}") print(f"Chi squared: {self.chi_squared_values[opt]:.4f}") print(f"Phi (N_eff): {self.phi_values[opt]:.4f}") print(f"Relative entropy: {self.kl_divergence_values[opt]:.4f}") print("=" * 60)
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
[docs] def theta_scan( observables: List[ExperimentalObservable], calculated_values: np.ndarray, reweighter: str = "bme", theta_range: Union[Tuple[float, float], np.ndarray] = (0.01, 10.0), n_points: int = 15, log_scale: bool = True, initial_weights: Optional[np.ndarray] = None, method: str = "perpendicular", verbose: bool = False, fit_kwargs: Optional[dict] = None, ) -> ThetaScanResult: """Scan a grid of theta values for BME or iBME and pick the L-curve knee. Parameters ---------- observables : list of ExperimentalObservable Experimental observables. calculated_values : numpy.ndarray Per-frame calculated values, shape ``(n_frames, n_observables)``. reweighter : str, optional ``"bme"`` (default) or ``"ibme"``. theta_range : tuple or numpy.ndarray, optional ``(min, max)`` for a generated grid of ``n_points``, or an explicit 1D array of theta values. Default ``(0.01, 10.0)``. n_points : int, optional Number of grid points when ``theta_range`` is a tuple. Default 15. log_scale : bool, optional Sample logarithmically when ``theta_range`` is a tuple. Default True. initial_weights : numpy.ndarray, optional Prior weights. Uniform if None. method : str, optional Knee-selection method (see :func:`find_optimal_theta`). verbose : bool, optional Print per-theta progress. Default False. fit_kwargs : dict, optional Extra keyword arguments forwarded to the reweighter's ``fit`` (e.g. ``ftol``/``max_ibme_iterations`` for iBME). Returns ------- ThetaScanResult Raises ------ SSException If ``reweighter`` is unknown, ``n_points`` < 1, or a log-scale grid has non-positive endpoints. """ if reweighter not in ("bme", "ibme"): raise SSException(f"Unknown reweighter: {reweighter}, must be 'bme' or 'ibme'") if isinstance(theta_range, (tuple, list)): if n_points < 1: raise SSException( f"n_points must be >= 1 for a tuple theta_range, got {n_points}" ) if log_scale and (theta_range[0] <= 0 or theta_range[1] <= 0): raise SSException( "theta_range endpoints must be positive when log_scale=True, " f"got {theta_range}" ) if log_scale: theta_values = np.logspace( np.log10(theta_range[0]), np.log10(theta_range[1]), n_points ) else: theta_values = np.linspace(theta_range[0], theta_range[1], n_points) else: theta_values = np.asarray(theta_range, dtype=np.float64) fit_kwargs = dict(fit_kwargs) if fit_kwargs else {} chi2_vals: List[float] = [] phi_vals: List[float] = [] kl_vals: List[float] = [] results: List[BMEResult] = [] for i, theta in enumerate(theta_values): if verbose: print(f"Processing theta {i + 1}/{len(theta_values)}: {theta:.4f}") if reweighter == "bme": rw = BME(observables, calculated_values, initial_weights) res = rw.fit( theta=float(theta), auto_theta=False, verbose=False, **fit_kwargs, ) else: rw = iBME(observables, calculated_values, initial_weights) res = rw.fit(theta=float(theta), verbose=False, **fit_kwargs) results.append(res) chi2_vals.append(res.chi_squared_final) phi_vals.append(res.phi) kl_vals.append(res.kl_divergence) chi2_arr = np.array(chi2_vals) phi_arr = np.array(phi_vals) kl_arr = np.array(kl_vals) optimal_idx, method_name = find_optimal_theta(chi2_arr, kl_arr, method=method) return ThetaScanResult( theta_values=theta_values, chi_squared_values=chi2_arr, phi_values=phi_arr, kl_divergence_values=kl_arr, results=results, optimal_theta=float(theta_values[optimal_idx]), optimal_idx=optimal_idx, method=method_name, )
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
[docs] class BME: """Bayesian Maximum Entropy reweighting. Finds the minimally biased frame weights that bring ensemble averages into agreement with experimental observables, by maximising entropy relative to the prior subject to a Gaussian-error data restraint. Parameters ---------- observables : list of ExperimentalObservable Experimental observables to fit against. calculated_values : numpy.ndarray Per-frame calculated observable values, shape ``(n_frames, n_observables)``. initial_weights : numpy.ndarray, optional Prior frame weights. If None, uniform weights are used. Internally normalized to sum to 1. Raises ------ SSException If inputs are malformed (see :func:`_validate_inputs`). """
[docs] def __init__( self, observables: List[ExperimentalObservable], calculated_values: np.ndarray, initial_weights: Optional[np.ndarray] = None, ): _validate_inputs(observables, calculated_values, initial_weights) self.observables = list(observables) self.calculated_values = calculated_values self.n_frames = calculated_values.shape[0] self.n_observables = len(observables) if initial_weights is None: self.initial_weights = np.ones(self.n_frames) / self.n_frames else: self.initial_weights = initial_weights / np.sum(initial_weights) self._lambdas = np.random.normal( loc=0.0, scale=LAMBDA_INIT_SCALE, size=self.n_observables ).astype(np.float64) self._bounds = [obs.get_bounds() for obs in self.observables] self._exp_values = np.array( [obs.value for obs in self.observables], dtype=np.float64 ) self._exp_sigma2 = np.array( [obs.uncertainty**2 for obs in self.observables], dtype=np.float64, ) self._result: Optional[BMEResult] = None self._theta_scan_result: Optional[ThetaScanResult] = None self._theta: Optional[float] = None
# ------------------------------------------------------------------ def _compute_chi_squared(self, weights: np.ndarray) -> float: """Constraint-aware reduced chi-squared for a weight vector. ``equality`` observables always penalize deviations; ``upper`` / ``lower`` only penalize the disallowed side. Parameters ---------- weights : numpy.ndarray Frame weights, shape ``(n_frames,)``. Returns ------- float Mean of ``(diff / sigma)^2`` over observables. """ return constraint_chi_squared(weights, self.calculated_values, self.observables) # ------------------------------------------------------------------ def _objective_and_gradient(self, lambdas: np.ndarray) -> Tuple[float, np.ndarray]: """Maxent objective and gradient with a Gaussian-error prior. Implements ``L = lambda^T O_exp + (theta/2) lambda^T Sigma^2 lambda + log Z`` and its gradient, scaled by ``1/theta`` for numerical stability. Parameters ---------- lambdas : numpy.ndarray Current Lagrange multipliers. Returns ------- tuple ``(objective/theta, gradient/theta)``. """ log_w = -np.sum(lambdas * self.calculated_values, axis=1) + np.log( self.initial_weights ) log_z = logsumexp(log_w) probs = np.exp(log_w - log_z) avg_calc = np.sum(probs[:, np.newaxis] * self.calculated_values, axis=0) regularization = self._theta / 2 * np.sum(lambdas**2 * self._exp_sigma2) constraint = np.dot(lambdas, self._exp_values) objective = log_z + constraint + regularization gradient = ( self._exp_values + self._theta * lambdas * self._exp_sigma2 - avg_calc ) return objective / self._theta, gradient / self._theta # ------------------------------------------------------------------ def _run_single_theta_optimization( self, max_iterations: int, optimizer: str, verbose: bool ) -> BMEResult: """Run the maxent optimization for the currently set ``self._theta``. Parameters ---------- max_iterations : int Maximum optimizer iterations. optimizer : str scipy.optimize.minimize method (must support ``jac`` + bounds). verbose : bool Print progress. Returns ------- BMEResult """ chi2_initial = self._compute_chi_squared(self.initial_weights) if verbose: print("BME Optimization") print(f" Theta: {self._theta}") print(f" Observables: {self.n_observables}") print(f" Frames: {self.n_frames}") print(f" Chi-squared initial: {chi2_initial:.4f}") opt = minimize( self._objective_and_gradient, self._lambdas, options={"maxiter": max_iterations}, method=optimizer, jac=True, bounds=self._bounds, ) metadata = { "optimizer": optimizer, "max_iterations": max_iterations, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } if opt.success: log_w = -np.sum( opt.x[np.newaxis, :] * self.calculated_values, axis=1 ) + np.log(self.initial_weights) weights = np.exp(log_w - logsumexp(log_w)) chi2_final = self._compute_chi_squared(weights) rel = float(np.sum(rel_entr(weights, self.initial_weights))) phi = float(np.exp(-rel)) if verbose: print(f" Optimization successful (iters: {opt.nit})") print(f" Chi-squared final: {chi2_final:.4f}") print(f" Effective fraction (phi): {phi:.4f}") return BMEResult( weights=weights, initial_weights=self.initial_weights.copy(), lambdas=opt.x.copy(), chi_squared_initial=chi2_initial, chi_squared_final=chi2_final, phi=phi, n_iterations=opt.nit, success=True, message=str(opt.message), theta=self._theta, observables=self.observables, calculated_values=self.calculated_values, metadata=metadata, ) if verbose: print(" Optimization failed") print(f" Message: {opt.message}") return BMEResult( weights=self.initial_weights.copy(), initial_weights=self.initial_weights.copy(), lambdas=opt.x.copy(), chi_squared_initial=chi2_initial, chi_squared_final=np.nan, phi=np.nan, n_iterations=getattr(opt, "nit", -1), success=False, message=str(opt.message), theta=self._theta, observables=self.observables, calculated_values=self.calculated_values, metadata=metadata, ) # ------------------------------------------------------------------
[docs] def fit( self, max_iterations: int = DEFAULT_MAX_ITERATIONS, optimizer: str = DEFAULT_OPTIMIZER, verbose: bool = True, *, theta: Optional[float] = None, auto_theta: bool = True, theta_scan_kwargs: Optional[dict] = None, ) -> BMEResult: """Fit BME weights at a fixed or automatically selected theta. Parameters ---------- max_iterations : int, optional Maximum optimizer iterations. Default ``DEFAULT_MAX_ITERATIONS``. optimizer : str, optional scipy.optimize.minimize method. Default ``"L-BFGS-B"``. verbose : bool, optional Print progress. Default True. theta : float, optional If given, use this theta (no scan). Must be positive. auto_theta : bool, optional If True and ``theta`` is None, run an L-curve scan to pick theta. Default True. theta_scan_kwargs : dict, optional Extra keyword arguments forwarded to :meth:`scan_theta`. Returns ------- BMEResult Raises ------ SSException If ``theta`` is not positive. """ if theta is not None: if theta <= 0: raise SSException(f"theta must be positive, got {theta}") self._theta = float(theta) self._theta_scan_result = None if verbose: print(f"[BME] Using manual theta = {self._theta:.4g}.") elif auto_theta: if verbose: print("[BME] Auto theta: running L-curve scan...") scan_kwargs = dict(theta_range=(0.01, 10.0), n_points=15) if theta_scan_kwargs: scan_kwargs.update(theta_scan_kwargs) scan = self.scan_theta(**scan_kwargs) opt_idx = scan.optimal_idx self._theta = float(scan.optimal_theta) self._result = scan.results[opt_idx] self._theta_scan_result = scan self._lambdas = self._result.lambdas.copy() if verbose: print( f"[BME] Selected theta={self._theta:.4g} via " f"{scan.method} (chi2_final=" f"{self._result.chi_squared_final:.3f}, " f"phi={self._result.phi:.3f})." ) return self._result else: if self._theta is None: self._theta = DEFAULT_THETA if verbose: print( f"[BME] auto_theta=False, no manual theta; using " f"theta={self._theta:.4g}." ) self._result = self._run_single_theta_optimization( max_iterations=max_iterations, optimizer=optimizer, verbose=verbose, ) return self._result
# ------------------------------------------------------------------
[docs] def scan_theta( self, theta_range: Union[Tuple[float, float], np.ndarray] = (0.01, 10.0), n_points: int = 15, log_scale: bool = True, method: str = "perpendicular", verbose: bool = False, ) -> ThetaScanResult: """Run an L-curve theta scan using this instance's data. Parameters ---------- theta_range, n_points, log_scale, method, verbose See :func:`theta_scan`. Returns ------- ThetaScanResult """ scan = theta_scan( observables=self.observables, calculated_values=self.calculated_values, reweighter="bme", theta_range=theta_range, n_points=n_points, log_scale=log_scale, initial_weights=self.initial_weights, method=method, verbose=verbose, ) self._theta_scan_result = scan return scan
# ------------------------------------------------------------------
[docs] def predict(self, calculated_values: np.ndarray) -> np.ndarray: """Weighted means of arbitrary observables using the fitted weights. Parameters ---------- calculated_values : numpy.ndarray Per-frame values, shape ``(n_frames, n_observables)``. Returns ------- numpy.ndarray Weighted means, shape ``(n_observables,)``. Raises ------ SSException If :meth:`fit` has not succeeded, or frame count mismatches. """ if self._result is None or not self._result.success: raise SSException( "Model has not been successfully fitted. Call fit() first." ) return self._result.predict(calculated_values)
@property def result(self) -> Optional[BMEResult]: """The most recent :class:`BMEResult`, or None.""" return self._result @property def theta(self) -> Optional[float]: """Theta used in the most recent fit, or None.""" return self._theta @property def theta_scan_result(self) -> Optional[ThetaScanResult]: """The most recent :class:`ThetaScanResult`, or None.""" return self._theta_scan_result
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
[docs] class iBME: """Iterative Bayesian Maximum Entropy reweighting. At each iteration a (optionally error-weighted) linear regression of the ensemble-averaged calculated values against the experimental values is used to refit a global scale ``alpha`` and offset ``beta``; the calculated values are rescaled (``calc -> alpha*calc + beta``) and a standard BME maxent step is run. The procedure repeats until the change in chi-squared drops below ``ftol``. iBME is appropriate when there is an unknown global scale/offset between calculated and experimental data (e.g. SAXS). Ported from the reference implementation of Bottaro et al. Parameters ---------- observables : list of ExperimentalObservable Experimental observables to fit against. calculated_values : numpy.ndarray Per-frame calculated values, shape ``(n_frames, n_observables)``. initial_weights : numpy.ndarray, optional Prior frame weights. Uniform if None. Raises ------ SSException If inputs are malformed (see :func:`_validate_inputs`). """
[docs] def __init__( self, observables: List[ExperimentalObservable], calculated_values: np.ndarray, initial_weights: Optional[np.ndarray] = None, ): _validate_inputs(observables, calculated_values, initial_weights) self.observables = list(observables) self.calculated_values = calculated_values self.n_frames = calculated_values.shape[0] self.n_observables = len(observables) if initial_weights is None: self.initial_weights = np.ones(self.n_frames) / self.n_frames else: self.initial_weights = initial_weights / np.sum(initial_weights) self._exp_values = np.array( [obs.value for obs in self.observables], dtype=np.float64 ) self._exp_sigma = np.array( [obs.uncertainty for obs in self.observables], dtype=np.float64 ) self._result: Optional[BMEResult] = None self._theta: Optional[float] = None self._theta_scan_result: Optional[ThetaScanResult] = None
# ------------------------------------------------------------------
[docs] def fit( self, theta: float, ftol: float = 0.01, max_ibme_iterations: int = 50, fit_offset: bool = True, lr_weights: bool = True, max_iterations: int = DEFAULT_MAX_ITERATIONS, optimizer: str = DEFAULT_OPTIMIZER, verbose: bool = True, ) -> BMEResult: """Run iterative BME at a fixed theta. Parameters ---------- theta : float Regularisation strength. Must be positive. ftol : float, optional Convergence tolerance on ``|delta chi-squared|`` between iterations. Default 0.01. max_ibme_iterations : int, optional Maximum number of scale/offset+BME iterations. Default 50. fit_offset : bool, optional If True fit a scale and offset; if False fit scale only. Default True. lr_weights : bool, optional If True weight the linear regression by ``1/sigma^2``; otherwise use uniform weights. Default True. max_iterations : int, optional Maximum optimizer iterations for each inner BME step. optimizer : str, optional scipy.optimize.minimize method for each inner BME step. verbose : bool, optional Print per-iteration progress. Default True. Returns ------- BMEResult Result with the final weights, ``scale``/``offset`` (net, relative to the original input), the ``ibme_iterations`` log, ``chi_squared_initial`` from the first iteration's pre-fit chi-squared, and ``phi`` computed against the original prior. Raises ------ SSException If ``theta`` is not positive. """ if theta <= 0: raise SSException(f"theta must be positive, got {theta}") self._theta = float(theta) w0 = self.initial_weights.copy() current_weights = w0.copy() # Working copy of the calculated values; rescaled in place each # iteration so the scale/offset accumulates (matches reference iBME). calc = self.calculated_values.astype(np.float64).copy() if lr_weights: lr_w = 1.0 / self._exp_sigma**2 else: lr_w = np.ones(self.n_observables) # Accumulated (net) scale/offset relative to the original input. net_scale = 1.0 net_offset = 0.0 iterations: List[dict] = [] chi2_initial = np.nan chi2_old = np.nan last_result: Optional[BMEResult] = None it = 0 for it in range(max_ibme_iterations): calc_avg = np.sum(calc * current_weights[:, np.newaxis], axis=0) alpha, beta = _weighted_linear_regression( calc_avg, self._exp_values, lr_w, fit_intercept=fit_offset ) calc = alpha * calc + beta net_scale *= alpha net_offset = alpha * net_offset + beta bme = BME(self.observables, calc, w0) res = bme.fit( theta=self._theta, auto_theta=False, verbose=False, max_iterations=max_iterations, optimizer=optimizer, ) if it == 0: chi2_initial = res.chi_squared_initial current_weights = res.weights.copy() chi2_new = res.chi_squared_final diff = abs(chi2_old - chi2_new) chi2_old = chi2_new last_result = res iterations.append( { "iteration": it, "scale": alpha, "offset": beta, "chi_squared": chi2_new, "diff": diff, } ) if verbose: print( f"Iteration {it:3d} scale={alpha:8.4f} " f"offset={beta:8.4f} chi2={chi2_new:8.4f} " f"diff={diff:8.2e}" ) if np.isfinite(diff) and diff < ftol: if verbose: print( f"iBME converged below tolerance {ftol:.2e} after " f"{it + 1} iterations" ) break phi = float(np.exp(-_srel(w0, current_weights))) success = last_result is not None and last_result.success metadata = { "optimizer": optimizer, "max_iterations": max_iterations, "max_ibme_iterations": max_ibme_iterations, "ftol": ftol, "fit_offset": fit_offset, "lr_weights": lr_weights, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } self._result = BMEResult( weights=current_weights, initial_weights=w0, lambdas=( last_result.lambdas.copy() if last_result is not None else np.zeros(self.n_observables) ), chi_squared_initial=chi2_initial, chi_squared_final=chi2_old, phi=phi, n_iterations=it + 1, success=success, message=( last_result.message if last_result is not None else "no iterations run" ), theta=self._theta, observables=self.observables, calculated_values=calc, metadata=metadata, scale=net_scale, offset=net_offset, ibme_iterations=iterations, ) return self._result
# ------------------------------------------------------------------
[docs] def scan_theta( self, theta_range: Union[Tuple[float, float], np.ndarray] = (0.01, 10.0), n_points: int = 15, log_scale: bool = True, method: str = "perpendicular", verbose: bool = False, fit_kwargs: Optional[dict] = None, ) -> ThetaScanResult: """Run an L-curve theta scan using iterative BME. Parameters ---------- theta_range, n_points, log_scale, method, verbose See :func:`theta_scan`. fit_kwargs : dict, optional Extra keyword arguments forwarded to :meth:`fit` for each theta (e.g. ``ftol``, ``max_ibme_iterations``, ``fit_offset``). Returns ------- ThetaScanResult """ scan = theta_scan( observables=self.observables, calculated_values=self.calculated_values, reweighter="ibme", theta_range=theta_range, n_points=n_points, log_scale=log_scale, initial_weights=self.initial_weights, method=method, verbose=verbose, fit_kwargs=fit_kwargs, ) self._theta_scan_result = scan return scan
# ------------------------------------------------------------------
[docs] def predict(self, calculated_values: np.ndarray) -> np.ndarray: """Weighted means of arbitrary observables using the fitted weights. Parameters ---------- calculated_values : numpy.ndarray Per-frame values, shape ``(n_frames, n_observables)``. Returns ------- numpy.ndarray Weighted means, shape ``(n_observables,)``. Raises ------ SSException If :meth:`fit` has not succeeded, or frame count mismatches. """ if self._result is None or not self._result.success: raise SSException( "Model has not been successfully fitted. Call fit() first." ) return self._result.predict(calculated_values)
[docs] def get_ibme_weights(self) -> np.ndarray: """Final iBME weights. Returns ------- numpy.ndarray Raises ------ SSException If :meth:`fit` has not been called. """ if self._result is None: raise SSException("iBME weights not available. Call fit() first.") return self._result.weights
[docs] def get_ibme_stats(self) -> List[dict]: """Per-iteration iBME log (scale, offset, chi-squared, diff). Returns ------- list of dict Raises ------ SSException If :meth:`fit` has not been called. """ if self._result is None: raise SSException("iBME stats not available. Call fit() first.") return self._result.ibme_iterations
@property def result(self) -> Optional[BMEResult]: """The most recent :class:`BMEResult`, or None.""" return self._result @property def theta(self) -> Optional[float]: """Theta used in the most recent fit, or None.""" return self._theta @property def theta_scan_result(self) -> Optional[ThetaScanResult]: """The most recent :class:`ThetaScanResult`, or None.""" return self._theta_scan_result
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
[docs] @dataclass class BMECustomResult: """Container for a :class:`BMECustom` reweighting result. Mirrors :class:`BMEResult` but for the raw vector/matrix + custom-cost variant: the scalar fit metric is a general ``cost`` (the reduced chi-squared by default, or whatever the user's ``cost_function`` returns) rather than a constraint-aware chi-squared, and the experimental data are held as plain arrays rather than a list of :class:`~soursop.ssutils.ExperimentalObservable`. Attributes ---------- weights : numpy.ndarray Optimized (posterior) frame weights, summing to 1. initial_weights : numpy.ndarray Prior frame weights. cost_initial : float Cost (goodness-of-fit) at the prior weights. cost_final : float Cost at the optimized weights. phi : float Fraction of effective frames, ``exp(-D_KL(w || w0)) in (0, 1]``. n_iterations : int Optimizer iterations. success : bool Whether the optimization succeeded. message : str Optimizer status message. theta : float Entropy-penalty strength used. experiment : numpy.ndarray Experimental vector, shape ``(m,)``. calculated_values : numpy.ndarray Per-conformer calculated matrix used in the fit, shape ``(n, m)``. metadata : dict Free-form metadata (optimizer, whether a custom cost was used, ...). """ weights: np.ndarray initial_weights: np.ndarray cost_initial: float cost_final: float phi: float n_iterations: int success: bool message: str theta: float experiment: np.ndarray calculated_values: np.ndarray metadata: dict def __str__(self): status = "SUCCESS" if self.success else "FAILED" return "\n".join( [ f"BMECustom Result [{status}]", f" Cost initial: {self.cost_initial:.4f}", f" Cost final: {self.cost_final:.4f}", f" phi (effective fraction): {self.phi:.4f}", f" Iterations: {self.n_iterations}", f" Theta: {self.theta}", ] ) def __repr__(self): return self.__str__() @property def kl_divergence(self) -> float: """Relative entropy (KL divergence) of ``weights`` from prior.""" if self.phi > 0: return -float(np.log(self.phi)) return float(np.inf) @property def reweighting_factors(self) -> np.ndarray: """Per-frame reweighting factors ``r_i = w_i / w0_i``.""" return self.weights / self.initial_weights
[docs] def predict(self, calculated_values: np.ndarray) -> np.ndarray: """Weighted average of arbitrary observables using these weights. Parameters ---------- calculated_values : numpy.ndarray Per-frame values, shape ``(n_frames, ...)``; ``n_frames`` must match the fitted ensemble. Returns ------- numpy.ndarray Weighted average over frames (axis 0). Raises ------ SSException If the number of frames does not match the fitted ensemble. """ calculated_values = np.asarray(calculated_values) if calculated_values.shape[0] != self.weights.shape[0]: raise SSException( f"Number of frames in calculated_values " f"({calculated_values.shape[0]}) must match the fitted " f"ensemble ({self.weights.shape[0]})" ) weights = self.weights.reshape((-1,) + (1,) * (calculated_values.ndim - 1)) return np.sum(weights * calculated_values, axis=0)
[docs] def diagnostics(self, warn_threshold: float = 0.5) -> dict: """Diagnose the reweighting result and flag potential issues. Parameters ---------- warn_threshold : float, optional Minimum acceptable ``phi`` before a low-diversity warning is raised. Default 0.5. Returns ------- dict Diagnostic metrics and a list of human-readable ``warnings``. """ diag: dict = {} warnings: List[str] = [] n = len(self.weights) diag["neff_entropy"] = n * float(self.phi) diag["neff_entropy_fraction"] = float(self.phi) diag["neff_renyi2"] = float(1.0 / np.sum(self.weights**2)) diag["neff_renyi2_fraction"] = float(diag["neff_renyi2"] / n) diag["cost_improvement"] = self.cost_initial - self.cost_final if self.phi < warn_threshold: warnings.append( f"Low Phi ({self.phi:.3f} < {warn_threshold}): significant " "loss of ensemble diversity. Consider increasing theta or " "loosening the experimental uncertainties." ) if diag["neff_renyi2"] < 0.1 * n: warnings.append( f"Low effective sample size (1/sum w^2) " f"({diag['neff_renyi2']:.1f} / {n}): only " f"~{diag['neff_renyi2_fraction'] * 100:.1f}% of frames " "are effectively used." ) diag["warnings"] = warnings diag["status"] = "OK" if len(warnings) == 0 else "WARNING" return diag
[docs] def print_diagnostics(self, warn_threshold: float = 0.5): """Print a formatted diagnostic report for this result.""" diag = self.diagnostics(warn_threshold) n = len(self.weights) key_w, val_w = 32, 14 print("\n" + "=" * 60) print("BME (custom) DIAGNOSTIC REPORT") print("=" * 60) print(f"\nOptimization Status: {self.message}") print(f"Success: {self.success}") print(f"Iterations: {self.n_iterations}") print("\nCost:") print(f" {'Initial':<{key_w}} {self.cost_initial:>{val_w}.4f}") print(f" {'Final':<{key_w}} {self.cost_final:>{val_w}.4f}") print(f" {'Improvement':<{key_w}} {diag['cost_improvement']:>{val_w}.4f}") print("\nEnsemble diversity:") print(f" {'Phi (entropy fraction)':<{key_w}} {self.phi:>{val_w}.4f}") print( f" {'N_eff (entropy-based)':<{key_w}} " f"{diag['neff_entropy']:>{val_w}.1f} / {n}" ) print( f" {'N_eff (1/sum w^2, Renyi-2)':<{key_w}} " f"{diag['neff_renyi2']:>{val_w}.1f} / {n}" ) print(f" {'Theta':<{key_w}} {self.theta:>{val_w}.4f}") if len(diag["warnings"]) > 0: print(f"\nWARNINGS ({len(diag['warnings'])}):") for i, warning in enumerate(diag["warnings"], 1): print(f" {i}. {warning}") else: print(f"\nStatus: {diag['status']} - No issues detected") print("=" * 60)
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------
[docs] class BMECustom: """BME reweighting against an experimental vector with a custom cost. A "BME variant" that reweights an ensemble against a whole experimental *profile* (e.g. a SAXS or PRE curve), with an **optional user-supplied goodness-of-fit function**. It is the penalty form of Bayesian Ensemble Refinement: it minimises .. math:: \\mathcal{L}(w) = \\text{cost}(w) + \\theta\\, D_{\\mathrm{KL}}(w\\,\\|\\,w^0) directly over the ``n`` frame weights (subject to the simplex ``w_i \\ge 0``, ``\\sum_i w_i = 1``) using SciPy's ``trust-constr``. The entropy penalty ``theta`` plays the same role as in :class:`BME`: large ``theta`` keeps the ensemble close to the prior, small ``theta`` fits the data harder. Unlike :class:`BME` (which exploits a closed-form exponential solution special to the Gaussian chi-squared), an arbitrary cost has no such closed form, so the weights are optimised directly. The default cost reproduces :class:`BME`'s reduced chi-squared exactly. Parameters ---------- experiment : numpy.ndarray Experimental vector, shape ``(m,)`` (e.g. one value per SAXS ``q``-point or per PRE residue). calculated_values : numpy.ndarray Per-conformer calculated values for the same observable, shape ``(n_frames, m)``. uncertainty : float or numpy.ndarray, optional Experimental uncertainty used by the **default** chi-squared cost: a scalar (broadcast to all ``m`` points) or a length-``m`` vector. Ignored when ``cost_function`` is supplied. Defaults to ``1.0``. cost_function : callable, optional Custom goodness-of-fit, called as ``cost_function(experiment, calculated_values, weights) -> float``, where ``experiment`` is ``(m,)``, ``calculated_values`` is ``(n_frames, m)`` and ``weights`` is the current ``(n_frames,)`` weight vector. Lower is better. If ``None`` (default), the reduced chi-squared ``mean_k ((<calc>_k - V_k)/sigma_k)^2`` is used, where ``<calc>_k = sum_i w_i calc[i, k]``. initial_weights : numpy.ndarray, optional Prior frame weights. If None, uniform weights are used. Internally normalised to sum to 1. Raises ------ SSException If inputs are malformed. Notes ----- With a custom ``cost_function`` the optimiser uses a finite-difference gradient, so very large ensembles or expensive cost functions can be slow; the default chi-squared path uses an analytic gradient. The problem is convex (unique optimum) when the cost is convex in ``w`` (the default chi-squared is); an arbitrary cost is optimised locally. Examples -------- >>> import numpy as np >>> from soursop.ssbme import BMECustom >>> # SAXS-like profile: m points, n conformers >>> bme = BMECustom(I_exp, calc_I, uncertainty=sigma_exp) >>> result = bme.fit(theta=1.0) >>> weights = result.weights >>> >>> # custom cost: chi-squared on a log scale >>> def log_chi2(exp, calc, w): ... avg = w @ calc ... return float(np.mean((np.log(avg) - np.log(exp)) ** 2)) >>> result = BMECustom(I_exp, calc_I, cost_function=log_chi2).fit(theta=1.0) """
[docs] def __init__( self, experiment: np.ndarray, calculated_values: np.ndarray, uncertainty=None, cost_function=None, initial_weights: Optional[np.ndarray] = None, ): experiment = np.asarray(experiment, dtype=np.float64) calculated_values = np.asarray(calculated_values, dtype=np.float64) if experiment.ndim != 1: raise SSException( f"experiment must be a 1D vector, got shape {experiment.shape}" ) if calculated_values.ndim != 2: raise SSException( "calculated_values must be 2D (n_frames, m), got shape " f"{calculated_values.shape}" ) self.m = experiment.shape[0] self.n_frames = calculated_values.shape[0] if calculated_values.shape[1] != self.m: raise SSException( f"calculated_values columns ({calculated_values.shape[1]}) " f"must match the experiment length ({self.m})" ) if cost_function is not None and not callable(cost_function): raise SSException("cost_function must be callable or None") # Uncertainty (used only by the default chi-squared cost). if uncertainty is None: sigma = np.ones(self.m, dtype=np.float64) else: sigma = np.asarray(uncertainty, dtype=np.float64) if sigma.ndim == 0: sigma = np.full(self.m, float(sigma)) elif sigma.shape != (self.m,): raise SSException( "uncertainty must be a scalar or a length-m vector " f"(m={self.m}), got shape {sigma.shape}" ) if np.any(sigma <= 0): raise SSException("uncertainty values must be positive") self.experiment = experiment self.calculated_values = calculated_values self.uncertainty = sigma self.cost_function = cost_function if initial_weights is None: self.initial_weights = np.ones(self.n_frames) / self.n_frames else: initial_weights = np.asarray(initial_weights, dtype=np.float64) if len(initial_weights) != self.n_frames: raise SSException("initial_weights length must match number of frames") self.initial_weights = initial_weights / np.sum(initial_weights) self._result: Optional[BMECustomResult] = None self._theta: Optional[float] = None self._theta_scan_result: Optional[ThetaScanResult] = None
# ------------------------------------------------------------------ def _cost(self, weights: np.ndarray) -> float: """Evaluate the (user or default) cost at ``weights``.""" if self.cost_function is None: avg = weights @ self.calculated_values diff = (avg - self.experiment) / self.uncertainty return float(np.mean(diff**2)) return float( self.cost_function(self.experiment, self.calculated_values, weights) ) def _default_cost_grad(self, weights: np.ndarray) -> np.ndarray: """Analytic gradient of the default chi-squared cost.""" avg = weights @ self.calculated_values coef = (2.0 / self.m) * (avg - self.experiment) / self.uncertainty**2 return self.calculated_values @ coef # ------------------------------------------------------------------
[docs] def fit( self, theta: float = 1.0, max_iterations: int = 2000, optimizer: str = "L-BFGS-B", verbose: bool = True, ) -> BMECustomResult: """Reweight by minimising ``cost(w) + theta * D_KL(w || w0)``. Internally the simplex ``w_i >= 0``, ``sum_i w_i = 1`` is enforced via a softmax reparameterisation (``w = softmax(z)``), reducing the problem to an unconstrained minimisation over ``z`` solved by L-BFGS-B. This avoids boundary kinks in ``log w`` and gives clean convergence on both the analytic-cost and custom-cost paths. Parameters ---------- theta : float, optional Entropy-penalty strength (must be positive). Default 1.0. max_iterations : int, optional Maximum optimizer iterations. Default 2000. optimizer : str, optional scipy.optimize.minimize method for the unconstrained problem in the softmax-reparameterised variable ``z``. Default ``"L-BFGS-B"``. verbose : bool, optional Print progress. Default True. Returns ------- BMECustomResult Raises ------ SSException If ``theta`` is not positive. """ if theta <= 0: raise SSException(f"theta must be positive, got {theta}") self._theta = float(theta) n = self.n_frames w0 = self.initial_weights.copy() log_w0 = np.log(np.maximum(w0, MIN_WEIGHT_THRESHOLD)) def softmax(z): # Numerically stable softmax. z = z - np.max(z) ez = np.exp(z) return ez / np.sum(ez) def objective(z): w = softmax(z) # KL(w || w0) = sum_i w_i (log w_i - log w0_i); with softmax # log w_i = z_i - logsumexp(z). log_w = z - logsumexp(z) kl = float(np.sum(w * (log_w - log_w0))) return self._cost(w) + theta * kl cost_initial = self._cost(w0) if verbose: print("BMECustom Optimization") print(f" Frames: {n}, Observables (vector length): {self.m}") custom = self.cost_function is not None print(f" Cost function: {'custom' if custom else 'default chi^2'}") print(f" Theta: {theta}") print(f" Cost initial: {cost_initial:.4f}") # Initial parameter: z = log w0 -> softmax(z) = w0. z0 = log_w0.copy() opt = minimize( objective, z0, method=optimizer, jac="2-point", options={"maxiter": max_iterations, "ftol": 1e-9, "gtol": 1e-7}, ) w_opt = softmax(opt.x) cost_final = self._cost(w_opt) phi = float(np.exp(-_srel(w0, w_opt))) # L-BFGS-B reports success when it converges by ``ftol`` / ``gtol``. # Fall back to "did we end up no worse than the prior?" so a stalled # but improving fit isn't reported as a failure. success = bool(opt.success) or cost_final <= cost_initial + 1e-9 if verbose: print(f" Cost final: {cost_final:.4f}") print(f" Effective fraction (phi): {phi:.4f}") print(f" Optimization successful: {success}") print(f" Optimizer message: {opt.message}") self._result = BMECustomResult( weights=w_opt, initial_weights=w0, cost_initial=float(cost_initial), cost_final=float(cost_final), phi=phi, n_iterations=int(getattr(opt, "nit", -1)), success=success, message=str(opt.message), theta=self._theta, experiment=self.experiment, calculated_values=self.calculated_values, metadata={ "optimizer": optimizer, "max_iterations": max_iterations, "custom_cost": self.cost_function is not None, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), }, ) return self._result
# ------------------------------------------------------------------
[docs] def scan_theta( self, theta_range: Union[Tuple[float, float], np.ndarray] = (0.01, 100.0), n_points: int = 12, log_scale: bool = True, method: str = "perpendicular", verbose: bool = False, ) -> ThetaScanResult: """Scan ``theta`` and pick the cost vs. relative-entropy knee. The L-curve analogue of :meth:`BME.scan_theta`. The returned :class:`ThetaScanResult` stores the cost in its ``chi_squared_values`` field (the generic fit metric for this variant). Parameters ---------- theta_range, n_points, log_scale, method, verbose See :func:`theta_scan`. Default range ``(0.01, 100.0)``. Returns ------- ThetaScanResult """ if isinstance(theta_range, (tuple, list)): if n_points < 1: raise SSException( f"n_points must be >= 1 for a tuple theta_range, got {n_points}" ) if log_scale and (theta_range[0] <= 0 or theta_range[1] <= 0): raise SSException( "theta_range endpoints must be positive when " f"log_scale=True, got {theta_range}" ) if log_scale: thetas = np.logspace( np.log10(theta_range[0]), np.log10(theta_range[1]), n_points ) else: thetas = np.linspace(theta_range[0], theta_range[1], n_points) else: thetas = np.asarray(theta_range, dtype=np.float64) costs: List[float] = [] phis: List[float] = [] kls: List[float] = [] results: List[BMECustomResult] = [] for i, t in enumerate(thetas): if verbose: print(f"Processing theta {i + 1}/{len(thetas)}: {t:.4f}") res = self.fit(theta=float(t), verbose=False) results.append(res) costs.append(res.cost_final) phis.append(res.phi) kls.append(res.kl_divergence) cost_arr = np.array(costs) kl_arr = np.array(kls) optimal_idx, method_name = find_optimal_theta(cost_arr, kl_arr, method=method) scan = ThetaScanResult( theta_values=thetas, chi_squared_values=cost_arr, phi_values=np.array(phis), kl_divergence_values=kl_arr, results=results, optimal_theta=float(thetas[optimal_idx]), optimal_idx=optimal_idx, method=method_name, ) self._theta_scan_result = scan return scan
[docs] def predict(self, calculated_values: np.ndarray) -> np.ndarray: """Weighted means of arbitrary observables using the fitted weights. Raises ------ SSException If :meth:`fit` has not succeeded, or frame count mismatches. """ if self._result is None or not self._result.success: raise SSException( "Model has not been successfully fitted. Call fit() first." ) return self._result.predict(calculated_values)
@property def result(self) -> Optional[BMECustomResult]: """The most recent :class:`BMECustomResult`, or None.""" return self._result @property def theta(self) -> Optional[float]: """Theta used in the most recent fit, or None.""" return self._theta @property def theta_scan_result(self) -> Optional[ThetaScanResult]: """The most recent :class:`ThetaScanResult`, or None.""" return self._theta_scan_result