# (C) Quantum Computing Inc., 2025.
"""
============================
Augmented Lagrangian Method
============================
Overview
============================
The augmented Lagrangian method algorithm in eqc-models is for automating parameter
adjustments to achieve feasible solutions to constrained optimization models using
entropy quantum computing devices. It supports fractional (continuous), binary and
general integer variables. ALM also supports enforcement of one-hot constraints,
inequality constraints and equality constraints without introduciton of slack
variables.
This method requires a different approach to modeling than used in model implementations
of constrained optimization models as direct subclasses of `EqcModel`. To use this approach,
implement an objective function in one of the subclasses of `EqcModel`. The next step is to
use a `ConstraintRegistry` object to describe the model constraints. There are many
parameters available inside the `ALMConfig` class. Only a few of these parameters should
be adjusted by novice users. Once the model, constraint registry and config objects are
ready, pass them to an `ALMAlgorithm` instance with a solver and it will iterate over
parameter adjustments until feasibility tolerance is found or maximum iteration count
is exceeded.
The following example is a basic outline of how to use the ALM implementation to
solve models.
::
model = PolynomialModel()
solver = Dirac3ContinuousCloudSolver()
solver_kwargs = {} # add solver parameters as dict here
config = ALMConfig() # add configuration parameters here
registry = ConstraintRegistry()
# add variable domains and constraints here
algorithm = ALMAlgorithm()
response = algorithm.run(model, registry, solver, config, **solver_kwargs) # keyword arguments are passed to the solver
Model Creation
============================
Definining the objective function for the optimization problem can be done in a number of ways.
There are two base classes on which all other models are based and they define how
function coefficients are specified. With a `QuadraticModel`, the coefficients are
specified using a one-dimensional array for the linear terms and a two-dimensional,
symmetric array or Hermitian matrix for quadratic terms. The `PolynomialModel` way uses two lists,
one for indices of terms and another for the coefficients of terms. These terms can be
up to degree five. The two classes just described are preferred for use with ALM because
most other model classes incorporate constraints into the operator. In the future, ALM will
provide a way to extract the objective function and constraints from the model separately,
populating the `ConstraintRegistry` automatically.
First, choose variable definitions by describing the variable names in a single list.
Then organize the terms in ascending lexicographic order by index. Finally, create a
list with coefficient entries corresponding to the index list.
The example problem used here is a simple logical decision of one variable among 10.
Those familiar with QUBO formulations
will recognize the penalties in the quadratic coefficients. This first example sets
the one-hot constraint as penalties in the objective function and adds block constraints
to each of the variables to enforce 0 or 1. This has the effect of using two time bins
per binary variable.
>>> import warnings; warnings.filterwarnings("ignore") # silence warnings
>>> n = 10
>>> variables = [f"x_{i}" for i in range(n)]
Define the indices and coefficients. The solution is `x_1 == 1` and all other `x_i == 0`.
Variable indices sent to the device are 1-based.
>>> indices = [(0, i+1) for i in range(n)] + [(i+1, j+1) for i in range(10) for j in range(i, n)]
>>> coefficients = [-i if i==n-1 else 0 for i in range(n)] + [2 for i in range(n) for j in range(i, n)]
Define the model, setting the upper bound to 1 on all varaibles.
>>> model = PolynomialModel(coefficients, indices)
>>> model.variables = variables
>>> model.variables[0] == "x_0"
True
>>> model.upper_bound = np.ones((n,))
>>> model.is_discrete = [True for i in range(n)] # for future modeling support
Begin the ALM setup. One of the features of this algorithm in eqc-models is the ability to enforce
discrete variables while using the continuous solver. This can allow for mixed integer formulations.
For binary variables, add a block for each one so that it is forced to 0 or 1. The `sum_to_one`
parameter is normally true and setting to false here is done because the simplicity of the problem
allows a more efficient approach.
>>> reg = ConstraintRegistry()
>>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)
Configure `rho_h` (equality constraints) to start at 1 and only allow 4 iterations. Also
set tolerance high.
>>> config = ALMConfig(tol_h=0.5, max_outer=4, rho_h=1, rho_g=1, gamma_up=2.0, gamma_down=1.0)
>>> from eqc_models.solvers import Dirac3ContinuousCloudSolver
>>> solver = Dirac3ContinuousCloudSolver()
>>> algo = ALMAlgorithm()
>>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
20...COMPLETE...
[ALM 00] ...
...
>>> results["decoded"] # had to hard-code to n==10
{0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}
Equality constraints
========================
This ALM algorithm can seek feasible solutions with targeted multipliers
rather than adding penalty multipliers into the objective coefficients and
adjusting manually, which would be required for more complex problems.
These constraints should be specified in the constraint registry. Specification
of the binary variables using the `add_block` call sets up the domain.
The objective function is formulated directly, purely as the cost of each variable,
which is the negative value of the variable index.
>>> indices = [(0, i+1) for i in range(n)]
>>> coefficients = [-i for i in range(n)]
>>> model = PolynomialModel(coefficients, indices)
>>> model.upper_bound = np.ones((n, ))
>>> model.is_discrete = [True for i in range(n)]
>>> reg = ConstraintRegistry()
>>> for i in range(n): reg.add_block([i], levels=[0, 1], sum_to_one=False)
Now, a new concept. Instead of placing the penalties explicitly in the objective, this statement
enforces the one-hot constraint by defining a function which is 0 when true and
either positive or negative when false. The lambda function is passed an encoded
array for the variable values, so the array must be decoded before evaluating
the function. To assist in this, a helper function called `level_decoder` is
available for import. Use this decoder and the solution x to extract the
values of the variables.
>>> decoder = level_decoder(n, [0, 1])
>>> test_x = np.zeros((20,))
>>> test_x[::2] = 1
>>> (decoder@test_x==0).all()
True
>>> config.max_outer = 15 # added complexity in the definition requires more steps
>>> config.gamma_up = 1.2
>>> config.gamma_down = 0.9
>>> reg.add_equality(lambda x: np.sum(decoder@x) - 1)
Everything else is the same as in the previous example.
>>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
20...COMPLETE...
[ALM 00] ...
...
>>> results["decoded"] # had to hard-code to n==10
{0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 0.0, 9: 1.0}
Inequality Constraints
========================
Incorporating inequality constraints is not much different. Specify an inequality
constraint by using the `ConstraintRegistry.add_inequality` method to pass a
function which is zero or negative when the constraint is met and positive when
the constraint is not met. This is straightforward for less than or equal to
constraints and requires negating the function when the constraint is for greater
than or equal. Here is an example showing a model where less than two variables
are non zero. When adding the binary variable blocks, the `sum_to_one` parameter
is left out, which results in the default of true. The use of the sum to one
parameter gives the algorithm more control over the dual variable computations.
>>> reg = ConstraintRegistry()
>>> for i in range(n): reg.add_block([i], levels=[0, 1])
>>> reg.add_inequality(lambda x: np.sum(decoder@x) - 2) # reuse the decoder from the previous example
>>> config.gamma_up = 1.2 # slower adjustment
>>> config.gamma_down = 0.8 # slower adjustment
>>> results = algo.run(model, reg, solver, config, sum_constraint=n, relaxation_schedule=1) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
20...COMPLETED...
[ALM 00] ...
[ALM 05] ...
...
>>> results["decoded"]
{0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0, 5: 0.0, 6: 0.0, 7: 0.0, 8: 1.0, 9: 1.0}
"""
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Tuple, Optional, Sequence, Union
import numpy as np
from collections import defaultdict
from eqc_models.base.polynomial import PolynomialModel
from eqc_models.base.operators import Polynomial
from eqc_models.base.results import SolutionResults
from eqc_models.algorithms.bcd import (
BCDConfig,
BCDRegistry,
BlockCoordinateDescentAlgorithm,
)
Array = np.ndarray
PolyTerm = Tuple[Tuple[int, ...], float]
[docs]
def level_decoder(n, levels):
"""
Return an operator that transforms an encoded solution to a solution
with a single value per variable. For instance, a sequence of two level
variables with levels 0 and 1 has values `[0, 1, 1, 0]` is decoded into
`[1, 0]` with the operator::
[[ 0, 1],
[ 0, 1]]
and used with the expression `op@variable`.
>>> decoder = level_decoder(3, [0, 1, 2])
>>> values = np.array([1, 0, 0, 0, 1, 0, 0, 0, 1])
>>> decoder@values
array([0., 1., 2.])
"""
level_count = len(levels)
decoder = np.zeros((n, n*level_count))
for i in range(n):
decoder[i, i*level_count:(i+1)*level_count] = levels
return decoder
[docs]
@dataclass
class ALMConstraint:
"""One constraint family; fun returns a vector; jac returns its Jacobian."""
kind: str # "eq" or "ineq"
fun: Callable[[Array], Array] # h(x) or g(x)
jac: Optional[Callable[[Array], Array]] = None
name: str = ""
family: str = ""
[docs]
@dataclass
class ALMBlock:
"""Lifted discrete variable block (optional)."""
idx: Sequence[int] # indices of block in the full x
levels: Array # (k,) level values (b_i)
enforce_sum_to_one: bool = True # register as equality via helper
enforce_one_hot: bool = True # ALM linearization with M = 11^T - I
[docs]
@dataclass
class ALMConfig:
# penalties
rho_h: float = 1.0 # equalities
rho_g: float = 1.0 # inequalities / one-hot
rho_min: float = 1e-3
rho_max: float = 1e3
# per-family penalty overrides
rho_eq_family: Dict[str, float] = field(default_factory=dict)
rho_ineq_family: Dict[str, float] = field(default_factory=dict)
# allow per-family adaptation toggle (optional)
adapt_by_family: bool = True
# adaptation toggles
adapt: bool = True
tau_up_h: float = 1.05
tau_down_h: float = 0.95
tau_up_g: float = 0.90
tau_down_g: float = 0.50
gamma_up: float = 1.2
gamma_down: float = 0.9
# tolerances & loop
tol_h: float = 1e-6
tol_g: float = 1e-6
max_outer: int = 100
# stagnation safety net
use_stagnation_bump: bool = False
patience_h: int = 10
patience_g: int = 10
stagnation_factor: float = 1e-3
# smoothing (optional)
ema_alpha: float = 1.0
# finite diff (only used if jac=None)
fd_eps: float = 1e-6
# activation threshold for projected ALM
act_tol: float = 1e-10
# Optional: BCD inner solve (decomposition)
bcd: BCDConfig = field(default_factory=BCDConfig)
[docs]
class ConstraintRegistry:
"""
Holds constraints and block metadata; keeps ALMAlgorithm stateless. Register constraints and
(optional) lifted-discrete blocks here.
"""
def __init__(self):
self.constraints: List[ALMConstraint] = []
self.blocks: List[ALMBlock] = []
[docs]
def add_equality(self, fun, jac=None, name="", family=""):
self.constraints.append(ALMConstraint("eq", fun, jac, name, family))
[docs]
def add_inequality(self, fun, jac=None, name="", family=""):
self.constraints.append(ALMConstraint("ineq", fun, jac, name, family))
[docs]
def add_block(self, idx: Sequence[int], levels: Array, sum_to_one=True, one_hot=True):
self.blocks.append(ALMBlock(list(idx), np.asarray(levels, float), sum_to_one, one_hot))
[docs]
class ALMAlgorithm:
"""Stateless ALM outer loop. Call `run(model, registry, core, cfg, **core_kwargs)`."""
# ---- helpers (static) ----
@staticmethod
def _finite_diff_jac(fun: Callable[[Array], Array], x: Array, eps: float) -> Array:
y0 = fun(x)
m = int(np.prod(y0.shape))
y0 = y0.reshape(-1)
n = x.size
J = np.zeros((m, n), dtype=float)
for j in range(n):
xp = x.copy()
xp[j] += eps
J[:, j] = (fun(xp).reshape(-1) - y0) / eps
return J
@staticmethod
def _pairwise_M(k: int) -> Array:
return np.ones((k, k), dtype=float) - np.eye(k, dtype=float)
@staticmethod
def _sum_to_one_selector(n: int, idx: Sequence[int]) -> Array:
S = np.zeros((1, n), dtype=float)
S[0, np.array(list(idx), int)] = 1.0
return S
@staticmethod
def _make_sum1_fun(S):
return lambda x: S @ x - np.array([1.0])
@staticmethod
def _make_sum1_jac(S):
return lambda x: S
@staticmethod
def _make_onehot_fun(sl, M):
sl = np.array(sl, int)
def _f(x):
s = x[sl]
return np.array([float(s @ (M @ s))]) # shape (1,)
return _f
@staticmethod
def _make_onehot_jac(sl, M, n):
sl = np.array(sl, int)
def _J(x):
s = x[sl]
grad_blk = 2.0 * (M @ s) # (k,)
J = np.zeros((1, n), dtype=float) # shape (1, n)
J[0, sl] = grad_blk
return J
return _J
@staticmethod
def _as_float(value) -> float:
arr = np.asarray(value, dtype=float).reshape(-1)
return float(arr[0]) if arr.size else 0.0
@staticmethod
def _polynomial_from_terms(poly_terms: List[PolyTerm]) -> Polynomial:
if not poly_terms:
return Polynomial([], [])
idxs, coeffs = zip(*poly_terms)
return Polynomial(list(coeffs), list(idxs))
@staticmethod
def _poly_value(poly_terms: List[PolyTerm], x: Array) -> float:
"""Backward-compatible wrapper around Polynomial.evaluate."""
if not poly_terms:
return 0.0
poly = ALMAlgorithm._polynomial_from_terms(poly_terms)
return ALMAlgorithm._as_float(poly.evaluate(np.asarray(x, dtype=float)))
@staticmethod
def _merge_poly(poly_terms: Optional[List[PolyTerm]], Q_aug: Optional[Array],
c_aug: Optional[Array]) -> List[PolyTerm]:
"""
Merge ALM's quadratic/linear increments (Q_aug, c_aug) into the base polynomial term list `poly_terms`.
If 'poly_terms' is None, then turn x^T Q_aug x + c_aug^T x into polynomial monomials.
Terms are of the form:
((0, i), w) for linear, ((i, j), w) for quadratic.
"""
merged = list(poly_terms) if poly_terms is not None else []
if Q_aug is not None:
Qs = 0.5 * (Q_aug + Q_aug.T)
n = Qs.shape[0]
for i in range(n):
# diagonal contributes Qii * x_i^2
if Qs[i, i] != 0.0:
merged.append(((i + 1, i + 1), float(Qs[i, i])))
for j in range(i + 1, n):
q = 2.0 * Qs[i, j] # x^T Q x -> sum_{i<j} 2*Q_ij x_i x_j
if q != 0.0:
merged.append(((i + 1, j + 1), float(q)))
if c_aug is not None:
for i, ci in enumerate(c_aug):
if ci != 0.0:
merged.append(((0, i + 1), float(ci)))
return merged
@staticmethod
def _block_offsets(blocks: List[ALMBlock]) -> List[int]:
"""
Return starting offsets (0-based) for each lifted block in the
concatenated s-vector.
"""
offs, pos = [], 0
for blk in blocks:
offs.append(pos)
pos += len(blk.levels) # each block contributes k lifted coordinates
return offs
[docs]
@staticmethod
def lift_Qc_to_poly_terms(
Q_native: np.ndarray, # shape (m, m) over original discrete vars (one block per var)
c_native: np.ndarray, # shape (m,)
blocks: List[ALMBlock], # ALM blocks (in the same order as the rows/cols of Q_native, c_native)
) -> Tuple[List[Tuple[int, ...]], List[float]]:
"""
Expand a quadratic/linear objective over m native discrete variables into the lifted
(one-hot) s-space.
Given `Q_native` and `c_native` over original m discrete vars (one block per var), and
`blocks` (`list[ALMBlock]` in the same order as original vars), we enforce:
- For variable i with level values b_i (length k_i), we create k_i lifted coords s_{i,a}.
- Quadratic term: J_ij = Q_native[i,j] * (b_i b_j^T) contributes to pairs (s_{i,a}, s_{j,b})
- Linear term: C_i = c_native[i] * b_i contributes to s_{i,a}
Returns polynomial (indices, coeffs) over concatenated s variables.
"""
m = len(blocks)
assert Q_native.shape == (m, m), "Q_native must match number of blocks"
assert c_native.shape == (m,), "c_native must match number of blocks"
offs = ALMAlgorithm._block_offsets(blocks)
terms_acc = defaultdict(float)
# Quadratic lift: J = kron-expansion
for i in range(m):
bi = blocks[i].levels[:, None] # (k_i, 1)
oi = offs[i]
for j in range(m):
bj = blocks[j].levels[:, None] # (k_j, 1)
oj = offs[j]
J_ij = Q_native[i, j] * (bi @ bj.T) # (k_i, k_j)
if np.allclose(J_ij, 0.0):
continue
ki, kj = bi.shape[0], bj.shape[0]
for a in range(ki):
ia = oi + a + 1
for b in range(kj):
jb = oj + b + 1
w = float(J_ij[a, b])
if w == 0.0:
continue
if ia == jb:
terms_acc[(ia, ia)] += w
else:
# NOTE: we store each cross monomial once (i < j); for i==j block pairs,
# double to represent J_ab + J_ba
if i == j: # intra-block off-diagonal needs 2x
w *= 2.0
i1, i2 = (ia, jb) if ia < jb else (jb, ia)
terms_acc[(i1, i2)] += w
# Linear lift: C_i = L_i * b_i
for i in range(m):
b = blocks[i].levels
oi = offs[i]
for a, val in enumerate(b):
ia = oi + a + 1
w = float(c_native[i] * val)
if w != 0.0:
terms_acc[(0, ia)] += w
# Pack to PolynomialModel format
indices = [tuple(k) for k in terms_acc.keys()]
coeffs = [float(v) for v in terms_acc.values()]
return indices, coeffs
@staticmethod
def _make_bcd_registry(
registry: "ConstraintRegistry",
*,
lifted_slices: Optional[List[List[int]]] = None,
) -> BCDRegistry:
"""
Translate ALM's registry into the standalone BCD registry.
Important:
- In native/non-lifted mode, BCD blocks should use the original `blk.idx`.
- In lifted mode, BCD blocks must use the current working-coordinate slices
(`lifted_slices`), not the original native indices.
"""
breg = BCDRegistry()
for i, blk in enumerate(registry.blocks):
if lifted_slices is None:
bcd_idx = list(int(j) for j in blk.idx)
else:
bcd_idx = list(int(j) for j in lifted_slices[i])
breg.add_block(
bcd_idx,
name=f"block_{i}",
levels=None if blk.levels is None else np.asarray(blk.levels, float),
)
for c in registry.constraints:
if c.kind == "eq":
breg.add_equality(c.fun, name=c.name, family=c.family)
elif c.kind == "ineq":
breg.add_inequality(c.fun, name=c.name, family=c.family)
return breg
# ---- main entrypoint ----
[docs]
@staticmethod
def run(
base_model: PolynomialModel,
registry: ConstraintRegistry,
solver,
cfg: ALMConfig = ALMConfig(),
x0: Optional[Array] = None,
*,
parse_output=None,
verbose: bool = True,
**solver_kwargs,
) -> Dict[str, Union[Array, Dict[int, float], Dict]]:
"""
Solve with ALM. Keep all ALM state local to this call (no global side-effects).
Handles three modes:
(A) No blocks -> continuous; use base_model as-is
(B) Blocks + native base_model -> lift to s-space
(C) Blocks + already-lifted base_model -> use as-is (compat)
Returns:
{
"x": final iterate,
"decoded": {start_idx_of_block: level_value, ...} for lifted blocks,
"decoded_debug": {start_idx_of_block: native_device_value, ...},
"hist": { "eq_inf": [...], "ineq_inf": [...], "obj": [...], "x": [...] }
}
"""
blocks = registry.blocks
has_blocks = len(blocks) > 0
# ---- choose working model + dimension n ----
if not has_blocks:
# (A) continuous case: use the provided model directly
model = base_model
# Prefer model.n, fall back to bounds; else infer from polynomial indices
n = getattr(model, "n", None)
if n is None:
ub = getattr(model, "upper_bound", None)
lb = getattr(model, "lower_bound", None)
if ub is not None:
n = len(ub)
elif lb is not None:
n = len(lb)
else:
# infer from polynomial terms
n = 0
for inds in getattr(model, "indices", getattr(model.polynomial, "indices", [])):
for j in inds:
if j > 0:
n = max(n, j)
lifted_slices: List[List[int]] = []
else:
# (B/C) lifted (discrete) case
target_lifted_n = sum(len(blk.levels) for blk in blocks)
base_n = getattr(base_model, "n", None)
def _infer_n_from_terms(pm: PolynomialModel) -> int:
inds_list = getattr(pm, "indices", getattr(pm.polynomial, "indices", []))
mx = 0
for inds in inds_list:
for j in inds:
if j > mx:
mx = j
return int(mx)
if base_n is None:
base_n = _infer_n_from_terms(base_model)
# detect "already-lifted" native input (compat path)
already_lifted = (base_n == target_lifted_n)
if already_lifted:
# (C) use provided model directly; assume bounds already sensible
model = base_model
n = target_lifted_n
else:
# (B) lift from native space
# base_model must expose coefficients/indices compatible with this call
c_base, Q_base = base_model._quadratic_polynomial_to_qubo_coefficients(
getattr(base_model, "coefficients", getattr(base_model.polynomial, "coefficients", [])),
getattr(base_model, "indices", getattr(base_model.polynomial, "indices", [])),
getattr(base_model, "n")
)
assert Q_base.shape[0] == Q_base.shape[1]
assert c_base.shape[0] == Q_base.shape[0]
indices_lifted, coeffs_lifted = ALMAlgorithm.lift_Qc_to_poly_terms(Q_base, c_base, blocks)
model = PolynomialModel(coeffs_lifted, indices_lifted)
n = target_lifted_n
# set canonical [0,1] bounds for lifted s
setattr(model, "lower_bound", np.zeros(n, float))
setattr(model, "upper_bound", np.ones(n, float))
# ---- n and lifted_slices ----
lifted_slices = []
pos = 0
for blk in blocks:
k = len(blk.levels) # number of lifted coords for this block
lifted_slices.append(list(range(pos, pos + k))) # 0-based in lifted x
pos += k
# Algorithm initial solution and bounds
lb = getattr(model, "lower_bound", None)
ub = getattr(model, "upper_bound", None)
if x0 is not None:
x = np.asarray(x0, float).copy()
else:
# default init
if (lb is not None) and (ub is not None) and np.all(np.isfinite(lb)) and np.all(np.isfinite(ub)):
x = 0.5 * (np.asarray(lb, float) + np.asarray(ub, float))
else:
x = np.zeros(n, float)
# ---- collect constraints ----
problem_eqs = [c for c in registry.constraints if c.kind == "eq"]
problem_ineqs = [c for c in registry.constraints if c.kind == "ineq"]
# auto-install sum-to-one and one-hot as equalities
# (One-hot: s^T (11^T - I) s = 0))
def _install_block_equalities() -> List[ALMConstraint]:
if not has_blocks:
return []
eqs: List[ALMConstraint] = []
for blk, lift_idx in zip(registry.blocks, lifted_slices):
if blk.enforce_sum_to_one:
S = ALMAlgorithm._sum_to_one_selector(n, lift_idx)
eqs.append(ALMConstraint(
"eq",
fun=ALMAlgorithm._make_sum1_fun(S),
jac=ALMAlgorithm._make_sum1_jac(S),
name=f"sum_to_one_block_{lift_idx[0]}",
family="block_sum1",
))
if blk.enforce_one_hot:
k = len(lift_idx)
M = ALMAlgorithm._pairwise_M(k)
eqs.append(ALMConstraint(
"eq",
fun=ALMAlgorithm._make_onehot_fun(lift_idx, M),
jac=ALMAlgorithm._make_onehot_jac(lift_idx, M, n),
name=f"onehot_block_{lift_idx[0]}",
family="block_onehot",
))
return eqs
block_eqs = _install_block_equalities()
# Unified equality list (order is fixed for whole run)
full_eqs = problem_eqs + block_eqs
# Allocate multipliers for every equality in full_eqs
lam_eq = []
for csp in full_eqs:
r0 = csp.fun(x).reshape(-1)
lam_eq.append(np.zeros_like(r0, dtype=float))
# Inequality multipliers per user inequality
mu_ineq = []
for csp in problem_ineqs:
r0 = csp.fun(x).reshape(-1)
mu_ineq.append(np.zeros_like(r0, dtype=float))
rho_eq_family = dict(getattr(cfg, "rho_eq_family", {}) or {})
rho_ineq_family = dict(getattr(cfg, "rho_ineq_family", {}) or {})
def _rho_for(csp_k: ALMConstraint) -> float:
fam = getattr(csp_k, "family", "") or ""
if csp_k.kind == "eq":
return float(rho_eq_family.get(fam, rho_h))
else:
return float(rho_ineq_family.get(fam, rho_g))
# -------- running stats for adaptive penalties --------
rho_h, rho_g = cfg.rho_h, cfg.rho_g
best_eq, best_ineq = np.inf, np.inf
no_imp_eq = no_imp_ineq = 0
prev_eq_inf, prev_ineq_inf = np.inf, np.inf
eps = 1e-12
prev_eq_inf_by_family, prev_ineq_inf_by_family = {}, {}
# ---- per-family stagnation tracking (only used when cfg.adapt_by_family=True)
best_eq_by_family: Dict[str, float] = {}
best_ineq_by_family: Dict[str, float] = {}
no_imp_eq_by_family: Dict[str, int] = {}
no_imp_ineq_by_family: Dict[str, int] = {}
hist = {"eq_inf": [], "ineq_inf": [], "obj": [], "x": [],
# per-iteration logs for parameters/multipliers
"rho_h": [], "rho_g": [],
"rho_eq_family": [], "rho_ineq_family": [],
"eq_inf_by_family": [], "ineq_inf_by_family": [],
# critical for debugging hardware compatibility (per-iteration)
"dynamic_range": [],
}
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq":
continue
hist[f"lam_eq_max_idx{k_idx}"] = []
hist[f"lam_eq_min_idx{k_idx}"] = []
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq":
continue
hist[f"mu_ineq_max_idx{k_idx}"] = []
hist[f"mu_ineq_min_idx{k_idx}"] = []
for it in range(cfg.max_outer):
# -------- base polynomial (does not include fixed penalties here) --------
base_terms: List[PolyTerm] = list(zip(model.indices, model.coefficients))
base_poly = ALMAlgorithm._polynomial_from_terms(base_terms)
# -------- ALM quadratic/linear pieces (assembled here, kept separate) --------
Q_aug = np.zeros((n, n), dtype=float)
c_aug = np.zeros(n, dtype=float)
have_aug = False
# (A) Equalities: linearize h near x^t => (rho/2)||A x - b||^2 + lam^T(Ax - b)
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq":
continue
h = csp.fun(x).reshape(-1)
A = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
A = np.atleast_2d(A)
assert A.shape[1] == n, f"A has {A.shape[1]} cols, expected {n}"
# linearization about current x: residual model r(x) = A x - b, with b = A x - h
b = A @ x - h
rho_k = _rho_for(csp)
Qk = 0.5 * rho_k * (A.T @ A)
ck = (A.T @ lam_eq[k_idx]) - rho_k * (A.T @ b)
Q_aug += Qk
c_aug += ck
have_aug = True
# (B) Inequalities: projected ALM. Linearize g near x^t.
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq":
continue
g = csp.fun(x).reshape(-1)
G = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
G = np.atleast_2d(G)
assert G.shape[1] == n, f"G has {G.shape[1]} cols, expected {n}"
d = G @ x - g
rho_k = _rho_for(csp)
# Activation measure at current iterate; meaning, the current violating inequality components:
# g(x) + mu/rho; Powell-Hestenes-Rockafellar shifted residual
y = G @ x - d + mu_ineq[k_idx] / rho_k
active = (y > cfg.act_tol)
if np.any(active):
GA = G[active, :]
muA = mu_ineq[k_idx][active]
gA = g[active]
# Q += (rho/2) * GA^T GA
Qk = 0.5 * rho_k * (GA.T @ GA)
# c += GA^T mu - rho * GA^T (GA x - gA); where GA x - gA is active measures of d = G @ x - g
ck = (GA.T @ muA) - rho_k * (GA.T @ (GA @ x - gA))
Q_aug += Qk
c_aug += ck
have_aug = True
# -------- build merged polynomial for the core solver --------
all_terms = ALMAlgorithm._merge_poly(base_terms, Q_aug if have_aug else None,
c_aug if have_aug else None)
idxs, coeffs = zip(*[(inds, w) for (inds, w) in all_terms]) if all_terms else ([], [])
poly_model = PolynomialModel(list(coeffs), list(idxs))
if lb is not None and hasattr(poly_model, "lower_bound"):
poly_model.lower_bound = np.asarray(lb, float)
if ub is not None and hasattr(poly_model, "upper_bound"):
poly_model.upper_bound = np.asarray(ub, float)
x_ws = x.copy()
# Convention: many cores look for one of these fields if present.
# Use one or more to be future-proof; harmless if ignored.
setattr(poly_model, "initial_guess", x_ws)
setattr(poly_model, "warm_start", x_ws)
setattr(poly_model, "x0", x_ws)
# -------- inner solve --------
if getattr(cfg, "bcd", None) is not None and getattr(cfg.bcd, "enabled", False):
# If ALM is operating in lifted space, BCD must receive lifted block
# coordinates rather than the original native block indices.
if lifted_slices is not None and len(lifted_slices) != len(registry.blocks):
raise ValueError("lifted_slices must align 1-to-1 with registry.blocks")
breg = ALMAlgorithm._make_bcd_registry(
registry,
lifted_slices=lifted_slices if has_blocks else None,
)
bcd_res = BlockCoordinateDescentAlgorithm.run(
poly_model,
breg,
solver,
cfg=cfg.bcd,
x0=x_ws,
verbose=verbose,
**solver_kwargs,
)
x = np.asarray(bcd_res["x"], float).reshape(-1)
hist.setdefault("bcd_inner", []).append(bcd_res["hist"])
else:
out = solver.solve(poly_model, **solver_kwargs)
# -------- parse --------
if parse_output:
x = parse_output(out)
else:
# default: support (value, x) or `.x` or raw x
if isinstance(out, SolutionResults):
x = out.solutions[0]
elif isinstance(out, tuple) and len(out) == 2:
_, x = out
elif isinstance(out, dict) and "results" in out and "solutions" in out["results"]:
x = out["results"]["solutions"][0]
elif isinstance(out, dict) and "x" in out:
x = out["x"]
else:
x = getattr(out, "x", out)
x = np.asarray(x, float)
# ---- multiplier tracking ----
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq": continue
hist[f"lam_eq_max_idx{k_idx}"].append(float(np.max(lam_eq[k_idx])))
hist[f"lam_eq_min_idx{k_idx}"].append(float(np.min(lam_eq[k_idx])))
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq": continue
hist[f"mu_ineq_max_idx{k_idx}"].append(float(np.max(mu_ineq[k_idx])))
hist[f"mu_ineq_min_idx{k_idx}"].append(float(np.min(mu_ineq[k_idx])))
# -------- residuals + multiplier updates --------
eq_infs = []
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq": continue
r = csp.fun(x).reshape(-1)
rho_k = _rho_for(csp)
lam_eq[k_idx] = lam_eq[k_idx] + rho_k * r
if r.size:
eq_infs.append(np.max(np.abs(r)))
eq_inf = float(np.max(eq_infs)) if eq_infs else 0.0
ineq_infs = []
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq": continue
r = csp.fun(x).reshape(-1)
rho_k = _rho_for(csp)
mu_ineq[k_idx] = np.maximum(0.0, mu_ineq[k_idx] + rho_k * r)
if r.size:
ineq_infs.append(np.max(np.maximum(0.0, r)))
ineq_inf = float(np.max(ineq_infs)) if ineq_infs else 0.0
assert len(lam_eq) == len(full_eqs)
assert len(mu_ineq) == len(problem_ineqs)
# ---- per-family residual telemetry ----
eq_inf_by_family = {}
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq": continue
fam = getattr(csp, "family", "") or ""
r = csp.fun(x).reshape(-1)
v = float(np.max(np.abs(r))) if r.size else 0.0
eq_inf_by_family[fam] = max(eq_inf_by_family.get(fam, 0.0), v)
ineq_inf_by_family = {}
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq": continue
fam = getattr(csp, "family", "") or ""
r = csp.fun(x).reshape(-1)
v = float(np.max(np.maximum(0.0, r))) if r.size else 0.0
ineq_inf_by_family[fam] = max(ineq_inf_by_family.get(fam, 0.0), v)
# evaluate base polynomial only (ca add aug value if want to track full L_A)
f_val = (
ALMAlgorithm._as_float(base_poly.evaluate(np.asarray(x, dtype=float)))
if base_terms else 0.0
)
# evaluate dynamic range
dynamic_range = poly_model.dynamic_range
hist["eq_inf"].append(eq_inf); hist["ineq_inf"].append(ineq_inf)
hist["obj"].append(float(f_val)); hist["x"].append(x.copy())
hist["dynamic_range"].append(float(dynamic_range))
# parameter tracking
hist["rho_h"].append(float(rho_h)); hist["rho_g"].append(float(rho_g))
hist["rho_eq_family"].append(dict(rho_eq_family))
hist["rho_ineq_family"].append(dict(rho_ineq_family))
hist["eq_inf_by_family"].append(dict(eq_inf_by_family))
hist["ineq_inf_by_family"].append(dict(ineq_inf_by_family))
if verbose:
# show worst 3 equality families by residual, with their rhos
eq_items = sorted(eq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
eq_top = eq_items[:3]
eq_str = ",".join([
f"{fam or 'eq'}:{val:.2e}@"
f"{_rho_for(next(c for c in full_eqs if (getattr(c,'family','') or '')==fam and c.kind=='eq')):.1g}"
for fam, val in eq_top
])
# show worst 3 inequality families by residual, with their rhos
ineq_items = sorted(ineq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
ineq_top = ineq_items[:3]
ineq_str = ",".join([
f"{fam or 'ineq'}:{val:.2e}@"
f"{_rho_for(next(c for c in problem_ineqs if (getattr(c, 'family', '') or '') == fam and c.kind == 'ineq')):.1g}"
for fam, val in ineq_top
])
print(f"[ALM {it:02d}] f={f_val:.6g} | eq_inf={eq_inf:.2e} | ineq_inf={ineq_inf:.2e} "
f"| rho_h={rho_h:.2e} | rho_g={rho_g:.2e} | eq_fam[{eq_str}] | ineq_fam[{ineq_str}] "
f"| dynamic_range={dynamic_range:.4g}")
# stopping
if eq_inf <= cfg.tol_h and ineq_inf <= cfg.tol_g:
if verbose:
print(f"[ALM] converged at iter {it}")
break
# EMA smoothing to reduce jitter
if it == 0:
eq_inf_smooth = eq_inf
ineq_inf_smooth = ineq_inf
else:
eq_inf_smooth = cfg.ema_alpha * eq_inf + (1 - cfg.ema_alpha) * eq_inf_smooth
ineq_inf_smooth = cfg.ema_alpha * ineq_inf + (1 - cfg.ema_alpha) * ineq_inf_smooth
# -------- Residual-ratio controller --------
if cfg.adapt and it > 0:
# per-family rho adaptation for equalities
if getattr(cfg, "adapt_by_family", False):
# Equality family
for fam, cur in eq_inf_by_family.items():
prev = max(prev_eq_inf_by_family.get(fam, cur), eps)
# rho value for family: if absent, start at rho_h
rho_f = float(rho_eq_family.get(fam, rho_h))
if cur > cfg.tau_up_h * prev:
rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
elif cur < cfg.tau_down_h * prev:
rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
rho_eq_family[fam] = rho_f
# update prev map
prev_eq_inf_by_family = dict(eq_inf_by_family)
# Inequality family
for fam, cur in ineq_inf_by_family.items():
prev = max(prev_ineq_inf_by_family.get(fam, cur), eps)
# rho value for family: if absent, start at rho_g
rho_f = float(rho_ineq_family.get(fam, rho_g))
if cur > cfg.tau_up_g * prev:
rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
elif cur < cfg.tau_down_g * prev:
rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
rho_ineq_family[fam] = rho_f
# update prev map
prev_ineq_inf_by_family = dict(ineq_inf_by_family)
else:
# Equality group
if eq_inf_smooth > cfg.tau_up_h * max(prev_eq_inf, eps): # stalled or not shrinking
rho_h = min(cfg.gamma_up * rho_h, cfg.rho_max)
elif eq_inf_smooth < cfg.tau_down_h * max(prev_eq_inf, eps): # fast progress, allow relaxation
rho_h = max(cfg.gamma_down * rho_h, cfg.rho_min)
# Inequality group
if ineq_inf_smooth > cfg.tau_up_g * max(prev_ineq_inf, eps):
rho_g = min(cfg.gamma_up * rho_g, cfg.rho_max)
elif ineq_inf_smooth < cfg.tau_down_g * max(prev_ineq_inf, eps):
rho_g = max(cfg.gamma_down * rho_g, cfg.rho_min)
else:
# per-family rho adaptation for equalities
if getattr(cfg, "adapt_by_family", False):
# update prev map
prev_eq_inf_by_family = dict(eq_inf_by_family)
prev_ineq_inf_by_family = dict(ineq_inf_by_family)
# -------- Stagnation bump (safety net) --------
if cfg.use_stagnation_bump:
if getattr(cfg, "adapt_by_family", False):
# ---- Equality families ----
for fam, cur in eq_inf_by_family.items():
# initialize
if fam not in best_eq_by_family:
best_eq_by_family[fam] = float(cur)
no_imp_eq_by_family[fam] = 0
if cur <= best_eq_by_family[fam] * (1 - cfg.stagnation_factor):
best_eq_by_family[fam] = float(cur)
no_imp_eq_by_family[fam] = 0
else:
no_imp_eq_by_family[fam] += 1
if no_imp_eq_by_family[fam] >= cfg.patience_h:
rho_f = float(rho_eq_family.get(fam, rho_h))
rho_eq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
no_imp_eq_by_family[fam] = 0
# ---- Inequality families ----
for fam, cur in ineq_inf_by_family.items():
# initialize
if fam not in best_ineq_by_family:
best_ineq_by_family[fam] = float(cur)
no_imp_ineq_by_family[fam] = 0
if cur <= best_ineq_by_family[fam] * (1 - cfg.stagnation_factor):
best_ineq_by_family[fam] = float(cur)
no_imp_ineq_by_family[fam] = 0
else:
no_imp_ineq_by_family[fam] += 1
if no_imp_ineq_by_family[fam] >= cfg.patience_g:
rho_f = float(rho_ineq_family.get(fam, rho_g))
rho_ineq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
no_imp_ineq_by_family[fam] = 0
else:
# Equality stagnation
if eq_inf <= best_eq * (1 - cfg.stagnation_factor):
best_eq = eq_inf; no_imp_eq = 0
else:
no_imp_eq += 1
if no_imp_eq >= cfg.patience_h:
rho_h = min(2.0 * rho_h, cfg.rho_max); no_imp_eq = 0
# Inequality stagnation
if ineq_inf <= best_ineq * (1 - cfg.stagnation_factor):
best_ineq = ineq_inf; no_imp_ineq = 0
else:
no_imp_ineq += 1
if no_imp_ineq >= cfg.patience_g:
rho_g = min(2.0 * rho_g, cfg.rho_max); no_imp_ineq = 0
# -------- finalize for next iteration --------
prev_eq_inf = max(eq_inf_smooth, eps)
prev_ineq_inf = max(ineq_inf_smooth, eps)
# ---- decoding back to native levels (only if blocks) ----
decoded_native: Dict[int, float] = {} # maps original var anchor -> chosen level value
decoded_lifted: Dict[int, int] = {} # maps lifted start index -> argmax position (optional)
if has_blocks:
for blk, lift_idx in zip(registry.blocks, lifted_slices):
if not lift_idx:
continue
sl = np.array(lift_idx, int)
if len(sl) == 0:
continue
sblk = x[sl]
j = int(np.argmax(sblk)) # which level got selected in the block
orig_anchor = int(blk.idx[0]) # anchor original var id for this block
decoded_native[orig_anchor] = float(blk.levels[j])
decoded_lifted[sl[0]] = j # optional: lifted index -> chosen slot
return {
"x": x,
"decoded": decoded_native if has_blocks else {},
"decoded_debug": decoded_lifted if has_blocks else {},
"hist": hist,
}