API Reference

This section contains the complete API documentation for PropFlow.

Core Modules

Engines

class propflow.bp.engine_base.BPEngine(factor_graph, computator=<propflow.bp.computators.MinSumComputator object>, init_normalization=<function dummy_func>, name='BPEngine', convergence_config=None, monitor_performance=None, normalize_messages=None, anytime=None, use_bct_history=None, snapshots_config=None)[source]

Bases: object

The core engine for running belief propagation simulations.

This class orchestrates the belief propagation process on a factor graph. It manages the simulation loop, message passing schedule, history tracking, convergence checking, and performance monitoring. It is designed to be extended by other engine classes that implement specific BP variants or policies.

computator

The computator instance for message calculation.

Type:

Computator

anytime

If True, the engine tracks the best cost found so far.

Type:

bool

normalize_messages[source]

If True, messages are normalized each cycle.

Type:

bool

graph

The factor graph on which the simulation runs.

Type:

FactorGraph

var_nodes

A list of variable agent nodes in the graph.

Type:

list

factor_nodes

A list of factor agent nodes in the graph.

Type:

list

history

An object for tracking the simulation’s history.

Type:

History

graph_diameter

The diameter of the factor graph.

Type:

int

convergence_monitor

The monitor for checking convergence.

Type:

ConvergenceMonitor

performance_monitor

An optional monitor for performance.

Type:

PerformanceMonitor

step(i=0)[source]

Runs one full step of the synchronous belief propagation algorithm.

A step consists of two main phases: 1. Variable nodes compute and send messages to factor nodes. 2. Factor nodes compute and send messages to variable nodes.

Parameters:

i (int) – The current step number.

Return type:

Step

Returns:

A Step object containing information about the messages exchanged.

run(max_iter=None, save_json=False, save_csv=True, filename=None, config_name=None)[source]

Runs the simulation until convergence or max iterations is reached.

Parameters:
  • max_iter (int) – The maximum number of iterations to run.

  • save_json (bool) – Whether to save the full history as a JSON file.

  • save_csv (bool) – Whether to save the cost history as a CSV file.

  • filename (str) – The base name for the output files.

  • config_name (str) – The name of the configuration to use for saving.

Return type:

Optional[str]

Returns:

An optional string, typically for results or status.

property name: str

The name of the engine instance.

Type:

str

property iteration_count: int

The number of iterations completed so far.

Type:

int

get_beliefs()[source]

Retrieves the current beliefs of all variable nodes.

Return type:

Dict[str, ndarray]

Returns:

A dictionary mapping variable names to their belief vectors.

property assignments: Dict[str, int | float]

The current assignments of all variable nodes.

Type:

dict

calculate_global_cost()[source]

Calculates the global cost based on the current variable assignments.

This method uses the original, unmodified factor cost tables to ensure the true cost is computed, independent of any runtime cost modifications.

Return type:

float

Returns:

The total cost of the current assignments.

post_init()[source]

Hook for logic to be executed after engine initialization.

Return type:

None

post_factor_cycle()[source]

Hook for logic after a full message passing cycle.

Return type:

None

post_two_cycles()[source]

Hook for logic after the first two message passing cycles.

Return type:

None

pre_factor_compute(factor, iteration=0)[source]

Hook for logic before a factor computes its messages.

Parameters:
  • factor (FactorAgent) – The factor agent about to compute messages.

  • iteration (int) – The current simulation iteration.

Return type:

None

post_factor_compute(factor, iteration)[source]

Hook for logic after a factor computes its messages.

Parameters:
  • factor (FactorAgent) – The factor agent that just computed messages.

  • iteration (int) – The current simulation iteration.

Return type:

None

pre_var_compute(var)[source]

Hook for logic before a variable computes its messages.

Parameters:

var (VariableAgent) – The variable agent about to compute messages.

Return type:

None

post_var_compute(var)[source]

Hook for logic after a variable computes its messages.

Parameters:

var (VariableAgent) – The variable agent that just computed messages.

Return type:

None

init_normalize()[source]

Hook for initial normalization logic.

Return type:

None

update_global_cost()[source]

Calculates and records the global cost for the current step.

If in “anytime” mode, it ensures the cost recorded does not increase.

Return type:

None

normalize_messages()[source]

Placeholder hook for message normalization logic.

Return type:

None

latest_snapshot()[source]

Returns the latest snapshot record if snapshots are enabled.

Returns:

The latest snapshot object, or None if snapshots are disabled.

get_snapshot(step_index)[source]

Returns the snapshot record for a given step index.

Parameters:

step_index (int) – The step for which to retrieve the snapshot.

Returns:

The snapshot object for the given step, or None if not available.

class propflow.bp.engines.Engine(factor_graph, computator=<propflow.bp.computators.MinSumComputator object>, init_normalization=<function dummy_func>, name='BPEngine', convergence_config=None, monitor_performance=None, normalize_messages=None, anytime=None, use_bct_history=None, snapshots_config=None)[source]

Bases: BPEngine

A basic belief propagation engine.

This is a direct alias for BPEngine and provides the standard, unmodified belief propagation behavior.

class propflow.bp.engines.SplitEngine(*args, split_factor=0.6, **kwargs)[source]

Bases: BPEngine

A BP engine that applies the factor splitting policy.

This engine modifies the factor graph by splitting each factor into two, distributing the original cost between them. This can sometimes help with convergence.

post_init()[source]

Applies the factor splitting policy after initialization.

Return type:

None

class propflow.bp.engines.CostReductionOnceEngine(*args, reduction_factor=0.5, **kwargs)[source]

Bases: BPEngine

A BP engine that applies a one-time cost reduction policy.

This engine reduces the costs in the factor tables at the beginning of the simulation and then applies a discount to outgoing messages from factors.

post_init()[source]

Applies the one-time cost reduction after initialization.

post_factor_compute(factor, iteration)[source]

Applies a discount to outgoing messages from factors.

class propflow.bp.engines.DampingEngine(*args, damping_factor=0.9, **kwargs)[source]

Bases: BPEngine

A BP engine that applies message damping.

Damping averages the message from the previous iteration with the newly computed message. This can help prevent oscillations and improve convergence.

post_var_compute(var)[source]

Applies damping after a variable node computes its messages.

class propflow.bp.engines.DampingSCFGEngine(*args, **kwargs)[source]

Bases: DampingEngine, SplitEngine

A BP engine that combines message damping and factor splitting.

class propflow.bp.engines.DampingCROnceEngine(*args, **kwargs)[source]

Bases: DampingEngine, CostReductionOnceEngine

A BP engine that combines message damping and one-time cost reduction.

class propflow.bp.engines.MessagePruningEngine(*args, prune_threshold=0.0001, min_iterations=5, adaptive_threshold=True, **kwargs)[source]

Bases: BPEngine

A BP engine that applies a message pruning policy to reduce memory usage.

post_init()[source]

Initializes and sets the message pruning policy on agent mailers.

Return type:

None

class propflow.bp.engines.DiscountEngine(*args, **kwargs)[source]

Bases: BPEngine

A BP engine that applies a discount factor to messages over time.

post_factor_cycle()[source]

Applies the discount policy after each message passing cycle.

Factor Graphs

class propflow.bp.factor_graph.FactorGraph(variable_li, factor_li, edges)[source]

Bases: object

Represents a bipartite factor graph for belief propagation.

This class encapsulates the structure of a factor graph, which consists of variable nodes and factor nodes. It enforces a bipartite structure where variables are only connected to factors and vice versa. It uses a networkx.Graph to manage the underlying connections.

variables

A list of all variable agents in the graph.

Type:

List[VariableAgent]

factors

A list of all factor agents in the graph.

Type:

List[FactorAgent]

G

The underlying networkx graph structure.

Type:

nx.Graph

property lb: int | float

The lower bound of the problem, can be set externally.

property global_cost: int | float

Calculates the global cost based on current variable assignments.

This property queries each variable for its current assignment and uses the original, unmodified factor cost tables to compute the total cost.

property curr_assignment: Dict[VariableAgent, int]

The current assignment for all variables in the graph.

Type:

dict

property edges: Dict[FactorAgent, List[VariableAgent]]

Reconstructs the edge dictionary mapping factors to variables.

Type:

dict

set_computator(computator, **kwargs)[source]

Assigns a computator to all nodes in the graph.

Parameters:
  • computator (Computator) – The computator instance to assign.

  • **kwargs – Additional arguments (not currently used).

Return type:

None

normalize_messages()[source]

Normalizes all incoming messages for all variable nodes.

This is a common technique to prevent numerical instability in belief propagation algorithms by shifting message values.

Return type:

None

visualize()[source]

Visualizes the factor graph using matplotlib.

Variable nodes are drawn as circles, and factor nodes are drawn as squares.

Return type:

None

get_variable_agents()[source]

Returns a list of all variable agents in the graph.

Return type:

List[VariableAgent]

get_factor_agents()[source]

Returns a list of all factor agents in the graph.

Return type:

List[FactorAgent]

property diameter: int

The diameter of the factor graph.

If the graph is not connected, it returns the diameter of the largest connected component.

Type:

int

property original_factors: List[FactorAgent]

A deep copy of the original factor agents.

This is preserved to allow for calculating the true global cost, even if factor costs are modified during a simulation run.

Type:

list[FactorAgent]

Agents

class propflow.core.agents.FGAgent(name, node_type, domain)[source]

Bases: Agent, ABC

Abstract base class for belief propagation (BP) nodes.

Extends the Agent class with methods relevant to message passing, updating local belief, and retrieving that belief. It serves as a foundation for both VariableAgent and FactorAgent classes.

domain

The size of the variable domain.

Type:

int

mailer

Handles incoming and outgoing messages.

Type:

MailHandler

receive_message(message)[source]

Receives a message and adds it to the mailer’s inbox.

Parameters:

message (Message) – The message to be received.

Return type:

None

send_message(message)[source]

Sends a message to its recipient via the mailer.

Parameters:

message (Message) – The message to be sent.

Return type:

None

empty_mailbox()[source]

Clears all messages from the mailer’s inbox.

Return type:

None

empty_outgoing()[source]

Clears all messages from the mailer’s outbox.

property inbox: List[Message]

A list of incoming messages.

Type:

list[Message]

property outbox: List[Message]

A list of outgoing messages.

Type:

list[Message]

abstractmethod compute_messages()[source]

Abstract method to compute outgoing messages.

This method must be implemented by subclasses to define how messages are calculated based on the agent’s current state and incoming messages.

Return type:

List[Message]

Returns:

A list of messages to be sent.

property last_iteration: List[Message]

The last list of messages sent.

Type:

list[Message]

last_cycle(diameter=1)[source]

Retrieves messages from a previous cycle.

Parameters:

diameter (int) – The number of iterations in a cycle. Defaults to 1.

Return type:

List[Message]

Returns:

A list of messages from the specified previous cycle.

append_last_iteration()[source]

Appends the current outbox to the history.

Maintains a history of sent messages, limited by _max_history.

class propflow.core.agents.VariableAgent(name, domain)[source]

Bases: FGAgent

Represents a variable node in a factor graph.

This agent is responsible for aggregating messages from neighboring factor nodes to compute its belief over its domain.

computator

An object that handles the computation of messages and beliefs.

compute_messages()[source]

Computes outgoing messages to factor nodes.

Uses the assigned computator to calculate messages based on the contents of the inbox.

Return type:

None

property belief: ndarray

The current belief distribution over the variable’s domain.

Type:

np.ndarray

property curr_assignment: int | float

The current assignment for the variable.

Type:

int | float

class propflow.core.agents.FactorAgent(name, domain, ct_creation_func, param=None, cost_table=None)[source]

Bases: FGAgent

Represents a factor node in a factor graph.

This agent stores a cost function (or utility function) that defines the relationship between a set of connected variable nodes.

cost_table

The table of costs for each combination of assignments.

Type:

CostTable

connection_number

A mapping from variable names to their dimension index.

Type:

dict

ct_creation_func

A function to create the cost table.

Type:

Callable

ct_creation_params

Parameters for the cost table creation function.

Type:

dict

classmethod create_from_cost_table(name, cost_table)[source]

Creates a FactorAgent from an existing cost table.

Parameters:
  • name (str) – The name of the factor.

  • cost_table (CostTable) – The cost table to use.

Return type:

FactorAgent

Returns:

A new FactorAgent instance.

compute_messages()[source]

Computes messages to be sent to variable nodes.

Uses the assigned computator to calculate messages based on the cost table and incoming messages from variable nodes.

Return type:

None

initiate_cost_table()[source]

Creates the cost table using the provided creation function.

Raises:

ValueError – If the cost table already exists or if no connections are set.

Return type:

None

set_dim_for_variable(variable, dim)[source]

Maps a variable to a dimension in the cost table.

Parameters:
  • variable (VariableAgent) – The variable agent to map.

  • dim (int) – The dimension index in the cost table.

Return type:

None

set_name_for_factor()[source]

Sets the factor’s name based on its connected variables.

Raises:

ValueError – If no connections are set.

Return type:

None

save_original(ct=None)[source]

Saves a copy of the original cost table.

Parameters:

ct (CostTable, optional) – An external cost table to save. Defaults to None.

Return type:

None

property mean_cost: float

The mean value of the costs in the cost table.

Type:

float

property total_cost: float

The sum of all costs in the cost table.

Type:

float

property original_cost_table: ndarray | None

The original, unmodified cost table, if saved.

Type:

np.ndarray | None

Components

class propflow.core.components.Message(data, sender, recipient)[source]

Bases: object

Represents a message passed between agents in the belief propagation algorithm.

data

The content of the message, typically a numpy array representing costs or beliefs.

Type:

np.ndarray

sender

The agent sending the message.

Type:

Agent

recipient

The agent receiving the message.

Type:

Agent

copy()[source]

Creates a deep copy of the message.

Returns:

A new Message instance with copied data.

Return type:

Message

class propflow.core.components.MailHandler(_domain_size)[source]

Bases: object

Handles message passing with deduplication and synchronization.

This class manages an agent’s incoming and outgoing messages, ensuring that only the latest message from each sender is stored.

pruning_policy

An optional policy for selectively discarding messages.

set_pruning_policy(policy)[source]

Sets a message pruning policy.

Parameters:

policy – An object with a should_accept_message method.

Return type:

None

set_first_message(owner, neighbor)[source]

Initializes the inbox with a zero-message from a neighbor.

This is used to ensure that an agent has a message from each neighbor before computation begins.

Parameters:
  • owner (FGAgent) – The agent who owns this mail handler.

  • neighbor (FGAgent) – The neighboring agent to initialize a message from.

Return type:

None

receive_messages(messages)[source]

Receives and handles one or more messages.

Applies a pruning policy if one is set and stores the message, overwriting any previous message from the same sender.

Parameters:

messages (Message | list[Message]) – A single Message or a list of Message objects.

Return type:

None

send()[source]

Sends all staged outgoing messages to their recipients.

Return type:

None

stage_sending(messages)[source]

Stages a list of messages to be sent.

Parameters:

messages (List[Message]) – The messages to be sent.

Return type:

None

prepare()[source]

Clears the outbox, typically after messages have been sent.

Return type:

None

clear_inbox()[source]

Clears all messages from the inbox.

Return type:

None

clear_outgoing()[source]

Clears all messages from the outbox.

Return type:

None

property inbox: List[Message]

A list of incoming messages.

Type:

list[Message]

property outbox: List[Message]

A list of outgoing messages.

Type:

list[Message]

Policies

Policies used by belief propagation bp.

propflow.policies.damp(variable, x=None)[source]

Applies damping to the outgoing messages of a single variable agent.

This function updates each outgoing message in the variable’s outbox by blending it with the corresponding message from the previous iteration.

The update rule is: new_message = x * previous_iteration_message + (1 - x) * current_message

Parameters:
  • variable (VariableAgent) – The VariableAgent whose outbox messages will be damped.

  • x (float) – The damping factor, representing the weight of the previous message. If None, the default from POLICY_DEFAULTS is used.

Return type:

None

propflow.policies.TD(variables, x=None, diameter=None)[source]

Applies temporal damping to the outgoing messages of a list of variables.

This function applies damping using messages from a previous cycle, determined by the diameter. The new message is a weighted average of the message from diameter iterations ago and the current message.

The update rule is: new_message = x * previous_cycle_message + (1 - x) * current_message

Parameters:
  • variables (List[VariableAgent]) – A list of VariableAgent objects to apply damping to.

  • x (float) – The damping factor, representing the weight of the previous message. If None, the default from POLICY_DEFAULTS is used.

  • diameter (int) – The number of iterations in a cycle, used to retrieve the message from the previous cycle. If None, the default from POLICY_DEFAULTS is used.

Return type:

None

propflow.policies.cost_reduction_all_factors_once(fg, x)[source]

Applies a one-time cost reduction to all factors in the graph.

This function iterates through all factor agents in the provided factor graph and multiplies their cost tables by a given factor. It also saves the original cost table before modification.

Parameters:
  • fg (FactorGraph) – The FactorGraph to modify.

  • x (float) – The multiplicative factor to apply to the cost tables.

Return type:

None

propflow.policies.discount_attentive(fg)[source]

Applies an attentive discount to incoming messages for all variable nodes.

The discount factor for each variable’s messages is inversely proportional to the variable’s degree in the factor graph. This means variables with more connections (higher degree) will have their incoming messages discounted more heavily.

Parameters:

fg (FactorGraph) – The FactorGraph containing the variables to be updated.

Return type:

None

propflow.policies.split_all_factors(fg, p=None)[source]

Performs an in-place replacement of every factor with two cloned factors.

Each factor f with cost table C is replaced by two new factors, f’ and f’’, with cost tables p*C and (1-p)*C, respectively. The new factors inherit all the connections of the original factor.

This function directly modifies the provided FactorGraph object, including its underlying networkx.Graph and its list of factors.

Parameters:
  • fg (FactorGraph) – The FactorGraph to modify.

  • p (float) – The splitting proportion, which must be between 0 and 1. This determines how the original cost is distributed between the two new factors. If None, the default from POLICY_DEFAULTS is used.

Raises:

AssertionError – If p is not in the range (0, 1).

Return type:

None

class propflow.policies.MessagePruningPolicy(prune_threshold=None, min_iterations=5, adaptive_threshold=True)[source]

Bases: Policy

A policy that prunes messages that have not changed significantly.

This policy helps to optimize belief propagation by avoiding the processing of messages that are redundant. It compares the norm of the difference between a new message and the previous message from the same sender.

prune_threshold

The threshold for pruning.

Type:

float

min_iterations

The number of initial iterations during which no pruning will occur.

Type:

int

adaptive_threshold

If True, the threshold is scaled by the magnitude of the message.

Type:

bool

iteration_count

The current iteration number.

Type:

int

pruned_count

The total number of pruned messages.

Type:

int

total_count

The total number of messages considered for pruning.

Type:

int

get_stats()[source]

Returns statistics about the pruning process.

Return type:

Dict[str, float]

Returns:

A dictionary containing the pruning rate, total messages considered, number of pruned messages, and total iterations.

reset()[source]

Resets the policy’s internal state for a new simulation run.

Return type:

None

should_accept_message(agent, new_message)[source]

Determines whether to accept or prune an incoming message.

Parameters:
  • agent (FGAgent) – The agent receiving the message.

  • new_message (Message) – The new Message object.

Return type:

bool

Returns:

True if the message should be accepted, False if it should be pruned.

step_completed()[source]

Signals that a simulation step has completed, incrementing the iteration count.

Return type:

None

propflow.policies.normalize_inbox(variables)[source]

Normalizes all messages in the inboxes of a list of variable agents.

This function iterates through each variable’s inbox and last iteration’s messages, normalizing them by subtracting the minimum value. This centers the message values around zero without changing their relative differences.

Parameters:

variables (List[VariableAgent]) – A list of VariableAgent objects whose messages will be normalized.

Return type:

None

propflow.policies.normalize_soft_max(cost_table)[source]

Normalizes a cost table using the softmax function.

This ensures all values are positive and sum to 1, effectively converting costs into a probability distribution. A stability trick (subtracting the max value) is used to prevent overflow with large cost values.

Parameters:

cost_table (ndarray) – An n-dimensional numpy array representing the cost table.

Return type:

ndarray

Returns:

The normalized cost table.

propflow.policies.normalize_cost_table_sum(cost_table)[source]

Normalizes a cost table so that the sum across all dimensions is equal.

Note

The implementation of this function appears to be incorrect for its stated purpose and may not produce the intended normalization.

Parameters:

cost_table (ndarray) – An n-dimensional numpy array representing the cost table.

Return type:

ndarray

Returns:

The modified cost table.

propflow.policies.init_normalization(li)[source]

Performs an initial normalization of cost tables for a list of factors.

This function divides each factor’s cost table by the total number of factors in the list.

Parameters:

li (List[FactorAgent]) – A list of FactorAgent objects to be normalized.

Return type:

None

Utilities

propflow.utils.dummy_func(*args, **kwargs)[source]

A dummy function that does nothing and returns nothing.

This function serves as a placeholder for callbacks or functions that are optional or not yet implemented, allowing the main logic to proceed without raising errors.

Parameters:
  • *args (Any) – Accepts any positional arguments.

  • **kwargs (Any) – Accepts any keyword arguments.

Return type:

None

class propflow.utils.FGBuilder[source]

Bases: object

A builder class providing static methods to construct factor graphs.

static build_cycle_graph(num_vars, domain_size, ct_factory, ct_params, **kwargs)[source]

Builds a factor graph with a simple cycle topology.

The graph structure is x1 – f12 – x2 – … – xn – fn1 – x1.

Parameters:
  • num_vars (int) – The number of variables in the cycle.

  • domain_size (int) – The size of the domain for each variable.

  • ct_factory (Union[Callable, CTFactory, str]) – The factory for creating cost tables.

  • ct_params (Dict[str, Any]) – Parameters for the cost table factory.

  • **kwargs – Catches unused arguments like density for API consistency.

Return type:

FactorGraph

Returns:

A FactorGraph instance with a cycle topology.

static build_random_graph(num_vars, domain_size, ct_factory, ct_params, density)[source]

Builds a factor graph with random binary constraints.

Parameters:
  • num_vars (int) – The number of variables in the graph.

  • domain_size (int) – The size of the domain for each variable.

  • ct_factory (Union[Callable, CTFactory, str]) – The factory for creating cost tables.

  • ct_params (Dict[str, Any]) – Parameters for the cost table factory.

  • density (float) – The density of the graph (probability of an edge).

Return type:

FactorGraph

Returns:

A FactorGraph instance with a random topology.

propflow.utils.find_project_root()[source]

Finds the project root directory by searching for common markers.

This function starts from the current working directory and traverses up the directory tree, looking for files or directories like .git, pyproject.toml, or setup.py that typically indicate the root of a project.

Return type:

Path

Returns:

A Path object representing the project root directory, or the current working directory if no project root markers are found (e.g., when installed as a package).

class propflow.utils.CTFactory(*values)

Bases: Enum

poisson = 'poisson'
random_int = 'random_int'
uniform_float = 'uniform_float'

Simulator

A parallelized simulator for running and comparing multiple engine configurations.

This module provides a Simulator class that can run multiple belief propagation engine configurations across a set of factor graphs in parallel. It uses Python’s multiprocessing module to distribute the simulation runs, collects the results, and provides a simple plotting utility to visualize and compare the performance of different engines.

class propflow.simulator.Simulator(engine_configs, log_level=None)[source]

Bases: object

Orchestrates parallel execution of multiple simulation configurations.

This class takes a set of engine configurations and a list of factor graphs, runs each engine on each graph in parallel, collects the cost history from each run, and provides methods to visualize the aggregated results.

engine_configs

A dictionary mapping engine names to their configurations.

Type:

dict

logger

A configured logger instance.

Type:

Logger

results

A dictionary to store the results of the simulations.

Type:

dict

timeout

The timeout in seconds for multiprocessing tasks.

Type:

int

run_simulations(graphs, max_iter=None)[source]

Runs all engine configurations on all provided graphs in parallel.

Parameters:
  • graphs (List[Any]) – A list of factor graph objects to run simulations on.

  • max_iter (Optional[int]) – The maximum number of iterations for each simulation run.

Return type:

Dict[str, List[List[float]]]

Returns:

A dictionary containing the collected results, where keys are engine names and values are lists of cost histories for each run.

plot_results(max_iter=None, verbose=False)[source]

Plots the average cost convergence for each engine configuration.

Parameters:
  • max_iter (Optional[int]) – The maximum number of iterations to display on the plot.

  • verbose (bool) – If True, plots individual simulation runs with transparency and standard deviation bands around the average.

Return type:

None

set_log_level(level)[source]

Sets the logging level for the simulator’s logger.

Parameters:

level (str) – The desired logging level (e.g., ‘INFO’, ‘DEBUG’).

Return type:

None