Source code for eqc_models.algorithms.alm

# (C) Quantum Computing Inc., 2025.
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Tuple, Optional, Sequence, Union
import numpy as np
from collections import defaultdict
from eqc_models.base.polynomial import PolynomialModel

Array = np.ndarray
PolyTerm = Tuple[Tuple[int, ...], float]


[docs] @dataclass class ALMConstraint: """One constraint family; fun returns a vector; jac returns its Jacobian.""" kind: str # "eq" or "ineq" fun: Callable[[Array], Array] # h(x) or g(x) jac: Optional[Callable[[Array], Array]] = None name: str = "" family: str = ""
[docs] @dataclass class ALMBlock: """Lifted discrete variable block (optional).""" idx: Sequence[int] # indices of block in the full x levels: Array # (k,) level values (b_i) enforce_sum_to_one: bool = True # register as equality via helper enforce_one_hot: bool = True # ALM linearization with M = 11^T - I
[docs] @dataclass class ALMConfig: # penalties rho_h: float = 50.0 # equalities rho_g: float = 50.0 # inequalities / one-hot rho_min: float = 1e-3 rho_max: float = 1e3 # per-family penalty overrides rho_eq_family: Dict[str, float] = field(default_factory=dict) rho_ineq_family: Dict[str, float] = field(default_factory=dict) # allow per-family adaptation toggle (optional) adapt_by_family: bool = True # adaptation toggles adapt: bool = True tau_up_h: float = 0.90 tau_down_h: float = 0.50 tau_up_g: float = 0.90 tau_down_g: float = 0.50 gamma_up: float = 2.0 gamma_down: float = 1.0 # tolerances & loop tol_h: float = 1e-6 tol_g: float = 1e-6 max_outer: int = 100 # stagnation safety net use_stagnation_bump: bool = True patience_h: int = 10 patience_g: int = 10 stagnation_factor: float = 1e-3 # smoothing (optional) ema_alpha: float = 0.3 # finite diff (only used if jac=None) fd_eps: float = 1e-6 # activation threshold for projected ALM act_tol: float = 1e-10
[docs] class ConstraintRegistry: """ Holds constraints and block metadata; keeps ALMAlgorithm stateless. Register constraints and (optional) lifted-discrete blocks here. """ def __init__(self): self.constraints: List[ALMConstraint] = [] self.blocks: List[ALMBlock] = []
[docs] def add_equality(self, fun, jac=None, name="", family=""): self.constraints.append(ALMConstraint("eq", fun, jac, name, family))
[docs] def add_inequality(self, fun, jac=None, name="", family=""): self.constraints.append(ALMConstraint("ineq", fun, jac, name, family))
[docs] def add_block(self, idx: Sequence[int], levels: Array, sum_to_one=True, one_hot=True): self.blocks.append(ALMBlock(list(idx), np.asarray(levels, float), sum_to_one, one_hot))
[docs] class ALMAlgorithm: """Stateless ALM outer loop. Call `run(model, registry, core, cfg, **core_kwargs)`.""" # ---- helpers (static) ---- @staticmethod def _finite_diff_jac(fun: Callable[[Array], Array], x: Array, eps: float) -> Array: y0 = fun(x) m = int(np.prod(y0.shape)) y0 = y0.reshape(-1) n = x.size J = np.zeros((m, n), dtype=float) for j in range(n): xp = x.copy() xp[j] += eps J[:, j] = (fun(xp).reshape(-1) - y0) / eps return J @staticmethod def _pairwise_M(k: int) -> Array: return np.ones((k, k), dtype=float) - np.eye(k, dtype=float) @staticmethod def _sum_to_one_selector(n: int, idx: Sequence[int]) -> Array: S = np.zeros((1, n), dtype=float) S[0, np.array(list(idx), int)] = 1.0 return S @staticmethod def _make_sum1_fun(S): return lambda x: S @ x - np.array([1.0]) @staticmethod def _make_sum1_jac(S): return lambda x: S @staticmethod def _make_onehot_fun(sl, M): sl = np.array(sl, int) def _f(x): s = x[sl] return np.array([float(s @ (M @ s))]) # shape (1,) return _f @staticmethod def _make_onehot_jac(sl, M, n): sl = np.array(sl, int) def _J(x): s = x[sl] grad_blk = 2.0 * (M @ s) # (k,) J = np.zeros((1, n), dtype=float) # shape (1, n) J[0, sl] = grad_blk return J return _J @staticmethod def _poly_value(poly_terms: List[PolyTerm], x: Array) -> float: val = 0.0 for inds, coeff in poly_terms: prod = 1.0 for j in inds: if j == 0: continue else: prod *= x[j - 1] val += coeff * prod return float(val) @staticmethod def _merge_poly(poly_terms: Optional[List[PolyTerm]], Q_aug: Optional[Array], c_aug: Optional[Array]) -> List[PolyTerm]: """ Merge ALM's quadratic/linear increments (Q_aug, c_aug) into the base polynomial term list `poly_terms`. If 'poly_terms' is None, then turn x^T Q_aug x + c_aug^T x into polynomial monomials. Terms are of the form: ((0, i), w) for linear, ((i, j), w) for quadratic. """ merged = list(poly_terms) if poly_terms is not None else [] if Q_aug is not None: Qs = 0.5 * (Q_aug + Q_aug.T) n = Qs.shape[0] for i in range(n): # diagonal contributes Qii * x_i^2 if Qs[i, i] != 0.0: merged.append(((i + 1, i + 1), float(Qs[i, i]))) for j in range(i + 1, n): q = 2.0 * Qs[i, j] # x^T Q x -> sum_{i<j} 2*Q_ij x_i x_j if q != 0.0: merged.append(((i + 1, j + 1), float(q))) if c_aug is not None: for i, ci in enumerate(c_aug): if ci != 0.0: merged.append(((0, i + 1), float(ci))) return merged @staticmethod def _block_offsets(blocks: List[ALMBlock]) -> List[int]: """ Return starting offsets (0-based) for each lifted block in the concatenated s-vector. """ offs, pos = [], 0 for blk in blocks: offs.append(pos) pos += len(blk.levels) # each block contributes k lifted coordinates return offs
[docs] @staticmethod def lift_Qc_to_poly_terms( Q_native: np.ndarray, # shape (m, m) over original discrete vars (one block per var) c_native: np.ndarray, # shape (m,) blocks: List[ALMBlock], # ALM blocks (in the same order as the rows/cols of Q_native, c_native) ) -> Tuple[List[Tuple[int, ...]], List[float]]: """ Expand a quadratic/linear objective over m native discrete variables into the lifted (one-hot) s-space. Given `Q_native` and `c_native` over original m discrete vars (one block per var), and `blocks` (`list[ALMBlock]` in the same order as original vars), we enforce: - For variable i with level values b_i (length k_i), we create k_i lifted coords s_{i,a}. - Quadratic term: J_ij = Q_native[i,j] * (b_i b_j^T) contributes to pairs (s_{i,a}, s_{j,b}) - Linear term: C_i = c_native[i] * b_i contributes to s_{i,a} Returns polynomial (indices, coeffs) over concatenated s variables. """ m = len(blocks) assert Q_native.shape == (m, m), "Q_native must match number of blocks" assert c_native.shape == (m,), "c_native must match number of blocks" offs = ALMAlgorithm._block_offsets(blocks) terms_acc = defaultdict(float) # Quadratic lift: J = kron-expansion for i in range(m): bi = blocks[i].levels[:, None] # (k_i, 1) oi = offs[i] for j in range(m): bj = blocks[j].levels[:, None] # (k_j, 1) oj = offs[j] J_ij = Q_native[i, j] * (bi @ bj.T) # (k_i, k_j) if np.allclose(J_ij, 0.0): continue ki, kj = bi.shape[0], bj.shape[0] for a in range(ki): ia = oi + a + 1 for b in range(kj): jb = oj + b + 1 w = float(J_ij[a, b]) if w == 0.0: continue if ia == jb: terms_acc[(ia, ia)] += w else: # NOTE: we store each cross monomial once (i < j); for i==j block pairs, # double to represent J_ab + J_ba if i == j: # intra-block off-diagonal needs 2x w *= 2.0 i1, i2 = (ia, jb) if ia < jb else (jb, ia) terms_acc[(i1, i2)] += w # Linear lift: C_i = L_i * b_i for i in range(m): b = blocks[i].levels oi = offs[i] for a, val in enumerate(b): ia = oi + a + 1 w = float(c_native[i] * val) if w != 0.0: terms_acc[(0, ia)] += w # Pack to PolynomialModel format indices = [tuple(k) for k in terms_acc.keys()] coeffs = [float(v) for v in terms_acc.values()] return indices, coeffs
# ---- main entrypoint ----
[docs] @staticmethod def run( base_model: PolynomialModel, registry: ConstraintRegistry, solver, cfg: ALMConfig = ALMConfig(), x0: Optional[Array] = None, *, parse_output=None, verbose: bool = True, **solver_kwargs, ) -> Dict[str, Union[Array, Dict[int, float], Dict]]: """ Solve with ALM. Keep all ALM state local to this call (no global side-effects). Handles three modes: (A) No blocks -> continuous; use base_model as-is (B) Blocks + native base_model -> lift to s-space (C) Blocks + already-lifted base_model -> use as-is (compat) Returns: { "x": final iterate, "decoded": {start_idx_of_block: level_value, ...} for lifted blocks, "decoded_debug": {start_idx_of_block: native_device_value, ...}, "hist": { "eq_inf": [...], "ineq_inf": [...], "obj": [...], "x": [...] } } """ blocks = registry.blocks has_blocks = len(blocks) > 0 # ---- choose working model + dimension n ---- if not has_blocks: # (A) continuous case: use the provided model directly model = base_model # Prefer model.n, fall back to bounds; else infer from polynomial indices n = getattr(model, "n", None) if n is None: ub = getattr(model, "upper_bound", None) lb = getattr(model, "lower_bound", None) if ub is not None: n = len(ub) elif lb is not None: n = len(lb) else: # infer from polynomial terms n = 0 for inds in getattr(model, "indices", getattr(model.polynomial, "indices", [])): for j in inds: if j > 0: n = max(n, j) lifted_slices: List[List[int]] = [] else: # (B/C) lifted (discrete) case target_lifted_n = sum(len(blk.levels) for blk in blocks) base_n = getattr(base_model, "n", None) def _infer_n_from_terms(pm: PolynomialModel) -> int: inds_list = getattr(pm, "indices", getattr(pm.polynomial, "indices", [])) mx = 0 for inds in inds_list: for j in inds: if j > mx: mx = j return int(mx) if base_n is None: base_n = _infer_n_from_terms(base_model) # detect "already-lifted" native input (compat path) already_lifted = (base_n == target_lifted_n) if already_lifted: # (C) use provided model directly; assume bounds already sensible model = base_model n = target_lifted_n else: # (B) lift from native space # base_model must expose coefficients/indices compatible with this call c_base, Q_base = base_model._quadratic_polynomial_to_qubo_coefficients( getattr(base_model, "coefficients", getattr(base_model.polynomial, "coefficients", [])), getattr(base_model, "indices", getattr(base_model.polynomial, "indices", [])), getattr(base_model, "n") ) assert Q_base.shape[0] == Q_base.shape[1] assert c_base.shape[0] == Q_base.shape[0] indices_lifted, coeffs_lifted = ALMAlgorithm.lift_Qc_to_poly_terms(Q_base, c_base, blocks) model = PolynomialModel(coeffs_lifted, indices_lifted) n = target_lifted_n # set canonical [0,1] bounds for lifted s setattr(model, "lower_bound", np.zeros(n, float)) setattr(model, "upper_bound", np.ones(n, float)) # ---- n and lifted_slices ---- lifted_slices = [] pos = 0 for blk in blocks: k = len(blk.levels) # number of lifted coords for this block lifted_slices.append(list(range(pos, pos + k))) # 0-based in lifted x pos += k # Algorithm initial solution and bounds lb = getattr(model, "lower_bound", None) ub = getattr(model, "upper_bound", None) if x0 is not None: x = np.asarray(x0, float).copy() else: # default init if (lb is not None) and (ub is not None) and np.all(np.isfinite(lb)) and np.all(np.isfinite(ub)): x = 0.5 * (np.asarray(lb, float) + np.asarray(ub, float)) else: x = np.zeros(n, float) # ---- collect constraints ---- problem_eqs = [c for c in registry.constraints if c.kind == "eq"] problem_ineqs = [c for c in registry.constraints if c.kind == "ineq"] # auto-install sum-to-one and one-hot as equalities # (One-hot: s^T (11^T - I) s = 0)) def _install_block_equalities() -> List[ALMConstraint]: if not has_blocks: return [] eqs: List[ALMConstraint] = [] for blk, lift_idx in zip(registry.blocks, lifted_slices): if blk.enforce_sum_to_one: S = ALMAlgorithm._sum_to_one_selector(n, lift_idx) eqs.append(ALMConstraint( "eq", fun=ALMAlgorithm._make_sum1_fun(S), jac=ALMAlgorithm._make_sum1_jac(S), name=f"sum_to_one_block_{lift_idx[0]}", family="block_sum1", )) if blk.enforce_one_hot: k = len(lift_idx) M = ALMAlgorithm._pairwise_M(k) eqs.append(ALMConstraint( "eq", fun=ALMAlgorithm._make_onehot_fun(lift_idx, M), jac=ALMAlgorithm._make_onehot_jac(lift_idx, M, n), name=f"onehot_block_{lift_idx[0]}", family="block_onehot", )) return eqs block_eqs = _install_block_equalities() # Unified equality list (order is fixed for whole run) full_eqs = problem_eqs + block_eqs # Allocate multipliers for every equality in full_eqs lam_eq = [] for csp in full_eqs: r0 = csp.fun(x).reshape(-1) lam_eq.append(np.zeros_like(r0, dtype=float)) # Inequality multipliers per user inequality mu_ineq = [] for csp in problem_ineqs: r0 = csp.fun(x).reshape(-1) mu_ineq.append(np.zeros_like(r0, dtype=float)) rho_eq_family = dict(getattr(cfg, "rho_eq_family", {}) or {}) rho_ineq_family = dict(getattr(cfg, "rho_ineq_family", {}) or {}) def _rho_for(csp_k: ALMConstraint) -> float: fam = getattr(csp_k, "family", "") or "" if csp_k.kind == "eq": return float(rho_eq_family.get(fam, rho_h)) else: return float(rho_ineq_family.get(fam, rho_g)) # -------- running stats for adaptive penalties -------- rho_h, rho_g = cfg.rho_h, cfg.rho_g best_eq, best_ineq = np.inf, np.inf no_imp_eq = no_imp_ineq = 0 prev_eq_inf, prev_ineq_inf = np.inf, np.inf eps = 1e-12 prev_eq_inf_by_family, prev_ineq_inf_by_family = {}, {} # ---- per-family stagnation tracking (only used when cfg.adapt_by_family=True) best_eq_by_family: Dict[str, float] = {} best_ineq_by_family: Dict[str, float] = {} no_imp_eq_by_family: Dict[str, int] = {} no_imp_ineq_by_family: Dict[str, int] = {} hist = {"eq_inf": [], "ineq_inf": [], "obj": [], "x": [], # per-iteration logs for parameters/multipliers "rho_h": [], "rho_g": [], "rho_eq_family": [], "rho_ineq_family": [], "eq_inf_by_family": [], "ineq_inf_by_family": [], # critical for debugging hardware compatibility (per-iteration) "dynamic_range": [], } for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue hist[f"lam_eq_max_idx{k_idx}"] = [] hist[f"lam_eq_min_idx{k_idx}"] = [] for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue hist[f"mu_ineq_max_idx{k_idx}"] = [] hist[f"mu_ineq_min_idx{k_idx}"] = [] for it in range(cfg.max_outer): # -------- base polynomial (does not include fixed penalties here) -------- base_terms: List[PolyTerm] = list(zip(model.polynomial.indices, model.polynomial.coefficients)) # -------- ALM quadratic/linear pieces (assembled here, kept separate) -------- Q_aug = np.zeros((n, n), dtype=float) c_aug = np.zeros(n, dtype=float) have_aug = False # (A) Equalities: linearize h near x^t => (rho/2)||A x - b||^2 + lam^T(Ax - b) for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue h = csp.fun(x).reshape(-1) A = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps) A = np.atleast_2d(A) assert A.shape[1] == n, f"A has {A.shape[1]} cols, expected {n}" # linearization about current x: residual model r(x) = A x - b, with b = A x - h b = A @ x - h rho_k = _rho_for(csp) Qk = 0.5 * rho_k * (A.T @ A) ck = (A.T @ lam_eq[k_idx]) - rho_k * (A.T @ b) Q_aug += Qk c_aug += ck have_aug = True # (B) Inequalities: projected ALM. Linearize g near x^t. for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue g = csp.fun(x).reshape(-1) G = csp.jac(x) if csp.jac is not None else ALMAlgorithm._finite_diff_jac(csp.fun, x, cfg.fd_eps) G = np.atleast_2d(G) assert G.shape[1] == n, f"G has {G.shape[1]} cols, expected {n}" d = G @ x - g rho_k = _rho_for(csp) # Activation measure at current iterate; meaning, the current violating inequality components: # g(x) + mu/rho; Powell-Hestenes-Rockafellar shifted residual y = G @ x - d + mu_ineq[k_idx] / rho_k active = (y > cfg.act_tol) if np.any(active): GA = G[active, :] muA = mu_ineq[k_idx][active] gA = g[active] # Q += (rho/2) * GA^T GA Qk = 0.5 * rho_k * (GA.T @ GA) # c += GA^T mu - rho * GA^T (GA x - gA); where GA x - gA is active measures of d = G @ x - g ck = (GA.T @ muA) - rho_k * (GA.T @ (GA @ x - gA)) Q_aug += Qk c_aug += ck have_aug = True # -------- build merged polynomial for the core solver -------- all_terms = ALMAlgorithm._merge_poly(base_terms, Q_aug if have_aug else None, c_aug if have_aug else None) idxs, coeffs = zip(*[(inds, w) for (inds, w) in all_terms]) if all_terms else ([], []) poly_model = PolynomialModel(list(coeffs), list(idxs)) if lb is not None and hasattr(poly_model, "lower_bound"): poly_model.lower_bound = np.asarray(lb, float) if ub is not None and hasattr(poly_model, "upper_bound"): poly_model.upper_bound = np.asarray(ub, float) x_ws = x.copy() # Convention: many cores look for one of these fields if present. # Use one or more to be future-proof; harmless if ignored. setattr(poly_model, "initial_guess", x_ws) setattr(poly_model, "warm_start", x_ws) setattr(poly_model, "x0", x_ws) # -------- inner solve -------- out = solver.solve(poly_model, **solver_kwargs) # -------- parse -------- if parse_output: x = parse_output(out) else: # default: support (value, x) or `.x` or raw x if isinstance(out, tuple) and len(out) == 2: _, x = out elif isinstance(out, dict) and "results" in out and "solutions" in out["results"]: x = out["results"]["solutions"][0] elif isinstance(out, dict) and "x" in out: x = out["x"] else: x = getattr(out, "x", out) x = np.asarray(x, float) # ---- multiplier tracking ---- for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue hist[f"lam_eq_max_idx{k_idx}"].append(float(np.max(lam_eq[k_idx]))) hist[f"lam_eq_min_idx{k_idx}"].append(float(np.min(lam_eq[k_idx]))) for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue hist[f"mu_ineq_max_idx{k_idx}"].append(float(np.max(mu_ineq[k_idx]))) hist[f"mu_ineq_min_idx{k_idx}"].append(float(np.min(mu_ineq[k_idx]))) # -------- residuals + multiplier updates -------- eq_infs = [] for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue r = csp.fun(x).reshape(-1) rho_k = _rho_for(csp) lam_eq[k_idx] = lam_eq[k_idx] + rho_k * r if r.size: eq_infs.append(np.max(np.abs(r))) eq_inf = float(np.max(eq_infs)) if eq_infs else 0.0 ineq_infs = [] for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue r = csp.fun(x).reshape(-1) rho_k = _rho_for(csp) mu_ineq[k_idx] = np.maximum(0.0, mu_ineq[k_idx] + rho_k * r) if r.size: ineq_infs.append(np.max(np.maximum(0.0, r))) ineq_inf = float(np.max(ineq_infs)) if ineq_infs else 0.0 assert len(lam_eq) == len(full_eqs) assert len(mu_ineq) == len(problem_ineqs) # ---- per-family residual telemetry ---- eq_inf_by_family = {} for k_idx, csp in enumerate(full_eqs): if csp.kind != "eq": continue fam = getattr(csp, "family", "") or "" r = csp.fun(x).reshape(-1) v = float(np.max(np.abs(r))) if r.size else 0.0 eq_inf_by_family[fam] = max(eq_inf_by_family.get(fam, 0.0), v) ineq_inf_by_family = {} for k_idx, csp in enumerate(problem_ineqs): if csp.kind != "ineq": continue fam = getattr(csp, "family", "") or "" r = csp.fun(x).reshape(-1) v = float(np.max(np.maximum(0.0, r))) if r.size else 0.0 ineq_inf_by_family[fam] = max(ineq_inf_by_family.get(fam, 0.0), v) # evaluate base polynomial only (ca add aug value if want to track full L_A) f_val = ALMAlgorithm._poly_value(base_terms, x) # evaluate dynamic range dynamic_range = poly_model.dynamic_range hist["eq_inf"].append(eq_inf); hist["ineq_inf"].append(ineq_inf) hist["obj"].append(float(f_val)); hist["x"].append(x.copy()) hist["dynamic_range"].append(float(dynamic_range)) # parameter tracking hist["rho_h"].append(float(rho_h)); hist["rho_g"].append(float(rho_g)) hist["rho_eq_family"].append(dict(rho_eq_family)) hist["rho_ineq_family"].append(dict(rho_ineq_family)) hist["eq_inf_by_family"].append(dict(eq_inf_by_family)) hist["ineq_inf_by_family"].append(dict(ineq_inf_by_family)) if verbose: # show worst 3 equality families by residual, with their rhos eq_items = sorted(eq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True) eq_top = eq_items[:3] eq_str = ",".join([ f"{fam or 'eq'}:{val:.2e}@" f"{_rho_for(next(c for c in full_eqs if (getattr(c,'family','') or '')==fam and c.kind=='eq')):.1g}" for fam, val in eq_top ]) # show worst 3 inequality families by residual, with their rhos ineq_items = sorted(ineq_inf_by_family.items(), key=lambda kv: kv[1], reverse=True) ineq_top = ineq_items[:3] ineq_str = ",".join([ f"{fam or 'ineq'}:{val:.2e}@" f"{_rho_for(next(c for c in problem_ineqs if (getattr(c, 'family', '') or '') == fam and c.kind == 'ineq')):.1g}" for fam, val in ineq_top ]) print(f"[ALM {it:02d}] f={f_val:.6g} | eq_inf={eq_inf:.2e} | ineq_inf={ineq_inf:.2e} " f"| rho_h={rho_h:.2e} | rho_g={rho_g:.2e} | eq_fam[{eq_str}] | ineq_fam[{ineq_str}] " f"| dynamic_range={dynamic_range:.4g}") # stopping if eq_inf <= cfg.tol_h and ineq_inf <= cfg.tol_g: if verbose: print(f"[ALM] converged at iter {it}") break # EMA smoothing to reduce jitter if it == 0: eq_inf_smooth = eq_inf ineq_inf_smooth = ineq_inf else: eq_inf_smooth = cfg.ema_alpha * eq_inf + (1 - cfg.ema_alpha) * eq_inf_smooth ineq_inf_smooth = cfg.ema_alpha * ineq_inf + (1 - cfg.ema_alpha) * ineq_inf_smooth # -------- Residual-ratio controller -------- if cfg.adapt and it > 0: # per-family rho adaptation for equalities if getattr(cfg, "adapt_by_family", False): # Equality family for fam, cur in eq_inf_by_family.items(): prev = max(prev_eq_inf_by_family.get(fam, cur), eps) # rho value for family: if absent, start at rho_h rho_f = float(rho_eq_family.get(fam, rho_h)) if cur > cfg.tau_up_h * prev: rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max) elif cur < cfg.tau_down_h * prev: rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min) rho_eq_family[fam] = rho_f # update prev map prev_eq_inf_by_family = dict(eq_inf_by_family) # Inequality family for fam, cur in ineq_inf_by_family.items(): prev = max(prev_ineq_inf_by_family.get(fam, cur), eps) # rho value for family: if absent, start at rho_g rho_f = float(rho_ineq_family.get(fam, rho_g)) if cur > cfg.tau_up_g * prev: rho_f = min(cfg.gamma_up * rho_f, cfg.rho_max) elif cur < cfg.tau_down_g * prev: rho_f = max(cfg.gamma_down * rho_f, cfg.rho_min) rho_ineq_family[fam] = rho_f # update prev map prev_ineq_inf_by_family = dict(ineq_inf_by_family) else: # Equality group if eq_inf_smooth > cfg.tau_up_h * max(prev_eq_inf, eps): # stalled or not shrinking rho_h = min(cfg.gamma_up * rho_h, cfg.rho_max) elif eq_inf_smooth < cfg.tau_down_h * max(prev_eq_inf, eps): # fast progress, allow relaxation rho_h = max(cfg.gamma_down * rho_h, cfg.rho_min) # Inequality group if ineq_inf_smooth > cfg.tau_up_g * max(prev_ineq_inf, eps): rho_g = min(cfg.gamma_up * rho_g, cfg.rho_max) elif ineq_inf_smooth < cfg.tau_down_g * max(prev_ineq_inf, eps): rho_g = max(cfg.gamma_down * rho_g, cfg.rho_min) else: # per-family rho adaptation for equalities if getattr(cfg, "adapt_by_family", False): # update prev map prev_eq_inf_by_family = dict(eq_inf_by_family) prev_ineq_inf_by_family = dict(ineq_inf_by_family) # -------- Stagnation bump (safety net) -------- if cfg.use_stagnation_bump: if getattr(cfg, "adapt_by_family", False): # ---- Equality families ---- for fam, cur in eq_inf_by_family.items(): # initialize if fam not in best_eq_by_family: best_eq_by_family[fam] = float(cur) no_imp_eq_by_family[fam] = 0 if cur <= best_eq_by_family[fam] * (1 - cfg.stagnation_factor): best_eq_by_family[fam] = float(cur) no_imp_eq_by_family[fam] = 0 else: no_imp_eq_by_family[fam] += 1 if no_imp_eq_by_family[fam] >= cfg.patience_h: rho_f = float(rho_eq_family.get(fam, rho_h)) rho_eq_family[fam] = min(2.0 * rho_f, cfg.rho_max) no_imp_eq_by_family[fam] = 0 # ---- Inequality families ---- for fam, cur in ineq_inf_by_family.items(): # initialize if fam not in best_ineq_by_family: best_ineq_by_family[fam] = float(cur) no_imp_ineq_by_family[fam] = 0 if cur <= best_ineq_by_family[fam] * (1 - cfg.stagnation_factor): best_ineq_by_family[fam] = float(cur) no_imp_ineq_by_family[fam] = 0 else: no_imp_ineq_by_family[fam] += 1 if no_imp_ineq_by_family[fam] >= cfg.patience_g: rho_f = float(rho_ineq_family.get(fam, rho_g)) rho_ineq_family[fam] = min(2.0 * rho_f, cfg.rho_max) no_imp_ineq_by_family[fam] = 0 else: # Equality stagnation if eq_inf <= best_eq * (1 - cfg.stagnation_factor): best_eq = eq_inf; no_imp_eq = 0 else: no_imp_eq += 1 if no_imp_eq >= cfg.patience_h: rho_h = min(2.0 * rho_h, cfg.rho_max); no_imp_eq = 0 # Inequality stagnation if ineq_inf <= best_ineq * (1 - cfg.stagnation_factor): best_ineq = ineq_inf; no_imp_ineq = 0 else: no_imp_ineq += 1 if no_imp_ineq >= cfg.patience_g: rho_g = min(2.0 * rho_g, cfg.rho_max); no_imp_ineq = 0 # -------- finalize for next iteration -------- prev_eq_inf = max(eq_inf_smooth, eps) prev_ineq_inf = max(ineq_inf_smooth, eps) # ---- decoding back to native levels (only if blocks) ---- decoded_native: Dict[int, float] = {} # maps original var anchor -> chosen level value decoded_lifted: Dict[int, int] = {} # maps lifted start index -> argmax position (optional) if has_blocks: for blk, lift_idx in zip(registry.blocks, lifted_slices): if not lift_idx: continue sl = np.array(lift_idx, int) if len(sl) == 0: continue sblk = x[sl] j = int(np.argmax(sblk)) # which level got selected in the block orig_anchor = int(blk.idx[0]) # anchor original var id for this block decoded_native[orig_anchor] = float(blk.levels[j]) decoded_lifted[sl[0]] = j # optional: lifted index -> chosen slot return { "x": x, "decoded": decoded_native if has_blocks else {}, "decoded_debug": decoded_lifted if has_blocks else {}, "hist": hist, }