{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# PyTorch Tests On/Off" ] }, { "cell_type": "code", "execution_count": 33, "metadata": { "scrolled": false }, "outputs": [], "source": [ "import numpy as np\n", "import torch\n", "from torch.autograd import Variable\n", "\n", "\n", "def _log_poisson_impl(n, lam):\n", " normal = torch.distributions.Normal(lam, torch.sqrt(lam))\n", " return normal.log_prob(n)\n", "\n", "\n", "class shapesys_constraint:\n", " def __init__(self, nom_data, mod_data):\n", " self.auxdata = []\n", " self.bkg_over_db_squared = []\n", " for b, deltab in zip(nom_data, mod_data):\n", " bkg_over_bsq = b * b / deltab / deltab # tau*b\n", " self.bkg_over_db_squared.append(bkg_over_bsq)\n", " self.auxdata.append(bkg_over_bsq)\n", "\n", " def alphas(self, pars):\n", " return np.product([pars, self.bkg_over_db_squared], axis=0)\n", "\n", " def logpdf(self, a, alpha):\n", " return _log_poisson_impl(a, alpha)\n", "\n", " def expected_data(self, pars):\n", " return self.alphas(pars)\n", "\n", "\n", "class Model:\n", " def __init__(self, spec):\n", " self.samples = {}\n", " self.samples['sig'] = spec['sig']\n", " self.samples['bkg'] = spec['bkg']\n", " self.constraint = shapesys_constraint(spec['bkg'], spec['bkgerr'])\n", "\n", " def expected_actualdata(self, pars):\n", " return [\n", " pars[0] * s + pars[1] * b\n", " for s, b in zip(self.samples['sig'], self.samples['bkg'])\n", " ]\n", "\n", " def expected_auxdata(self, pars):\n", " modparslice = slice(1, 2)\n", " return self.constraint.expected_data(pars[modparslice])\n", "\n", " def expected_data(self, pars, include_auxdata=True):\n", " expected_actual = self.expected_actualdata(pars)\n", "\n", " if not include_auxdata:\n", " return expected_actual\n", " expected_constraints = self.expected_auxdata(pars)\n", " return np.concatenate([expected_actual, expected_constraints])\n", "\n", " def logpdf(self, data, pars):\n", " actual_data, auxdata = [data[0]], [data[1]]\n", " lambdas_data = self.expected_actualdata(pars)\n", "\n", " sum = 0\n", " for d, lam in zip(actual_data, lambdas_data):\n", " sum = sum + _log_poisson_impl(d, lam)\n", "\n", " modauxslice = slice(0, 1)\n", " thisauxdata = auxdata[slice(0, 1)]\n", "\n", " modparslice = slice(1, 2)\n", " modalphas = pdf.constraint.alphas(pars[modparslice])\n", "\n", " for a, alpha in zip(thisauxdata, modalphas):\n", " sum = sum + self.constraint.logpdf(a, alpha)\n", " return sum\n", "\n", "\n", "def loglambdav(pars, data, pdf):\n", " return -2 * pdf.logpdf(data, pars)\n", "\n", "\n", "def optimize(\n", " objective,\n", " init_pars,\n", " par_bounds,\n", " trf_to_unbounded=lambda x: x,\n", " trf_from_unbounded=lambda x: x,\n", " args=None,\n", " constrain_index_value=None,\n", "):\n", " data = args[0]\n", "\n", " poi_index = None\n", " constrained_var = None\n", " if constrain_index_value is not None:\n", " poi_index, poi_value = constrain_index_value\n", "\n", " upto = init_pars[:poi_index]\n", " fromon = init_pars[poi_index + 1 :]\n", " nuis_init = np.concatenate([upto, fromon])\n", "\n", " init_pars = nuis_init\n", " constrained_var = Variable(torch.Tensor([poi_value]), requires_grad=True)\n", "\n", " init_pars = trf_to_unbounded(init_pars)\n", "\n", " # print('init from: %s' % init_pars)\n", "\n", " parameters = Variable(torch.Tensor(init_pars), requires_grad=True)\n", " data = Variable(torch.Tensor(data))\n", "\n", " optimizer = torch.optim.Adam([parameters])\n", "\n", " def assemble_from_nuis(nuisance_pars):\n", " tocat = []\n", " if poi_index > 0:\n", " tocat.append(nuisance_pars[:poi_index])\n", " tocat.append(constrained_var)\n", " if poi_index < len(parameters):\n", " tocat.append(nuisance_pars[poi_index:])\n", " return torch.cat(tocat)\n", "\n", " for i in range(1000):\n", " objpars = None\n", " if constrain_index_value is None:\n", " objpars = parameters\n", " else:\n", " objpars = assemble_from_nuis(parameters)\n", " loss = objective(objpars, data, args[1])\n", " optimizer.zero_grad()\n", " loss.backward()\n", " optimizer.step()\n", " # if i % 1000 == 0:\n", " # print(trf_from_unbounded(parameters).data.numpy())\n", "\n", " result = parameters\n", " if constrain_index_value is not None:\n", " result = assemble_from_nuis(result)\n", " result = trf_from_unbounded(result).data.numpy()\n", " # print('result %s' % result)\n", " return result\n", "\n", "\n", "def unconstrained_bestfit(data, pdf, init_pars, par_bounds):\n", " # print 'fit', data, 'expected for init pars', np.array(pdf.expected_actualdata(init_pars))\n", " result = optimize(\n", " loglambdav,\n", " init_pars,\n", " par_bounds,\n", " args=(\n", " data,\n", " pdf,\n", " ),\n", " )\n", " return result\n", "\n", "\n", "def constrained_bestfit(constrained_mu, data, pdf, init_pars, par_bounds):\n", " # print 'fit', data, 'for pars', np.array(pdf.expected_actualdata(init_pars))\n", " result = optimize(\n", " loglambdav,\n", " init_pars,\n", " par_bounds,\n", " args=(\n", " data,\n", " pdf,\n", " ),\n", " constrain_index_value=(0, constrained_mu),\n", " )\n", " return result" ] }, { "cell_type": "code", "execution_count": 34, "metadata": { "scrolled": false }, "outputs": [], "source": [ "spec = {\n", " 'obs': [55.0],\n", " 'sig': [10.0],\n", " 'bkg': [50.0],\n", " 'bkgerr': [7.0],\n", "}\n", "\n", "pdf = Model(spec)\n", "\n", "data = np.concatenate([spec['obs'], pdf.constraint.auxdata])\n", "init_pars = [1.0, 1.0] # (mu, gamma)\n", "par_bounds = [[0, 10], [0, 10]]\n", "\n", "# constrained_bestfit(1.0, data, pdf, init_pars, par_bounds)\n", "# unconstrained_bestfit(data, pdf, init_pars, par_bounds)" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [], "source": [ "### The Test Statistic\n", "def generate_asimov_data(asimov_mu, data, pdf, init_pars, par_bounds):\n", " bestfit_nuisance_asimov = constrained_bestfit(\n", " asimov_mu, data, pdf, init_pars, par_bounds\n", " )\n", " return pdf.expected_data(bestfit_nuisance_asimov)\n", "\n", "\n", "def qmu(mu, data, pdf, init_pars, par_bounds):\n", " mubhathat = constrained_bestfit(mu, data, pdf, init_pars, par_bounds)\n", " muhatbhat = unconstrained_bestfit(data, pdf, init_pars, par_bounds)\n", "\n", " v_mbhh = Variable(torch.Tensor(mubhathat))\n", " v_mhbh = Variable(torch.Tensor(muhatbhat))\n", " v_data = Variable(torch.Tensor(data))\n", " qmu = loglambdav(v_mbhh, v_data, pdf) - loglambdav(v_mhbh, v_data, pdf)\n", " qmu = qmu.data.numpy()[0]\n", " if muhatbhat[0] > mu:\n", " return 0.0\n", " if -1e-6 < qmu < 0:\n", " log.warning('WARNING: qmu negative: %s', qmu)\n", " return 0.0\n", " return qmu\n", "\n", "\n", "def pvals_from_teststat(sqrtqmu_v, sqrtqmuA_v):\n", " from scipy.stats import norm\n", "\n", " CLsb = 1 - norm.cdf(sqrtqmu_v)\n", " CLb = norm.cdf(sqrtqmuA_v - sqrtqmu_v)\n", " CLs = CLb / CLsb\n", " return CLsb, CLb, CLs\n", "\n", "\n", "def runOnePoint(muTest, data, pdf, init_pars, par_bounds):\n", " asimov_mu = 0.0\n", " asimov_data = generate_asimov_data(asimov_mu, data, pdf, init_pars, par_bounds)\n", "\n", " qmu_v = qmu(muTest, data, pdf, init_pars, par_bounds)\n", " qmuA_v = qmu(muTest, asimov_data, pdf, init_pars, par_bounds)\n", "\n", " sqrtqmu_v = np.sqrt(qmu_v)\n", " sqrtqmuA_v = np.sqrt(qmuA_v)\n", "\n", " sigma = muTest / sqrtqmuA_v if sqrtqmuA_v > 0 else None\n", "\n", " CLsb, CLb, CLs = pvals_from_teststat(sqrtqmu_v, sqrtqmuA_v)\n", "\n", " CLs_exp = []\n", " for nsigma in [-2, -1, 0, 1, 2]:\n", " sqrtqmu_v_sigma = sqrtqmuA_v - nsigma\n", " CLs_exp.append(pvals_from_teststat(sqrtqmu_v_sigma, sqrtqmuA_v)[-1])\n", " return qmu_v, qmuA_v, sigma, CLsb, CLb, CLs, CLs_exp" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Populating the interactive namespace from numpy and matplotlib\n" ] }, { "data": { "image/png": "\n", "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%pylab inline\n", "\n", "testmus = np.linspace(0, 5, 11)\n", "results = []\n", "for mu in testmus:\n", " results.append(runOnePoint(mu, data, pdf, init_pars, par_bounds))\n", "\n", "obs = [1.0 / r[-2:][0] for r in results]\n", "exp = [[1.0 / r[-2:][1][i] for r in results] for i in range(5)]\n", "\n", "\n", "def plot_results(testmus, cls_obs, cls_exp, test_size=0.05):\n", " plt.plot(testmus, cls_obs, c='k')\n", " for i, c in zip(range(5), ['grey', 'grey', 'grey', 'grey', 'grey']):\n", " plt.plot(testmus, cls_exp[i], c=c)\n", " plt.plot(testmus, [test_size] * len(testmus), c='r')\n", " plt.ylim(0, 1)\n", "\n", "\n", "plot_results(testmus, obs, exp)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.14" } }, "nbformat": 4, "nbformat_minor": 2 }