from typing import Tuple
import numpy as np
import scipy.sparse as sp
import networkx as nx
from math import modf
from eqc_models.graph.base import GraphModel
[docs]
class GraphPartitionModel(GraphModel):
"""
A model for graph partitioning into `k` parts with objective and constraints
derived from the Laplacian matrix and additional penalties for balance and constraints.
"""
def __init__(self, G: nx.Graph, k: int = 2, weight: str = "weight", alpha: float = 1.0, beta_obj: float = 1.0,
gamma: float = 1.0):
"""
Parameters:
-----------
G : nx.Graph
The graph to partition.
k : int
The number of partitions.
weight : str
The key for edge weights in the graph.
alpha : float
The penalty multiplier for balance constraints.
beta_obj : float
The penalty multiplier for minimizing edge cuts (Laplacian term).
gamma : float
The penalty multiplier for assignment constraints.
"""
self._G = G
self._k = k
self._weight = weight
self._alpha = alpha
self._beta_obj = beta_obj
self._gamma = gamma
self._laplacian = nx.laplacian_matrix(G, weight=weight)
self._num_nodes = G.number_of_nodes()
self._sorted_nodes = sorted(G.nodes)
self._constraints_offset = 0
self._balanced_partition_offset = 0
self.set_and_validate_k()
self._objective_matrix = self.initialize_model()
super().__init__(self._G)
[docs]
def set_and_validate_k(self):
"""
Sets k and encoding length for a graph problem
"""
# modf(x) = (fractional, integer) decomposition.
# Make sure fractional portion is zero. Convert to int if so.
assert modf(self._k)[0] == 0, "'k' must be an integer."
# it's an int, so set self.k
self._k = int(self._k)
# Verify k >= 2
assert self._k >= 2, f"ERROR, k={self._k}: k must be greater than or equal to 2."
# Verify that k makes sense
assert self._k <= self._num_nodes, (
f"ERROR, k={self._k}: k must be less than number of nodes or variables. k = {self._k} and "
f"number of nodes = {self._num_nodes}"
)
[docs]
def initialize_model(self):
"""
Build the objective matrix and constraints for the k-partition problem.
"""
if self._k == 2:
# For 2 partitions, construct a simpler QUBO from the Laplacian matrix
return self.get_two_partition_qubo()
else:
# For k > 2, construct a block-diagonal Laplacian with balance and constraints
laplacian_blocks = 0.5 * sp.block_diag([self._laplacian] * self._k, format="csr")
balance_term = self.get_balanced_partition_term()
constraints = self.get_constraints()
return (
self._alpha * balance_term
+ self._gamma * constraints
+ self._beta_obj * laplacian_blocks
)
[docs]
def get_balanced_partition_term(self) -> sp.spmatrix:
"""
Construct the quadratic penalty term for balanced partitions.
"""
I_k = sp.identity(self._k)
Ones_n = np.ones((self._num_nodes, self._num_nodes))
balanced_partition_term = sp.kron(I_k, Ones_n, format="csr")
balanced_partition_term -= (
2 * self._num_nodes / self._k * sp.identity(balanced_partition_term.shape[0])
)
self._balanced_partition_offset = self._num_nodes**2 / self._k
return balanced_partition_term
[docs]
def get_constraints(self) -> sp.spmatrix:
"""
Construct the quadratic penalty term for assignment constraints.
"""
I_n = sp.identity(self._num_nodes)
Ones_k = np.ones((self._k, self._k))
constraints = sp.kron(Ones_k, I_n, format="csr")
constraints -= 2 * sp.identity(constraints.shape[0])
self._constraints_offset = self._num_nodes
return constraints
[docs]
def get_two_partition_qubo(self) -> sp.spmatrix:
"""
Construct the QUBO matrix for two partitions using adjacency and penalties.
"""
Garr = nx.to_scipy_sparse_matrix(self._G, weight=self._weight, nodelist=self._sorted_nodes)
Q = (
self._alpha * np.ones(Garr.shape, dtype=np.float32)
- self._beta_obj * Garr
)
degrees = Garr.sum(axis=1).A1 # Convert sparse matrix to 1D array
diag = self._beta_obj * degrees - self._alpha * (self._num_nodes - 1)
np.fill_diagonal(Q, diag)
return sp.csr_matrix(Q)
[docs]
def evaluate(self, solution: np.ndarray) -> float:
"""
Evaluate the objective function for a given solution.
"""
assert len(solution) == self._objective_matrix.shape[0], "Solution size mismatch."
return float(solution.T @ self._objective_matrix @ solution)
[docs]
def decode(self, solution: np.ndarray) -> dict:
"""
Decode the solution vector into a partition assignment.
"""
if self._k == 2:
return {node: int(solution[i]) for i, node in enumerate(self._sorted_nodes)}
else:
partitions, nodes = np.where(solution.reshape((self._k, self._num_nodes)) == 1)
return {self._sorted_nodes[node]: int(partition) for partition, node in zip(partitions, nodes)}
[docs]
def costFunction(self) -> Tuple[np.ndarray, np.ndarray]:
"""
Return the linear and quadratic components of the objective function.
"""
Q = self._objective_matrix
h = Q.diagonal()
J = 2 * sp.triu(Q, k=1).tocsr() # Extract upper triangular part for quadratic terms
return h, J