Source code for propflow.snapshots.analyzer

"""Snapshot analysis utilities for belief propagation convergence dynamics.

This module provides analysis tools for examining convergence properties,
dependency structures, and cycle metrics from simulation snapshots.
"""

from __future__ import annotations

import csv
from pathlib import Path
from dataclasses import dataclass
from typing import Dict, List, Mapping, Sequence, Any

import networkx as nx
import numpy as np
from scipy import sparse

from .types import EngineSnapshot


# Tolerance constants
_ZERO_TOL = 1e-9


@dataclass(slots=True)
class _Coordinate:
    """Internal helper describing a single difference coordinate."""

    kind: str
    sender: str
    recipient: str
    label: int

    def key(self) -> tuple[str, str, str, int]:
        return (self.kind, self.sender, self.recipient, self.label)


[docs] class SnapshotAnalyzer: """Analyze convergence dynamics from propflow snapshots. This class provides methods to derive Jacobian matrices, dependency graphs, cycle metrics, and other structural properties from snapshot sequences. """ def __init__( self, snapshots: Sequence[EngineSnapshot], *, domain: Mapping[str, int] | None = None, max_cycle_len: int = 12, ) -> None: """Initialize the analyzer with a sequence of snapshots. Args: snapshots: A sequence of EngineSnapshot objects from a simulation. domain: Optional domain mapping (var -> domain_size). If not provided, will be inferred from snapshots. max_cycle_len: Maximum cycle length to enumerate in cycle analysis. """ if not snapshots: raise ValueError("SnapshotAnalyzer requires at least one snapshot") self._snapshots: List[EngineSnapshot] = sorted( list(snapshots), key=lambda rec: rec.step ) self._step_index: Dict[int, int] = { rec.step: idx for idx, rec in enumerate(self._snapshots) } self._max_cycle_len = max_cycle_len self._domain = dict(domain or self._infer_domain(self._snapshots[0])) self._nilpotent_cache: Dict[int, int | None] = {}
[docs] def beliefs_per_variable(self) -> Dict[str, List[int | None]]: """Return the argmin trajectory for each variable across snapshots.""" variables = sorted(self._domain.keys()) series: Dict[str, List[int | None]] = {var: [] for var in variables} for rec in self._snapshots: # sum R messages per variable to compute belief grouped: Dict[str, List[np.ndarray]] = {} for (f, v), r_msg in rec.R.items(): grouped.setdefault(v, []).append(np.asarray(r_msg, dtype=float)) for var in variables: if vectors := grouped.get(var): combined = np.sum(vectors, axis=0) series[var].append(int(np.argmin(combined))) else: series[var].append(rec.assignments.get(var)) return series
[docs] def difference_coordinates( self, step_idx: int ) -> tuple[ Dict[tuple[str, str], float | np.ndarray], Dict[tuple[str, str], float | np.ndarray], ]: """Compute ΔQ and ΔR (difference coordinates) for a snapshot.""" rec = self._snapshot_by_index(step_idx) delta_q: Dict[tuple[str, str], float | np.ndarray] = {} delta_r: Dict[tuple[str, str], float | np.ndarray] = {} for (u, f), q_msg in rec.Q.items(): values = np.asarray(q_msg, dtype=float) if values.size > 0: delta_q[(u, f)] = self._recenter(values) for (f, v), r_msg in rec.R.items(): values = np.asarray(r_msg, dtype=float) if values.size > 0: delta_r[(f, v)] = self._recenter(values) return delta_q, delta_r
[docs] def jacobian(self, step_idx: int) -> np.ndarray | sparse.spmatrix: """Construct the Jacobian matrix in difference coordinates.""" q_arrays, r_arrays, q_coords, r_coords = self._coordinate_arrays(step_idx) total_dim = len(q_coords) + len(r_coords) matrix: np.ndarray | sparse.lil_matrix if total_dim < 100: matrix = np.zeros((total_dim, total_dim), dtype=float) else: matrix = sparse.lil_matrix((total_dim, total_dim), dtype=float) q_index = {coord.key(): idx for idx, coord in enumerate(q_coords)} r_index = { coord.key(): len(q_coords) + idx for idx, coord in enumerate(r_coords) } # variable rows for coord in q_coords: row = q_index[coord.key()] for r_coord in r_coords: if r_coord.recipient != coord.sender: continue if r_coord.sender == coord.recipient: continue if r_coord.label != coord.label: continue col = r_index[r_coord.key()] _set_entry(matrix, row, col, 1.0) # factor rows rec = self._snapshot_by_index(step_idx) q_messages = dict(rec.Q.items()) for coord in r_coords: row = r_index[coord.key()] factor, var = coord.sender, coord.recipient # Incoming messages from other variables to this factor for (u, f), q_arr in q_messages.items(): if f != factor or u == var: continue array = q_arrays.get((u, f)) if array is None: continue if array.size == 1: delta = float(array[0]) value = 0.0 if abs(delta) < _ZERO_TOL else -float(np.sign(delta)) col = q_index[("Q", u, f, 0)] _set_entry(matrix, row, col, value) else: winner = int(np.argmin(q_arr)) # Simplified: use identity projection for label in range(array.size): col = q_index[("Q", u, f, label)] _set_entry(matrix, row, col, 1.0 if label == winner else 0.0) return matrix
[docs] def dependency_digraph(self, step_idx: int) -> nx.DiGraph: """Construct the dependency digraph induced by the Jacobian.""" matrix = self.jacobian(step_idx) _, _, q_coords, r_coords = self._coordinate_arrays(step_idx) coord_list = q_coords + r_coords graph = nx.DiGraph() for idx, coord in enumerate(coord_list): graph.add_node( idx, kind=coord.kind, sender=coord.sender, recipient=coord.recipient, label=coord.label, ) if sparse.issparse(matrix): # Convert to COO format for safe iteration coo_matrix = matrix.tocoo() for r_idx, c_idx, value in zip( coo_matrix.row, coo_matrix.col, coo_matrix.data ): graph.add_edge(int(c_idx), int(r_idx), weight=float(value)) else: nz_rows, nz_cols = np.nonzero(matrix) for r_idx, c_idx in zip(nz_rows, nz_cols): graph.add_edge( int(c_idx), int(r_idx), weight=float(matrix[r_idx, c_idx]) ) return graph
[docs] def cycle_metrics(self, step_idx: int) -> Dict[str, Any]: """Compute cycle metrics for convergence analysis.""" graph = self.dependency_digraph(step_idx) cycles: List[List[int]] = [] for cycle in nx.simple_cycles(graph): if self._max_cycle_len and len(cycle) > self._max_cycle_len: continue cycles.append(cycle) return { "num_cycles": len(cycles), "has_cycles": len(cycles) > 0, }
[docs] def nilpotent_index(self, step_idx: int) -> int | None: """Compute the nilpotent index of the Jacobian matrix.""" if step_idx in self._nilpotent_cache: return self._nilpotent_cache[step_idx] matrix = self.jacobian(step_idx) dense = ( matrix.toarray() if sparse.issparse(matrix) else np.asarray(matrix, dtype=float) ) if dense.size == 0: self._nilpotent_cache[step_idx] = 0 return 0 power = dense.copy() for idx in range(1, dense.shape[0] + 1): if np.allclose(power, 0.0, atol=1e-9): self._nilpotent_cache[step_idx] = idx return idx power = power @ dense self._nilpotent_cache[step_idx] = None return None
[docs] def block_norms(self, step_idx: int) -> Dict[str, float]: """Compute infinity norms of Jacobian blocks.""" matrix = self.jacobian(step_idx) dense = ( matrix.toarray() if sparse.issparse(matrix) else np.asarray(matrix, dtype=float) ) q_arrays, r_arrays, _, _ = self._coordinate_arrays(step_idx) q_dim = sum(arr.size for arr in q_arrays.values()) r_dim = sum(arr.size for arr in r_arrays.values()) if q_dim + r_dim == 0: return {"A": 0.0, "B": 0.0, "P": 0.0} def _inf_norm(block: np.ndarray) -> float: if block.size == 0: return 0.0 return float(np.max(np.sum(np.abs(block), axis=1))) A_block = dense[:q_dim, q_dim : q_dim + r_dim] B_block = dense[q_dim : q_dim + r_dim, :q_dim] P_block = dense[q_dim : q_dim + r_dim, q_dim : q_dim + r_dim] return { "A": _inf_norm(A_block), "B": _inf_norm(B_block), "P": _inf_norm(P_block), }
# ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _snapshot_by_index(self, step_idx: int) -> EngineSnapshot: """Retrieve snapshot by index in the sorted list.""" if step_idx < 0 or step_idx >= len(self._snapshots): raise IndexError("step_idx out of range") return self._snapshots[step_idx] @staticmethod def _recenter(values: np.ndarray) -> float | np.ndarray: """Recenter array to minimum or compute scalar difference.""" if values.size == 0: return np.array([], dtype=float) if values.size == 2: return float(values[1] - values[0]) offset = float(np.min(values)) return values - offset @staticmethod def _as_array(value: float | np.ndarray) -> np.ndarray: """Convert scalar or array to numpy array.""" if isinstance(value, np.ndarray): return value.reshape(-1) return np.array([float(value)], dtype=float) @staticmethod def _infer_domain(record: EngineSnapshot) -> Dict[str, int]: """Infer variable domains from snapshot data.""" domain: Dict[str, int] = {} for var, val in record.assignments.items(): if val is not None: domain[var] = max(domain.get(var, 0), val + 1) for (u, f), q_arr in record.Q.items(): values = np.asarray(q_arr, dtype=float) if values.size: domain[u] = max(domain.get(u, 0), values.size) for (f, v), r_arr in record.R.items(): values = np.asarray(r_arr, dtype=float) if values.size: domain[v] = max(domain.get(v, 0), values.size) return domain def _coordinate_arrays( self, step_idx: int ) -> tuple[ Dict[tuple[str, str], np.ndarray], Dict[tuple[str, str], np.ndarray], List[_Coordinate], List[_Coordinate], ]: """Build coordinate arrays and coordinate lists.""" delta_q, delta_r = self.difference_coordinates(step_idx) q_arrays = {key: self._as_array(value) for key, value in delta_q.items()} r_arrays = {key: self._as_array(value) for key, value in delta_r.items()} q_coords: List[_Coordinate] = [] for (var, factor), array in q_arrays.items(): for label in range(array.size): q_coords.append(_Coordinate("Q", var, factor, label)) r_coords: List[_Coordinate] = [] for (factor, var), array in r_arrays.items(): for label in range(array.size): r_coords.append(_Coordinate("R", factor, var, label)) return q_arrays, r_arrays, q_coords, r_coords
[docs] class AnalysisReport: """Generate analysis reports from snapshots.""" def __init__(self, analyzer: SnapshotAnalyzer) -> None: """Initialize with a snapshot analyzer. Args: analyzer: The SnapshotAnalyzer instance to generate reports from. """ self._analyzer = analyzer
[docs] def to_json(self, step_idx: int) -> Dict[str, Any]: """Generate a JSON-serializable analysis report for a snapshot. Args: step_idx: The index of the snapshot to analyze. Returns: A dictionary containing analysis results. """ analyzer = self._analyzer beliefs = analyzer.beliefs_per_variable() nilpotent = analyzer.nilpotent_index(step_idx) block_norms = analyzer.block_norms(step_idx) cycles = analyzer.cycle_metrics(step_idx) # Spectral radius (if dense enough) spectral_radius = None try: matrix = analyzer.jacobian(step_idx) dense = ( matrix.toarray() if sparse.issparse(matrix) else np.asarray(matrix, dtype=float) ) if dense.size: spectral_radius = float(np.max(np.abs(np.linalg.eigvals(dense)))) except Exception: pass return { "step": step_idx, "beliefs": beliefs, "nilpotent_index": nilpotent, "block_norms": block_norms, "cycle_metrics": cycles, "spectral_radius": spectral_radius, }
[docs] def to_csv(self, base_dir: str | Path, *, step_idx: int) -> None: """Export analysis results to CSV files. Args: base_dir: Directory to save CSV files. step_idx: The step index for the analysis. """ base = Path(base_dir) base.mkdir(parents=True, exist_ok=True) beliefs = self._analyzer.beliefs_per_variable() steps = range(len(next(iter(beliefs.values()), []))) with (base / "beliefs.csv").open("w", newline="", encoding="utf-8") as handle: writer = csv.writer(handle) header = ["step"] + list(beliefs.keys()) writer.writerow(header) for step in steps: row = [step] for var in beliefs: seq = beliefs[var] row.append(seq[step] if step < len(seq) else None) writer.writerow(row) summary = self.to_json(step_idx) with (base / "metrics.json").open("w", encoding="utf-8") as handle: import json json.dump(summary, handle, indent=2)
def _set_entry( matrix: np.ndarray | sparse.lil_matrix, row: int, col: int, value: float ) -> None: """Set matrix entry, handling both dense and sparse matrices.""" matrix[row, col] = value __all__ = ["SnapshotAnalyzer", "AnalysisReport"]