API Reference

This section contains the complete API documentation for PropFlow.

Core Modules

Engines

class propflow.bp.engine_base.BPEngine(factor_graph, computator=<propflow.bp.computators.MinSumComputator object>, init_normalization=<function dummy_func>, name='BPEngine', convergence_config=None, monitor_performance=None, normalize_messages=None, anytime=None, snapshot_manager=None)[source]

Bases: object

The core engine for running belief propagation simulations.

This class orchestrates the belief propagation process on a factor graph. It manages the simulation loop, message passing schedule, history tracking, convergence checking, and performance monitoring. It is designed to be extended by other engine classes that implement specific BP variants or policies.

computator

The computator instance for message calculation.

Type:

Computator

anytime

If True, the engine tracks the best cost found so far.

Type:

bool

normalize_messages

If True, messages are normalized each cycle.

Type:

bool

graph

The factor graph on which the simulation runs.

Type:

FactorGraph

var_nodes

A list of variable agent nodes in the graph.

Type:

list

factor_nodes

A list of factor agent nodes in the graph.

Type:

list

history

An object for tracking the simulation’s history.

Type:

History

graph_diameter

The diameter of the factor graph.

Type:

int

convergence_monitor

The monitor for checking convergence.

Type:

ConvergenceMonitor

performance_monitor

An optional monitor for performance.

Type:

PerformanceMonitor

step(i=0)[source]

Runs one full step of the synchronous belief propagation algorithm.

A step consists of two main phases: 1. Variable nodes compute and send messages to factor nodes. 2. Factor nodes compute and send messages to variable nodes.

Parameters:

i (int) – The current step number.

Return type:

Step

Returns:

A Step object containing information about the messages exchanged.

run(max_iter=None, save_json=False, save_csv=False, filename=None, config_name=None)[source]

Runs the simulation until convergence or max iterations is reached.

Parameters:
  • max_iter (int | None) – The maximum number of iterations to run.

  • save_json (bool) – Whether to save the full history as a JSON file.

  • save_csv (bool) – Whether to save the cost history as a CSV file.

  • filename (str) – The base name for the output files.

  • config_name (str | None) – The name of the configuration to use for saving.

Return type:

Optional[str]

Returns:

An optional string, typically for results or status.

property name: str

The name of the engine instance.

Type:

str

property iteration_count: int

The number of iterations completed so far.

Type:

int

get_beliefs()[source]

Retrieves the current beliefs of all variable nodes.

Return type:

Dict[str, ndarray]

Returns:

A dictionary mapping variable names to their belief vectors.

property assignments: Dict[str, int | float]

The current assignments of all variable nodes.

Type:

dict

calculate_global_cost()[source]

Calculates the global cost using the original, unmodified factor cost tables.

Return type:

float

post_init()[source]

Hook for logic to be executed after engine initialization.

Return type:

None

post_factor_cycle()[source]

Hook for logic after a full message passing cycle.

Return type:

None

post_two_cycles()[source]

Hook for logic after the first two message passing cycles.

Return type:

None

pre_factor_compute(factor, iteration=0)[source]

Hook for logic before a factor computes its messages.

Parameters:
  • factor (FactorAgent) – The factor agent about to compute messages.

  • iteration (int) – The current simulation iteration.

Return type:

None

post_factor_compute(factor, iteration)[source]

Hook for logic after a factor computes its messages.

Parameters:
  • factor (FactorAgent) – The factor agent that just computed messages.

  • iteration (int) – The current simulation iteration.

Return type:

None

pre_var_compute(var)[source]

Hook for logic before a variable computes its messages.

Parameters:

var (VariableAgent) – The variable agent about to compute messages.

Return type:

None

post_var_compute(var)[source]

Hook for logic after a variable computes its messages.

Parameters:

var (VariableAgent) – The variable agent that just computed messages.

Return type:

None

init_normalize()[source]

Hook for initial normalization logic.

Return type:

None

update_global_cost()[source]

Calculates and records the global cost for the current step.

If in “anytime” mode, it ensures the cost recorded does not increase.

Return type:

float

latest_snapshot()[source]
Return type:

Optional[EngineSnapshot]

get_snapshot(step_index)[source]
Return type:

Optional[EngineSnapshot]

property snapshots: List[EngineSnapshot]
property snapshot_map: Dict[int, EngineSnapshot]
print_snapshots(include_cost_tables=True, show='text')[source]

Prints a step-by-step formatted history of the simulation.

Parameters:
  • include_cost_tables (bool) – Whether to include cost tables in the output.

  • show (Literal['text', 'table']) – “text” (default) or “table” (tabular layout for messages).

Return type:

None

normalize_inbox()[source]

Manually normalizes messages in the system.

This normalizes the R-messages currently waiting in Variable inboxes. Since Q-messages are computed from these R-messages in the next step, this effectively stabilizes both message types against drift.

Return type:

None

class propflow.bp.engines.Engine(factor_graph, computator=<propflow.bp.computators.MinSumComputator object>, init_normalization=<function dummy_func>, name='BPEngine', convergence_config=None, monitor_performance=None, normalize_messages=None, anytime=None, snapshot_manager=None)[source]

Bases: BPEngine

A basic belief propagation engine.

This is a direct alias for BPEngine and provides the standard, unmodified belief propagation behavior.

class propflow.bp.engines.SplitEngine(*args, split_factor=0.5, **kwargs)[source]

Bases: BPEngine

A BP engine that applies the factor splitting policy.

This engine modifies the factor graph by splitting each factor into two, distributing the original cost between them. This can sometimes help with convergence.

post_init()[source]

Applies the factor splitting policy after initialization.

Return type:

None

class propflow.bp.engines.MidRunSplitEngine(*args, split_at_iter, split_factor=0.5, split_targets=None, split_fraction=None, split_seed=None, transfer_mode='reset', **kwargs)[source]

Bases: BPEngine

A BP engine that applies factor splitting at a chosen iteration.

The engine runs standard BP until split_at_iter. Immediately before that step is computed, it splits the requested factors and either resets messages to zero or transfers current factor-to-variable messages into the split graph.

The transfer mode is an empirical heuristic, not a canonical mid-run split. It preserves the aggregate factor-to-variable contribution at the variable inbox boundary (and therefore the variable belief, and the Q messages to unaffected neighbors). It does not preserve the Q messages going into the split clones themselves: each clone’s Q picks up the sibling clone’s transferred share, so each clone’s first outgoing R is not a canonical continuation of the un-split dynamics. Treat the split iteration as a heuristic injection.

step(i=0)[source]

Runs one full step of the synchronous belief propagation algorithm.

A step consists of two main phases: 1. Variable nodes compute and send messages to factor nodes. 2. Factor nodes compute and send messages to variable nodes.

Parameters:

i (int) – The current step number.

Returns:

A Step object containing information about the messages exchanged.

class propflow.bp.engines.CostReductionOnceEngine(*args, reduction_factor=0.5, **kwargs)[source]

Bases: BPEngine

A BP engine that applies a one-time cost reduction policy.

This engine reduces the costs in the factor tables at the beginning of the simulation and then applies a discount to outgoing messages from factors.

post_init()[source]

Applies the one-time cost reduction after initialization.

post_factor_compute(factor, iteration)[source]

Applies a discount to outgoing messages from factors.

class propflow.bp.engines.DampingEngine(*args, damping_factor=0.9, **kwargs)[source]

Bases: BPEngine

A BP engine that applies message damping.

Damping averages the message from the previous iteration with the newly computed message. This can help prevent oscillations and improve convergence.

post_var_compute(var)[source]

Applies damping after a variable node computes its messages.

class propflow.bp.engines.QRDampingEngine(*args, q_damping_factor=0.0, r_damping_factor=0.0, **kwargs)[source]

Bases: BPEngine

A BP engine that applies message damping to both Q and R messages.

Q damping applies to variable → factor messages. R damping applies to factor → variable messages.

post_init()[source]

Prime factor history with zero messages so R-damping applies from iter 0.

Return type:

None

post_var_compute(var)[source]

Hook for logic after a variable computes its messages.

Parameters:

var (VariableAgent) – The variable agent that just computed messages.

Return type:

None

post_factor_compute(factor, iteration)[source]

Hook for logic after a factor computes its messages.

Parameters:
  • factor (FactorAgent) – The factor agent that just computed messages.

  • iteration (int) – The current simulation iteration.

Return type:

None

class propflow.bp.engines.RDampingEngine(*args, damping_factor=0.9, **kwargs)[source]

Bases: BPEngine

A BP engine that applies message damping to R messages.

Damping averages the R message from the previous iteration with the newly computed message. This can help prevent oscillations and improve convergence. Unlike DampingEngine which damps Q messages (Variable -> Factor), this engine damps R messages (Factor -> Variable).

post_init()[source]

Initialize history with zero messages to ensure damping works from iter 0.

Return type:

None

post_factor_compute(factor, iteration)[source]

Applies damping after a factor node computes its messages.

class propflow.bp.engines.DiffusionEngine(*args, alpha=0.3, **kwargs)[source]

Bases: BPEngine

A BP engine that applies spatial message diffusion.

Unlike damping which blends messages across time (current vs previous), diffusion blends messages across space (local vs neighbors) at each iteration. This can help smooth the optimization landscape and improve convergence on densely connected graphs.

post_var_compute(var)[source]

Apply spatial diffusion to Q-messages (variable → factor).

For each message this variable sends to a factor, blend it with messages from other variables connected to the same factor.

Return type:

None

post_factor_compute(factor, iteration)[source]

Apply spatial diffusion to R-messages (factor → variable).

For each message this factor sends to a variable, blend it with messages from other factors connected to the same variable.

Return type:

None

class propflow.bp.engines.DampingSCFGEngine(*args, **kwargs)[source]

Bases: DampingEngine, SplitEngine

A BP engine that combines message damping and factor splitting.

class propflow.bp.engines.DampingCROnceEngine(*args, **kwargs)[source]

Bases: DampingEngine, CostReductionOnceEngine

A BP engine that combines message damping and one-time cost reduction.

class propflow.bp.engines.TRWEngine(*args, factor_rhos=None, tree_sample_count=250, tree_sampler_seed=None, min_rho=1e-06, **kwargs)[source]

Bases: BPEngine

Tree-Reweighted Belief Propagation engine (Min-Sum variant).

The engine keeps the standard Min-Sum computator but automatically:

  1. Samples spanning trees over the variable-only (primal) graph to estimate per-factor appearance probabilities rho_f.

  2. Scales each factor’s energy table by 1 / rho_f before message computation so local costs match the TRW objective.

  3. Re-weights outgoing R-messages from factors by rho_f so that variable updates/beliefs operate on appropriately weighted costs.

Rho sampling and scaling can be overridden by providing explicit factor_rhos (all > 0). Otherwise the engine performs end-to-end TRW reweighting using the current factor graph structure.

DEFAULT_MIN_RHO = 1e-06
post_init()[source]

Validate rho configuration, sample if needed, and scale costs.

Called from BPEngine.__init__ after self.graph is set but before messages are initialized.

Return type:

None

post_factor_compute(factor, iteration)[source]

Scale outgoing R-messages by rho_f before they are sent.

Return type:

None

class propflow.bp.engines.DampingTRWEngine(*args, **kwargs)[source]

Bases: DampingEngine, TRWEngine

A BP engine that combines TRW reweighting with message damping.

class propflow.bp.engines.MessagePruningEngine(*args, prune_threshold=0.0001, min_iterations=5, adaptive_threshold=True, **kwargs)[source]

Bases: BPEngine

A BP engine that applies a message pruning policy to reduce memory usage.

post_init()[source]

Initializes and sets the message pruning policy on agent mailers.

Return type:

None

Factor Graphs

class propflow.bp.factor_graph.FactorGraph(variable_li, factor_li, edges)[source]

Bases: object

Represents a bipartite factor graph for belief propagation.

This class encapsulates the structure of a factor graph, which consists of variable nodes and factor nodes. It enforces a bipartite structure where variables are only connected to factors and vice versa. It uses a networkx.Graph to manage the underlying connections.

variables

A list of all variable agents in the graph.

Type:

List[VariableAgent]

factors

A list of all factor agents in the graph.

Type:

List[FactorAgent]

G

The underlying networkx graph structure.

Type:

nx.Graph

property lb: int | float

The lower bound of the problem, can be set externally.

compute_cost(factors=None)[source]

Compute total cost over the given factors using current variable assignments.

Parameters:

factors (Optional[List[FactorAgent]]) – which factors to sum over. Defaults to self.factors (current, possibly modified factors). Pass self.original_factors to get the cost against the original, unmodified cost tables.

Return type:

float

property global_cost: int | float

Total cost over current (possibly modified) factors.

property curr_assignment: Dict[VariableAgent, int]

The current assignment for all variables in the graph.

Type:

dict

property edges: Dict[FactorAgent, List[VariableAgent]]

Reconstructs the edge dictionary mapping factors to variables.

Type:

dict

set_computator(computator, **kwargs)[source]

Assigns a computator to all nodes in the graph.

Parameters:
  • computator (BPComputator) – The computator instance to assign.

  • **kwargs – Additional arguments (not currently used).

Return type:

None

normalize_messages()[source]

Normalizes all incoming messages for all variable nodes.

This is a common technique to prevent numerical instability in belief propagation algorithms by shifting message values.

Return type:

None

visualize(layout='bipartite', layout_kwargs=None, plot=True, *, pretty=False)[source]

Visualizes the factor graph using matplotlib.

Variable nodes are drawn as circles, and factor nodes are drawn as squares. If plot is True the figure is shown and the function returns None. If plot is False the Figure object is returned (and not shown), allowing further programmatic use.

Parameters:
  • layout (str) – Layout algorithm to use. Supported values are "bipartite" (default), "spring", "circular", and "kamada_kawai". Ignored when pretty=True.

  • layout_kwargs (Optional[Dict[str, Any]]) – Optional keyword arguments forwarded to the selected NetworkX layout function.

  • plot (bool) – If True, call plt.show() and return None. If False, return the matplotlib.figure.Figure instance without showing it.

  • pretty (bool) – When True, render a fixed spring-layout visualization with styled nodes/legend useful for presentations.

Return type:

Figure | None

get_variable_agents()[source]

Returns a list of all variable agents in the graph.

Return type:

List[VariableAgent]

get_factor_agents()[source]

Returns a list of all factor agents in the graph.

Return type:

List[FactorAgent]

property diameter: int

The diameter of the factor graph.

If the graph is not connected, it returns the diameter of the largest connected component.

Type:

int

property original_factors: List[FactorAgent]

A deep copy of the original factor agents.

Type:

list[FactorAgent]

Computators

class propflow.bp.computators.BPComputator(reduce_func=<function min>, combine_func=<ufunc 'add'>)[source]

Bases: Computator

A vectorized, cache-friendly Belief Propagation computator.

This class provides a highly optimized implementation for the core computations in belief propagation algorithms. It uses function dispatch tables for common operations ( min, max, sum, add, multiply) to minimize overhead and leverages vectorized numpy operations for performance.

The behavior of the computator (Min-Sum, Max-Product) is determined by the reduce_func and combine_func arguments passed during initialization.

reduce_func

The function used for message reduction (np.min).

Type:

Callable

combine_func

The function used for message combination (np.add).

Type:

Callable

compute_Q(messages)[source]

Computes outgoing messages from a variable node to factor nodes (Q messages).

This is an optimized, vectorized implementation.

Parameters:

messages (List[Message]) – A list of incoming messages from factor nodes.

Return type:

List[Message]

Returns:

A list of computed messages to be sent to factor nodes.

compute_R(cost_table, incoming_messages)[source]

Computes outgoing messages from a factor node to variable nodes (R messages).

This is an optimized, vectorized implementation that involves three main steps: 1. Broadcast each incoming Q message to the dimensionality of the cost table. 2. Combine the cost table with all broadcasted Q messages once. 3. For each recipient, efficiently “remove” its Q message from the aggregate

and reduce to produce the outgoing R message.

Parameters:
  • cost_table (ndarray) – The factor’s cost table.

  • incoming_messages (List[Message]) – A list of incoming messages from variable nodes.

Return type:

List[Message]

Returns:

A list of computed messages to be sent to variable nodes.

get_assignment(belief)[source]

Gets the optimal assignment from a belief vector.

Uses the pre-selected _argreduce_func (e.g., argmin, argmax) for zero-overhead execution.

Parameters:

belief (ndarray) – The belief vector.

Return type:

int

Returns:

The index of the optimal assignment.

compute_belief(messages, domain)[source]

Computes the belief of a variable node from incoming messages.

Parameters:
  • messages (List[Message]) – A list of incoming messages.

  • domain (int) – The domain size of the variable.

Return type:

ndarray

Returns:

A numpy array representing the belief distribution.

propflow.bp.computators.Computator

alias of BPComputator

class propflow.bp.computators.MinSumComputator[source]

Bases: BPComputator

A computator for the Min-Sum belief propagation algorithm.

This is equivalent to finding the Most Probable Explanation (MPE) in a graphical model represented in the log-domain. It combines messages using addition and reduces them using the min operator.

class propflow.bp.computators.MaxSumComputator[source]

Bases: BPComputator

A computator for the Max-Sum belief propagation algorithm.

This is used for problems where the goal is to maximize a sum of utilities. It combines messages using addition and reduces them using the max operator.

class propflow.bp.computators.MaxProductComputator[source]

Bases: BPComputator

A computator for the Max-Product belief propagation algorithm.

This is equivalent to finding the Most Probable Explanation (MPE) in a graphical model. It combines messages using multiplication and reduces them using the max operator.

class propflow.bp.computators.SumProductComputator[source]

Bases: BPComputator

A computator for the Sum-Product belief propagation algorithm.

This is used to compute marginal probabilities in a graphical model. It combines messages using multiplication and reduces them (marginalizes) using summation.

Agents

class propflow.core.agents.FGAgent(name, node_type, domain)[source]

Bases: Agent, ABC

Abstract base class for belief propagation (BP) nodes.

Extends the Agent class with methods relevant to message passing, updating local belief, and retrieving that belief. It serves as a foundation for both VariableAgent and FactorAgent classes.

domain

The size of the variable domain.

Type:

int

mailer

Handles incoming and outgoing messages.

Type:

MailHandler

receive_message(message)[source]

Receives a message and adds it to the mailer’s inbox.

Parameters:

message (Message) – The message to be received.

Return type:

None

empty_mailbox()[source]

Clears all messages from the mailer’s inbox.

Return type:

None

empty_outgoing()[source]

Clears all messages from the mailer’s outbox.

property inbox: List[Message]

A list of incoming messages.

Type:

list[Message]

property outbox: List[Message]

A list of outgoing messages.

Type:

list[Message]

abstractmethod compute_messages()[source]

Abstract method to compute outgoing messages.

This method must be implemented by subclasses to define how messages are calculated based on the agent’s current state and incoming messages.

Return type:

List[Message]

Returns:

A list of messages to be sent.

property last_iteration: List[Message]

The last list of messages sent.

Type:

list[Message]

last_cycle(diameter=1)[source]

Retrieves messages from a previous cycle.

Parameters:

diameter (int) – The number of iterations in a cycle. Defaults to 1.

Return type:

List[Message]

Returns:

A list of messages from the specified previous cycle.

append_last_iteration()[source]

Appends the current outbox to the history.

Maintains a history of sent messages, limited by _max_history.

class propflow.core.agents.VariableAgent(name, domain)[source]

Bases: FGAgent

Represents a variable node in a factor graph.

This agent is responsible for aggregating messages from neighboring factor nodes to compute its belief over its domain.

computator

An object that handles the computation of messages and beliefs.

compute_messages()[source]

Computes outgoing messages to factor nodes.

Uses the assigned computator to calculate messages based on the contents of the inbox.

Return type:

None

property belief: ndarray

The current belief distribution over the variable’s domain.

Type:

np.ndarray

property curr_assignment: int | float

The current assignment for the variable.

Type:

int | float

class propflow.core.agents.FactorAgent(name, domain, ct_creation_func, param=None, cost_table=None)[source]

Bases: FGAgent

Represents a factor node in a factor graph.

This agent stores a cost function (or utility function) that defines the relationship between a set of connected variable nodes.

cost_table

The table of costs for each combination of assignments.

Type:

CostTable

connection_number

A mapping from variable names to their dimension index.

Type:

dict

ct_creation_func

A function to create the cost table.

Type:

Callable

ct_creation_params

Parameters for the cost table creation function.

Type:

dict

classmethod create_from_cost_table(name, cost_table)[source]

Creates a FactorAgent from an existing cost table.

Parameters:
  • name (str) – The name of the factor.

  • cost_table (ndarray) – The cost table to use.

Return type:

FactorAgent

Returns:

A new FactorAgent instance.

compute_messages()[source]

Computes messages to be sent to variable nodes.

Uses the assigned computator to calculate messages based on the cost table and incoming messages from variable nodes.

Return type:

None

initiate_cost_table()[source]

Creates the cost table using the provided creation function.

Raises:

ValueError – If the cost table already exists or if no connections are set.

Return type:

None

set_dim_for_variable(variable, dim)[source]

Maps a variable to a dimension in the cost table.

Parameters:
  • variable (VariableAgent) – The variable agent to map.

  • dim (int) – The dimension index in the cost table.

Return type:

None

set_name_for_factor()[source]

Sets the factor’s name based on its connected variables.

Raises:

ValueError – If no connections are set.

Return type:

None

save_original(ct=None)[source]

Saves a copy of the original cost table.

Parameters:

ct (ndarray | None) – An external cost table to save. Defaults to None.

Return type:

None

property mean_cost: float

The mean value of the costs in the cost table.

Type:

float

property total_cost: float

The sum of all costs in the cost table.

Type:

float

property original_cost_table: ndarray | None

The original, unmodified cost table, if saved.

Type:

np.ndarray | None

Components

class propflow.core.components.Message(data, sender, recipient)[source]

Bases: object

Represents a message passed between agents in the belief propagation algorithm.

data

The content of the message, typically a numpy array representing costs or beliefs.

Type:

np.ndarray

sender

The agent sending the message.

Type:

Agent

recipient

The agent receiving the message.

Type:

Agent

copy()[source]

Creates a deep copy of the message.

Returns:

A new Message instance with copied data.

Return type:

Message

class propflow.core.components.MailHandler(_domain_size)[source]

Bases: object

Handles message passing with deduplication and synchronization.

This class manages an agent’s incoming and outgoing messages, ensuring that only the latest message from each sender is stored.

pruning_policy

An optional policy for selectively discarding messages.

set_pruning_policy(policy)[source]

Sets a message pruning policy.

Parameters:

policy – An object with a should_accept_message method.

Return type:

None

set_first_message(owner, neighbor)[source]

Initializes the inbox with a zero-message from a neighbor.

This is used to ensure that an agent has a message from each neighbor before computation begins.

Parameters:
  • owner (FGAgent) – The agent who owns this mail handler.

  • neighbor (FGAgent) – The neighboring agent to initialize a message from.

Return type:

None

receive_messages(messages)[source]

Receives and handles one or more messages.

Applies a pruning policy if one is set and stores the message, overwriting any previous message from the same sender.

Parameters:

messages (Message | list[Message]) – A single Message or a list of Message objects.

Return type:

None

send()[source]

Sends all staged outgoing messages to their recipients.

Return type:

None

stage_sending(messages)[source]

Stages a list of messages to be sent.

Parameters:

messages (List[Message]) – The messages to be sent.

Return type:

None

prepare()[source]

Clears the outbox, typically after messages have been sent.

Return type:

None

clear_inbox()[source]

Clears all messages from the inbox.

Return type:

None

clear_outgoing()[source]

Clears all messages from the outbox.

Return type:

None

property inbox: List[Message]

A list of incoming messages.

Type:

list[Message]

property outbox: List[Message]

A list of outgoing messages.

Type:

list[Message]

Policies

Policies used by belief propagation bp.

class propflow.policies.ConvergenceConfig(belief_threshold=1e-06, assignment_threshold=0, min_iterations=0, patience=5, use_relative_change=True)[source]

Bases: object

Configuration settings for the convergence detection logic.

belief_threshold

The maximum change in the norm of a belief vector for it to be considered stable.

assignment_threshold

The maximum number of assignment changes allowed for the assignments to be considered stable.

min_iterations

The minimum number of iterations to run before checking for convergence.

patience

The number of consecutive iterations for which the beliefs and assignments must remain stable before declaring convergence.

use_relative_change

If True, uses the relative change in belief norm for the threshold check; otherwise, uses the absolute change.

belief_threshold: float = 1e-06
assignment_threshold: int = 0
min_iterations: int = 0
patience: int = 5
use_relative_change: bool = True
class propflow.policies.ConvergenceMonitor(config=None)[source]

Bases: object

Monitors and detects convergence in a belief propagation simulation.

This class tracks the history of beliefs and assignments to determine if the algorithm has reached a stable state according to the provided configuration.

prev_beliefs: Dict[str, ndarray] | None
prev_assignments: Dict[str, int] | None
check_convergence(beliefs, assignments)[source]

Checks if the algorithm has converged.

This method compares the current beliefs and assignments with the previous state to determine if they have stabilized.

Parameters:
  • beliefs (Dict[str, ndarray]) – A dictionary mapping variable names to their current belief vectors.

  • assignments (Dict[str, int]) – A dictionary mapping variable names to their current assignments.

Return type:

bool

Returns:

True if the algorithm is considered to have converged, False otherwise.

reset()[source]

Resets the monitor to its initial state for a new simulation run.

Return type:

None

get_convergence_summary()[source]

Returns a summary of the convergence history.

Return type:

Dict

Returns:

A dictionary containing the total iterations, convergence status, and the history of belief changes.

class propflow.policies.MessagePruningPolicy(prune_threshold=None, min_iterations=5, adaptive_threshold=True)[source]

Bases: object

A policy that prunes messages that have not changed significantly.

This policy helps to optimize belief propagation by avoiding the processing of messages that are redundant. It compares the norm of the difference between a new message and the previous message from the same sender.

policy_type

The type of the policy (PolicyType.MESSAGE).

prune_threshold

The threshold for pruning.

min_iterations

The number of initial iterations during which no pruning will occur.

adaptive_threshold

If True, the threshold is scaled by the magnitude of the message.

iteration_count

The current iteration number.

pruned_count

The total number of pruned messages.

total_count

The total number of messages considered for pruning.

should_accept_message(agent, new_message)[source]

Determines whether to accept or prune an incoming message.

Parameters:
  • agent (FGAgent) – The agent receiving the message.

  • new_message (Message) – The new Message object.

Return type:

bool

Returns:

True if the message should be accepted, False if it should be pruned.

step_completed()[source]

Signals that a simulation step has completed, incrementing the iteration count.

Return type:

None

get_stats()[source]

Returns statistics about the pruning process.

Return type:

Dict[str, float]

Returns:

A dictionary containing the pruning rate, total messages considered, number of pruned messages, and total iterations.

reset()[source]

Resets the policy’s internal state for a new simulation run.

Return type:

None

propflow.policies.TD(variables, x=None, diameter=None)[source]

Applies temporal damping to the outgoing messages of a list of variables.

This function applies damping using messages from a previous cycle, determined by the diameter. The new message is a weighted average of the message from diameter iterations ago and the current message.

The update rule is: new_message = x * previous_cycle_message + (1 - x) * current_message

Parameters:
  • variables (List[VariableAgent]) – A list of VariableAgent objects to apply damping to.

  • x (float) – The damping factor, representing the weight of the previous message. If None, the default from POLICY_DEFAULTS is used.

  • diameter (int) – The number of iterations in a cycle, used to retrieve the message from the previous cycle. If None, the default from POLICY_DEFAULTS is used.

Return type:

None

propflow.policies.cost_reduction_all_factors_once(fg, x)[source]

Applies a one-time cost reduction to all factors in the graph.

This function iterates through all factor agents in the provided factor graph and multiplies their cost tables by a given factor. It also saves the original cost table before modification.

Parameters:
  • fg (FactorGraph) – The FactorGraph to modify.

  • x (float) – The multiplicative factor to apply to the cost tables.

Return type:

None

propflow.policies.damp(agent, x=None)[source]

Applies damping to outgoing messages of a variable or factor agent.

Blends each outgoing message with the corresponding message from the previous iteration: new = x * prev + (1 - x) * current.

Parameters:
  • agent – The agent (variable or factor) whose outbox messages will be damped.

  • x (float) – The damping factor (weight of previous message). Defaults to PolicyDefaults.

Return type:

None

propflow.policies.damp_factor(agent, x=None)

Applies damping to outgoing messages of a variable or factor agent.

Blends each outgoing message with the corresponding message from the previous iteration: new = x * prev + (1 - x) * current.

Parameters:
  • agent – The agent (variable or factor) whose outbox messages will be damped.

  • x (float) – The damping factor (weight of previous message). Defaults to PolicyDefaults.

Return type:

None

propflow.policies.discount(fac_a, x)[source]

Applies a discount factor to the cost tables of specified factor agents.

Parameters:
  • fac_a (Iterable[FactorAgent]) – An iterable of FactorAgent objects whose cost tables will be discounted.

  • x (float) – The multiplicative discount factor to apply.

Return type:

None

propflow.policies.discount_attentive(fg)[source]

Applies an attentive discount to incoming messages for all variable nodes.

The discount factor for each variable’s messages is inversely proportional to the variable’s degree in the factor graph. This means variables with more connections (higher degree) will have their incoming messages discounted more heavily.

Parameters:

fg (FactorGraph) – The FactorGraph containing the variables to be updated.

Return type:

None

propflow.policies.init_normalization(li)[source]

Performs an initial normalization of cost tables for a list of factors.

This function divides each factor’s cost table by the total number of factors in the list.

Parameters:

li (List[FactorAgent]) – A list of FactorAgent objects to be normalized.

Return type:

None

propflow.policies.normalize_cost_table_sum(cost_table)[source]

Normalizes a cost table so that the sum across all dimensions is equal.

Note

The implementation of this function appears to be incorrect for its stated purpose and may not produce the intended normalization.

Parameters:

cost_table (ndarray) – An n-dimensional numpy array representing the cost table.

Return type:

ndarray

Returns:

The modified cost table.

propflow.policies.normalize_inbox(variables)[source]

Normalizes all messages in the inboxes of a list of variable agents.

This function iterates through each variable’s inbox and last iteration’s messages, normalizing them by subtracting the minimum value. This centers the message values around zero without changing their relative differences.

Parameters:

variables (List[VariableAgent]) – A list of VariableAgent objects whose messages will be normalized.

Return type:

None

propflow.policies.normalize_soft_max(cost_table)[source]

Normalizes a cost table using the softmax function.

This ensures all values are positive and sum to 1, effectively converting costs into a probability distribution. A stability trick (subtracting the max value) is used to prevent overflow with large cost values.

Parameters:

cost_table (ndarray) – An n-dimensional numpy array representing the cost table.

Return type:

ndarray

Returns:

The normalized cost table.

propflow.policies.reduce_R(fac_a, x)[source]

Reduces the outgoing R-messages from a factor agent by a factor x.

This function iterates through all staged outgoing messages (R-messages) of a given factor agent and multiplies their data by a reduction factor.

Parameters:
  • fac_a (FactorAgent) – The FactorAgent whose outgoing messages will be reduced.

  • x (float) – The factor by which to reduce the message data.

Return type:

None

propflow.policies.split_all_factors(fg, p=None)[source]

Performs an in-place replacement of every factor with two cloned factors.

Each factor f with cost table C is replaced by two new factors, f’ and f’’, with cost tables p*C and (1-p)*C, respectively. The new factors inherit all the connections of the original factor.

This function directly modifies the provided FactorGraph object, including its underlying networkx.Graph and its list of factors.

Parameters:
  • fg (FactorGraph) – The FactorGraph to modify.

  • p (float) – The splitting proportion, which must be between 0 and 1. This determines how the original cost is distributed between the two new factors. If None, the default from POLICY_DEFAULTS is used.

Raises:

AssertionError – If p is not in the range (0, 1).

Return type:

Dict[str, List[FactorAgent]]

propflow.policies.split_factors(fg, p=None, *, factor_names=None, split_fraction=None, seed=None)[source]

Split selected factors in-place and return original-to-clone mapping.

Parameters:
  • fg (FactorGraph) – The factor graph to modify.

  • p (float | None) – Split proportion allocated to the first clone.

  • factor_names (Optional[Sequence[str]]) – Optional factor names to restrict the split target set. When omitted, all factors are candidates.

  • split_fraction (float | None) – Optional fraction of the candidate factors to split. Selection is deterministic for a given seed.

  • seed (int | None) – Seed used when split_fraction is provided.

Return type:

Dict[str, List[FactorAgent]]

Returns:

A mapping from original factor name to the two replacement clone factors.

propflow.policies.split_specific_factors(fg, factors, p=None)[source]

Split specific factors in the factor graph.

Parameters:
Return type:

Dict[str, List[FactorAgent]]

Utilities

propflow.utils.dummy_func(*args, **kwargs)[source]

A dummy function that does nothing and returns nothing.

This function serves as a placeholder for callbacks or functions that are optional or not yet implemented, allowing the main logic to proceed without raising errors.

Parameters:
  • *args (Any) – Accepts any positional arguments.

  • **kwargs (Any) – Accepts any keyword arguments.

Return type:

None

class propflow.utils.FGBuilder[source]

Bases: object

A builder class providing static methods to construct factor graphs.

static build_from_edges(variables, factors, edges)[source]

Builds a factor graph from the provided variables, factors, and edges.

Parameters:
Returns:

The constructed factor graph.

Return type:

FactorGraph

static build_random_graph(num_vars, domain_size, ct_factory, ct_params, density, *, seed=None)[source]

Builds a factor graph with random binary constraints.

Parameters:
  • num_vars (int) – The number of variables in the graph.

  • domain_size (int) – The size of the domain for each variable.

  • ct_factory (Union[Callable, str]) – The factory for creating cost tables.

  • ct_params (Dict[str, Any]) – Parameters for the cost table factory.

  • density (float) – The density of the graph (probability of an edge).

  • seed (int | None) – Optional seed controlling the random topology. When omitted, randomness is derived from the globally-configured numpy and random RNGs so user-level seeding still produces deterministic graphs.

Return type:

FactorGraph

Returns:

A FactorGraph instance with a random topology.

static build_cycle_graph(num_vars, domain_size, ct_factory, ct_params, **kwargs)[source]

Builds a factor graph with a simple cycle topology.

The graph structure is x1 – f12 – x2 – … – xn – fn1 – x1.

Parameters:
  • num_vars (int) – The number of variables in the cycle.

  • domain_size (int) – The size of the domain for each variable.

  • ct_factory (Union[Callable, str]) – The factory for creating cost tables.

  • ct_params (Dict[str, Any]) – Parameters for the cost table factory.

  • **kwargs – Catches unused arguments like density for API consistency.

Return type:

FactorGraph

Returns:

A FactorGraph instance with a cycle topology.

static build_lemniscate_graph(num_vars, domain_size, ct_factory, ct_params, **kwargs)[source]

Builds a factor graph with a lemniscate (∞) topology.

The structure consists of two cycles that share a single central variable, producing a figure-eight shape. Each loop is guaranteed to contain at least two distinct variables in addition to the central node.

Parameters:
  • num_vars (int) – Total number of variables in the graph. Must be >= 5.

  • domain_size (int) – The size of the domain for each variable.

  • ct_factory (Union[Callable, str]) – Factory used to create cost tables for the factors.

  • ct_params (Dict[str, Any]) – Parameters forwarded to the cost table factory.

  • **kwargs – Captures unused parameters (e.g., density) for API parity.

Return type:

FactorGraph

Returns:

A FactorGraph instance shaped like a lemniscate.

Raises:

ValueError – If fewer than five variables are provided.

static create_lemniscate_graph(num_vars, domain_size, ct_factory, ct_params, **kwargs)

Builds a factor graph with a lemniscate (∞) topology.

The structure consists of two cycles that share a single central variable, producing a figure-eight shape. Each loop is guaranteed to contain at least two distinct variables in addition to the central node.

Parameters:
  • num_vars (int) – Total number of variables in the graph. Must be >= 5.

  • domain_size (int) – The size of the domain for each variable.

  • ct_factory (Union[Callable, str]) – Factory used to create cost tables for the factors.

  • ct_params (Dict[str, Any]) – Parameters forwarded to the cost table factory.

  • **kwargs – Captures unused parameters (e.g., density) for API parity.

Return type:

FactorGraph

Returns:

A FactorGraph instance shaped like a lemniscate.

Raises:

ValueError – If fewer than five variables are provided.

static create_leminscate_graph(num_vars, domain_size, ct_factory, ct_params, **kwargs)

Builds a factor graph with a lemniscate (∞) topology.

The structure consists of two cycles that share a single central variable, producing a figure-eight shape. Each loop is guaranteed to contain at least two distinct variables in addition to the central node.

Parameters:
  • num_vars (int) – Total number of variables in the graph. Must be >= 5.

  • domain_size (int) – The size of the domain for each variable.

  • ct_factory (Union[Callable, str]) – Factory used to create cost tables for the factors.

  • ct_params (Dict[str, Any]) – Parameters forwarded to the cost table factory.

  • **kwargs – Captures unused parameters (e.g., density) for API parity.

Return type:

FactorGraph

Returns:

A FactorGraph instance shaped like a lemniscate.

Raises:

ValueError – If fewer than five variables are provided.

static build_with_unary_costs(base_graph, unary_costs)[source]

Extends a factor graph with unary constraints (per-variable cost biases).

Unary factors are single-variable factors that act as priors or biases for individual variables. They’re useful for adding soft constraints or preferences on variable assignments.

Parameters:
  • base_graph (FactorGraph) – An existing factor graph to extend.

  • unary_costs (Dict[str, ndarray]) – A dictionary mapping variable names to 1D numpy arrays of costs. The array length must match the variable’s domain size.

Return type:

FactorGraph

Returns:

A new FactorGraph with the unary factors added.

Example

>>> fg = FGBuilder.build_cycle_graph(3, 2, create_random_int_table, {"low": 0, "high": 10})
>>> unary = {"x1": np.array([0, 5]), "x2": np.array([3, 0])}
>>> fg_with_unary = FGBuilder.build_with_unary_costs(fg, unary)
propflow.utils.find_project_root()[source]

Finds the project root directory by searching for common markers.

This function starts from the current working directory and traverses up the directory tree, looking for files or directories like .git, pyproject.toml, or setup.py that typically indicate the root of a project.

Return type:

Path

Returns:

A Path object representing the project root directory, or the current working directory if no project root markers are found (e.g., when installed as a package).

class propflow.utils.CTFactories[source]

Bases: object

Cost table factory namespace. Use: CTFactories.RANDOM_INT

UNIFORM(domain, low=0.0, high=1.0)

Generate cost table with uniform float values.

When using FGBuilder, pass low and high via ct_params.

RANDOM_INT(domain, low=0, high=10)

Generate cost table with random integer values.

When using FGBuilder, pass low and high via ct_params.

POISSON(domain, rate=1.0, strength=None)

Generate cost table with Poisson-distributed values.

When using FGBuilder, pass rate or strength via ct_params.

Configuration

Configuration helpers for the belief propagation simulator.

class propflow.configs.Logger(name, level=None, file=None)[source]

Bases: Logger

A custom logger that provides standardized console and file logging.

This class extends the standard logging.Logger to automatically configure a colored console handler and an optional file handler based on the global logging configuration.

file_handler

The handler for logging to a file, which is only created if file logging is enabled.

Type:

logging.FileHandler

console

The handler for logging to the console with colored output.

Type:

colorlog.StreamHandler

class propflow.configs.CTFactories[source]

Bases: object

Cost table factory namespace. Use: CTFactories.RANDOM_INT

UNIFORM(domain, low=0.0, high=1.0)

Generate cost table with uniform float values.

When using FGBuilder, pass low and high via ct_params.

RANDOM_INT(domain, low=0, high=10)

Generate cost table with random integer values.

When using FGBuilder, pass low and high via ct_params.

POISSON(domain, rate=1.0, strength=None)

Generate cost table with Poisson-distributed values.

When using FGBuilder, pass rate or strength via ct_params.

propflow.configs.get_ct_factory(factory)[source]

Resolve factory identifier to callable.

Accepts CTFactories.RANDOM_INT, “random_int”, or raw function.

Return type:

Callable

propflow.configs.create_poisson_table(n, domain, rate=1.0, strength=None)[source]

Generate cost table with Poisson-distributed values.

When using FGBuilder, pass rate or strength via ct_params.

propflow.configs.create_random_int_table(n, domain, low=0, high=10)[source]

Generate cost table with random integer values.

When using FGBuilder, pass low and high via ct_params.

propflow.configs.create_uniform_float_table(n, domain, low=0.0, high=1.0)[source]

Generate cost table with uniform float values.

When using FGBuilder, pass low and high via ct_params.

class propflow.configs.EngineDefaults(max_iterations=2000, normalize_messages=True, monitor_performance=False, anytime=False, timeout=600)[source]

Bases: object

Default parameters for the belief propagation engine.

max_iterations: int = 2000
normalize_messages: bool = True
monitor_performance: bool = False
anytime: bool = False
timeout: int = 600
class propflow.configs.LoggingDefaults(default_level=20, verbose_logging=False, file_logging=True, log_dir='configs/logs', log_format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', console_format='%(log_color)s%(asctime)s - %(name)s - %(message)s', file_format='%(asctime)s - %(name)s  - %(message)s', console_colors=<factory>)[source]

Bases: object

Default configuration for the logging system.

default_level: int = 20
verbose_logging: bool = False
file_logging: bool = True
log_dir: str = 'configs/logs'
log_format: str = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
console_format: str = '%(log_color)s%(asctime)s - %(name)s - %(message)s'
file_format: str = '%(asctime)s - %(name)s  - %(message)s'
console_colors: Dict[str, str]
class propflow.configs.PolicyDefaults(damping_factor=0.9, damping_diameter=1, split_factor=0.5, pruning_threshold=0.1, pruning_magnitude_factor=0.1, cost_reduction_enabled=True, tree_reweight_factor=0.5)[source]

Bases: object

Default parameters for various belief propagation policies.

damping_factor: float = 0.9
damping_diameter: int = 1
split_factor: float = 0.5
pruning_threshold: float = 0.1
pruning_magnitude_factor: float = 0.1
cost_reduction_enabled: bool = True
tree_reweight_factor: float = 0.5
class propflow.configs.ConvergenceDefaults(belief_threshold=1e-06, assignment_threshold=0, min_iterations=0, patience=5, use_relative_change=True, timeout=600)[source]

Bases: object

Default parameters for the convergence monitor.

belief_threshold: float = 1e-06
assignment_threshold: int = 0
min_iterations: int = 0
patience: int = 5
use_relative_change: bool = True
timeout: int = 600
class propflow.configs.SimulatorDefaults(default_max_iter=5000, default_log_level='INFORMATIVE', timeout=3600, cpu_count_multiplier=1.0)[source]

Bases: object

Default parameters for the multi-simulation runner.

default_max_iter: int = 5000
default_log_level: str = 'INFORMATIVE'
timeout: int = 3600
cpu_count_multiplier: float = 1.0
propflow.configs.validate_engine_config(config)[source]

Validate engine configuration. Raises ValueError if invalid.

Return type:

bool

propflow.configs.validate_policy_config(config)[source]

Validate policy configuration. Raises ValueError if invalid.

Return type:

bool

propflow.configs.validate_convergence_config(config)[source]

Validates convergence configuration parameters.

Parameters:

config (Dict[str, Any]) – A dictionary containing convergence configuration.

Return type:

bool

Returns:

True if the configuration is valid.

Raises:

ValueError – If a value is invalid or outside its expected range.

propflow.configs.get_validated_config(config_type, user_config=None)[source]

Merges a user configuration with defaults and validates it.

Parameters:
  • config_type (str) – The type of configuration to retrieve (e.g., ‘engine’, ‘policy’).

  • user_config (Optional[Dict[str, Any]]) – An optional dictionary of user-provided overrides.

Return type:

Dict[str, Any]

Returns:

A dictionary containing the final, validated configuration.

Raises:

ValueError – If the config_type is unknown or validation fails.

Simulator

A parallelized simulator for running and comparing multiple engine configurations.

This module provides a Simulator class that can run multiple belief propagation engine configurations across a set of factor graphs in parallel. It uses Python’s multiprocessing module to distribute the simulation runs, collects the results, and provides a simple plotting utility to visualize and compare the performance of different engines.

class propflow.simulator.Simulator(engine_configs, log_level=None, *, seed=None)[source]

Bases: object

Orchestrates parallel execution of multiple simulation configurations.

This class takes a set of engine configurations and a list of factor graphs, runs each engine on each graph in parallel, collects the cost history from each run, and provides methods to visualize the aggregated results.

engine_configs

A dictionary mapping engine names to their configurations.

Type:

dict

logger

A configured logger instance.

Type:

Logger

results

A dictionary to store the results of the simulations.

Type:

dict

timeout

The timeout in seconds for multiprocessing tasks.

Type:

int

run_simulations(graphs, max_iter=None)[source]

Runs all engine configurations on all provided graphs in parallel.

Parameters:
  • graphs (List[Any]) – A list of factor graph objects to run simulations on.

  • max_iter (Optional[int]) – The maximum number of iterations for each simulation run.

Return type:

Dict[str, List[List[float]]]

Returns:

A dictionary containing the collected results, where keys are engine names and values are lists of cost histories for each run.

plot_results(max_iter=None, verbose=False)[source]

Plots the average cost convergence for each engine configuration.

Parameters:
  • max_iter (Optional[int]) – The maximum number of iterations to display on the plot.

  • verbose (bool) – If True, plots individual simulation runs with transparency and standard deviation bands around the average.

Return type:

None

set_log_level(level)[source]

Sets the logging level for the simulator’s logger.

Parameters:

level (str) – The desired logging level (e.g., ‘INFO’, ‘DEBUG’).

Return type:

None

Snapshots

Per-step snapshot capture and analysis for BP engines.

This module provides a small, modular API to record a lightweight snapshot of the engine state at each step (when enabled), and to compute Jacobian blocks and cycle metrics for focused, iteration-level analysis.

Top-level exports: - EngineSnapshot: container of snapshot data + Jacobians + metrics - SnapshotManager: attaches to engine and records per-step snapshots - SnapshotAnalyzer: derive Jacobian matrices and convergence metrics from snapshots - AnalysisReport: generate analysis reports and export results - SnapshotVisualizer: plot belief argmin trajectories from snapshots

class propflow.snapshots.EngineSnapshot(step, lambda_, dom, N_var, N_fac, Q, R, unary=<factory>, beliefs=<factory>, assignments=<factory>, global_cost=None, metadata=<factory>, cost_tables=<factory>, cost_labels=<factory>, jacobians=None, cycles=None, winners=None, min_idx=None, bct_metadata=<factory>, captured_at=<factory>)[source]

Bases: object

Self-contained snapshot captured at a single BP step.

step

Current iteration step. (int)

lambda_

Damping factor used in this step. (float)

dom

Variable domains.

N_var

Neighbouring factors for each variable.

N_fac

Neighbouring variables for each factor. (2 for binary factors)

Q

Outgoing Q messages.

R

Outgoing R messages.

unary

Unary beliefs for each variable.

beliefs

Marginal beliefs for each variable.

assignments

Current MAP assignments for each variable.

global_cost

Current global cost (if computed).

metadata

Additional metadata captured at this step.

cost_tables

Cost tables for each factor.

cost_labels

Labels for cost tables.

jacobians

Jacobian matrices and metadata (if computed).

cycles

Cycle metrics (if computed).

winners

Winning assignments per factor (if applicable).

min_idx

Indices of minimum costs per message (if applicable).

bct_metadata

Additional BCT-related metadata.

captured_at

Timestamp when the snapshot was captured.

step: int
lambda_: float
dom: Dict[str, List[str]]
N_var: Dict[str, List[str]]
N_fac: Dict[str, List[str]]
Q: Dict[Tuple[str, str], ndarray]
R: Dict[Tuple[str, str], ndarray]
unary: Dict[str, ndarray]
beliefs: Dict[str, ndarray]
assignments: Dict[str, int]
global_cost: float | None = None
metadata: Dict[str, Any]
cost_tables: Dict[str, ndarray]
cost_labels: Dict[str, List[str]]
jacobians: Jacobians | None = None
cycles: CycleMetrics | None = None
winners: Dict[Tuple[str, str, str], Dict[str, str]] | None = None
min_idx: Dict[Tuple[str, str], int] | None = None
bct_metadata: Dict[str, Any]
captured_at: datetime
class propflow.snapshots.Jacobians(idxQ, idxR, A, P, B, block_norms=None)[source]

Bases: object

Holds Jacobian-related matrices and associated metadata.

idxQ: Dict[Tuple[str, str, int], int]
idxR: Dict[Tuple[str, str, int], int]
A: csr_matrix
P: csr_matrix
B: csr_matrix
block_norms: Dict[str, float] | None = None
class propflow.snapshots.CycleMetrics(num_cycles, aligned_hops_total, has_certified_contraction, details=None)[source]

Bases: object

Compact summary of cycle analysis for a given step.

num_cycles: int
aligned_hops_total: int
has_certified_contraction: bool
details: List[Dict[str, Any]] | None = None
to_dict()[source]
Return type:

Dict[str, Any]

class propflow.snapshots.SnapshotManager[source]

Bases: object

Lightweight helper that captures a snapshot for a single engine step.

capture_step(step_index, step, engine)[source]

Capture the engine state after a completed step.

Return type:

EngineSnapshot

class propflow.snapshots.SnapshotAnalyzer(snapshots, *, domain=None, max_cycle_len=12)[source]

Bases: object

Analyze convergence dynamics from propflow snapshots.

This class provides methods to derive Jacobian matrices, dependency graphs, cycle metrics, and other structural properties from snapshot sequences.

beliefs_per_variable()[source]

Return the argmin trajectory for each variable across snapshots.

Return type:

Dict[str, List[int | None]]

difference_coordinates(step_idx)[source]

Compute ΔQ and ΔR (difference coordinates) for a snapshot.

Return type:

tuple[Dict[tuple[str, str], float | ndarray], Dict[tuple[str, str], float | ndarray]]

jacobian(step_idx)[source]

Construct the Jacobian matrix in difference coordinates.

Return type:

ndarray | spmatrix

dependency_digraph(step_idx)[source]

Construct the dependency digraph induced by the Jacobian.

Return type:

DiGraph

cycle_metrics(step_idx)[source]

Compute cycle metrics for convergence analysis.

Return type:

Dict[str, Any]

nilpotent_index(step_idx)[source]

Compute the nilpotent index of the Jacobian matrix.

Return type:

int | None

block_norms(step_idx)[source]

Compute infinity norms of Jacobian blocks.

Return type:

Dict[str, float]

class propflow.snapshots.AnalysisReport(analyzer)[source]

Bases: object

Generate analysis reports from snapshots.

to_json(step_idx)[source]

Generate a JSON-serializable analysis report for a snapshot.

Parameters:

step_idx (int) – The index of the snapshot to analyze.

Return type:

Dict[str, Any]

Returns:

A dictionary containing analysis results.

to_csv(base_dir, *, step_idx)[source]

Export analysis results to CSV files.

Parameters:
  • base_dir (str | Path) – Directory to save CSV files.

  • step_idx (int) – The step index for the analysis.

Return type:

None

class propflow.snapshots.SnapshotVisualizer(snapshots)[source]

Bases: object

Visualize belief propagation snapshot trajectories.

variables()[source]

Return sorted list of all variables in the snapshots.

Return type:

List[str]

steps()[source]

Return the ordered simulation steps captured in the snapshots.

Return type:

List[int]

argmin_series(vars_filter=None)[source]

Get argmin trajectories for selected variables.

Parameters:
  • vars_filter (Optional[List[str]]) – Optional list of variable names to include.

  • None (If)

  • included. (all variables are)

Return type:

Dict[str, List[int | None]]

Returns:

Dictionary mapping variable names to their argmin trajectories.

global_cost_series(*, include_missing=False, fill_value=nan)[source]

Return the global cost trajectory extracted from the snapshots.

Parameters:
  • include_missing (bool) – If True, include steps with missing costs using fill_value.

  • fill_value (float) – Value to substitute whenever a snapshot lacks a global cost.

Return type:

Tuple[List[int], List[float]]

Returns:

A tuple (steps, costs) with matching lengths.

Raises:

ValueError – If no snapshots contain global cost information.

factor_cost_matrix(factor, step)[source]

Return a copy of a factor’s cost table at a given step.

Return type:

ndarray

factor_cost_labels(factor, step)[source]
Return type:

Tuple[List[str], List[str]]

static plot_agent_cost_table(agent, *, connections=None, fmt='{:.3g}', plot=True, show=True, savepath=None, cmap='viridis', annotate=True)[source]

Static method to show a cost table for a FactorAgent without snapshots.

Parameters:
  • agent (FactorAgent) – The FactorAgent to visualize.

  • connections (Optional[Dict[FactorAgent, Tuple[str, str]]]) – Optional dictionary mapping FactorAgent objects to (row_var, col_var) tuples. Used to provide variable names for agents without explicit connections.

  • fmt (str) – Numeric format string for printing tables and annotations.

  • plot (bool) – If True, generate a heatmap using matplotlib. If False, print a text table.

  • show (bool) – Whether to display the plot (if plot=True).

  • savepath (str | None) – Optional path to save the figure (if plot=True).

  • cmap (str) – Colormap for the heatmap (if plot=True).

  • annotate (bool) – Whether to annotate cells with values (if plot=True).

Return type:

Figure | str

Returns:

The formatted table string (if plot=False) or a matplotlib Figure (if plot=True).

plot_cost_tables(factor=None, step=None, *, show=True, savepath=None, cmap='viridis', annotate=True, fmt='{:.3g}', connections=None)[source]

Pretty-print or plot factor cost tables from a snapshot.

  • When factor is provided, prints a labeled table for that factor and returns the formatted string.

  • When factor is None, plots all cost tables in a grid with axis labels derived from the factor’s variable ordering. Titles include the factor name.

Parameters:
  • factor (str | FactorAgent | None) – Optional factor name/agent. If provided, only that factor is shown.

  • step (int | None) – Snapshot step to use. Defaults to the last recorded step.

  • show (bool) – Whether to display the plot (when plotting all factors).

  • savepath (str | None) – Optional path to save the plotted figure (when plotting all factors).

  • cmap (str) – Matplotlib colormap name for heatmaps.

  • annotate (bool) – Whether to overlay numeric values on heatmaps.

  • fmt (str) – Numeric format string for printing tables and annotations.

  • connections (Optional[Dict[FactorAgent, Tuple[str, str]]]) – Optional dictionary mapping FactorAgent objects to (row_var, col_var) tuples. Used to provide variable names for agents without explicit connections.

Return type:

Figure | str

Returns:

A matplotlib Figure when plotting all factors, or the formatted table string when printing a single factor.

plot_factor_costs(from_variable, to_factor=None, step=None, *, mode='auto', cmap='viridis', annotate=True, show=True, savepath=None, return_data=False, highlight_color='tab:red', text_color='black', fmt='{:.3g}')[source]

Visualise factor cost tables induced by factor→variable messages.

Return type:

Union[Figure, Tuple[Figure, ndarray, ndarray]]

plot_global_cost(*, show=True, savepath=None, include_missing=False, fill_value=nan, rolling_window=None, return_data=False)[source]

Plot the evolution of the global cost captured in the snapshots.

Parameters:
  • show (bool) – Whether to display the plot window.

  • savepath (str | None) – Optional file path to save the figure.

  • include_missing (bool) – If True, include steps without a recorded cost using fill_value.

  • fill_value (float) – Value used when include_missing is True and a step lacks a cost.

  • rolling_window (int | None) – Size of a trailing window to compute and overlay a rolling mean.

  • return_data (bool) – If True, return the underlying data alongside the figure.

Return type:

Union[Figure, Tuple[Figure, Dict[str, Any]]]

Returns:

The created matplotlib figure, optionally accompanied by the plotted data.

plot_message_norms(*, message_type='Q', pairs=None, norm='l2', show=True, savepath=None, return_data=False)[source]

Plot the per-step norms of Q or R messages.

Parameters:
  • message_type (Literal['Q', 'R']) – Select "Q" (variable→factor) or "R" (factor→variable``) messages.

  • pairs (Optional[Sequence[tuple[str, str]]]) – Optional explicit list of message pairs to include. Each tuple is (sender, recipient).

  • norm (Literal['l2', 'l1', 'linf']) – Vector norm used to summarise each message. Supported values are "l2", "l1", "linf".

  • show (bool) – Whether to display the plot.

  • savepath (str | None) – Optional path to save the figure.

  • return_data (bool) – If True, include the computed series in the return value.

Return type:

Union[Figure, Tuple[Figure, Dict[str, Any]]]

Returns:

The created figure, optionally with the underlying message norm series.

plot_assignment_heatmap(vars_filter=None, *, show=True, savepath=None, cmap='viridis', missing_value=nan, annotate=True, value_labels=None, return_data=False)[source]

Plot variable assignments over time as a heatmap.

Parameters:
  • vars_filter (Optional[List[str]]) – Optional subset of variables to include.

  • show (bool) – Whether to display the figure window.

  • savepath (str | None) – Optional path to save the generated figure.

  • cmap (str) – Matplotlib colormap name to use for the heatmap.

  • missing_value (float) – Value inserted when an assignment is missing for a step. Defaults to NaN so gaps appear as empty cells.

  • annotate (bool) – Whether to write assignment values inside each cell.

  • value_labels (Union[Mapping[int, str], Sequence[str], None]) – Optional mapping or ordered list that converts assignment indices to display labels (e.g., {0: "A", 1: "B"} or ["A", "B", "C"]).

  • return_data (bool) – If True, return the data used for plotting alongside the figure.

Return type:

Union[Figure, Tuple[Figure, Dict[str, Any]], None]

Returns:

The heatmap figure, optionally with the underlying matrix.

plot_argmin_per_variable(vars_filter=None, *, figsize=None, show=True, savepath=None, combined_savepath=None, layout='separate')[source]

Plot argmin trajectories for selected variables.

Parameters:
  • vars_filter (Optional[List[str]]) – Optional list of variable names to plot.

  • figsize (tuple[float, float] | None) – Figure size tuple (width, height).

  • show (bool) – Whether to display the plot.

  • savepath (str | None) – Optional path to save individual variable plots (separate layout).

  • combined_savepath (str | None) – Optional path to save a combined plot.

  • layout (Literal['separate', 'combined']) – Choose “separate” for per-variable panels or “combined” for a single figure.

Return type:

None

plot_bct(variable_name, iteration=None, *, value_index=None, steps_back=None, show=True, savepath=None, verbose=False)[source]

Plot a Backtrack Cost Tree (BCT) for a variable from snapshots.

Reconstructs BCT data from snapshot Q and R messages, then visualizes how costs and beliefs from earlier iterations contribute to the final belief of the specified variable.

Parameters:
  • variable_name (str) – The name of the variable to visualize the BCT for.

  • iteration (int | None) – The iteration index to trace back from. Defaults to None (last step). If None, uses -1 (the last captured iteration).

  • steps_back (int | None) – Optional number of steps from the end to anchor the tree. For example, steps_back=10 traces the state 10 steps before the last recorded snapshot. When provided, overrides iteration.

  • show (bool) – Whether to display the plot.

  • savepath (str | None) – Optional path to save the generated figure.

  • verbose (bool) – If True, annotate edges with message costs that generated each contribution in addition to assignments and table costs.

Return type:

BCTCreator

Returns:

The BCTCreator object for further analysis (e.g., convergence analysis, variable comparisons). Can be used to call methods like analyze_convergence(), compare_variables(), export_analysis(), etc.

Raises:

ValueError – If the variable_name is not found in the snapshots.

class propflow.snapshots.StepByStepFormatter(snapshots, normalize_messages=True, route_filter='both', route_order=None, ignore_pairs=None)[source]

Bases: object

Formats BP simulation steps in Excel-like tabular format.

This class takes a sequence of EngineSnapshots and provides methods to format them as readable step-by-step output showing: - Cost tables for all factors - Q messages (variable -> factor) per iteration - R messages (factor -> variable) per iteration - Variable assignments and beliefs per iteration - Solution cost per iteration

Example

>>> from propflow.snapshots import StepByStepFormatter
>>> formatter = StepByStepFormatter(engine.snapshot_manager.snapshots)
>>> print(formatter.format_all_steps())
property variables: List[str]

List of variable names in the problem.

property factors: List[str]

List of factor names in the problem.

property steps: List[int]

List of step numbers available.

property domain_size: int

Domain size of variables (from first snapshot).

format_cost_tables(use_letters=True)[source]

Format cost tables for all factors.

Parameters:

use_letters (bool) – If True, use letter labels (a, b, …) instead of numbers.

Return type:

str

Returns:

Formatted string showing all cost tables.

format_iteration(step, use_letters=True, show='text')[source]

Format Q/R messages, assignments, and cost for one iteration.

Parameters:
  • step (int) – The step number to format.

  • use_letters (bool) – If True, use letter labels for domain values.

  • show (Literal['text', 'table']) – "text" (default) prints the existing format; "table" renders Q/R messages in a tabular layout.

Return type:

str

Returns:

Formatted string for the iteration.

format_all_steps(include_cost_tables=True, show='text')[source]

Return complete step-by-step output.

Parameters:
  • include_cost_tables (bool) – If True, include cost tables at the beginning.

  • show (Literal['text', 'table']) – "text" (default) prints the existing format; "table" renders Q/R messages in a tabular layout.

Return type:

str

Returns:

Complete formatted output for all iterations.

format_summary()[source]

Return a compact summary of the simulation.

Return type:

str

Returns:

Summary string with initial/final costs and assignments.