# (C) Quantum Computing Inc., 2025.
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Tuple, Optional, Sequence, Union
import numpy as np
from collections import defaultdict
from eqc_models.base.polynomial import PolynomialModel
Array = np.ndarray
PolyTerm = Tuple[Tuple[int, ...], float]
[docs]
@dataclass
class ALMConstraint:
"""One constraint family; fun returns a vector; jac returns its Jacobian."""
kind: str # "eq" or "ineq"
fun: Callable[[Array], Array] # h(x) or g(x)
jac: Optional[Callable[[Array], Array]] = None
name: str = ""
family: str = ""
[docs]
@dataclass
class ALMBlock:
"""Lifted discrete variable block (optional)."""
idx: Sequence[int] # indices of block in the full x
levels: Array # (k,) level values (b_i)
enforce_sum_to_one: bool = True # register as equality via helper
enforce_one_hot: bool = True # ALM linearization with M = 11^T - I
[docs]
@dataclass
class ALMConfig:
# penalties
rho_h: float = 50.0 # equalities
rho_g: float = 50.0 # inequalities / one-hot
rho_min: float = 1e-3
rho_max: float = 1e3
# per-family penalty overrides
rho_eq_family: Dict[str, float] = field(default_factory=dict)
rho_ineq_family: Dict[str, float] = field(default_factory=dict)
# allow per-family adaptation toggle (optional)
adapt_by_family: bool = True
# adaptation toggles
adapt: bool = True
tau_up_h: float = 0.90
tau_down_h: float = 0.50
tau_up_g: float = 0.90
tau_down_g: float = 0.50
gamma_up: float = 2.0
gamma_down: float = 1.0
# tolerances & loop
tol_h: float = 1e-6
tol_g: float = 1e-6
max_outer: int = 100
# stagnation safety net
use_stagnation_bump: bool = True
patience_h: int = 10
patience_g: int = 10
stagnation_factor: float = 1e-3
# smoothing (optional)
ema_alpha: float = 0.3
# finite diff (only used if jac=None)
fd_eps: float = 1e-6
# activation threshold for projected ALM
act_tol: float = 1e-10
[docs]
class ConstraintRegistry:
"""
Holds constraints and block metadata; keeps ALMAlgorithm stateless. Register constraints and
(optional) lifted-discrete blocks here.
"""
def __init__(self):
self.constraints: List[ALMConstraint] = []
self.blocks: List[ALMBlock] = []
[docs]
def add_equality(self, fun, jac=None, name="", family=""):
self.constraints.append(ALMConstraint("eq", fun, jac, name, family))
[docs]
def add_inequality(self, fun, jac=None, name="", family=""):
self.constraints.append(ALMConstraint("ineq", fun, jac, name, family))
[docs]
def add_block(self, idx: Sequence[int], levels: Array, sum_to_one=True, one_hot=True):
self.blocks.append(ALMBlock(list(idx), np.asarray(levels, float), sum_to_one, one_hot))
[docs]
class ALMAlgorithm:
"""Stateless ALM outer loop. Call `run(model, registry, core, cfg, **core_kwargs)`."""
# ---- helpers (static) ----
@staticmethod
def _finite_diff_jac(fun: Callable[[Array], Array], x: Array, eps: float) -> Array:
y0 = fun(x)
m = int(np.prod(y0.shape))
y0 = y0.reshape(-1)
n = x.size
J = np.zeros((m, n), dtype=float)
for j in range(n):
xp = x.copy()
xp[j] += eps
J[:, j] = (fun(xp).reshape(-1) - y0) / eps
return J
@staticmethod
def _pairwise_M(k: int) -> Array:
return np.ones((k, k), dtype=float) - np.eye(k, dtype=float)
@staticmethod
def _sum_to_one_selector(n: int, idx: Sequence[int]) -> Array:
S = np.zeros((1, n), dtype=float)
S[0, np.array(list(idx), int)] = 1.0
return S
@staticmethod
def _make_sum1_fun(S):
return lambda x: S @ x - np.array([1.0])
@staticmethod
def _make_sum1_jac(S):
return lambda x: S
@staticmethod
def _make_onehot_fun(sl, M):
sl = np.array(sl, int)
def _f(x):
s = x[sl]
return np.array([float(s @ (M @ s))]) # shape (1,)
return _f
@staticmethod
def _make_onehot_jac(sl, M, n):
sl = np.array(sl, int)
def _J(x):
s = x[sl]
grad_blk = 2.0 * (M @ s) # (k,)
J = np.zeros((1, n), dtype=float) # shape (1, n)
J[0, sl] = grad_blk
return J
return _J
@staticmethod
def _poly_value(poly_terms: List[PolyTerm], x: Array) -> float:
val = 0.0
for inds, coeff in poly_terms:
prod = 1.0
for j in inds:
if j == 0:
continue
else:
prod *= x[j - 1]
val += coeff * prod
return float(val)
@staticmethod
def _merge_poly(poly_terms: Optional[List[PolyTerm]], Q_aug: Optional[Array],
c_aug: Optional[Array]) -> List[PolyTerm]:
"""
Merge ALM's quadratic/linear increments (Q_aug, c_aug) into the base polynomial term list `poly_terms`.
If 'poly_terms' is None, then turn x^T Q_aug x + c_aug^T x into polynomial monomials.
Terms are of the form:
((0, i), w) for linear, ((i, j), w) for quadratic.
"""
merged = list(poly_terms) if poly_terms is not None else []
if Q_aug is not None:
Qs = 0.5 * (Q_aug + Q_aug.T)
n = Qs.shape[0]
for i in range(n):
# diagonal contributes Qii * x_i^2
if Qs[i, i] != 0.0:
merged.append(((i + 1, i + 1), float(Qs[i, i])))
for j in range(i + 1, n):
q = 2.0 * Qs[i, j] # x^T Q x -> sum_{i<j} 2*Q_ij x_i x_j
if q != 0.0:
merged.append(((i + 1, j + 1), float(q)))
if c_aug is not None:
for i, ci in enumerate(c_aug):
if ci != 0.0:
merged.append(((0, i + 1), float(ci)))
return merged
@staticmethod
def _block_offsets(blocks: List[ALMBlock]) -> List[int]:
"""
Return starting offsets (0-based) for each lifted block in the
concatenated s-vector.
"""
offs, pos = [], 0
for blk in blocks:
offs.append(pos)
pos += len(blk.levels) # each block contributes k lifted coordinates
return offs
[docs]
@staticmethod
def lift_Qc_to_poly_terms(
Q_native: np.ndarray, # shape (m, m) over original discrete vars (one block per var)
c_native: np.ndarray, # shape (m,)
blocks: List[ALMBlock], # ALM blocks (in the same order as the rows/cols of Q_native, c_native)
) -> Tuple[List[Tuple[int, ...]], List[float]]:
"""
Expand a quadratic/linear objective over m native discrete variables into the lifted
(one-hot) s-space.
Given `Q_native` and `c_native` over original m discrete vars (one block per var), and
`blocks` (`list[ALMBlock]` in the same order as original vars), we enforce:
- For variable i with level values b_i (length k_i), we create k_i lifted coords s_{i,a}.
- Quadratic term: J_ij = Q_native[i,j] * (b_i b_j^T) contributes to pairs (s_{i,a}, s_{j,b})
- Linear term: C_i = c_native[i] * b_i contributes to s_{i,a}
Returns polynomial (indices, coeffs) over concatenated s variables.
"""
m = len(blocks)
assert Q_native.shape == (m, m), "Q_native must match number of blocks"
assert c_native.shape == (m,), "c_native must match number of blocks"
offs = ALMAlgorithm._block_offsets(blocks)
terms_acc = defaultdict(float)
# Quadratic lift: J = kron-expansion
for i in range(m):
bi = blocks[i].levels[:, None] # (k_i, 1)
oi = offs[i]
for j in range(m):
bj = blocks[j].levels[:, None] # (k_j, 1)
oj = offs[j]
J_ij = Q_native[i, j] * (bi @ bj.T) # (k_i, k_j)
if np.allclose(J_ij, 0.0):
continue
ki, kj = bi.shape[0], bj.shape[0]
for a in range(ki):
ia = oi + a + 1
for b in range(kj):
jb = oj + b + 1
w = float(J_ij[a, b])
if w == 0.0:
continue
if ia == jb:
terms_acc[(ia, ia)] += w
else:
# NOTE: we store each cross monomial once (i < j); for i==j block pairs,
# double to represent J_ab + J_ba
if i == j: # intra-block off-diagonal needs 2x
w *= 2.0
i1, i2 = (ia, jb) if ia < jb else (jb, ia)
terms_acc[(i1, i2)] += w
# Linear lift: C_i = L_i * b_i
for i in range(m):
b = blocks[i].levels
oi = offs[i]
for a, val in enumerate(b):
ia = oi + a + 1
w = float(c_native[i] * val)
if w != 0.0:
terms_acc[(0, ia)] += w
# Pack to PolynomialModel format
indices = [tuple(k) for k in terms_acc.keys()]
coeffs = [float(v) for v in terms_acc.values()]
return indices, coeffs
# ---- main entrypoint ----
[docs]
@staticmethod
def run(
base_model: PolynomialModel,
registry: ConstraintRegistry,
solver,
cfg: ALMConfig = ALMConfig(),
x0: Optional[Array] = None,
*,
parse_output=None,
verbose: bool = True,
**solver_kwargs,
) -> Dict[str, Union[Array, Dict[int, float], Dict]]:
"""
Solve with ALM. Keep all ALM state local to this call (no global side-effects).
Handles three modes:
(A) No blocks -> continuous; use base_model as-is
(B) Blocks + native base_model -> lift to s-space
(C) Blocks + already-lifted base_model -> use as-is (compat)
Returns:
{
"x": final iterate,
"decoded": {start_idx_of_block: level_value, ...} for lifted blocks,
"decoded_debug": {start_idx_of_block: native_device_value, ...},
"hist": { "eq_inf": [...], "ineq_inf": [...], "obj": [...], "x": [...] }
}
"""
blocks = registry.blocks
has_blocks = len(blocks) > 0
# ---- choose working model + dimension n ----
if not has_blocks:
# (A) continuous case: use the provided model directly
model = base_model
# Prefer model.n, fall back to bounds; else infer from polynomial indices
n = getattr(model, "n", None)
if n is None:
ub = getattr(model, "upper_bound", None)
lb = getattr(model, "lower_bound", None)
if ub is not None:
n = len(ub)
elif lb is not None:
n = len(lb)
else:
# infer from polynomial terms
n = 0
for inds in getattr(model, "indices", getattr(model.polynomial, "indices", [])):
for j in inds:
if j > 0:
n = max(n, j)
lifted_slices: List[List[int]] = []
else:
# (B/C) lifted (discrete) case
target_lifted_n = sum(len(blk.levels) for blk in blocks)
base_n = getattr(base_model, "n", None)
def _infer_n_from_terms(pm: PolynomialModel) -> int:
inds_list = getattr(pm, "indices", getattr(pm.polynomial, "indices", []))
mx = 0
for inds in inds_list:
for j in inds:
if j > mx:
mx = j
return int(mx)
if base_n is None:
base_n = _infer_n_from_terms(base_model)
# detect "already-lifted" native input (compat path)
already_lifted = (base_n == target_lifted_n)
if already_lifted:
# (C) use provided model directly; assume bounds already sensible
model = base_model
n = target_lifted_n
else:
# (B) lift from native space
# base_model must expose coefficients/indices compatible with this call
c_base, Q_base = base_model._quadratic_polynomial_to_qubo_coefficients(
getattr(base_model, "coefficients", getattr(base_model.polynomial, "coefficients", [])),
getattr(base_model, "indices", getattr(base_model.polynomial, "indices", [])),
getattr(base_model, "n")
)
assert Q_base.shape[0] == Q_base.shape[1]
assert c_base.shape[0] == Q_base.shape[0]
indices_lifted, coeffs_lifted = ALMAlgorithm.lift_Qc_to_poly_terms(Q_base, c_base, blocks)
model = PolynomialModel(coeffs_lifted, indices_lifted)
n = target_lifted_n
# set canonical [0,1] bounds for lifted s
setattr(model, "lower_bound", np.zeros(n, float))
setattr(model, "upper_bound", np.ones(n, float))
# ---- n and lifted_slices ----
lifted_slices = []
pos = 0
for blk in blocks:
k = len(blk.levels) # number of lifted coords for this block
lifted_slices.append(list(range(pos, pos + k))) # 0-based in lifted x
pos += k
# Algorithm initial solution and bounds
lb = getattr(model, "lower_bound", None)
ub = getattr(model, "upper_bound", None)
if x0 is not None:
x = np.asarray(x0, float).copy()
else:
# default init
if (lb is not None) and (ub is not None) and np.all(np.isfinite(lb)) and np.all(np.isfinite(ub)):
x = 0.5 * (np.asarray(lb, float) + np.asarray(ub, float))
else:
x = np.zeros(n, float)
# ---- collect constraints ----
problem_eqs = [c for c in registry.constraints if c.kind == "eq"]
problem_ineqs = [c for c in registry.constraints if c.kind == "ineq"]
# auto-install sum-to-one and one-hot as equalities
# (One-hot: s^T (11^T - I) s = 0))
def _install_block_equalities() -> List[ALMConstraint]:
if not has_blocks:
return []
eqs: List[ALMConstraint] = []
for blk, lift_idx in zip(registry.blocks, lifted_slices):
if blk.enforce_sum_to_one:
S = ALMAlgorithm._sum_to_one_selector(n, lift_idx)
eqs.append(ALMConstraint(
"eq",
fun=ALMAlgorithm._make_sum1_fun(S),
jac=ALMAlgorithm._make_sum1_jac(S),
name=f"sum_to_one_block_{lift_idx[0]}",
family="block_sum1",
))
if blk.enforce_one_hot:
k = len(lift_idx)
M = ALMAlgorithm._pairwise_M(k)
eqs.append(ALMConstraint(
"eq",
fun=ALMAlgorithm._make_onehot_fun(lift_idx, M),
jac=ALMAlgorithm._make_onehot_jac(lift_idx, M, n),
name=f"onehot_block_{lift_idx[0]}",
family="block_onehot",
))
return eqs
block_eqs = _install_block_equalities()
# Unified equality list (order is fixed for whole run)
full_eqs = problem_eqs + block_eqs
# Allocate multipliers for every equality in full_eqs
lam_eq = []
for csp in full_eqs:
r0 = csp.fun(x).reshape(-1)
lam_eq.append(np.zeros_like(r0, dtype=float))
# Inequality multipliers per user inequality
mu_ineq = []
for csp in problem_ineqs:
r0 = csp.fun(x).reshape(-1)
mu_ineq.append(np.zeros_like(r0, dtype=float))
rho_eq_family = dict(getattr(cfg, "rho_eq_family", {}) or {})
rho_ineq_family = dict(getattr(cfg, "rho_ineq_family", {}) or {})
def _rho_for(csp_k: ALMConstraint) -> float:
fam = getattr(csp_k, "family", "") or ""
if csp_k.kind == "eq":
return float(rho_eq_family.get(fam, rho_h))
else:
return float(rho_ineq_family.get(fam, rho_g))
# -------- running stats for adaptive penalties --------
rho_h, rho_g = cfg.rho_h, cfg.rho_g
best_eq, best_ineq = np.inf, np.inf
no_imp_eq = no_imp_ineq = 0
prev_eq_inf, prev_ineq_inf = np.inf, np.inf
eps = 1e-12
prev_eq_inf_by_family, prev_ineq_inf_by_family = {}, {}
# ---- per-family stagnation tracking (only used when cfg.adapt_by_family=True)
best_eq_by_family: Dict[str, float] = {}
best_ineq_by_family: Dict[str, float] = {}
no_imp_eq_by_family: Dict[str, int] = {}
no_imp_ineq_by_family: Dict[str, int] = {}
hist = {"eq_inf": [], "ineq_inf": [], "obj": [], "x": [],
# per-iteration logs for parameters/multipliers
"rho_h": [], "rho_g": [],
"rho_eq_family": [], "rho_ineq_family": [],
"eq_inf_by_family": [], "ineq_inf_by_family": [],
# critical for debugging hardware compatibility (per-iteration)
"dynamic_range": [],
}
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq":
continue
hist[f"lam_eq_max_idx{k_idx}"] = []
hist[f"lam_eq_min_idx{k_idx}"] = []
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq":
continue
hist[f"mu_ineq_max_idx{k_idx}"] = []
hist[f"mu_ineq_min_idx{k_idx}"] = []
for it in range(cfg.max_outer):
# -------- base polynomial (does not include fixed penalties here) --------
base_terms: List[PolyTerm] = list(zip(model.polynomial.indices, model.polynomial.coefficients))
# -------- ALM quadratic/linear pieces (assembled here, kept separate) --------
Q_aug = np.zeros((n, n), dtype=float)
c_aug = np.zeros(n, dtype=float)
have_aug = False
# (A) Equalities: linearize h near x^t => (rho/2)||A x - b||^2 + lam^T(Ax - b)
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq":
continue
h = csp.fun(x).reshape(-1)
A = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
A = np.atleast_2d(A)
assert A.shape[1] == n, f"A has {A.shape[1]} cols, expected {n}"
# linearization about current x: residual model r(x) = A x - b, with b = A x - h
b = A @ x - h
rho_k = _rho_for(csp)
Qk = 0.5 * rho_k * (A.T @ A)
ck = (A.T @ lam_eq[k_idx]) - rho_k * (A.T @ b)
Q_aug += Qk
c_aug += ck
have_aug = True
# (B) Inequalities: projected ALM. Linearize g near x^t.
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq":
continue
g = csp.fun(x).reshape(-1)
G = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps)
G = np.atleast_2d(G)
assert G.shape[1] == n, f"G has {G.shape[1]} cols, expected {n}"
d = G @ x - g
rho_k = _rho_for(csp)
# Activation measure at current iterate; meaning, the current violating inequality components:
# g(x) + mu/rho; Powell-Hestenes-Rockafellar shifted residual
y = G @ x - d + mu_ineq[k_idx] / rho_k
active = (y > cfg.act_tol)
if np.any(active):
GA = G[active, :]
muA = mu_ineq[k_idx][active]
gA = g[active]
# Q += (rho/2) * GA^T GA
Qk = 0.5 * rho_k * (GA.T @ GA)
# c += GA^T mu - rho * GA^T (GA x - gA); where GA x - gA is active measures of d = G @ x - g
ck = (GA.T @ muA) - rho_k * (GA.T @ (GA @ x - gA))
Q_aug += Qk
c_aug += ck
have_aug = True
# -------- build merged polynomial for the core solver --------
all_terms = ALMAlgorithm._merge_poly(base_terms, Q_aug if have_aug else None,
c_aug if have_aug else None)
idxs, coeffs = zip(*[(inds, w) for (inds, w) in all_terms]) if all_terms else ([], [])
poly_model = PolynomialModel(list(coeffs), list(idxs))
if lb is not None and hasattr(poly_model, "lower_bound"):
poly_model.lower_bound = np.asarray(lb, float)
if ub is not None and hasattr(poly_model, "upper_bound"):
poly_model.upper_bound = np.asarray(ub, float)
x_ws = x.copy()
# Convention: many cores look for one of these fields if present.
# Use one or more to be future-proof; harmless if ignored.
setattr(poly_model, "initial_guess", x_ws)
setattr(poly_model, "warm_start", x_ws)
setattr(poly_model, "x0", x_ws)
# -------- inner solve --------
out = solver.solve(poly_model, **solver_kwargs)
# -------- parse --------
if parse_output:
x = parse_output(out)
else:
# default: support (value, x) or `.x` or raw x
if isinstance(out, tuple) and len(out) == 2:
_, x = out
elif isinstance(out, dict) and "results" in out and "solutions" in out["results"]:
x = out["results"]["solutions"][0]
elif isinstance(out, dict) and "x" in out:
x = out["x"]
else:
x = getattr(out, "x", out)
x = np.asarray(x, float)
# ---- multiplier tracking ----
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq": continue
hist[f"lam_eq_max_idx{k_idx}"].append(float(np.max(lam_eq[k_idx])))
hist[f"lam_eq_min_idx{k_idx}"].append(float(np.min(lam_eq[k_idx])))
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq": continue
hist[f"mu_ineq_max_idx{k_idx}"].append(float(np.max(mu_ineq[k_idx])))
hist[f"mu_ineq_min_idx{k_idx}"].append(float(np.min(mu_ineq[k_idx])))
# -------- residuals + multiplier updates --------
eq_infs = []
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq": continue
r = csp.fun(x).reshape(-1)
rho_k = _rho_for(csp)
lam_eq[k_idx] = lam_eq[k_idx] + rho_k * r
if r.size:
eq_infs.append(np.max(np.abs(r)))
eq_inf = float(np.max(eq_infs)) if eq_infs else 0.0
ineq_infs = []
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq": continue
r = csp.fun(x).reshape(-1)
rho_k = _rho_for(csp)
mu_ineq[k_idx] = np.maximum(0.0, mu_ineq[k_idx] + rho_k * r)
if r.size:
ineq_infs.append(np.max(np.maximum(0.0, r)))
ineq_inf = float(np.max(ineq_infs)) if ineq_infs else 0.0
assert len(lam_eq) == len(full_eqs)
assert len(mu_ineq) == len(problem_ineqs)
# ---- per-family residual telemetry ----
eq_inf_by_family = {}
for k_idx, csp in enumerate(full_eqs):
if csp.kind != "eq": continue
fam = getattr(csp, "family", "") or ""
r = csp.fun(x).reshape(-1)
v = float(np.max(np.abs(r))) if r.size else 0.0
eq_inf_by_family[fam] = max(eq_inf_by_family.get(fam, 0.0), v)
ineq_inf_by_family = {}
for k_idx, csp in enumerate(problem_ineqs):
if csp.kind != "ineq": continue
fam = getattr(csp, "family", "") or ""
r = csp.fun(x).reshape(-1)
v = float(np.max(np.maximum(0.0, r))) if r.size else 0.0
ineq_inf_by_family[fam] = max(ineq_inf_by_family.get(fam, 0.0), v)
# evaluate base polynomial only (ca add aug value if want to track full L_A)
f_val = ALMAlgorithm._poly_value(base_terms, x)
# evaluate dynamic range
dynamic_range = poly_model.dynamic_range
hist["eq_inf"].append(eq_inf); hist["ineq_inf"].append(ineq_inf)
hist["obj"].append(float(f_val)); hist["x"].append(x.copy())
hist["dynamic_range"].append(float(dynamic_range))
# parameter tracking
hist["rho_h"].append(float(rho_h)); hist["rho_g"].append(float(rho_g))
hist["rho_eq_family"].append(dict(rho_eq_family))
hist["rho_ineq_family"].append(dict(rho_ineq_family))
hist["eq_inf_by_family"].append(dict(eq_inf_by_family))
hist["ineq_inf_by_family"].append(dict(ineq_inf_by_family))
if verbose:
# show worst 3 equality families by residual, with their rhos
eq_items = sorted(eq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
eq_top = eq_items[:3]
eq_str = ",".join([
f"{fam or 'eq'}:{val:.2e}@"
f"{_rho_for(next(c for c in full_eqs if (getattr(c,'family','') or '')==fam and c.kind=='eq')):.1g}"
for fam, val in eq_top
])
# show worst 3 inequality families by residual, with their rhos
ineq_items = sorted(ineq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True)
ineq_top = ineq_items[:3]
ineq_str = ",".join([
f"{fam or 'ineq'}:{val:.2e}@"
f"{_rho_for(next(c for c in problem_ineqs if (getattr(c, 'family', '') or '') == fam and c.kind == 'ineq')):.1g}"
for fam, val in ineq_top
])
print(f"[ALM {it:02d}] f={f_val:.6g} | eq_inf={eq_inf:.2e} | ineq_inf={ineq_inf:.2e} "
f"| rho_h={rho_h:.2e} | rho_g={rho_g:.2e} | eq_fam[{eq_str}] | ineq_fam[{ineq_str}] "
f"| dynamic_range={dynamic_range:.4g}")
# stopping
if eq_inf <= cfg.tol_h and ineq_inf <= cfg.tol_g:
if verbose:
print(f"[ALM] converged at iter {it}")
break
# EMA smoothing to reduce jitter
if it == 0:
eq_inf_smooth = eq_inf
ineq_inf_smooth = ineq_inf
else:
eq_inf_smooth = cfg.ema_alpha * eq_inf + (1 - cfg.ema_alpha) * eq_inf_smooth
ineq_inf_smooth = cfg.ema_alpha * ineq_inf + (1 - cfg.ema_alpha) * ineq_inf_smooth
# -------- Residual-ratio controller --------
if cfg.adapt and it > 0:
# per-family rho adaptation for equalities
if getattr(cfg, "adapt_by_family", False):
# Equality family
for fam, cur in eq_inf_by_family.items():
prev = max(prev_eq_inf_by_family.get(fam, cur), eps)
# rho value for family: if absent, start at rho_h
rho_f = float(rho_eq_family.get(fam, rho_h))
if cur > cfg.tau_up_h * prev:
rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
elif cur < cfg.tau_down_h * prev:
rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
rho_eq_family[fam] = rho_f
# update prev map
prev_eq_inf_by_family = dict(eq_inf_by_family)
# Inequality family
for fam, cur in ineq_inf_by_family.items():
prev = max(prev_ineq_inf_by_family.get(fam, cur), eps)
# rho value for family: if absent, start at rho_g
rho_f = float(rho_ineq_family.get(fam, rho_g))
if cur > cfg.tau_up_g * prev:
rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max)
elif cur < cfg.tau_down_g * prev:
rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min)
rho_ineq_family[fam] = rho_f
# update prev map
prev_ineq_inf_by_family = dict(ineq_inf_by_family)
else:
# Equality group
if eq_inf_smooth > cfg.tau_up_h * max(prev_eq_inf, eps): # stalled or not shrinking
rho_h = min(cfg.gamma_up * rho_h, cfg.rho_max)
elif eq_inf_smooth < cfg.tau_down_h * max(prev_eq_inf, eps): # fast progress, allow relaxation
rho_h = max(cfg.gamma_down * rho_h, cfg.rho_min)
# Inequality group
if ineq_inf_smooth > cfg.tau_up_g * max(prev_ineq_inf, eps):
rho_g = min(cfg.gamma_up * rho_g, cfg.rho_max)
elif ineq_inf_smooth < cfg.tau_down_g * max(prev_ineq_inf, eps):
rho_g = max(cfg.gamma_down * rho_g, cfg.rho_min)
else:
# per-family rho adaptation for equalities
if getattr(cfg, "adapt_by_family", False):
# update prev map
prev_eq_inf_by_family = dict(eq_inf_by_family)
prev_ineq_inf_by_family = dict(ineq_inf_by_family)
# -------- Stagnation bump (safety net) --------
if cfg.use_stagnation_bump:
if getattr(cfg, "adapt_by_family", False):
# ---- Equality families ----
for fam, cur in eq_inf_by_family.items():
# initialize
if fam not in best_eq_by_family:
best_eq_by_family[fam] = float(cur)
no_imp_eq_by_family[fam] = 0
if cur <= best_eq_by_family[fam] * (1 - cfg.stagnation_factor):
best_eq_by_family[fam] = float(cur)
no_imp_eq_by_family[fam] = 0
else:
no_imp_eq_by_family[fam] += 1
if no_imp_eq_by_family[fam] >= cfg.patience_h:
rho_f = float(rho_eq_family.get(fam, rho_h))
rho_eq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
no_imp_eq_by_family[fam] = 0
# ---- Inequality families ----
for fam, cur in ineq_inf_by_family.items():
# initialize
if fam not in best_ineq_by_family:
best_ineq_by_family[fam] = float(cur)
no_imp_ineq_by_family[fam] = 0
if cur <= best_ineq_by_family[fam] * (1 - cfg.stagnation_factor):
best_ineq_by_family[fam] = float(cur)
no_imp_ineq_by_family[fam] = 0
else:
no_imp_ineq_by_family[fam] += 1
if no_imp_ineq_by_family[fam] >= cfg.patience_g:
rho_f = float(rho_ineq_family.get(fam, rho_g))
rho_ineq_family[fam] = min(2.0 * rho_f, cfg.rho_max)
no_imp_ineq_by_family[fam] = 0
else:
# Equality stagnation
if eq_inf <= best_eq * (1 - cfg.stagnation_factor):
best_eq = eq_inf; no_imp_eq = 0
else:
no_imp_eq += 1
if no_imp_eq >= cfg.patience_h:
rho_h = min(2.0 * rho_h, cfg.rho_max); no_imp_eq = 0
# Inequality stagnation
if ineq_inf <= best_ineq * (1 - cfg.stagnation_factor):
best_ineq = ineq_inf; no_imp_ineq = 0
else:
no_imp_ineq += 1
if no_imp_ineq >= cfg.patience_g:
rho_g = min(2.0 * rho_g, cfg.rho_max); no_imp_ineq = 0
# -------- finalize for next iteration --------
prev_eq_inf = max(eq_inf_smooth, eps)
prev_ineq_inf = max(ineq_inf_smooth, eps)
# ---- decoding back to native levels (only if blocks) ----
decoded_native: Dict[int, float] = {} # maps original var anchor -> chosen level value
decoded_lifted: Dict[int, int] = {} # maps lifted start index -> argmax position (optional)
if has_blocks:
for blk, lift_idx in zip(registry.blocks, lifted_slices):
if not lift_idx:
continue
sl = np.array(lift_idx, int)
if len(sl) == 0:
continue
sblk = x[sl]
j = int(np.argmax(sblk)) # which level got selected in the block
orig_anchor = int(blk.idx[0]) # anchor original var id for this block
decoded_native[orig_anchor] = float(blk.levels[j])
decoded_lifted[sl[0]] = j # optional: lifted index -> chosen slot
return {
"x": x,
"decoded": decoded_native if has_blocks else {},
"decoded_debug": decoded_lifted if has_blocks else {},
"hist": hist,
}