Source code for eqc_models.algorithms.alm

# (C) Quantum Computing Inc., 2025.
"""
============================
Augmented Lagrangian Method
============================

Overview
============================

The augmented Lagrangian method algorithm in eqc-models is for automating parameter
adjustments to achieve feasible solutions to constrained optimization models using
entropy quantum computing devices. It supports fractional (continuous), binary and
general integer variables. ALM also supports enforcement of one-hot constraints,
inequality constraints and equality constraints without introduciton of slack 
variables.

This method requires a different approach to modeling than used in model implementations
of constrained optimization models as direct subclasses of `EqcModel`. To use this approach,
implement an objective function in one of the subclasses of `EqcModel`. The next step is to 
use a `ConstraintRegistry` object to describe the model constraints. There are many 
parameters available inside the `ALMConfig` class. Only a few of these parameters should 
be adjusted by novice users. Once the model, constraint registry and config objects are 
ready, pass them to an `ALMAlgorithm` instance with a solver and it will iterate over 
parameter adjustments until feasibility tolerance is found or maximum iteration count 
is exceeded.

The following example is a basic outline of how to use the ALM implementation to
solve models.

:: 

    model = PolynomialModel()
    solver = Dirac3ContinuousCloudSolver()
    solver_kwargs = {} # add solver parameters as dict here
    config = ALMConfig() # add configuration parameters here
    registry = ConstraintRegistry()
    # add variable domains and constraints here
    algorithm = ALMAlgorithm()
    response = algorithm.run(model, registry, solver, config, **solver_kwargs) # keyword arguments are passed to the solver

Model Creation
============================

Definining the objective function for the optimization problem can be done in a number of ways.
There are two base classes on which all other models are based and they define how 
function coefficients are specified. With a `QuadraticModel`, the coefficients are
specified using a one-dimensional array for the linear terms and a two-dimensional,
symmetric array or Hermitian matrix for quadratic terms. The `PolynomialModel` way uses two lists,
one for indices of terms and another for the coefficients of terms. These terms can be 
up to degree five. The two classes just described are preferred for use with ALM because 
most other model classes incorporate constraints into the operator. In the future, ALM will 
provide a way to extract the objective function and constraints from the model separately,
populating the `ConstraintRegistry` automatically. 
First, choose variable definitions by describing the variable names in a single list. 
Then organize the terms in ascending lexicographic order by index. Finally, create a
list with coefficient entries corresponding to the index list.
The example problem used here is a simple logical decision of one variable among 10. 
Those familiar with QUBO formulations
will recognize the penalties in the quadratic coefficients. This first example sets
the one-hot constraint as penalties in the objective function and adds block constraints
to each of the variables to enforce 0 or 1. This has the effect of using two time bins
per binary variable. 

>>> import warnings; warnings.filterwarnings("ignore") # silence warnings
>>> n = 10
>>> variables = [f"x_{i}" for i in range(n)]

Define the indices and coefficients. The solution is `x_1 == 1` and all other `x_i == 0`.
Variable indices sent to the device are 1-based.

>>> indices = [(0, i+1) for i in range(n)] + [(i+1, j+1) for i in range(10) for j in range(i, n)]
>>> coefficients = [-i if i==n-1 else 0 for i in range(n)] + [2 for i in range(n) for j in range(i, n)]

Define the model, setting the upper bound to 1 on all varaibles.

>>> model = PolynomialModel(coefficients, indices)
>>> model.variables = variables
>>> model.variables[0] == "x_0"
True
>>> model.upper_bound = np.ones((n,))
>>> model.is_discrete = [True for i in range(n)] # for future modeling support

Begin the ALM setup. One of the features of this algorithm in eqc-models is the ability to enforce
discrete variables while using the continuous solver. This can allow for mixed integer formulations.
For binary variables, add a block for each one so that it is forced to 0 or 1. The `sum_to_one`
parameter is normally true and setting to false here is done because the simplicity of the problem
allows a more efficient approach.

>>> reg = ConstraintRegistry()
>>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)

Configure `rho_h` (equality constraints) to start at 1 and only allow 4 iterations. Also
set tolerance high.

>>> config = ALMConfig(tol_h=0.5, max_outer=4, rho_h=1, rho_g=1, gamma_up=2.0, gamma_down=1.0)
>>> from eqc_models.solvers import Dirac3ContinuousCloudSolver
>>> solver = Dirac3ContinuousCloudSolver()
>>> algo = ALMAlgorithm()
>>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
 20...COMPLETE...
[ALM 00] ...
...
>>> results["decoded"] # had to hard-code to n==10
{0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}

Equality constraints
========================

This ALM algorithm can seek feasible solutions with targeted multipliers
rather than adding penalty multipliers into the objective coefficients and 
adjusting manually, which would be required for more complex problems. 
These constraints should be specified in the constraint registry. Specification
of the binary variables using the `add_block` call sets up the domain.
The objective function is formulated directly, purely as the cost of each variable,
which is the negative value of the variable index.

>>> indices = [(0, i+1) for i in range(n)]
>>> coefficients = [-i for i in range(n)]
>>> model = PolynomialModel(coefficients, indices)
>>> model.upper_bound = np.ones((n, ))
>>> model.is_discrete = [True for i in range(n)]
>>> reg = ConstraintRegistry()
>>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)

Now, a new concept. Instead of placing the penalties explicitly in the objective, this statement
enforces the one-hot constraint by defining a function which is 0 when true and 
either positive or negative when false. The lambda function is passed an encoded 
array for the variable values, so the array must be decoded before evaluating
the function. To assist in this, a helper function called `level_decoder` is
available for import. Use this decoder and the solution x to extract the 
values of the variables.

>>> decoder = level_decoder(n, [0, 1])
>>> test_x = np.zeros((20,))
>>> test_x[::2] = 1
>>> (decoder@test_x==0).all()
True
>>> config.max_outer = 15 # added complexity in the definition requires more steps
>>> config.gamma_up = 1.2
>>> config.gamma_down = 0.9
>>> reg.add_equality(lambda x: np.sum(decoder@x) - 1)

Everything else is the same as in the previous example.

>>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
 20...COMPLETE...
[ALM 00] ...
...
>>> results["decoded"] # had to hard-code to n==10
{0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}

Inequality Constraints
========================

Incorporating inequality constraints is not much different. Specify an inequality
constraint by using the `ConstraintRegistry.add_inequality` method to pass a 
function which is zero or negative when the constraint is met and positive when
the constraint is not met. This is straightforward for less than or equal to 
constraints and requires negating the function when the constraint is for greater
than or equal. Here is an example showing a model where less than two variables
are non zero. When adding the binary variable blocks, the `sum_to_one` parameter
is left out, which results in the default of true. The use of the sum to one
parameter gives the algorithm more control over the dual variable computations.

>>> reg = ConstraintRegistry()
>>> for i in range(n): reg.add_block([i], levels=[0, 1])
>>> reg.add_inequality(lambda x: np.sum(decoder@x) - 2) # reuse the decoder from the previous example
>>> config.gamma_up = 1.2 # slower adjustment
>>> config.gamma_down = 0.8 # slower adjustment
>>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
 20...COMPLETED...
 [ALM 00] ...
 [ALM 05] ...
 ...
>>> results["decoded"]
{0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 1.0, 9: 1.0}

"""
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Tuple, Optional, Sequence, Union
import numpy as np
from collections import defaultdict
from eqc_models.base.polynomial import PolynomialModel
from eqc_models.base.operators import Polynomial
from eqc_models.base.results import SolutionResults
from eqc_models.algorithms.bcd import (
    BCDConfig,
    BCDRegistry,
    BlockCoordinateDescentAlgorithm,
)

Array = np.ndarray
PolyTerm = Tuple[Tuple[int, ...], float]

[docs] def level_decoder(n, levels): """ Return an operator that transforms an encoded solution to a solution with a single value per variable. For instance, a sequence of two level variables with levels 0 and 1 has values `[0, 1, 1, 0]` is decoded into `[1, 0]` with the operator:: [[ 0, 1], [ 0, 1]] and used with the expression `op@variable`. >>> decoder = level_decoder(3, [0, 1, 2]) >>> values = np.array([1, 0, 0, 0, 1, 0, 0, 0, 1]) >>> decoder@values array([0., 1., 2.]) """ level_count = len(levels) decoder = np.zeros((n, n*level_count)) for i in range(n): decoder[i, i*level_count:(i+1)*level_count] = levels return decoder
[docs] @dataclass class ALMConstraint: """One constraint family; fun returns a vector; jac returns its Jacobian.""" kind: str # "eq" or "ineq" fun: Callable[[Array], Array] # h(x) or g(x) jac: Optional[Callable[[Array], Array]] = None name: str = "" family: str = ""
[docs] @dataclass class ALMBlock: """Lifted discrete variable block (optional).""" idx: Sequence[int] # indices of block in the full x levels: Array # (k,) level values (b_i) enforce_sum_to_one: bool = True # register as equality via helper enforce_one_hot: bool = True # ALM linearization with M = 11^T - I
[docs] @dataclass class ALMConfig: # penalties rho_h: float = 1.0 # equalities rho_g: float = 1.0 # inequalities / one-hot rho_min: float = 1e-3 rho_max: float = 1e3 # per-family penalty overrides rho_eq_family: Dict[str, float] = field(default_factory=dict) rho_ineq_family: Dict[str, float] = field(default_factory=dict) # allow per-family adaptation toggle (optional) adapt_by_family: bool = True # adaptation toggles adapt: bool = True tau_up_h: float = 1.05 tau_down_h: float = 0.95 tau_up_g: float = 0.90 tau_down_g: float = 0.50 gamma_up: float = 1.2 gamma_down: float = 0.9 # tolerances & loop tol_h: float = 1e-6 tol_g: float = 1e-6 max_outer: int = 100 # stagnation safety net use_stagnation_bump: bool = False patience_h: int = 10 patience_g: int = 10 stagnation_factor: float = 1e-3 # smoothing (optional) ema_alpha: float = 1.0 # finite diff (only used if jac=None) fd_eps: float = 1e-6 # activation threshold for projected ALM act_tol: float = 1e-10 # Optional: BCD inner solve (decomposition) bcd: BCDConfig = field(default_factory=BCDConfig)
[docs] class ConstraintRegistry: """ Holds constraints and block metadata; keeps ALMAlgorithm stateless. Register constraints and (optional) lifted-discrete blocks here. """ def __init__(self): self.constraints: List[ALMConstraint] = [] self.blocks: List[ALMBlock] = []
[docs] def add_equality(self, fun, jac=None, name="", family=""): self.constraints.append(ALMConstraint("eq", fun, jac, name, family))
[docs] def add_inequality(self, fun, jac=None, name="", family=""): self.constraints.append(ALMConstraint("ineq", fun, jac, name, family))
[docs] def add_block(self, idx: Sequence[int], levels: Array, sum_to_one=True, one_hot=True): self.blocks.append(ALMBlock(list(idx), np.asarray(levels, float), sum_to_one, one_hot))
[docs] class ALMAlgorithm: """Stateless ALM outer loop. Call `run(model, registry, core, cfg, **core_kwargs)`.""" # ---- helpers (static) ---- @staticmethod def _finite_diff_jac(fun: Callable[[Array], Array], x: Array, eps: float) -> Array: y0 = fun(x) m = int(np.prod(y0.shape)) y0 = y0.reshape(-1) n = x.size J = np.zeros((m, n), dtype=float) for j in range(n): xp = x.copy() xp[j] += eps J[:, j] = (fun(xp).reshape(-1) - y0) / eps return J @staticmethod def _pairwise_M(k: int) -> Array: return np.ones((k, k), dtype=float) - np.eye(k, dtype=float) @staticmethod def _sum_to_one_selector(n: int, idx: Sequence[int]) -> Array: S = np.zeros((1, n), dtype=float) S[0, np.array(list(idx), int)] = 1.0 return S @staticmethod def _make_sum1_fun(S): return lambda x: S @ x - np.array([1.0]) @staticmethod def _make_sum1_jac(S): return lambda x: S @staticmethod def _make_onehot_fun(sl, M): sl = np.array(sl, int) def _f(x): s = x[sl] return np.array([float(s @ (M @ s))]) # shape (1,) return _f @staticmethod def _make_onehot_jac(sl, M, n): sl = np.array(sl, int) def _J(x): s = x[sl] grad_blk = 2.0 * (M @ s) # (k,) J = np.zeros((1, n), dtype=float) # shape (1, n) J[0, sl] = grad_blk return J return _J @staticmethod def _as_float(value) -> float: arr = np.asarray(value, dtype=float).reshape(-1) return float(arr[0]) if arr.size else 0.0 @staticmethod def _polynomial_from_terms(poly_terms: List[PolyTerm]) -> Polynomial: if not poly_terms: return Polynomial([], []) idxs, coeffs = zip(*poly_terms) return Polynomial(list(coeffs), list(idxs)) @staticmethod def _poly_value(poly_terms: List[PolyTerm], x: Array) -> float: """Backward-compatible wrapper around Polynomial.evaluate.""" if not poly_terms: return 0.0 poly = ALMAlgorithm._polynomial_from_terms(poly_terms) return ALMAlgorithm._as_float(poly.evaluate(np.asarray(x, dtype=float))) @staticmethod def _merge_poly(poly_terms: Optional[List[PolyTerm]], Q_aug: Optional[Array], c_aug: Optional[Array]) -> List[PolyTerm]: """ Merge ALM's quadratic/linear increments (Q_aug, c_aug) into the base polynomial term list `poly_terms`. If 'poly_terms' is None, then turn x^T Q_aug x + c_aug^T x into polynomial monomials. Terms are of the form: ((0, i), w) for linear, ((i, j), w) for quadratic. """ merged = list(poly_terms) if poly_terms is not None else [] if Q_aug is not None: Qs = 0.5 * (Q_aug + Q_aug.T) n = Qs.shape[0] for i in range(n): # diagonal contributes Qii * x_i^2 if Qs[i, i] != 0.0: merged.append(((i + 1, i + 1), float(Qs[i, i]))) for j in range(i + 1, n): q = 2.0 * Qs[i, j] # x^T Q x -> sum_{i<j} 2*Q_ij x_i x_j if q != 0.0: merged.append(((i + 1, j + 1), float(q))) if c_aug is not None: for i, ci in enumerate(c_aug): if ci != 0.0: merged.append(((0, i + 1), float(ci))) return merged @staticmethod def _block_offsets(blocks: List[ALMBlock]) -> List[int]: """ Return starting offsets (0-based) for each lifted block in the concatenated s-vector. """ offs, pos = [], 0 for blk in blocks: offs.append(pos) pos += len(blk.levels) # each block contributes k lifted coordinates return offs
[docs] @staticmethod def lift_Qc_to_poly_terms( Q_native: np.ndarray, # shape (m, m) over original discrete vars (one block per var) c_native: np.ndarray, # shape (m,) blocks: List[ALMBlock], # ALM blocks (in the same order as the rows/cols of Q_native, c_native) ) -> Tuple[List[Tuple[int, ...]], List[float]]: """ Expand a quadratic/linear objective over m native discrete variables into the lifted (one-hot) s-space. Given `Q_native` and `c_native` over original m discrete vars (one block per var), and `blocks` (`list[ALMBlock]` in the same order as original vars), we enforce: - For variable i with level values b_i (length k_i), we create k_i lifted coords s_{i,a}. - Quadratic term: J_ij = Q_native[i,j] * (b_i b_j^T) contributes to pairs (s_{i,a}, s_{j,b}) - Linear term: C_i = c_native[i] * b_i contributes to s_{i,a} Returns polynomial (indices, coeffs) over concatenated s variables. """ m = len(blocks) assert Q_native.shape == (m, m), "Q_native must match number of blocks" assert c_native.shape == (m,), "c_native must match number of blocks" offs = ALMAlgorithm._block_offsets(blocks) terms_acc = defaultdict(float) # Quadratic lift: J = kron-expansion for i in range(m): bi = blocks[i].levels[:, None] # (k_i, 1) oi = offs[i] for j in range(m): bj = blocks[j].levels[:, None] # (k_j, 1) oj = offs[j] J_ij = Q_native[i, j] * (bi @ bj.T) # (k_i, k_j) if np.allclose(J_ij, 0.0): continue ki, kj = bi.shape[0], bj.shape[0] for a in range(ki): ia = oi + a + 1 for b in range(kj): jb = oj + b + 1 w = float(J_ij[a, b]) if w == 0.0: continue if ia == jb: terms_acc[(ia, ia)] += w else: # NOTE: we store each cross monomial once (i < j); for i==j block pairs, # double to represent J_ab + J_ba if i == j: # intra-block off-diagonal needs 2x w *= 2.0 i1, i2 = (ia, jb) if ia < jb else (jb, ia) terms_acc[(i1, i2)] += w # Linear lift: C_i = L_i * b_i for i in range(m): b = blocks[i].levels oi = offs[i] for a, val in enumerate(b): ia = oi + a + 1 w = float(c_native[i] * val) if w != 0.0: terms_acc[(0, ia)] += w # Pack to PolynomialModel format indices = [tuple(k) for k in terms_acc.keys()] coeffs = [float(v) for v in terms_acc.values()] return indices, coeffs
@staticmethod def _make_bcd_registry( registry: "ConstraintRegistry", *, lifted_slices: Optional[List[List[int]]] = None, ) -> BCDRegistry: """ Translate ALM's registry into the standalone BCD registry. Important: - In native/non-lifted mode, BCD blocks should use the original `blk.idx`. - In lifted mode, BCD blocks must use the current working-coordinate slices (`lifted_slices`), not the original native indices. """ breg = BCDRegistry() for i, blk in enumerate(registry.blocks): if lifted_slices is None: bcd_idx = list(int(j) for j in blk.idx) else: bcd_idx = list(int(j) for j in lifted_slices[i]) breg.add_block( bcd_idx, name=f"block_{i}", levels=None if blk.levels is None else np.asarray(blk.levels, float), ) for c in registry.constraints: if c.kind == "eq": breg.add_equality(c.fun, name=c.name, family=c.family) elif c.kind == "ineq": breg.add_inequality(c.fun, name=c.name, family=c.family) return breg # ---- main entrypoint ----
[docs] @staticmethod def run( base_model: PolynomialModel, registry: ConstraintRegistry, solver, cfg: ALMConfig = ALMConfig(), x0: Optional[Array] = None, *, parse_output=None, verbose: bool = True, **solver_kwargs, ) -> Dict[str, Union[Array, Dict[int, float], Dict]]: """ Solve with ALM. Keep all ALM state local to this call (no global side-effects). Handles three modes: (A) No blocks -> continuous; use base_model as-is (B) Blocks + native base_model -> lift to s-space (C) Blocks + already-lifted base_model -> use as-is (compat) Returns: { "x": final iterate, "decoded": {start_idx_of_block: level_value, ...} for lifted blocks, "decoded_debug": {start_idx_of_block: native_device_value, ...}, "hist": { "eq_inf": [...], "ineq_inf": [...], "obj": [...], "x": [...] } } """ blocks = registry.blocks has_blocks = len(blocks) > 0 # ---- choose working model + dimension n ---- if not has_blocks: # (A) continuous case: use the provided model directly model = base_model # Prefer model.n, fall back to bounds; else infer from polynomial indices n = getattr(model, "n", None) if n is None: ub = getattr(model, "upper_bound", None) lb = getattr(model, "lower_bound", None) if ub is not None: n = len(ub) elif lb is not None: n = len(lb) else: # infer from polynomial terms n = 0 for inds in getattr(model, "indices", getattr(model.polynomial, "indices", [])): for j in inds: if j > 0: n = max(n, j) lifted_slices: List[List[int]] = [] else: # (B/C) lifted (discrete) case target_lifted_n = sum(len(blk.levels) for blk in blocks) base_n = getattr(base_model, "n", None) def _infer_n_from_terms(pm: PolynomialModel) -> int: inds_list = getattr(pm, "indices", getattr(pm.polynomial, "indices", [])) mx = 0 for inds in inds_list: for j in inds: if j > mx: mx = j return int(mx) if base_n is None: base_n = _infer_n_from_terms(base_model) # detect "already-lifted" native input (compat path) already_lifted = (base_n == target_lifted_n) if already_lifted: # (C) use provided model directly; assume bounds already sensible model = base_model n = target_lifted_n else: # (B) lift from native space # base_model must expose coefficients/indices compatible with this call c_base, Q_base = base_model._quadratic_polynomial_to_qubo_coefficients( getattr(base_model, "coefficients", getattr(base_model.polynomial, "coefficients", [])), getattr(base_model, "indices", getattr(base_model.polynomial, "indices", [])), getattr(base_model, "n") ) assert Q_base.shape[0] == Q_base.shape[1] assert c_base.shape[0] == Q_base.shape[0] indices_lifted, coeffs_lifted = ALMAlgorithm.lift_Qc_to_poly_terms(Q_base, c_base, blocks) model = PolynomialModel(coeffs_lifted, indices_lifted) n = target_lifted_n # set canonical [0,1] bounds for lifted s setattr(model, "lower_bound", np.zeros(n, float)) setattr(model, "upper_bound", np.ones(n, float)) # ---- n and lifted_slices ---- lifted_slices = [] pos = 0 for blk in blocks: k = len(blk.levels) # number of lifted coords for this block lifted_slices.append(list(range(pos, pos + k))) # 0-based in lifted x pos += k # Algorithm initial solution and bounds lb = getattr(model, "lower_bound", None) ub = getattr(model, "upper_bound", None) if x0 is not None: x = np.asarray(x0, float).copy() else: # default init if (lb is not None) and (ub is not None) and np.all(np.isfinite(lb)) and np.all(np.isfinite(ub)): x = 0.5 * (np.asarray(lb, float) + np.asarray(ub, float)) else: x = np.zeros(n, float) # ---- collect constraints ---- problem_eqs = [c for c in registry.constraints if c.kind == "eq"] problem_ineqs = [c for c in registry.constraints if c.kind == "ineq"] # auto-install sum-to-one and one-hot as equalities # (One-hot: s^T (11^T - I) s = 0)) def _install_block_equalities() -> List[ALMConstraint]: if not has_blocks: return [] eqs: List[ALMConstraint] = [] for blk, lift_idx in zip(registry.blocks, lifted_slices): if blk.enforce_sum_to_one: S = ALMAlgorithm._sum_to_one_selector(n, lift_idx) eqs.append(ALMConstraint( "eq", fun=ALMAlgorithm._make_sum1_fun(S), jac=ALMAlgorithm._make_sum1_jac(S), name=f"sum_to_one_block_{lift_idx[0]}", family="block_sum1", )) if blk.enforce_one_hot: k = len(lift_idx) M = ALMAlgorithm._pairwise_M(k) eqs.append(ALMConstraint( "eq", fun=ALMAlgorithm._make_onehot_fun(lift_idx, M), jac=ALMAlgorithm._make_onehot_jac(lift_idx, M, n), name=f"onehot_block_{lift_idx[0]}", family="block_onehot", )) return eqs block_eqs = _install_block_equalities() # Unified equality list (order is fixed for whole run) full_eqs = problem_eqs + block_eqs # Allocate multipliers for every equality in full_eqs lam_eq = [] for csp in full_eqs: r0 = csp.fun(x).reshape(-1) lam_eq.append(np.zeros_like(r0, dtype=float)) # Inequality multipliers per user inequality mu_ineq = [] for csp in problem_ineqs: r0 = csp.fun(x).reshape(-1) mu_ineq.append(np.zeros_like(r0, dtype=float)) rho_eq_family = dict(getattr(cfg, "rho_eq_family", {}) or {}) rho_ineq_family = dict(getattr(cfg, "rho_ineq_family", {}) or {}) def _rho_for(csp_k: ALMConstraint) -> float: fam = getattr(csp_k, "family", "") or "" if csp_k.kind == "eq": return float(rho_eq_family.get(fam, rho_h)) else: return float(rho_ineq_family.get(fam, rho_g)) # -------- running stats for adaptive penalties -------- rho_h, rho_g = cfg.rho_h, cfg.rho_g best_eq, best_ineq = np.inf, np.inf no_imp_eq = no_imp_ineq = 0 prev_eq_inf, prev_ineq_inf = np.inf, np.inf eps = 1e-12 prev_eq_inf_by_family, prev_ineq_inf_by_family = {}, {} # ---- per-family stagnation tracking (only used when cfg.adapt_by_family=True) best_eq_by_family: Dict[str, float] = {} best_ineq_by_family: Dict[str, float] = {} no_imp_eq_by_family: Dict[str, int] = {} no_imp_ineq_by_family: Dict[str, int] = {} hist = {"eq_inf": [], "ineq_inf": [], "obj": [], "x": [], # per-iteration logs for parameters/multipliers "rho_h": [], "rho_g": [], "rho_eq_family": [], "rho_ineq_family": [], "eq_inf_by_family": [], "ineq_inf_by_family": [], # critical for debugging hardware compatibility (per-iteration) "dynamic_range": [], } for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue hist[f"lam_eq_max_idx{k_idx}"] = [] hist[f"lam_eq_min_idx{k_idx}"] = [] for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue hist[f"mu_ineq_max_idx{k_idx}"] = [] hist[f"mu_ineq_min_idx{k_idx}"] = [] for it in range(cfg.max_outer): # -------- base polynomial (does not include fixed penalties here) -------- base_terms: List[PolyTerm] = list(zip(model.indices, model.coefficients)) base_poly = ALMAlgorithm._polynomial_from_terms(base_terms) # -------- ALM quadratic/linear pieces (assembled here, kept separate) -------- Q_aug = np.zeros((n, n), dtype=float) c_aug = np.zeros(n, dtype=float) have_aug = False # (A) Equalities: linearize h near x^t => (rho/2)||A x - b||^2 + lam^T(Ax - b) for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue h = csp.fun(x).reshape(-1) A = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps) A = np.atleast_2d(A) assert A.shape[1] == n, f"A has {A.shape[1]} cols, expected {n}" # linearization about current x: residual model r(x) = A x - b, with b = A x - h b = A @ x - h rho_k = _rho_for(csp) Qk = 0.5 * rho_k * (A.T @ A) ck = (A.T @ lam_eq[k_idx]) - rho_k * (A.T @ b) Q_aug += Qk c_aug += ck have_aug = True # (B) Inequalities: projected ALM. Linearize g near x^t. for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue g = csp.fun(x).reshape(-1) G = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps) G = np.atleast_2d(G) assert G.shape[1] == n, f"G has {G.shape[1]} cols, expected {n}" d = G @ x - g rho_k = _rho_for(csp) # Activation measure at current iterate; meaning, the current violating inequality components: # g(x) + mu/rho; Powell-Hestenes-Rockafellar shifted residual y = G @ x - d + mu_ineq[k_idx] / rho_k active = (y > cfg.act_tol) if np.any(active): GA = G[active, :] muA = mu_ineq[k_idx][active] gA = g[active] # Q += (rho/2) * GA^T GA Qk = 0.5 * rho_k * (GA.T @ GA) # c += GA^T mu - rho * GA^T (GA x - gA); where GA x - gA is active measures of d = G @ x - g ck = (GA.T @ muA) - rho_k * (GA.T @ (GA @ x - gA)) Q_aug += Qk c_aug += ck have_aug = True # -------- build merged polynomial for the core solver -------- all_terms = ALMAlgorithm._merge_poly(base_terms, Q_aug if have_aug else None, c_aug if have_aug else None) idxs, coeffs = zip(*[(inds, w) for (inds, w) in all_terms]) if all_terms else ([], []) poly_model = PolynomialModel(list(coeffs), list(idxs)) if lb is not None and hasattr(poly_model, "lower_bound"): poly_model.lower_bound = np.asarray(lb, float) if ub is not None and hasattr(poly_model, "upper_bound"): poly_model.upper_bound = np.asarray(ub, float) x_ws = x.copy() # Convention: many cores look for one of these fields if present. # Use one or more to be future-proof; harmless if ignored. setattr(poly_model, "initial_guess", x_ws) setattr(poly_model, "warm_start", x_ws) setattr(poly_model, "x0", x_ws) # -------- inner solve -------- if getattr(cfg, "bcd", None) is not None and getattr(cfg.bcd, "enabled", False): # If ALM is operating in lifted space, BCD must receive lifted block # coordinates rather than the original native block indices. if lifted_slices is not None and len(lifted_slices) != len(registry.blocks): raise ValueError("lifted_slices must align 1-to-1 with registry.blocks") breg = ALMAlgorithm._make_bcd_registry( registry, lifted_slices=lifted_slices if has_blocks else None, ) bcd_res = BlockCoordinateDescentAlgorithm.run( poly_model, breg, solver, cfg=cfg.bcd, x0=x_ws, verbose=verbose, **solver_kwargs, ) x = np.asarray(bcd_res["x"], float).reshape(-1) hist.setdefault("bcd_inner", []).append(bcd_res["hist"]) else: out = solver.solve(poly_model, **solver_kwargs) # -------- parse -------- if parse_output: x = parse_output(out) else: # default: support (value, x) or `.x` or raw x if isinstance(out, SolutionResults): x = out.solutions[0] elif isinstance(out, tuple) and len(out) == 2: _, x = out elif isinstance(out, dict) and "results" in out and "solutions" in out["results"]: x = out["results"]["solutions"][0] elif isinstance(out, dict) and "x" in out: x = out["x"] else: x = getattr(out, "x", out) x = np.asarray(x, float) # ---- multiplier tracking ---- for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue hist[f"lam_eq_max_idx{k_idx}"].append(float(np.max(lam_eq[k_idx]))) hist[f"lam_eq_min_idx{k_idx}"].append(float(np.min(lam_eq[k_idx]))) for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue hist[f"mu_ineq_max_idx{k_idx}"].append(float(np.max(mu_ineq[k_idx]))) hist[f"mu_ineq_min_idx{k_idx}"].append(float(np.min(mu_ineq[k_idx]))) # -------- residuals + multiplier updates -------- eq_infs = [] for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue r = csp.fun(x).reshape(-1) rho_k = _rho_for(csp) lam_eq[k_idx] = lam_eq[k_idx] + rho_k * r if r.size: eq_infs.append(np.max(np.abs(r))) eq_inf = float(np.max(eq_infs)) if eq_infs else 0.0 ineq_infs = [] for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue r = csp.fun(x).reshape(-1) rho_k = _rho_for(csp) mu_ineq[k_idx] = np.maximum(0.0, mu_ineq[k_idx] + rho_k * r) if r.size: ineq_infs.append(np.max(np.maximum(0.0, r))) ineq_inf = float(np.max(ineq_infs)) if ineq_infs else 0.0 assert len(lam_eq) == len(full_eqs) assert len(mu_ineq) == len(problem_ineqs) # ---- per-family residual telemetry ---- eq_inf_by_family = {} for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue fam = getattr(csp, "family", "") or "" r = csp.fun(x).reshape(-1) v = float(np.max(np.abs(r))) if r.size else 0.0 eq_inf_by_family[fam] = max(eq_inf_by_family.get(fam, 0.0), v) ineq_inf_by_family = {} for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue fam = getattr(csp, "family", "") or "" r = csp.fun(x).reshape(-1) v = float(np.max(np.maximum(0.0, r))) if r.size else 0.0 ineq_inf_by_family[fam] = max(ineq_inf_by_family.get(fam, 0.0), v) # evaluate base polynomial only (ca add aug value if want to track full L_A) f_val = ( ALMAlgorithm._as_float(base_poly.evaluate(np.asarray(x, dtype=float))) if base_terms else 0.0 ) # evaluate dynamic range dynamic_range = poly_model.dynamic_range hist["eq_inf"].append(eq_inf); hist["ineq_inf"].append(ineq_inf) hist["obj"].append(float(f_val)); hist["x"].append(x.copy()) hist["dynamic_range"].append(float(dynamic_range)) # parameter tracking hist["rho_h"].append(float(rho_h)); hist["rho_g"].append(float(rho_g)) hist["rho_eq_family"].append(dict(rho_eq_family)) hist["rho_ineq_family"].append(dict(rho_ineq_family)) hist["eq_inf_by_family"].append(dict(eq_inf_by_family)) hist["ineq_inf_by_family"].append(dict(ineq_inf_by_family)) if verbose: # show worst 3 equality families by residual, with their rhos eq_items = sorted(eq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True) eq_top = eq_items[:3] eq_str = ",".join([ f"{fam or 'eq'}:{val:.2e}@" f"{_rho_for(next(c for c in full_eqs if (getattr(c,'family','') or '')==fam and c.kind=='eq')):.1g}" for fam, val in eq_top ]) # show worst 3 inequality families by residual, with their rhos ineq_items = sorted(ineq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True) ineq_top = ineq_items[:3] ineq_str = ",".join([ f"{fam or 'ineq'}:{val:.2e}@" f"{_rho_for(next(c for c in problem_ineqs if (getattr(c, 'family', '') or '') == fam and c.kind == 'ineq')):.1g}" for fam, val in ineq_top ]) print(f"[ALM {it:02d}] f={f_val:.6g} | eq_inf={eq_inf:.2e} | ineq_inf={ineq_inf:.2e} " f"| rho_h={rho_h:.2e} | rho_g={rho_g:.2e} | eq_fam[{eq_str}] | ineq_fam[{ineq_str}] " f"| dynamic_range={dynamic_range:.4g}") # stopping if eq_inf <= cfg.tol_h and ineq_inf <= cfg.tol_g: if verbose: print(f"[ALM] converged at iter {it}") break # EMA smoothing to reduce jitter if it == 0: eq_inf_smooth = eq_inf ineq_inf_smooth = ineq_inf else: eq_inf_smooth = cfg.ema_alpha * eq_inf + (1 - cfg.ema_alpha) * eq_inf_smooth ineq_inf_smooth = cfg.ema_alpha * ineq_inf + (1 - cfg.ema_alpha) * ineq_inf_smooth # -------- Residual-ratio controller -------- if cfg.adapt and it > 0: # per-family rho adaptation for equalities if getattr(cfg, "adapt_by_family", False): # Equality family for fam, cur in eq_inf_by_family.items(): prev = max(prev_eq_inf_by_family.get(fam, cur), eps) # rho value for family: if absent, start at rho_h rho_f = float(rho_eq_family.get(fam, rho_h)) if cur > cfg.tau_up_h * prev: rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max) elif cur < cfg.tau_down_h * prev: rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min) rho_eq_family[fam] = rho_f # update prev map prev_eq_inf_by_family = dict(eq_inf_by_family) # Inequality family for fam, cur in ineq_inf_by_family.items(): prev = max(prev_ineq_inf_by_family.get(fam, cur), eps) # rho value for family: if absent, start at rho_g rho_f = float(rho_ineq_family.get(fam, rho_g)) if cur > cfg.tau_up_g * prev: rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max) elif cur < cfg.tau_down_g * prev: rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min) rho_ineq_family[fam] = rho_f # update prev map prev_ineq_inf_by_family = dict(ineq_inf_by_family) else: # Equality group if eq_inf_smooth > cfg.tau_up_h * max(prev_eq_inf, eps): # stalled or not shrinking rho_h = min(cfg.gamma_up * rho_h, cfg.rho_max) elif eq_inf_smooth < cfg.tau_down_h * max(prev_eq_inf, eps): # fast progress, allow relaxation rho_h = max(cfg.gamma_down * rho_h, cfg.rho_min) # Inequality group if ineq_inf_smooth > cfg.tau_up_g * max(prev_ineq_inf, eps): rho_g = min(cfg.gamma_up * rho_g, cfg.rho_max) elif ineq_inf_smooth < cfg.tau_down_g * max(prev_ineq_inf, eps): rho_g = max(cfg.gamma_down * rho_g, cfg.rho_min) else: # per-family rho adaptation for equalities if getattr(cfg, "adapt_by_family", False): # update prev map prev_eq_inf_by_family = dict(eq_inf_by_family) prev_ineq_inf_by_family = dict(ineq_inf_by_family) # -------- Stagnation bump (safety net) -------- if cfg.use_stagnation_bump: if getattr(cfg, "adapt_by_family", False): # ---- Equality families ---- for fam, cur in eq_inf_by_family.items(): # initialize if fam not in best_eq_by_family: best_eq_by_family[fam] = float(cur) no_imp_eq_by_family[fam] = 0 if cur <= best_eq_by_family[fam] * (1 - cfg.stagnation_factor): best_eq_by_family[fam] = float(cur) no_imp_eq_by_family[fam] = 0 else: no_imp_eq_by_family[fam] += 1 if no_imp_eq_by_family[fam] >= cfg.patience_h: rho_f = float(rho_eq_family.get(fam, rho_h)) rho_eq_family[fam] = min(2.0 * rho_f, cfg.rho_max) no_imp_eq_by_family[fam] = 0 # ---- Inequality families ---- for fam, cur in ineq_inf_by_family.items(): # initialize if fam not in best_ineq_by_family: best_ineq_by_family[fam] = float(cur) no_imp_ineq_by_family[fam] = 0 if cur <= best_ineq_by_family[fam] * (1 - cfg.stagnation_factor): best_ineq_by_family[fam] = float(cur) no_imp_ineq_by_family[fam] = 0 else: no_imp_ineq_by_family[fam] += 1 if no_imp_ineq_by_family[fam] >= cfg.patience_g: rho_f = float(rho_ineq_family.get(fam, rho_g)) rho_ineq_family[fam] = min(2.0 * rho_f, cfg.rho_max) no_imp_ineq_by_family[fam] = 0 else: # Equality stagnation if eq_inf <= best_eq * (1 - cfg.stagnation_factor): best_eq = eq_inf; no_imp_eq = 0 else: no_imp_eq += 1 if no_imp_eq >= cfg.patience_h: rho_h = min(2.0 * rho_h, cfg.rho_max); no_imp_eq = 0 # Inequality stagnation if ineq_inf <= best_ineq * (1 - cfg.stagnation_factor): best_ineq = ineq_inf; no_imp_ineq = 0 else: no_imp_ineq += 1 if no_imp_ineq >= cfg.patience_g: rho_g = min(2.0 * rho_g, cfg.rho_max); no_imp_ineq = 0 # -------- finalize for next iteration -------- prev_eq_inf = max(eq_inf_smooth, eps) prev_ineq_inf = max(ineq_inf_smooth, eps) # ---- decoding back to native levels (only if blocks) ---- decoded_native: Dict[int, float] = {} # maps original var anchor -> chosen level value decoded_lifted: Dict[int, int] = {} # maps lifted start index -> argmax position (optional) if has_blocks: for blk, lift_idx in zip(registry.blocks, lifted_slices): if not lift_idx: continue sl = np.array(lift_idx, int) if len(sl) == 0: continue sblk = x[sl] j = int(np.argmax(sblk)) # which level got selected in the block orig_anchor = int(blk.idx[0]) # anchor original var id for this block decoded_native[orig_anchor] = float(blk.levels[j]) decoded_lifted[sl[0]] = j # optional: lifted index -> chosen slot return { "x": x, "decoded": decoded_native if has_blocks else {}, "decoded_debug": decoded_lifted if has_blocks else {}, "hist": hist, }